diff --git a/src/pe_reports/data/database.ini b/src/pe_reports/data/database.ini index 1aa5ee69..4bd89f39 100644 --- a/src/pe_reports/data/database.ini +++ b/src/pe_reports/data/database.ini @@ -1,10 +1,3 @@ -[postgres] -host= -database= -user= -password= -port= - [blocklist] [dehashed] [dnstwist] @@ -13,6 +6,17 @@ port= [intelx] api_key= +[peapi] +api_key= +api_url= + +[postgres] +host= +database= +user= +password= +port= + [postgresql] host= database= diff --git a/src/pe_reports/data/db_query.py b/src/pe_reports/data/db_query.py index b7fa57d1..e793d771 100644 --- a/src/pe_reports/data/db_query.py +++ b/src/pe_reports/data/db_query.py @@ -1,6 +1,7 @@ """Query the PE PostgreSQL database.""" # Standard Python Libraries +import json import logging import re import sys @@ -11,6 +12,7 @@ import psycopg2 from psycopg2 import OperationalError from psycopg2.extensions import AsIs +import requests from .config import config @@ -20,6 +22,8 @@ LOGGER = logging.getLogger(__name__) CONN_PARAMS_DIC = config() +PE_API_KEY = config(section="peapi").get("api_key") +PE_API_URL = config(section="peapi").get("api_url") def sanitize_string(string): @@ -57,20 +61,25 @@ def close(conn): return -def get_orgs(conn): - """Query organizations table for orgs we report on.""" +def get_orgs(): + """Query organizations table.""" + headers = { + "Content-Type": "application/json", + "access_token": f"{PE_API_KEY}", + } try: - cur = conn.cursor() - sql = """SELECT * FROM organizations WHERE report_on""" - cur.execute(sql) - pe_orgs = cur.fetchall() - cur.close() - return pe_orgs - except (Exception, psycopg2.DatabaseError) as error: - LOGGER.error("There was a problem with your database query %s", error) - finally: - if conn is not None: - close(conn) + response = requests.post(PE_API_URL, headers=headers).json() + return response + except requests.exceptions.HTTPError as errh: + print(errh) + except requests.exceptions.ConnectionError as errc: + print(errc) + except requests.exceptions.Timeout as errt: + print(errt) + except requests.exceptions.RequestException as err: + print(err) + except json.decoder.JSONDecodeError as err: + print(err) def get_orgs_df(): diff --git a/src/pe_source/data/pe_db/db_query_source.py b/src/pe_source/data/pe_db/db_query_source.py index 65b3d2b9..95591d65 100644 --- a/src/pe_source/data/pe_db/db_query_source.py +++ b/src/pe_source/data/pe_db/db_query_source.py @@ -1,15 +1,17 @@ """Query the PE PostgreSQL database.""" # Standard Python Libraries -from datetime import datetime +import json import re import sys +import time # Third-Party Libraries import pandas as pd import psycopg2 from psycopg2 import OperationalError import psycopg2.extras as extras +import requests # cisagov Libraries from pe_reports import app @@ -20,6 +22,8 @@ LOGGER = app.config["LOGGER"] CONN_PARAMS_DIC = config() +PE_API_KEY = config(section="peapi").get("api_key") +PE_API_URL = config(section="peapi").get("api_url") def show_psycopg2_exception(err): @@ -53,28 +57,24 @@ def sanitize_text(string): def get_orgs(): - """Query organizations that receive reports and demo organizations.""" - conn = connect() + """Query organizations table.""" + headers = { + "Content-Type": "application/json", + "access_token": f"{PE_API_KEY}", + } try: - cur = conn.cursor() - sql = """SELECT * FROM organizations where report_on or demo""" - cur.execute(sql) - pe_orgs = cur.fetchall() - keys = ("org_uid", "org_name", "cyhy_db_name") - - for value in pe_orgs: - value[0] = sanitize_uid(value[0]) # org_uid - value[1] = value[1] - value[2] = sanitize_text(value[2]) # cyhy_db_name - - pe_orgs = [dict(zip(keys, values)) for values in pe_orgs] - cur.close() - return pe_orgs - except (Exception, psycopg2.DatabaseError) as error: - LOGGER.error("There was a problem with your database query %s", error) - finally: - if conn is not None: - close(conn) + response = requests.post(PE_API_URL + "get_orgs/", headers=headers).json() + return response + except requests.exceptions.HTTPError as errh: + print(errh) + except requests.exceptions.ConnectionError as errc: + print(errc) + except requests.exceptions.Timeout as errt: + print(errt) + except requests.exceptions.RequestException as err: + print(err) + except json.decoder.JSONDecodeError as err: + print(err) def get_ips(org_uid): @@ -92,27 +92,31 @@ def get_ips(org_uid): def get_data_source_uid(source): - """Get data source uid.""" - conn = connect() - cur = conn.cursor() - sql = """SELECT * FROM data_source WHERE name = '{}'""" - cur.execute(sql.format(source)) - source = cur.fetchone()[0] - - # Sanitize the data returned by fetchone()[0], - # returned data is data_source_uid (a uuid string) - source = sanitize_uid(source) - - cur.close() - cur = conn.cursor() - # Update last_run in data_source table - date = datetime.today().strftime("%Y-%m-%d") - sql = """update data_source set last_run = '{}' - where name = '{}';""" - cur.execute(sql.format(date, source)) - cur.close() - close(conn) - return source + """Query data_source table and update the last viewed time.""" + urlOrgs = PE_API_URL + headers = { + "Content-Type": "application/json", + "access_token": f"{PE_API_KEY}", + } + try: + response = requests.post( + urlOrgs + "data_source/" + source, headers=headers + ).json() + # Change last viewed + uid = response[0]["data_source_uid"] + requests.put(urlOrgs + "update_last_viewed/" + uid, headers=headers) + LOGGER.info("Updated last viewed for %s", source) + return response + except requests.exceptions.HTTPError as errh: + print(errh) + except requests.exceptions.ConnectionError as errc: + print(errc) + except requests.exceptions.Timeout as errt: + print(errt) + except requests.exceptions.RequestException as err: + print(err) + except json.decoder.JSONDecodeError as err: + print(err) def insert_sixgill_alerts(df): @@ -458,24 +462,64 @@ def insert_intelx_breaches(df): cursor.close() -def get_intelx_breaches(source_uid): - """Get IntelX credential breaches.""" - conn = connect() +def get_intelx_breaches(source_uid, redo_interval=3): + """ + Query API for all IntelX credential breaches. + + Args: + source_uid: The data source uid to filter credential breaches by + redo_interval: The amount of time to pause before redoing api call + + Return: + Credential breach data that have the specified data_source_uid as a dataframe + """ + # Endpoint info + create_task_url = PE_API_URL + "cred_breach_intelx" + check_task_url = PE_API_URL + "cred_breach_intelx/task/" + headers = { + "Content-Type": "application/json", + "access_token": f"{PE_API_KEY}", + } + data = json.dumps({"source_uid": source_uid}) try: - cur = conn.cursor() - sql = """SELECT breach_name, credential_breaches_uid FROM credential_breaches where data_source_uid = %s""" - cur.execute(sql, [source_uid]) - all_breaches = cur.fetchall() - for breach in all_breaches: - breach[0] = sanitize_text([0]) - breach[1] = sanitize_uid(breach[1]) - cur.close() - return all_breaches - except (Exception, psycopg2.DatabaseError) as error: - LOGGER.error("There was a problem with your database query %s", error) - finally: - if conn is not None: - close(conn) + # Create task for query + create_task_result = requests.post( + create_task_url, headers=headers, data=data + ).json() + task_id = create_task_result.get("task_id") + LOGGER.info( + "Created task for cred_breach_intelx endpoint query, task_id: ", task_id + ) + # Once task has been started, keep pinging task status until finished + check_task_url += task_id + task_status = "Pending" + while task_status != "Completed" and task_status != "Failed": + # Ping task status endpoint and get status + check_task_resp = requests.get(check_task_url, headers=headers).json() + task_status = check_task_resp.get("status") + LOGGER.info( + "\tPinged cred_breach_intelx status endpoint, status:", task_status + ) + time.sleep(redo_interval) + except requests.exceptions.HTTPError as errh: + LOGGER.error(errh) + except requests.exceptions.ConnectionError as errc: + LOGGER.error(errc) + except requests.exceptions.Timeout as errt: + LOGGER.error(errt) + except requests.exceptions.RequestException as err: + LOGGER.error(err) + except json.decoder.JSONDecodeError as err: + LOGGER.error(err) + # Once task finishes, return result + if task_status == "Completed": + # Convert result to list of tuples to match original function + result = [tuple(row.values()) for row in check_task_resp.get("result")] + return result + else: + raise Exception( + "cred_breach_intelx query task failed, details: ", check_task_resp + ) def insert_intelx_credentials(df):