In [360]:
import csv
import os
import sys
import ssl
import sqlite3
import smtplib
import requests
from os import environ
from datetime import date, datetime
from typing import List
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
import pandas as pd

In [395]:
# Environment variables:
BASE_URL = r"https://data.usajobs.gov/api/"
USER_AGENT= r"recruiting@tasman.ai"
API_KEY= r"9MbHa87/i38Y36f7BjnF/HnyPGEsOUWXJdInME0B99E="
PAGE_LIMIT = 500

TITLES = ['Data Analyst', 'Data Scientist', 'Data Engineering']
KEYWORDS = ['data', 'analysis', 'analytics']

BASE_DIR = r'/Users/antoniosanmateu/desktop/tasman/tasman-de/'
DB_NAME = r'USAGOVJobs.db'

DATABASE_DIR = os.path.join(BASE_DIR, DB_NAME)
EXPORTS_DIR = os.path.join(BASE_DIR, r"exports")

EMAIL_PARAMS = {
    "FROM": r"no-reply@mail.com",
    "TO": r"antonio.sanmateu@gmail.com",
    "PORT":587,
    "SMTP_SERVER":r"smtp.gmail.com",
    "EMAIL":r"asanmateu@joor.com",
    "PASSWORD":r"yppk owpm mbub uzkn"
}

In [388]:
os.chdir(BASE_DIR)

In [389]:
def db_connect(db_path: str = DATABASE_DIR):
    """Connects to database and returns a database connection object. """
    try:
        db_connection = sqlite3.connect(DATABASE_DIR)
        return db_connection
    except sqlite3.Error as error:
        print("Failed to execute the above query", error)
        sys.exit(1)

In [390]:
def get_api_call(endpoint: str, params: dict, base_url: str = BASE_URL, page_limit: int = PAGE_LIMIT):
    """
    Makes a GET request with appropriate parameters, authentication,
    while respecting page and rate limits, and paginating if needed. 
    
    Returns a JSON API response object. """

    # Set up authentication
    headers = {
        "Host": "data.usajobs.gov",
        "User-Agent": USER_AGENT,
        "Authorization-Key": API_KEY
    }

    # Set up default parameters
    # params["DatePosted"] = date.today().strftime("%Y-%m-%d")
    params["ResultsPerPage"] = page_limit
    params["SortField"] = "DatePosted"
    params["SortOrder"] = "Descending"

    # Make the API call
    try:
        url = base_url + endpoint
        response = requests.get(url, headers=headers, params=params)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as err:
        print(err)
        sys.exit(1)

In [391]:
def parse_positions(response_json):
    """
    Parses a response JSON for wanted fields.

    Returns a list of positions of appropriate object type. """

    positions = []

    try:
        for job in response_json["SearchResult"]["SearchResultItems"]:
            position = {
                "PositionID": job["MatchedObjectDescriptor"]["PositionID"],
                "PositionTitle": job["MatchedObjectDescriptor"]["PositionTitle"].strip().title(),
                "OrganizationName": job["MatchedObjectDescriptor"]["OrganizationName"],
                "RemunerationMin": float(job["MatchedObjectDescriptor"]["PositionRemuneration"][0]["MinimumRange"]),
                "RemunerationMax": float(job["MatchedObjectDescriptor"]["PositionRemuneration"][0]["MaximumRange"]),
                "RemunerationRate": job["MatchedObjectDescriptor"]["PositionRemuneration"][0]["RateIntervalCode"],
                "WhoMayApply": job["MatchedObjectDescriptor"]["UserArea"]["Details"]["WhoMayApply"]["Name"],
                "ApplicationCloseDate": job["MatchedObjectDescriptor"]["ApplicationCloseDate"],
            }
            positions.append(position)

            return positions

    except() as err:
        print(err)
        sys.exit(1)

In [403]:
def extract_positions(titles: List[str], keywords: List[str]):
    """
    Makes API calls for titles and keywords, parses the responses.

    Returns the values ready to be loaded into database. """

    # Set up API query parameters
    params_titles = {
        "PositionTitle": titles
    }

    params_keywords = {
        "Keyword": keywords
    }

    # Retrieve API responses
    try:
        api_response_titles = get_api_call("Search", params_titles)
        api_response_keywords = get_api_call("Search", params_keywords)

        # Parse the API responses
        title_search = parse_positions(api_response_titles)
        keyword_search = parse_positions(api_response_keywords)

        # Merge search results on PositionID into a DataFrame
        merged_search = title_search + keyword_search
        search_df = pd.DataFrame(merged_search)
        search_df = search_df.drop_duplicates(subset="PositionID", keep="first")

        return search_df

    except requests.exceptions.HTTPError as err:
        print(err)
        sys.exit(1)

In [404]:
df = extract_positions(TITLES, KEYWORDS)

In [405]:
df

Unnamed: 0,PositionID,PositionTitle,OrganizationName,RemunerationMin,RemunerationMax,RemunerationRate,WhoMayApply,ApplicationCloseDate
0,20220251,Data Analyst,National Geospatial-Intelligence Agency,96119.0,164102.0,Per Year,,2022-05-07T23:59:59.9970
1,CES-11480215-22MJC,Data Scientist,Defense Information Systems Agency,106823.0,138868.0,Per Year,,2022-05-10T23:59:59.9970


In [297]:
def prep_database(db_name: str = DB_NAME) -> None:
    """Connects to database and creates tables if necessary. """
    # TODO: Could improve database design adding levels of granularity and foreign keys
    try:
        db_connection = db_connect(db_name)
        db_cursor = db_connection.cursor()
        db_cursor.execute("""CREATE TABLE IF NOT EXISTS POSITION (
                                TITLE_ID            VARCHAR(255)        PRIMARY KEY,
                                TITLE               VARCHAR(255)        NOT NULL,
                                ORGANISATION_NAME   VARCHAR(255)        NOT NULL,
                                REMUNERATION_MIN    REAL,
                                REMUNERATION_MAX    REAL,
                                REMUNERATION_RATE   VARCHAR(255),
                                WHO_MAY_APPLY       VARCHAR(255),
                                APPLICATION_CLOSE_DATE TIMESTAMP     NOT NULL);""")
        db_connection.commit()
        print(f"Database created: {db_name} at {DIR}")
        print("Table POSITION has been created")
    except sqlite3.Error as error:
        print("Failed to execute the above query", error)
        sys.exit(1)
    finally:
        if db_connection:
            db_connection.close()

In [202]:
prep_database()

Database created: USAGOVJobs.db at /Users/antoniosanmateu/desktop/tasman/tasman-de/
Table POSITION has been created
Database connection is now closed.


In [260]:
def load_df(df: pd.DataFrame, db_name: str = DB_NAME) -> None:
    """Loads dataframe into database using sqlalchemy. """
    try:
        db_connection = db_connect(db_name)
        with db_connection:
            db_cursor = db_connection.cursor()
            for index, row in df.iterrows():
                db_cursor.execute(""" INSERT OR REPLACE INTO POSITION (
                                                    TITLE_ID, 
                                                    TITLE, 
                                                    ORGANISATION_NAME, 
                                                    REMUNERATION_MIN, 
                                                    REMUNERATION_MAX, 
                                                    REMUNERATION_RATE, 
                                                    WHO_MAY_APPLY, 
                                                    APPLICATION_CLOSE_DATE
                                                    )    
                                                    VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
                                  (row["PositionID"], 
                                   row["PositionTitle"], 
                                   row["OrganizationName"], 
                                   row["RemunerationMin"], 
                                   row["RemunerationMax"], 
                                   row["RemunerationRate"], 
                                   row["WhoMayApply"], 
                                   row["ApplicationCloseDate"]))
            db_connection.commit()
    except sqlite3.Error as error:
        print("Failed to execute the above query", error)
        sys.exit(1)
    finally:
        if db_connection:
            db_connection.close()

In [216]:
load_df(df)

Tables: []
Database connection is now closed.


In [222]:
# Let's check if data has been uploaded:
db_connection = db_connect(DB_NAME)
db_cursor = db_connection.cursor()
db_cursor.execute(""" SELECT * FROM POSITION """)
db_cursor.fetchall()

[('20220251',
  'Data Analyst',
  'National Geospatial-Intelligence Agency',
  96119.0,
  164102.0,
  'Per Year',
  '',
  '2022-05-07T23:59:59.9970'),
 ('CES-11480215-22MJC',
  'Data Scientist',
  'Defense Information Systems Agency',
  106823.0,
  138868.0,
  'Per Year',
  '',
  '2022-05-10T23:59:59.9970')]

In [310]:
def export_query_as_csv(query: str, query_name: str, path: str, db_name: str = DB_NAME) -> None:
    """Exports query results to CSV file. """
    
    try:
        
        # Get the database connection
        db_connection = db_connect(db_name)
        with db_connection:
            db_cursor = db_connection.cursor()
            db_cursor.execute(query)
            rows = db_cursor.fetchall()
        try:
            # Write to CSV file
            with open(os.path.join(path, query_name), "w") as csv_file:
                writer = csv.writer(csv_file)
                writer.writerow([i[0] for i in db_cursor.description])
                writer.writerows(rows)
        except IOError as err:
            print(err)
            sys.exit(1)
    except sqlite3.Error as err:
        print(err)
        sys.exit(1)
    finally:
        if db_connection:
            db_cursor.close()
            db_connection.close()

In [329]:
def run_analysis(exports_path: str = EXPORTS_DIR):
    """
    Runs 3 SQL queries to obtain results that could answer the following questions:
    1. How do *monthly* starting salaries differ across positions with different titles and keywords?
    2. Do (filtered) positions for which 'United States Citizens' can apply have a higher average salary than those
       that 'Student/Internship Program Eligibles' can apply for? (by month)
    3. What are the organisations that have most open (filtered) positions?
    
    Exports results of queries into CSV files in the `output_path` directory.

    ** Feel free to break this function down into smaller units 
    (hint: potentially have a `export_csv(query_result)` function)  
    """
    
    query_1 = """ 
        SELECT DISTINCT TITLE_ID, TITLE, REMUNERATION_MIN   
        FROM POSITION
        WHERE REMUNERATION_RATE = 'Monthly'
        AND LOWER(TITLE) LIKE '%data%'
        GROUP BY 1
        ORDER BY 2 DESC 
        """
    query_2 = """
        SELECT DISTINCT WHO_MAY_APPLY, AVG(REMUNERATION_MIN), AVG(REMUNERATION_MAX) 
        FROM POSITION
        WHERE WHO_MAY_APPLY LIKE '%United States Citizens%' 
        OR WHO_MAY_APPLY LIKE '%Student/Internship Program Eligibles%'
        AND REMUNERATION_RATE = 'Monthly'
        GROUP BY WHO_MAY_APPLY
        ORDER BY AVG(REMUNERATION_MIN) DESC
        """
    query_3 = """
        SELECT ORGANISATION_NAME, COUNT(TITLE_ID)
        FROM POSITION
        WHERE APPLICATION_CLOSE_DATE > DATE('NOW')
        GROUP BY 1
        ORDER BY 2 DESC
        """
    
    curr_timestamp = int(datetime.timestamp(datetime.now()))
    
    global ANALYSIS_DIR
    ANALYSIS_DIR = os.path.join(exports_path, str(curr_timestamp))    
    
    try:
        # Create analysis folder if necessary
        if not os.path.exists(os.path.join(ANALYSIS_DIR)):
            os.makedirs(ANALYSIS_DIR)        
        # Export results of queries into CSV files in the `output_path` directory.
        for num, query in enumerate([query_1, query_2, query_3], start=1):
            export_query_as_csv(query, query_name=f"query{num}_{curr_timestamp}.csv", path=ANALYSIS_DIR)
                      
    except sqlite3.Error as error:
        print("Failed to execute the above query", error)
        sys.exit(1)

In [330]:
run_analysis()

In [368]:
def send_reports(reports_path: str):
    """
    Loops through present CSV files in reports_path, 
    and sends them via email to recipient. 

    Returns None
    """
    curr_timestamp = int(datetime.timestamp(datetime.now()))

    # Set up email parameters
    msg = MIMEMultipart()
    msg["From"] = EMAIL_PARAMS["FROM"]
    msg["To"] = EMAIL_PARAMS["TO"]
    msg["Subject"] = "Antonio - Data Analysis Reports {}".format(date.today())
    msg.attach(MIMEText("Please find attached reports for today's analysis."))
    
    context = ssl.create_default_context()

    # Loop through CSV files in reports_path, attach to email
    try:
        for file in os.listdir(reports_path):
            with open(os.path.join(reports_path, file), "rb") as attachment:
                part = MIMEBase("application", "octet-stream")
                part.set_payload(attachment.read())
                encoders.encode_base64(part)
                part.add_header(
                    "Content-Disposition",
                    "attachment; filename={}".format(file),
                )
                msg.attach(part)
    except FileNotFoundError:
        print("No reports found in {}".format(reports_path))
        sys.exit(1)

    try:
        # Send email using SMTP
        smtp_server = smtplib.SMTP(EMAIL_PARAMS["SMTP_SERVER"], EMAIL_PARAMS["PORT"])
        smtp_server.ehlo()
        # Start TLS for security
        smtp_server.starttls(context=context)
        # Identify ourselves to smtp gmail client
        smtp_server.ehlo()
        # Identify to server this time with encrypted connection
        smtp_server.login(EMAIL_PARAMS["EMAIL"], EMAIL_PARAMS["PASSWORD"])
        # Send email
        smtp_server.sendmail(EMAIL_PARAMS["EMAIL"], EMAIL_PARAMS["TO"], msg.as_string())
        # Quit server
        smtp_server.quit()
    except Exception as e:
        print(e)
        sys.exit(1)

In [373]:
send_reports(ANALYSIS_DIR)