Skip to content
This repository has been archived by the owner on Feb 14, 2024. It is now read-only.

Commit

Permalink
Merge pull request #640 from cisagov/EM-get-org-api
Browse files Browse the repository at this point in the history
Replace Get_orgs
  • Loading branch information
cduhn17 committed Jul 25, 2023
2 parents 818ecce + 8057968 commit 998fa20
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 80 deletions.
18 changes: 11 additions & 7 deletions src/pe_reports/data/database.ini
@@ -1,10 +1,3 @@
[postgres]
host=
database=
user=
password=
port=

[blocklist]
[dehashed]
[dnstwist]
Expand All @@ -13,6 +6,17 @@ port=
[intelx]
api_key=

[peapi]
api_key=
api_url=

[postgres]
host=
database=
user=
password=
port=

[postgresql]
host=
database=
Expand Down
35 changes: 22 additions & 13 deletions src/pe_reports/data/db_query.py
@@ -1,6 +1,7 @@
"""Query the PE PostgreSQL database."""

# Standard Python Libraries
import json
import logging
import re
import sys
Expand All @@ -11,6 +12,7 @@
import psycopg2
from psycopg2 import OperationalError
from psycopg2.extensions import AsIs
import requests

from .config import config

Expand All @@ -20,6 +22,8 @@
LOGGER = logging.getLogger(__name__)

CONN_PARAMS_DIC = config()
PE_API_KEY = config(section="peapi").get("api_key")
PE_API_URL = config(section="peapi").get("api_url")


def sanitize_string(string):
Expand Down Expand Up @@ -57,20 +61,25 @@ def close(conn):
return


def get_orgs(conn):
"""Query organizations table for orgs we report on."""
def get_orgs():
"""Query organizations table."""
headers = {
"Content-Type": "application/json",
"access_token": f"{PE_API_KEY}",
}
try:
cur = conn.cursor()
sql = """SELECT * FROM organizations WHERE report_on"""
cur.execute(sql)
pe_orgs = cur.fetchall()
cur.close()
return pe_orgs
except (Exception, psycopg2.DatabaseError) as error:
LOGGER.error("There was a problem with your database query %s", error)
finally:
if conn is not None:
close(conn)
response = requests.post(PE_API_URL, headers=headers).json()
return response
except requests.exceptions.HTTPError as errh:
print(errh)
except requests.exceptions.ConnectionError as errc:
print(errc)
except requests.exceptions.Timeout as errt:
print(errt)
except requests.exceptions.RequestException as err:
print(err)
except json.decoder.JSONDecodeError as err:
print(err)


def get_orgs_df():
Expand Down
164 changes: 104 additions & 60 deletions src/pe_source/data/pe_db/db_query_source.py
@@ -1,15 +1,17 @@
"""Query the PE PostgreSQL database."""

# Standard Python Libraries
from datetime import datetime
import json
import re
import sys
import time

# Third-Party Libraries
import pandas as pd
import psycopg2
from psycopg2 import OperationalError
import psycopg2.extras as extras
import requests

# cisagov Libraries
from pe_reports import app
Expand All @@ -20,6 +22,8 @@
LOGGER = app.config["LOGGER"]

CONN_PARAMS_DIC = config()
PE_API_KEY = config(section="peapi").get("api_key")
PE_API_URL = config(section="peapi").get("api_url")


def show_psycopg2_exception(err):
Expand Down Expand Up @@ -53,28 +57,24 @@ def sanitize_text(string):


def get_orgs():
"""Query organizations that receive reports and demo organizations."""
conn = connect()
"""Query organizations table."""
headers = {
"Content-Type": "application/json",
"access_token": f"{PE_API_KEY}",
}
try:
cur = conn.cursor()
sql = """SELECT * FROM organizations where report_on or demo"""
cur.execute(sql)
pe_orgs = cur.fetchall()
keys = ("org_uid", "org_name", "cyhy_db_name")

for value in pe_orgs:
value[0] = sanitize_uid(value[0]) # org_uid
value[1] = value[1]
value[2] = sanitize_text(value[2]) # cyhy_db_name

pe_orgs = [dict(zip(keys, values)) for values in pe_orgs]
cur.close()
return pe_orgs
except (Exception, psycopg2.DatabaseError) as error:
LOGGER.error("There was a problem with your database query %s", error)
finally:
if conn is not None:
close(conn)
response = requests.post(PE_API_URL + "get_orgs/", headers=headers).json()
return response
except requests.exceptions.HTTPError as errh:
print(errh)
except requests.exceptions.ConnectionError as errc:
print(errc)
except requests.exceptions.Timeout as errt:
print(errt)
except requests.exceptions.RequestException as err:
print(err)
except json.decoder.JSONDecodeError as err:
print(err)


def get_ips(org_uid):
Expand All @@ -92,27 +92,31 @@ def get_ips(org_uid):


def get_data_source_uid(source):
"""Get data source uid."""
conn = connect()
cur = conn.cursor()
sql = """SELECT * FROM data_source WHERE name = '{}'"""
cur.execute(sql.format(source))
source = cur.fetchone()[0]

# Sanitize the data returned by fetchone()[0],
# returned data is data_source_uid (a uuid string)
source = sanitize_uid(source)

cur.close()
cur = conn.cursor()
# Update last_run in data_source table
date = datetime.today().strftime("%Y-%m-%d")
sql = """update data_source set last_run = '{}'
where name = '{}';"""
cur.execute(sql.format(date, source))
cur.close()
close(conn)
return source
"""Query data_source table and update the last viewed time."""
urlOrgs = PE_API_URL
headers = {
"Content-Type": "application/json",
"access_token": f"{PE_API_KEY}",
}
try:
response = requests.post(
urlOrgs + "data_source/" + source, headers=headers
).json()
# Change last viewed
uid = response[0]["data_source_uid"]
requests.put(urlOrgs + "update_last_viewed/" + uid, headers=headers)
LOGGER.info("Updated last viewed for %s", source)
return response
except requests.exceptions.HTTPError as errh:
print(errh)
except requests.exceptions.ConnectionError as errc:
print(errc)
except requests.exceptions.Timeout as errt:
print(errt)
except requests.exceptions.RequestException as err:
print(err)
except json.decoder.JSONDecodeError as err:
print(err)


def insert_sixgill_alerts(df):
Expand Down Expand Up @@ -458,24 +462,64 @@ def insert_intelx_breaches(df):
cursor.close()


def get_intelx_breaches(source_uid):
"""Get IntelX credential breaches."""
conn = connect()
def get_intelx_breaches(source_uid, redo_interval=3):
"""
Query API for all IntelX credential breaches.
Args:
source_uid: The data source uid to filter credential breaches by
redo_interval: The amount of time to pause before redoing api call
Return:
Credential breach data that have the specified data_source_uid as a dataframe
"""
# Endpoint info
create_task_url = PE_API_URL + "cred_breach_intelx"
check_task_url = PE_API_URL + "cred_breach_intelx/task/"
headers = {
"Content-Type": "application/json",
"access_token": f"{PE_API_KEY}",
}
data = json.dumps({"source_uid": source_uid})
try:
cur = conn.cursor()
sql = """SELECT breach_name, credential_breaches_uid FROM credential_breaches where data_source_uid = %s"""
cur.execute(sql, [source_uid])
all_breaches = cur.fetchall()
for breach in all_breaches:
breach[0] = sanitize_text([0])
breach[1] = sanitize_uid(breach[1])
cur.close()
return all_breaches
except (Exception, psycopg2.DatabaseError) as error:
LOGGER.error("There was a problem with your database query %s", error)
finally:
if conn is not None:
close(conn)
# Create task for query
create_task_result = requests.post(
create_task_url, headers=headers, data=data
).json()
task_id = create_task_result.get("task_id")
LOGGER.info(
"Created task for cred_breach_intelx endpoint query, task_id: ", task_id
)
# Once task has been started, keep pinging task status until finished
check_task_url += task_id
task_status = "Pending"
while task_status != "Completed" and task_status != "Failed":
# Ping task status endpoint and get status
check_task_resp = requests.get(check_task_url, headers=headers).json()
task_status = check_task_resp.get("status")
LOGGER.info(
"\tPinged cred_breach_intelx status endpoint, status:", task_status
)
time.sleep(redo_interval)
except requests.exceptions.HTTPError as errh:
LOGGER.error(errh)
except requests.exceptions.ConnectionError as errc:
LOGGER.error(errc)
except requests.exceptions.Timeout as errt:
LOGGER.error(errt)
except requests.exceptions.RequestException as err:
LOGGER.error(err)
except json.decoder.JSONDecodeError as err:
LOGGER.error(err)
# Once task finishes, return result
if task_status == "Completed":
# Convert result to list of tuples to match original function
result = [tuple(row.values()) for row in check_task_resp.get("result")]
return result
else:
raise Exception(
"cred_breach_intelx query task failed, details: ", check_task_resp
)


def insert_intelx_credentials(df):
Expand Down

0 comments on commit 998fa20

Please sign in to comment.