In [1]:
import requests
from requests.auth import HTTPBasicAuth
import pandas as pd
import os
from dotenv import load_dotenv
import time
from enum import Enum
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey, DateTime, Float
from sqlalchemy.engine import URL, Engine

## Seting up environment

In [27]:
SAMPLE_JOB_TITLES = ["data science", "data engineer", "data analyst"]

load_dotenv()

# Get the API key from the .env file
OER_API_KEY = os.getenv("OER_API_KEY")
REED_API_KEY = os.getenv('REED_USER')

# Get the database connection settings from the .env file
DB_USER = os.getenv("TEST_STAGING_DB_USERNAME")
DB_PASSWORD = os.getenv("TEST_STAGING_DB_PASSWORD")
DB_HOST = os.getenv("TEST_STAGING_SERVER_NAME")
DB_PORT = os.getenv("TEST_STAGING_DB_PORT")
DB_NAME = os.getenv("TEST_STAGING_DB_NAME")

# Table names
JOB_TABLE_NAME = "job_advertisements"
EXCHANGE_TABLE_NAME = "open_exchange_rates"

## Reed API

In [3]:
class ReedAPI:
    def __init__(self, api_key: str):
        self.user = api_key
        self.password = ""
        self.base_url = "https://www.reed.co.uk/api/1.0/search/"
        self.page_size = 50

    def get_all_jobs(self, job_title: str) -> list[dict]:
        jobs, total_results = self.get_jobs_partial(job_title, self.page_size, 0)
        for page in range(1, total_results // self.page_size + 1):
            time.sleep(1) # TODO Check the API rate limit
            jobs.extend(self.get_jobs_partial(job_title, self.page_size, page)[0])

            if page > 1: # TODO Remove this, it's just for testing
                break
        return jobs
    
    def get_jobs_partial(self, job_title: str, count: int, page: int) -> tuple[list[dict], int]:
        print(f"Getting jobs for {job_title} page {page}")
        params = {
            "keywords" : job_title,
            "resultsToTake" : count,
            "resultsToSkip" : page * count
        }
        auth = HTTPBasicAuth(self.user, self.password)
        response = requests.get(self.base_url, auth=auth, params=params)
        if response.status_code != 200:
            raise Exception("Failed to get jobs")
        response_json = response.json()
        print(f"Got {len(response_json.get('results'))} jobs for {job_title} page {page}. Total results: {response_json.get('totalResults')}")
        return (response_json.get("results"), int(response_json.get("totalResults")))
    
def reed_load(dict_list: list[dict]) -> pd.DataFrame:
    df = pd.DataFrame(dict_list)
    df_selected = df[['jobId', 'employerId','employerName', 'jobTitle', 'locationName', 'minimumSalary', 'maximumSalary', 'currency', 'date', 'applications', 'jobUrl']].copy()
    df_selected['date'] = pd.to_datetime(df_selected['date'], format="%d/%m/%Y", errors='coerce')
    return df_selected

## Open Exchange Rates API

In [4]:
class Currency(Enum):
    GBP = "GBP"
    EUR = "EUR"
    USD = "USD"
    JPY = "JPY"
    CHF = "CHF"
    CAD = "CAD"
    AUD = "AUD"
    NZD = "NZD"
    ZAR = "ZAR"
    CNY = "CNY"
    HKD = "HKD"

class OpenExchangeRateApi:
    def __init__(self, api_key: str) -> None:
        self.api_key = api_key
        self.base_url = "https://openexchangerates.org/api/latest.json"
        self.base_currency = Currency.USD # TODO: not supported in free plan
        self.default_exchange_currencies = [
            Currency.GBP,
            Currency.EUR,
            Currency.USD,
            Currency.JPY, 
            Currency.CHF, 
            Currency.CAD, 
            Currency.AUD]

    def get_latest(self, base_currency: Currency = Currency.GBP, exchangee_currencies: list[Currency] = None ) -> dict:
        if exchangee_currencies is None:
            exchangee_currencies = self.default_exchange_currencies
        params = {
            "app_id": self.api_key,
            # "base": self.base_currency, # TODO: not supported in free plan
            "symbols": ",".join([currency.value for currency in exchangee_currencies])
        }
        response = requests.get(self.base_url, params=params)
        if response.status_code != 200:
            print(response.status_code)
            print(response.text)
            raise Exception("Failed to get jobs")
        return response.json()

def oer_load(dict_list: dict) -> pd.DataFrame:
    timestamp = dict_list["timestamp"] # UTC timestamp indicating the time the data was collected
    pandas_timestamp = pd.Timestamp(timestamp, unit="s")
    base_currency = dict_list["base"]
    rates = dict_list["rates"]
    df = pd.DataFrame.from_dict(rates, orient="index", columns=["rate"])
    df["timestamp"] = pandas_timestamp
    df["base_currency"] = base_currency
    df.reset_index(inplace=True)
    df.rename(columns={"index": "exchange_currency"}, inplace=True)
    return df

## Getting data from the web

In [5]:
reed_api = ReedAPI(REED_API_KEY)
job_advertisements = dict()
for job_title in SAMPLE_JOB_TITLES:
    print(f"Getting jobs for {job_title}")
    advertisements_raw = reed_api.get_all_jobs(job_title)
    advertisements_df = reed_load(advertisements_raw)
    job_advertisements[job_title] = advertisements_df
    print(f"Got {len(advertisements_df)} jobs for {job_title}")
print("Finished getting jobs")

Getting jobs for data science
Getting jobs for data science page 0


Got 50 jobs for data science page 0. Total results: 1422
Getting jobs for data science page 1
Got 50 jobs for data science page 1. Total results: 1422
Getting jobs for data science page 2
Got 50 jobs for data science page 2. Total results: 1422
Got 150 jobs for data science
Getting jobs for data engineer
Getting jobs for data engineer page 0
Got 50 jobs for data engineer page 0. Total results: 3608
Getting jobs for data engineer page 1
Got 50 jobs for data engineer page 1. Total results: 3608
Getting jobs for data engineer page 2
Got 50 jobs for data engineer page 2. Total results: 3608
Got 150 jobs for data engineer
Getting jobs for data analyst
Getting jobs for data analyst page 0
Got 50 jobs for data analyst page 0. Total results: 3284
Getting jobs for data analyst page 1
Got 50 jobs for data analyst page 1. Total results: 3284
Getting jobs for data analyst page 2
Got 50 jobs for data analyst page 2. Total results: 3284
Got 150 jobs for data analyst
Finished getting jobs


In [6]:
oer_api = OpenExchangeRateApi(OER_API_KEY)
oer_latest_raw = oer_api.get_latest()
oer_df = oer_load(oer_latest_raw)
print(f"Got {len(oer_df)} exchange rates")

Got 7 exchange rates


## Initialise the database connection

In [7]:
test_db_url = URL.create(
    drivername="postgresql+pg8000",
    username=DB_USER,
    password=DB_PASSWORD,
    host=DB_HOST,
    port=DB_PORT,
    database=DB_NAME
)

engine = create_engine(test_db_url)
with engine.connect() as conn:
    print("Successfully connected to the database")

Successfully connected to the database


## Creating the database tables

In [23]:
def create_table(engine: Engine, table_name: str, metadata: MetaData, df: pd.DataFrame) -> Table:
    # Drop the table if it already exists
    if engine.has_table(table_name):
        print(f"Dropping table {table_name}")
        metadata.tables[table_name].drop(engine)
        metadata.remove(metadata.tables[table_name])
        print(f"Dropped table {table_name}")
    print(f"Creating table {table_name}")

    # get the data types for each column
    column_types = dict()
    for column_name in df.columns:
        column_type = df[column_name].dtype
        if column_type == "object":
            column_types[column_name] = String
        elif column_type == "int64":
            column_types[column_name] = Integer
        elif column_type == "float64":
            column_types[column_name] = Float
        elif column_type == "datetime64[ns]":
            column_types[column_name] = DateTime
        else:
            raise Exception(f"Unsupported data type {column_type} for column {column_name}")
    
    print(f"Column types: {column_types}")
    # Create the table
    table = Table(
        table_name,
        metadata,
        Column("id", Integer, primary_key=True),
        *(Column(column_name, column_type) for column_name, column_type in column_types.items())
    )
    table.create(engine)
    print(f"Created table {table_name}")
    return table

In [29]:
metadata = MetaData(bind=engine)
metadata.reflect() # get target table schemas into metadata object 
job_advertisements_table = create_table(engine, JOB_TABLE_NAME, metadata, list(job_advertisements.values())[0])
oer_table = create_table(engine, EXCHANGE_TABLE_NAME, metadata, oer_df)

  if engine.has_table(table_name):


Dropping table job_advertisements
Dropped table job_advertisements
Creating table job_advertisements
Column types: {'jobId': <class 'sqlalchemy.sql.sqltypes.Integer'>, 'employerId': <class 'sqlalchemy.sql.sqltypes.Integer'>, 'employerName': <class 'sqlalchemy.sql.sqltypes.String'>, 'jobTitle': <class 'sqlalchemy.sql.sqltypes.String'>, 'locationName': <class 'sqlalchemy.sql.sqltypes.String'>, 'minimumSalary': <class 'sqlalchemy.sql.sqltypes.Float'>, 'maximumSalary': <class 'sqlalchemy.sql.sqltypes.Float'>, 'currency': <class 'sqlalchemy.sql.sqltypes.String'>, 'date': <class 'sqlalchemy.sql.sqltypes.DateTime'>, 'applications': <class 'sqlalchemy.sql.sqltypes.Integer'>, 'jobUrl': <class 'sqlalchemy.sql.sqltypes.String'>}
Created table job_advertisements
Dropping table open_exchange_rates
Dropped table open_exchange_rates
Creating table open_exchange_rates
Column types: {'exchange_currency': <class 'sqlalchemy.sql.sqltypes.String'>, 'rate': <class 'sqlalchemy.sql.sqltypes.Float'>, 'timesta

## Populating the database with data

In [28]:
# Insert the data into the tables
print("Inserting data into the job_advertisements table")
for job_title, job_advertisements_df in job_advertisements.items():
    print(f"Inserting data for {job_title}")
    job_advertisements_df.to_sql(JOB_TABLE_NAME, engine, if_exists="append", index=False, chunksize=100, method="multi")
print("Inserted data into the job_advertisements table")    

print("Inserting data into the open_exchange_rates table")
oer_df.to_sql(EXCHANGE_TABLE_NAME, engine, if_exists="append", index=False, chunksize=100, method="multi")
print("Inserted data into the open_exchange_rates table")

# Check the data was inserted correctly
print("Checking the data was inserted correctly")
verify_df = pd.read_sql_table(JOB_TABLE_NAME, engine)
print(verify_df.head())
verify_df = pd.read_sql_table(EXCHANGE_TABLE_NAME, engine)
print(verify_df.head())
print("Finished checking the data was inserted correctly")

Inserting data into the job_advertisements table


KeyboardInterrupt: 