Skip to content
This repository has been archived by the owner on Feb 14, 2024. It is now read-only.

Commit

Permalink
Updated get_intelx_breaches() to use API
Browse files Browse the repository at this point in the history
Updated the get_intelx_breaches() in db_query_source.py to use API instead of TSQL query
  • Loading branch information
arng4108 committed Jul 19, 2023
1 parent f3ed216 commit 7f0e5d9
Showing 1 changed file with 66 additions and 23 deletions.
89 changes: 66 additions & 23 deletions src/pe_source/data/pe_db/db_query_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import requests
import sys
import json
import time

# Third-Party Libraries
import pandas as pd
Expand All @@ -22,8 +23,9 @@
LOGGER = app.config["LOGGER"]

CONN_PARAMS_DIC = config()
PE_API_KEY = config(section='peapi').get('api_key')
PE_API_URL = config(section='peapi').get('api_url')
PE_API_KEY = config(section="peapi").get("api_key")
PE_API_URL = config(section="peapi").get("api_url")


def show_psycopg2_exception(err):
"""Handle errors for PostgreSQL issues."""
Expand Down Expand Up @@ -59,7 +61,7 @@ def get_orgs():
"""Query organizations table."""
headers = {
"Content-Type": "application/json",
"access_token": f'{PE_API_KEY}',
"access_token": f"{PE_API_KEY}",
}
try:
response = requests.post(PE_API_URL, headers=headers).json()
Expand Down Expand Up @@ -95,14 +97,16 @@ def get_data_source_uid(source):
urlOrgs = PE_API_URL
headers = {
"Content-Type": "application/json",
"access_token": f'{PE_API_KEY}',
"access_token": f"{PE_API_KEY}",
}
try:
response = requests.post(urlOrgs + "data_source/" + source, headers=headers).json()
#Change last viewed
response = requests.post(
urlOrgs + "data_source/" + source, headers=headers
).json()
# Change last viewed
uid = response[0]["data_source_uid"]
r = requests.put(urlOrgs + "update_last_viewed/" + uid, headers=headers)
LOGGER.info('Updated last viewed for %s', source)
LOGGER.info("Updated last viewed for %s", source)
return response
except requests.exceptions.HTTPError as errh:
print(errh)
Expand Down Expand Up @@ -460,23 +464,62 @@ def insert_intelx_breaches(df):


def get_intelx_breaches(source_uid):
"""Get IntelX credential breaches."""
conn = connect()
"""
Query API for all IntelX credential breaches.
Args:
source_uid: The data source uid to filter credential breaches by
Return:
Credential breach data that have the specified data_source_uid as a dataframe
"""
# Endpoint info
create_task_url = PE_API_URL + "cred_breach_intelx"
check_task_url = PE_API_URL + "cred_breach_intelx/task/"
headers = {
"Content-Type": "application/json",
"access_token": f"{PE_API_KEY}",
}
data = json.dumps({"source_uid": source_uid})
try:
cur = conn.cursor()
sql = """SELECT breach_name, credential_breaches_uid FROM credential_breaches where data_source_uid = %s"""
cur.execute(sql, [source_uid])
all_breaches = cur.fetchall()
for breach in all_breaches:
breach[0] = sanitize_text([0])
breach[1] = sanitize_uid(breach[1])
cur.close()
return all_breaches
except (Exception, psycopg2.DatabaseError) as error:
LOGGER.error("There was a problem with your database query %s", error)
finally:
if conn is not None:
close(conn)
# Create task for query
create_task_result = requests.post(
create_task_url, headers=headers, data=data
).json()
task_id = create_task_result.get("task_id")
LOGGER.info(
"Created task for cred_breach_intelx endpoint query, task_id: ", task_id
)
# Once task has been started, keep pinging task status until finished
check_task_url += task_id
task_status = "Pending"
while task_status != "Completed" and task_status != "Failed":
# Ping task status endpoint and get status
check_task_resp = requests.get(check_task_url, headers=headers).json()
task_status = check_task_resp.get("status")
LOGGER.info(
"\tPinged cred_breach_intelx status endpoint, status:", task_status
)
time.sleep(3)
except requests.exceptions.HTTPError as errh:
LOGGER.error(errh)
except requests.exceptions.ConnectionError as errc:
LOGGER.error(errc)
except requests.exceptions.Timeout as errt:
LOGGER.error(errt)
except requests.exceptions.RequestException as err:
LOGGER.error(err)
except json.decoder.JSONDecodeError as err:
LOGGER.error(err)
# Once task finishes, return result
if task_status == "Completed":
# Convert result to list of tuples to match original function
result = [tuple(row.values()) for row in check_task_resp.get("result")]
return result
else:
raise Exception(
"cred_breach_intelx query task failed, details: ", check_task_resp
)


def insert_intelx_credentials(df):
Expand Down

0 comments on commit 7f0e5d9

Please sign in to comment.