In [1]:
import yaml
import pandera as pa

import pandas as pd
from utils.utility import get_cocktail_by_glass, extract_and_validate, generate_date_dim
# from utils.sql import transaction_table, glass_table, bar_table, cocktail_table, date_table
from utils.data_extractor import APIExtractor, CSVExtractor



In [2]:
config_path = "config.yaml"
config = yaml.safe_load(open("config.yaml"))

db_config = config["DATABASE"]
csv_config = config["CSV"]
api_config = config["API"]

In [3]:
from utils.utility import trainsaction_schema, bar_stock_schema

In [12]:
bar_stock_param = csv_config["bar_stock"]
drink_param = api_config["cocktail"]
glass_param = api_config["glass"]
bar_param = csv_config["bar_stock"]

In [14]:
glass_df = extract_and_validate(parameters=glass_param, extract_func=APIExtractor)
glass_list = glass_df["glass"].tolist()
cocktail_df = get_cocktail_by_glass(parameters=drink_param, glass_list=glass_list, extract_func=APIExtractor)
stock_df = extract_and_validate(parameters=bar_stock_param, extract_func=CSVExtractor, schema=bar_stock_schema)

In [84]:
for x in transaction_df.drink.unique():
    if x not in cocktail_df.drink.unique():
        print(x)

Valencia Cocktail
Orgasm
Sidecar
Zimadori Zinger
Thai Coffee
Sweet Tooth
Winter Rita
Porto Flip
Royal Gin Fizz
Vodka And Tonic
Tuxedo Cocktail
Paradise
Snowball
Turf Cocktail
Scooter
Vodka Martini
Zorbatini
White Lady
Victor
Waikiki Beachcomber
Yoghurt Cooler
Texas Rattlesnake
Russian Spring Punch
Sea Breeze
Winter Paloma
Yellow Bird
Thai Iced Coffee
Quentin
Ziemes Martini Apfelsaft
Passion Fruit Martini
Queen Bee
Vermouth Cassis


In [39]:
c = cocktail_df[['drink']].drop_duplicates().sort_values(by=['drink'])
d =transaction_df[['drink']].drop_duplicates().sort_values(by=['drink'])

In [86]:
c[c.drink=="Valencia Cocktail"] , d[d.drink=="Valencia Cocktail"]

(Empty DataFrame
 Columns: [drink]
 Index: [],
                drink
 5  Valencia Cocktail)

In [82]:
f = pd.DataFrame()
f["c"] = c[i:i+20].reset_index(drop=True)
f["d"] = d[j:j+20].reset_index(drop=True) 
f

Unnamed: 0,c,d
0,Bee'S Knees,Bellini
1,Belgian Blue,Bible Belt
2,Bellini,Big Red
3,Bellini Martini,Bijou
4,Bermuda Highball,Bloody Mary
5,Berry Deadly,Bob Marley
6,Between The Sheets,Bora Bora
7,Bible Belt,Boston Sour
8,Big Red,Boxcar
9,Bijou,Bramble


In [50]:
d

Unnamed: 0,drink
231,A. J.
486,Ace
53,Adam
296,Addison
116,Affair
...,...
9,Zimadori Zinger
62,Zippy'S Revenge
137,Zizi Coin-Coin
552,Zoksel


In [None]:
transaction_df = pd.DataFrame()
for param in csv_config["transactions"]:
    tmp = extract_and_validate(parameters=param, extract_func=CSVExtractor, schema=trainsaction_schema)
    tmp["location"] = param["name"]
    transaction_df = pd.concat([transaction_df, tmp], axis=0, ignore_index=True)

In [4]:
from utils.utility import trainsaction_schema, bar_stock_schema

In [5]:
import os
import yaml
import pandas as pd
import logging
from sqlalchemy import create_engine
# from utils.utility import custom_logger
from utils.utility import get_cocktail_by_glass, extract_and_validate, generate_date_dim
from sql import glass_insert, cocktail_insert, stock_insert, transaction_insert, bar_insert
from sql import bar_table, glass_table, cocktail_table, stock_table, transaction_table, date_table
from utils.utility import trainsaction_schema, bar_stock_schema
from utils.data_extractor import APIExtractor, CSVExtractor

# initialize the custom logger
# logger = custom_logger(level="INFO", filename="data_pipeline.log")
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def load_config(config_path):
    with open(config_path, "r") as file:
        return yaml.safe_load(file)

def create_db_connection(config):
    db_user = os.environ["DB_USER"]
    db_password = os.environ["DB_PASSWORD"]
    db_host = os.environ["DB_HOST"]
    db_port = os.environ["DB_PORT"]
    db_name = config["database"]
    return create_engine(f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}")

def create_tables(conn):
    logger.info("Creating tables")
    conn.execute(bar_table)
    conn.execute(glass_table)
    conn.execute(cocktail_table)
    conn.execute(stock_table)
    conn.execute(date_table)
    conn.execute(transaction_table)
    logger.info("Tables creation completed")

def load_to_stage(df, con, table_name):
    if not df.empty:
        logger.info(f"Loading data into {table_name}")
        df.to_sql(name=table_name, con=con, if_exists="replace", index=False)
        logger.info(f"Loading data into {table_name} completed")    

def load_to_report(query, conn):
    logger.info(f"Loading data into {query}")
    conn.execute(query)
    logger.info(f"Loading data into {query} completed")

def load_data_to_staging(db_config, api_config, csv_config, conn):
    logger.info("Running load_data_to staging")
    
    # extract glass data from API
    glass_param = api_config["glass"]
    glass_table = db_config["glass_table_stage"]
    glass_df = extract_and_validate(parameters=glass_param, extract_func=APIExtractor)
    load_to_stage(glass_df, conn, glass_table)
    
    # cocktail data from API
    drink_param = api_config["cocktail"]
    drink_table = db_config["cocktail_table_stage"]
    glass_list = glass_df["glass"].unique().tolist()
    cocktail_df = get_cocktail_by_glass(parameters=drink_param, glass_list=glass_list, extract_func=APIExtractor)
    load_to_stage(cocktail_df, conn, drink_table)
    
    # bar stock data from csv
    bar_stock_param = csv_config["bar_stock"]
    stock_table = db_config["stock_table_stage"]
    stock_df = extract_and_validate(parameters=bar_stock_param, extract_func=CSVExtractor, schema=bar_stock_schema)
    load_to_stage(stock_df, conn, stock_table)
    
    # extract transaction data from csv validate and load into staging table
    transaction_table = db_config["transaction_table_stage"]
    transaction_df = pd.DataFrame()
    for param in csv_config["transactions"]:
        tmp = extract_and_validate(parameters=param, extract_func=CSVExtractor, schema=trainsaction_schema)
        tmp["location"] = param["name"]
        transaction_df = pd.concat([transaction_df, tmp], axis=0, ignore_index=True)
    load_to_stage(transaction_df, conn, transaction_table)

    # this will be a one time process as the date dimension table will be static
    if db_config["initial_load"]:
        date_table = db_config["date_table"]
        date_df = generate_date_dim(start_date="2020-01-01", end_date="2030-12-31", freq="H")
        load_to_stage(date_df, conn, date_table)

def update_report_tables_from_staging(db_config, conn):
    logger.info("Updating report tables from staging")
    # update date bar dimension
    bar_sql = bar_insert.format(temp_table=db_config["stock_table_stage"])
    conn.execute(bar_sql)

    # update glasses table
    glass_temp_table = db_config["glass_table_stage"]
    load_to_report(glass_insert.format(temp_table=glass_temp_table), conn)
    
    # update cocktail table
    cocktail_temp_table = db_config["cocktail_table_stage"]
    load_to_report(cocktail_insert.format(temp_table=cocktail_temp_table), conn)

    # update stock table
    stock_temp_table = db_config["stock_table_stage"]
    load_to_report(stock_insert.format(temp_table=stock_temp_table), conn)
    
    # update transaction table
    transaction_temp_table = db_config["transaction_table_stage"]
    load_to_report(transaction_insert.format(temp_table=transaction_temp_table), conn)
    
    logger.info("Report tables update completed")
    
config = load_config("config.yaml")
csv_config = config["CSV"]
api_config = config["API"]
db_config = config["DATABASE"]
engine = create_db_connection(db_config)
    
# def main():
#     logger.info("ETL process started")
#     config = load_config("config.yaml")
#     csv_config = config["CSV"]
#     api_config = config["API"]
#     db_config = config["DATABASE"]
#     engine = create_db_connection(db_config)
#     conn = engine.connect()
#     if db_config["initial_load"]:
#         create_tables(conn)
#     load_data_to_staging(db_config, api_config, csv_config, conn)
#     update_report_tables_from_staging(db_config, conn)
#     conn.close()
#     logger.info("ETL process completed")

# if __name__ == "__main__":
#     main()

In [281]:
bar_data_insert = """INSERT INTO bars (name) VALUES (:1);"""

glass_data_insert = """
INSERT INTO glasses (name)
VALUES (?);
"""

In [221]:
session.query(Glass).delete()
session.commit()

for i in glass_df.itertuples(index=False):
    tmp = Glass(**i._asdict())
    session.add(tmp)

In [131]:
drink_df.head()

Unnamed: 0,glass,drink
0,Highball glass,57 Chevy with a White License Plate
1,Highball glass,747 Drink
2,Highball glass,A Day at the Beach
3,Highball glass,A Furlong Too Late
4,Highball glass,A Night In Old Mandalay


In [132]:
glass_df.head()

Unnamed: 0,glass
0,Highball glass
1,Cocktail glass
2,Old-fashioned glass
3,Whiskey Glass
4,Collins glass


In [207]:
from sqlalchemy import Column, Integer, String, ForeignKey, DateTime,Sequence
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class Bar(Base):
    __tablename__ = 'bars'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    UniqueConstraint('name')

class BarStock(Base):
    __tablename__ = 'bar_stock'
    id = Column(Integer, primary_key=True)
    bar_id = Column(Integer, ForeignKey('bars.id'))
    glass_id = Column(Integer, ForeignKey('glasses.id'))
    stock = Column(Integer)
    
class Transaction(Base):
    __tablename__ = 'transactions'
    id = Column(Integer, primary_key=True)
    drink_id = Column(Integer, ForeignKey('cocktail.id'))
    bar_id = Column(Integer, ForeignKey('bars.id'))
    date = Column(DateTime, nullable=False)
    amount = Column(Integer)
    
class Glass(Base):
    __tablename__ = 'glasses'
    id = Column(Integer, primary_key=True, autoincrement=True, nullable=False)
    glass = Column(String)
    UniqueConstraint('glass')
    Sequence("seq",start=1001, increment=1)

class Cocktail(Base):
    __tablename__ = 'cocktail'
    id = Column(Integer, primary_key=True)
    glass_id = Column(Integer, ForeignKey('glasses.id'))
    cocktail = Column(String, nullable=False)
    UniqueConstraint('cocktail', 'glass_id')

In [None]:
transaction_df = pd.DataFrame()
for param in csv_config["transactions"]:
    tmp = extract_and_validate(parameters=param, extract_func=CSVExtractor, schema=trainsaction_schema)
    tmp["location"] = param["name"]
    transaction_df = pd.concat([transaction_df, tmp], axis=0, ignore_index=True)

In [6]:
glass_param = api_config["glass"]
glass_df = extract_and_validate(parameters=glass_param, extract_func=APIExtractor)

In [15]:
config = yaml.safe_load(open('config.yaml'))
csv_transactions = config['CSV']['transactions']
csv_bars = config['CSV']['bar']
api_config = config['API']

In [24]:
pd.read_csv(**csv_bars["pandas_kwargs"])[["bar"]].drop_duplicates()

Unnamed: 0,bar
0,budapest
1,london
2,new york


In [None]:
DATABASE = "qs_warehouse"

In [None]:
class ExtractData:
    def __init__(self):
        pass
    
class TransformData:
    def __init__(self):
        pass
        
    def transform_data(self):
        pass

class LoadData:
    def __init__(self):
        pass

In [5]:

# transaction_df.to_sql(name=transaction_table, con=engine, if_exists="append", index=False)

# generate date dimension table with a grain of 1 hour
# this will be a one time process as the date dimension table will be static
# date_table = db_config["date_table"]
# date_df = generate_date_dim(start_date="2020-01-01", end_date="2020-12-31", freq="H")
# date_df.to_sql(name=date_table, con=engine, if_exists="append", index=False)

In [None]:
# import yaml
# config = yaml.safe_load(open("test.yaml"))
# conn = psycopg2.connect(database="****", user="****", password="****", host="****", port="****")
# conn.set_session(autocommit=True)
# cur = conn.cursor()
# engine = create_engine('postgresql://user:pass@localhost/db', echo=True)
# cur.execute(f"DROP DATABASE IF EXISTS {DB_NAME}")
# cur.execute(f"CREATE DATABASE {DB_NAME} WITH ENCODING 'utf8' TEMPLATE template0")

In [31]:
from sqlalchemy import create_engine

In [32]:
from sqlalchemy import (
    create_engine, 
    Table, 
    Column, )

In [None]:
export DB_USER="hardey"
export DB_PASSWORD="root"
export DB_HOST="localhost"
export DB_PORT="5432"

In [30]:
engine = create_engine('postgresql://user:pass@localhost/db', echo=True)

NameError: name 'create_engine' is not defined

In [28]:
import yaml
config = yaml.safe_load(open("config.yaml"))

In [1]:
import logging

In [2]:
log = logging.getLogger(__name__)
log.setLevel("INFO")

In [4]:
log.__dict__

{'filters': [],
 'name': '__main__',
 'level': 20,
 'propagate': True,
 'handlers': [],
 'disabled': False,
 '_cache': {},
 'manager': <logging.Manager at 0x7f9acb41ad00>}

In [50]:
try:
    2 / 0
except ZeroDivisionError as e:
    logging.error(e, exc_info=True)
    # raise e
except Exception as e:
    print(e)

print("hello")

ERROR:root:division by zero
Traceback (most recent call last):
  File "/var/folders/98/116rntps01j3zcbq56ks9h3r0000gn/T/ipykernel_26747/3645587077.py", line 2, in <module>
    2 / 0
ZeroDivisionError: division by zero


hello


In [None]:
DimGlass
-
GlassID PK int
Name varchar(200) 

DimBar
-------
BarID PK int
Name varchar(200) 

FactTransactions
----
TransactionID PK 
BarID FK >- DimBar.BarID
CockTailID FK >- DimCockTail.CockTailID
Amount 
Date


DimCockTail
------------
CockTailID PK int
Name varchar(200) 
Amount


DimDate
----------
DateID PK int