In [25]:
import pandas as pd
from secrets_config import source_db_user, source_db_password, source_db_server_name, source_db_database_name
import jinja2 as j2 

# import libraries for sql 
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, Float, JSON 
from sqlalchemy.engine import URL
from sqlalchemy.dialects import postgresql

In [26]:
# create connection to database 
source_connection_url = URL.create(
    drivername = "postgresql+pg8000", 
    username = source_db_user,
    password = source_db_password,
    host = source_db_server_name, 
    port = 5432,
    database = source_db_database_name, 
)

source_engine = create_engine(source_connection_url)

In [27]:
import os 
import logging 
import datetime as dt 
import numpy as np

def get_incremental_value(table_name, path="extract_log"):
    df = pd.read_csv(f"{path}/{table_name}.csv")
    return df[df["log_date"] == df["log_date"].max()]["incremental_value"].values[0]

def is_incremental(table:str, path:str)->bool:
    # read sql contents into a variable 
    with open(f"{path}/{table}.sql") as f: 
        raw_sql = f.read()
    try: 
        config = j2.Template(raw_sql).make_module().config 
        return config["extract_type"].lower() == "incremental"
    except:
        return False

def upsert_incremental_log(log_path, table_name, incremental_value)->bool:
    if f"{table_name}.csv" in os.listdir(log_path):
        df_existing_incremental_log = pd.read_csv(f"{log_path}/{table_name}.csv")
        df_incremental_log = pd.DataFrame(data={
            "log_date": [dt.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")], 
            "incremental_value": [incremental_value]
        })
        df_updated_incremental_log = pd.concat([df_existing_incremental_log,df_incremental_log])
        df_updated_incremental_log.to_csv(f"{log_path}/{table_name}.csv", index=False)
    else: 
        df_incremental_log = pd.DataFrame(data={
            "log_date": [dt.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")], 
            "incremental_value": [incremental_value]
        })
        df_incremental_log.to_csv(f"{log_path}/{table_name}.csv", index=False)
    return True 

def extract_from_database(table_name, engine, path="extract_queries")->pd.DataFrame:
    """
    Builds models with a matching file name in the models_path folder. 
    - `table_name`: the name of the table (without .sql)
    - `path`: the path to the extract queries directory containing the sql files. defaults to `extract_queries`
    """
    logging.basicConfig(level=logging.INFO, format="[%(levelname)s][%(asctime)s]: %(message)s")
    
    logging.info(f"Extracting table: {table_name}")
    if f"{table_name}.sql" in os.listdir(path):
        # read sql contents into a variable 
        with open(f"{path}/{table_name}.sql") as f: 
            raw_sql = f.read()
        # get config 
        config = j2.Template(raw_sql).make_module().config 
        
        if is_incremental(table=table_name, path=path): 
            incremental_path = "extract_log"
            if not os.path.exists(incremental_path): 
                os.mkdir(incremental_path)
            if f"{table_name}.csv" in os.listdir(incremental_path):
                # get incremental value and perform incremental extract 
                current_max_incremental_value = get_incremental_value(table_name, path=incremental_path)
                parsed_sql = j2.Template(raw_sql).render(source_table = table_name, engine=engine, is_incremental=True, incremental_value=current_max_incremental_value)
                # execute incremental extract
                df = pd.read_sql(sql=parsed_sql, con=engine)
                # update max incremental value 
                if len(df) > 0: 
                    max_incremental_value = df[config["incremental_column"]].max()
                else: 
                    max_incremental_value = current_max_incremental_value
                upsert_incremental_log(log_path=incremental_path, table_name=table_name, incremental_value=max_incremental_value)
                logging.info(f"Successfully extracted table: {table_name}, rows extracted: {len(df)}")
                return df 
            else: 
                # parse sql using jinja 
                parsed_sql = j2.Template(raw_sql).render(source_table = table_name, engine=engine)
                # perform full extract 
                df = pd.read_sql(sql=parsed_sql, con=engine)
                # store latest incremental value 
                max_incremental_value = df[config["incremental_column"]].max()
                upsert_incremental_log(log_path=incremental_path, table_name=table_name, incremental_value=max_incremental_value)
                logging.info(f"Successfully extracted table: {table_name}, rows extracted: {len(df)}")
                return df 
        else: 
            # parse sql using jinja 
            parsed_sql = j2.Template(raw_sql).render(source_table = table_name, engine=engine)
            # perform full extract 
            df = pd.read_sql(sql=parsed_sql, con=engine)
            logging.info(f"Successfully extracted table: {table_name}, rows extracted: {len(df)}")
            return df 
    else: 
        logging.error(f"Could not find table: {table_name}")

In [28]:
def get_key_columns(table:str, path:str="extract_queries")->list: 
    """
    get a list of key columns from the .sql file. key_columns have to be expressed as `{% set key_columns = ["keyA", "keyB"]%}` in the .sql file. 
    - `table`: name of the sql file without .sql 
    - `path`: path to the sql file 
    """
    # read sql contents into a variable 
    with open(f"{path}/{table}.sql") as f: 
        raw_sql = f.read()
    try: 
        key_columns = j2.Template(raw_sql).make_module().config["key_columns"] # get key columns 
        return key_columns
    except:
        return []

In [29]:
from sqlalchemy import Integer, String, Float, JSON , DateTime, Boolean, BigInteger, Numeric


def get_sqlalchemy_column(column_name:str , source_datatype:str, primary_key:bool=False)->Column:
    """
    A helper function that returns a SQLAlchemy column by mapping a pandas dataframe datatypes to sqlalchemy datatypes 
    """
    dtype_map = {
        "int64": BigInteger, 
        "object": String, 
        "datetime64[ns]": DateTime, 
        "float64": Numeric,
        "bool": Boolean
    }
    column = Column(column_name, dtype_map[source_datatype], primary_key=primary_key) 
    return column

def generate_sqlalchemy_schema(df: pd.DataFrame, key_columns:list, table_name, meta): 
    """
    Generates a sqlalchemy table schema that shall be used to create the target table and perform insert/upserts. 
    """
    schema = []
    for column in [{"column_name": col[0], "source_datatype": col[1]} for col in zip(df.columns, [dtype.name for dtype in df.dtypes])]:
        schema.append(get_sqlalchemy_column(**column, primary_key=column["column_name"] in key_columns))
    return Table(table_name, meta, *schema)


In [30]:
import numpy as np
def upsert_in_chunks(df:pd.DataFrame, engine, table_schema:Table, key_columns:list, chunksize:int=1000)->bool:
    """
    performs the upsert with several rows at a time (i.e. a chunk of rows). this is better suited for very large sql statements that need to be broken into several steps. 
    """
    max_length = len(df)
    df = df.replace({np.nan: None})
    for i in range(0, max_length, chunksize):
        if i + chunksize >= max_length: 
            lower_bound = i
            upper_bound = max_length 
        else: 
            lower_bound = i 
            upper_bound = i + chunksize
        insert_statement = postgresql.insert(table_schema).values(df.iloc[lower_bound:upper_bound].to_dict(orient='records'))
        upsert_statement = insert_statement.on_conflict_do_update(
            index_elements=key_columns,
            set_={c.key: c for c in insert_statement.excluded if c.key not in key_columns})
        logging.info(f"Inserting chunk: [{lower_bound}:{upper_bound}] out of index {max_length}")
        result = engine.execute(upsert_statement)
    return True 

def upsert_all(df:pd.DataFrame, engine, table_schema:Table, key_columns:list)->bool:
    """
    performs the upsert with all rows at once. this may cause timeout issues if the sql statement is very large. 
    """
    insert_statement = postgresql.insert(table_schema).values(df.to_dict(orient='records'))
    upsert_statement = insert_statement.on_conflict_do_update(
        index_elements=key_columns,
        set_={c.key: c for c in insert_statement.excluded if c.key not in key_columns})
    result = engine.execute(upsert_statement)
    logging.info(f"Insert/updated rows: {result.rowcount}")
    return True 

def upsert_to_database(df: pd.DataFrame, table_name: str, key_columns: str, engine, chunksize:int=1000)->bool: 
    """
    Upsert dataframe to a database table 
    - `df`: pandas dataframe 
    - `table`: name of the target table 
    - `key_columns`: name of key columns to be used for upserting 
    - `engine`: connection engine to database 
    - `chunksize`: if chunksize greater than 0 is specified, then the rows will be inserted in the specified chunksize. e.g. 1000 rows at a time. 
    """
    logging.basicConfig(level=logging.INFO, format="[%(levelname)s][%(asctime)s]: %(message)s")
    meta = MetaData()
    logging.info(f"Generating table schema: {table_name}")
    table_schema = generate_sqlalchemy_schema(df=df, key_columns=key_columns,table_name=table_name, meta=meta)
    meta.create_all(engine)
    logging.info(f"Table schema generated: {table_name}")
    logging.info(f"Writing to table: {table_name}")
    if chunksize > 0:
        upsert_in_chunks(df=df, engine=engine, table_schema=table_schema, key_columns=key_columns, chunksize=chunksize)
    else: 
        upsert_all(df=df, engine=engine, table_schema=table_schema, key_columns=key_columns)
    logging.info(f"Successful write to table: {table_name}")
    return True 

In [31]:
def overwrite_to_database(df: pd.DataFrame, table_name: str, engine)->bool: 
    """
    Upsert dataframe to a database table 
    - `df`: pandas dataframe 
    - `table`: name of the target table 
    - `engine`: connection engine to database 
    """
    logging.basicConfig(level=logging.INFO, format="[%(levelname)s][%(asctime)s]: %(message)s")
    logging.info(f"Writing to table: {table_name}")
    df.to_sql(name=table_name, con=engine, if_exists="replace", index=False)
    logging.info(f"Successful write to table: {table_name}, rows inserted/updated: {len(df)}")
    return True 

In [32]:
from secrets_config import target_db_user, target_db_password, target_db_server_name, target_db_database_name
# create connection to database 
target_connection_url = URL.create(
    drivername = "postgresql+pg8000", 
    username = target_db_user,
    password = target_db_password,
    host = target_db_server_name, 
    port = 5432,
    database = target_db_database_name, 
)

target_engine = create_engine(target_connection_url)

In [33]:
def extract_load_pipeline(source_engine, target_engine, path="extract_queries"): 
    for file in os.listdir(path):
        table_name = file.replace(".sql", "")
        df = extract_from_database(table_name=table_name, engine=source_engine, path=path)
        if is_incremental(table=table_name, path=path):
            key_columns = get_key_columns(table=table_name, path=path)
            upsert_to_database(df=df, table_name=table_name, key_columns=key_columns, engine=target_engine, chunksize=1000)
        else: 
            overwrite_to_database(df=df, table_name=table_name, engine=target_engine)

In [34]:
extract_load_pipeline(
    source_engine=source_engine, 
    target_engine=target_engine, 
    path="extract_queries"
)

[INFO][2022-07-24 18:04:37,277]: Extracting table: customer
[INFO][2022-07-24 18:04:37,362]: Successfully extracted table: customer, rows extracted: 599
[INFO][2022-07-24 18:04:37,365]: Writing to table: customer
[INFO][2022-07-24 18:04:37,789]: Successful write to table: customer, rows inserted/updated: 599
[INFO][2022-07-24 18:04:37,789]: Extracting table: film_category
[INFO][2022-07-24 18:04:37,828]: Successfully extracted table: film_category, rows extracted: 1000
[INFO][2022-07-24 18:04:37,830]: Writing to table: film_category
[INFO][2022-07-24 18:04:38,269]: Successful write to table: film_category, rows inserted/updated: 1000
[INFO][2022-07-24 18:04:38,270]: Extracting table: film
[INFO][2022-07-24 18:04:38,327]: Successfully extracted table: film, rows extracted: 1000
[INFO][2022-07-24 18:04:38,329]: Writing to table: film
[INFO][2022-07-24 18:04:38,981]: Successful write to table: film, rows inserted/updated: 1000
[INFO][2022-07-24 18:04:38,982]: Extracting table: staff
[INFO

In [35]:
import os 
import logging 

def build_model(model, engine, models_path="models")->bool:
    """
    Builds models with a matching file name in the models_path folder. 
    - `model`: the name of the model (without .sql)
    - `models_path`: the path to the models directory containing the sql files. defaults to `models`
    """
    logging.basicConfig(level=logging.INFO, format="[%(levelname)s][%(asctime)s]: %(message)s")
    
    if f"{model}.sql" in os.listdir(models_path):
        logging.info(f"Building model: {model}")
    
        # read sql contents into a variable 
        with open(f"{models_path}/{model}.sql") as f: 
            raw_sql = f.read()

        # parse sql using jinja 
        parsed_sql = j2.Template(raw_sql).render(target_table = model, engine=engine)

        # execute parsed sql 
        result = engine.execute(parsed_sql)
        logging.info(f"Successfully built model: {model}, rows inserted/updated: {result.rowcount}")
        return True 
    else: 
        logging.error(f"Could not find model: {model}")

In [6]:
# import TopologicalSorter
from graphlib import TopologicalSorter

In [7]:
# create a DAG of models using TopologicalSorter
ts = TopologicalSorter()
ts.add("staging_films")
ts.add("serving_sales_film", "staging_films")
ts.add("serving_films_popular", "staging_films")
ts.add("serving_sales_customer")
ts.add("serving_sales_cumulative")
dag = tuple(ts.static_order())
print(dag)

('staging_films', 'serving_sales_customer', 'serving_sales_cumulative', 'serving_sales_film', 'serving_films_popular')


In [38]:
# execute each node in the dag in order using a for loop 
for node in dag: 
    build_model(model=node, engine=target_engine, models_path="models/")

[INFO][2022-07-24 18:04:40,487]: Building model: staging_films
[INFO][2022-07-24 18:04:40,568]: Successfully built model: staging_films, rows inserted/updated: 958
[INFO][2022-07-24 18:04:40,569]: Building model: serving_sales_customer
[INFO][2022-07-24 18:04:40,618]: Successfully built model: serving_sales_customer, rows inserted/updated: 599
[INFO][2022-07-24 18:04:40,619]: Building model: serving_sales_cumulative
[INFO][2022-07-24 18:04:40,687]: Successfully built model: serving_sales_cumulative, rows inserted/updated: 14596
[INFO][2022-07-24 18:04:40,688]: Building model: serving_sales_film
[INFO][2022-07-24 18:04:40,717]: Successfully built model: serving_sales_film, rows inserted/updated: 958
[INFO][2022-07-24 18:04:40,719]: Building model: serving_films_popular
[INFO][2022-07-24 18:04:40,727]: Successfully built model: serving_films_popular, rows inserted/updated: 958
