In [1]:
import requests
import pandas as pd
import yaml
import os
import datetime as dt 

from secrets_config import api_key

In [2]:
with open("config.yaml") as stream:
    config = yaml.safe_load(stream)

target_cities = config['extract']['target_cities']
salesResults_url = config['extract']['api_urls']['salesResults']
salesResults_listings_url = config['extract']['api_urls']['salesResults_listings']

In [3]:
params = {
    "api_key": api_key
}

for city in target_cities:
    response_data = []
    url = salesResults_url.replace('{city}', city)

    response = requests.get(url, params=params)

    if response.json() is not None: 
        response_data.extend(response.json())
        print(f"Status: {response.status_code} for {url}")

    temp_df = pd.concat([
                pd.DataFrame([[city, url, dt.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")]], 
                columns=['City', 'URL', 'Execution time']), 
                pd.json_normalize(data=response.json())], axis=1)

    if 'df_results' in locals():
        df_results = pd.concat([df_results, temp_df], ignore_index=True)
    else:
        df_results = temp_df

Status: 200 for https://api.domain.com.au/v1/salesResults/Sydney
Status: 200 for https://api.domain.com.au/v1/salesResults/Melbourne
Status: 200 for https://api.domain.com.au/v1/salesResults/Adelaide


In [4]:
df_results

Unnamed: 0,City,URL,Execution time,auctionedDate,lastModifiedDateTime,adjClearanceRate,median,numberAuctioned,numberListedForAuction,numberSold,numberUnreported,numberWithdrawn,totalSales
0,Sydney,https://api.domain.com.au/v1/salesResults/Sydney,2022-09-29T18:36:30.170563Z,2022-09-24,2022-09-29T11:32:12.094Z,0.600677,1435000,591,725,355,11,130,364296588
1,Melbourne,https://api.domain.com.au/v1/salesResults/Melb...,2022-09-29T18:36:30.741148Z,2022-09-24,2022-09-29T11:32:12.094Z,0.617647,1017500,68,97,42,0,11,30176100
2,Adelaide,https://api.domain.com.au/v1/salesResults/Adel...,2022-09-29T18:36:31.336804Z,2022-09-24,2022-09-29T11:32:12.094Z,0.731707,771000,41,49,30,2,1,18046000


In [35]:
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, Float, JSON # https://www.tutorialspoint.com/sqlalchemy/sqlalchemy_core_creating_table.htm
from sqlalchemy.engine import URL
from sqlalchemy.dialects import postgresql
from secrets_config import db_user, db_password, db_server_name, db_database_name

In [36]:
# create connection to database 
connection_url = URL.create(
    drivername = "postgresql+pg8000", 
    username = db_user,
    password = db_password,
    host = db_server_name, 
    port = 5432,
    database = db_database_name, 
)

engine = create_engine(connection_url)

In [37]:
# specify target table schema 
raw_table = "raw_sales_result"

meta = MetaData()
raw_sales_result = Table(
    raw_table, meta, 
    Column("City", String, primary_key=True),
    Column("lastModifiedDateTime", String, primary_key=True),
    Column("URL", String),
    Column("Execution time", String),
    Column("auctionedDate", String),
    Column("adjClearanceRate", Float),
    Column("median", Integer),
    Column("numberAuctioned", Integer),
    Column("numberListedForAuction", Integer),
    Column("numberSold", Integer),
    Column("numberUnreported", Integer),
    Column("numberWithdrawn", Integer),
    Column("totalSales", Integer)
)
meta.create_all(engine) # creates table if it does not exist 

In [38]:
insert_statement = postgresql.insert(raw_sales_result).values(df_results.to_dict(orient='records'))
upsert_statement = insert_statement.on_conflict_do_update(
    index_elements=['City', 'lastModifiedDateTime'],
    set_={c.key: c for c in insert_statement.excluded if c.key not in ['City', 'lastModifiedDateTime']})
with engine.connect() as connection: 
    connection.execute(upsert_statement)

In [39]:
import os 
import logging 
import jinja2 as j2

def build_model(model, engine, models_path="models/")->bool:
    """
    Builds models with a matching file name in the models_path folder. 
    - `model`: the name of the model (without .sql)
    - `models_path`: the path to the models directory containing the sql files. defaults to `models/`
    """
    logging.basicConfig(level=logging.INFO, format="[%(levelname)s][%(asctime)s]: %(message)s")
    
    if f"{model}.sql" in os.listdir(models_path):
        logging.info(f"Building model: {model}")
    
        # read sql contents into a variable 
        with open(f"models/{model}.sql") as f: 
            raw_sql = f.read()

        # parse sql using jinja 
        parsed_sql = j2.Template(raw_sql).render(target_table = model, engine=engine)

        # execute parsed sql 
        engine.execute(parsed_sql)
        logging.info(f"Successfully built model: {model}")
        return True 
    else: 
        logging.error(f"Could not find model: {model}")

In [40]:
build_model(model="staging_sales_results", engine=engine, models_path="models/")

[INFO][2022-09-29 19:52:48,806]: Building model: staging_sales_results
[INFO][2022-09-29 19:52:48,819]: Successfully built model: staging_sales_results


True

In [42]:
build_model(model="serving_sales_results", engine=engine, models_path="models/")

[INFO][2022-09-29 19:58:48,844]: Building model: serving_sales_results
[INFO][2022-09-29 19:58:48,852]: Successfully built model: serving_sales_results


True

In [None]:
build_model(model="serving_sales_averages", engine=engine, models_path="models/")