In [1]:
import boto3
import json
import pandas as pd
from botocore.handlers import disable_signing
import os
import pyodbc

In [2]:




os.environ['AWS_DEFAULT_REGION'] = 'us-west-2'
s3 = boto3.resource('s3')
s3.meta.client.meta.events.register('choose-signer.s3.*', disable_signing)

s3_dir = "data_engineering/test_1/"
bucket = "tests.arealytics.com.au"

bars_file = s3_dir + "bars.json"
beers_file = s3_dir +  "beers.json"
visits_file = s3_dir +  "visit_events.json"

#load & normalize bars into df
bars_obj = s3.Object(bucket,bars_file)
bars = json.load(bars_obj.get()['Body'])
df_bars = pd.json_normalize(bars, record_path=['stock'], meta= ["barName", "address"])
# rename columns
df_bars.rename(columns = {'name':'drinkname', 'barName' : 'barname'}, inplace = True)

# Load beers into df
beers_obj = s3.Object(bucket,beers_file)
beers = json.load(beers_obj.get()['Body'])
df_beers = pd.json_normalize(beers)
# rename columns
df_beers.rename(columns = {'name':'drinkname'}, inplace = True)

# Load visists into df
visits_obj = s3.Object(bucket,visits_file)
visits = json.load(visits_obj.get()['Body'])
df_visits = pd.json_normalize(visits)
# convert drinks to string
df_visits['drinks'] = df_visits['drinks'].astype(str)


In [3]:
## database connection
def connect_to_sqldb(server, database, username, password):
    
    try:
    
        conn = pyodbc.connect('DRIVER={SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password)
    except pyodbc.OperationalError as e:
        raise e
    else:
        print("Connected to SQL")
    return conn

In [4]:
#create staging tables
def create_staging(curr):
    
    create_staging_schema = ("""IF NOT EXISTS (SELECT 1 FROM SYS.SCHEMAS WHERE [name] = 'staging')
                                begin
                                EXEC ('CREATE SCHEMA [staging]')
                                end""")
    
    create_staging_bars = ( """IF OBJECT_ID(N'staging.bars', N'U') IS NOT NULL
                                  DROP TABLE staging.bars
                                  CREATE TABLE staging.bars(
                                  drinkName varchar(255),
                                  price varchar(255),
                                  barName varchar(255),
                                  address varchar(500))""")
    
    create_staging_beers = ( """IF OBJECT_ID(N'staging.beers', N'U') IS NOT NULL
                                   DROP TABLE staging.beers
                                   CREATE TABLE staging.beers(
                                   drinkName varchar(255),
                                   codeBar varchar(255),
                                   type varchar(255),
                                   alcoholUnits varchar(255))""")
    
    create_staging_visits = ( """IF OBJECT_ID(N'staging.visits', N'U') IS NOT NULL
                                    DROP TABLE staging.visits
                                    CREATE TABLE staging.visits(
                                    uuid nvarchar(255),
                                    barName varchar(255),
                                    drinks varchar(255),
                                    drinkName varchar(255),
                                    happyHour varchar(255),
                                    visited varchar(255))""")
    
    try:
        
        curr.execute(create_staging_schema)
        curr.execute(create_staging_bars)
        curr.execute(create_staging_beers)
        curr.execute(create_staging_visits)
        
    except pyodbc.Error as e:
        raise e
        
    finally:
        curr.commit()

In [5]:
# load staging bars
def insert_into_staging_bars(curr, drinkname,price,barname,address):
    insert_staging_bars = ("""INSERT INTO staging.bars (drinkName,price,barName,address) 
            values(?, ?, ?, ?);""")

    insert_row = (drinkname,price,barname,address)
    try:
        curr.execute(insert_staging_bars, insert_row)
    except pyodbc.Error as e:
        raise e
    finally:
        curr.commit()

In [6]:
def append_staging_bars(curr, df):
    for index, row in df.iterrows():
        insert_into_staging_bars(curr, row["drinkname"],row["price"],row["barname"],row["address"])

In [7]:
# load staging beers
def insert_into_staging_beers(curr, drinkname, codebar, type, alcoholUnits):
    insert_staging_beers = ("""INSERT INTO staging.beers (drinkName,codeBar,type,alcoholUnits) 
            values(?, ?, ?, ?);""")

    insert_row = (drinkname, codebar, type, alcoholUnits)
    try:
        curr.execute(insert_staging_beers, insert_row)
    except pyodbc.Error as e:
        raise e
    finally:
        curr.commit()

In [8]:
def append_staging_beers(curr, df):
    for index, row in df.iterrows():
        insert_into_staging_beers(curr, row["drinkname"],row["codebar"],row["type"],row["alcoholUnits"])

In [9]:
# load staging visits
def insert_into_staging_visits(curr, uuid, bar_name, drinks, beverage, happy_hour, visited):
    insert_staging_visits = ("""INSERT INTO staging.visits (uuid, barName, drinks, drinkName, happyHour, visited) 
            values(?, ?, ?, ?, ?, ?);""")

    insert_row = (uuid, bar_name, drinks, beverage, happy_hour, visited)
    try:
        curr.execute(insert_staging_visits, insert_row)
    except pyodbc.Error as e:
        raise e
    finally:
        curr.commit()

In [10]:
def append_staging_visits(curr, df):
    for index, row in df.iterrows():
        insert_into_staging_visits(curr, row["uuid"],row["bar_name"],row["drinks"],row["beverage"],row["happy_hour"],row["visited"])

In [11]:
def populatemodel(curr, model):
    run_model = (f"Exec {model}")
    
    try:
        curr.execute(run_model)
    except pyodbc.Error as e:
        raise e
    finally:
        curr.commit()

In [12]:
def run_model(lst):
    for model in lst:
        populatemodel(curr, model)

In [13]:
### connect to DB
server = '<your rds instance>' 
database = 'arealtcs' 
username = '<your username>' 
password = '<your password>'
conn = connect_to_sqldb(server, database, username, password)

Connected to SQL


In [14]:
curr = conn.cursor()

In [15]:
create_staging(curr)

In [16]:
append_staging_bars(curr, df_bars)

In [17]:
append_staging_beers(curr, df_beers)

In [18]:
append_staging_visits(curr, df_visits)

In [19]:
modellist = ["model.LoadBars", "model.LoadDrinkType", "model.LoadDrinks", "model.LoadDrinkPrice", "model.LoadVisits"]
run_model(modellist)