# Data vault of Northwind data base

## Setup

In [2]:
import pandas as pd
import datetime as dt
import hashlib
import sqlite3
import re

In [4]:
# configs
connection_string = "/Users/deniel.horvatic/repository/dataengineer/awesome-dataengineer-masterclass/06_data_vault/data/raw/northwind.db"
# TODO2 get source tables from db
source_tables = ["customers", "shippers"] # TODO1 Add more  

In [6]:
# DB helper
def get_db():
    db = sqlite3.connect(connection_string)
    return db

def get_cursor(db):
    #query_in_memory_db = "ATTACH DATABASE ':memory:' AS memdb1"
    # query_in_memory_db = "ATTACH DATABASE 'file::memory:?cache=shared' AS aux1";
    cursor = db.cursor()
    # cursor.execute(query_in_memory_db)
    print("Connection established!")
    return cursor

def close_connection(cursor):
    cursor.close()
    print("Connection closed!")
    print("******************")

def get_all_tables(cursor) -> list:
    query = f""" 
    SELECT name 
    FROM sqlite_master 
    WHERE type='table'; """
    cursor.execute(query)
    return cursor.fetchall()

# def get_column_names(table, cursor):
#     query = f""" 
#     SELECT name 
#     FROM sqlite_master 
#     WHERE type='table'; """
#     cursor.execute(query)
#     return cursor.fetchall()

def get_column_names(table: str, db) -> list:
    """returns a list of the column names of a table from a SQL database
    if index was created properly"""
    df_column = pd.read_sql(f'PRAGMA table_info({table});', con=db)
    return df_column['name'].to_list()
    
def get_primary_keys(table_name, db):
    try:
        table_idx = pd.read_sql(f"PRAGMA index_list({table_name});", con=db)
        primary_key_var = (table_idx.loc[table_idx['origin'] == 'pk']['name'][0])
        primary_key_row = pd.read_sql(f"PRAGMA index_info({primary_key_var});", con=db)
        primary_key = primary_key_row['name'][0]
        print("Get primary key:",primary_key)
        return primary_key
    except IndexError:
        return f"Table 'index_info' not found, can't get primary keey of {table_name}"

def get_foreign_keys(table_name, cursor): 
    try:
        query = f""" PRAGMA foreign_key_list({table_name})"""
        table_idx = cursor.execute(query)
        rows = table_idx.fetchall()
        keys = list(map(lambda x: x[3],rows))
        return keys
    except IndexError:
        return f"Table 'index_info' not found, can't get foreign keys of {table}"

def generate_hash_key(DFrow, WSreplace="-"):
#     convert to STRING
    DFrow = DFrow.astype("string")
#     trim fields
    DFrow = DFrow.str.strip()
#     UPPERCASE
    DFrow = DFrow.str.upper()
#     replace whitespace characters -> WSreplace
    DFrow = DFrow.str.replace("\s", WSreplace, regex=True)
#     replace NULL -> ""
#     delimit upon join usind "_"
    DFrow = DFrow.str.cat(sep='_', na_rep='#')
    m = hashlib.sha256()
    m.update(DFrow.encode('utf-8'))
    return m.hexdigest()


# def generate_hash_key(string_to_hash) -> str:
#     """function takes a string, converts every character to uppercase, replaces blankspaces and calculates MD5 of the result"""
#     print(f"Keys to hash: {string_to_hash}")
#     cleaned_string = string_to_hash.upper().replace(" ", "").encode("utf8")
#     md5hash = hashlib.md5(cleaned_string).hexdigest()
#     return md5hash

# create table
def create_table(table_name,attributes, cursor):
    try:
        cursor.execute(f"""CREATE TABLE {table_name} ({attributes})""")
    except:
        pass

# drop table if exists
# create table
def drop_table(table_name,cursor):
    try:
        cursor.execute(f"""DROP TABLE IF EXISTS {table_name}""")
    except:
        pass

def insert(table_name,db):
    cursor.execute(f"""INSERT INTO {table_name} (test) values(1)""")



## Hubs
Contains set of business keys.
Less likely to change.

In [8]:
class Hub: 
    def __init__(self,HUB_name, source_table, db, source_db_path):
        # self.sources = {} # TODO 3 do we need multiple sources 
        self.HUB_name = HUB_name
        self.source_db_path = source_db_path
        self.db = db
        self.df = None
        self.cursor = get_cursor(db)
        self.add_source(source_table)
        close_connection(self.cursor)

    def add_source(self, source_table):
        print(f"Get data from source '{source_table}'")
        # read raw data
        self.df = pd.read_sql(f"""SELECT * FROM {source_table};""", con=self.db)
        # generate columns
        primary_key = get_primary_keys(source_table, db)
        self.df.rename(columns = {primary_key: "BK"}, inplace=True)
        self.df['RS'] = f"{self.source_db_path}_{source_table}" 
        self.df['LDTS'] = dt.datetime.now()
        self.df['HK'] = self.df.apply(
            lambda row: generate_hash_key(row[['BK','RS']]), axis=1
        ) if not self.df.empty else []
        self.df.set_index('HK')
        drop_table(self.HUB_name, self.cursor)
        self.df[['HK', 'BK', 'RS', 'LDTS']].to_sql(
            name=self.HUB_name,
            con = self.db,
            if_exists = "replace",
            index = False
        )
        print(f"""Table {self.HUB_name} created.""")

# Links

In [10]:
class Link:
    def __init__(self, LINK_name, origin0, origin0_FK, origin1, origin1_FK, db, source_db_path):
        # self.sources = {} # TODO 3 do we need multiple sources 
        self.LINK_name = LINK_name
        self.source_db_path = source_db_path
        self.db = db
        self.RS = 0
        self.cursor = get_cursor(db)
        self.setup(origin0, origin0_FK, origin1, origin1_FK)
        close_connection(self.cursor)
        
    def setup(self, origin0, origin0_FK, origin1, origin1_FK):
        print(f"Get data from source '{origin0}'")
        hub0 = f"HUB_{origin0}"
        hub1 = f"HUB_{origin1}"
        query_foreign_key = f"""PRAGMA foreign_key_list({origin0})"""
        query_read = f""" 
            SELECT {origin0_FK},{origin1_FK}
            FROM {origin0}
        """
        foreign_key_tables = self.cursor.execute(query_foreign_key).fetchall()
        for table in foreign_key_tables:
            if table[2] == origin1:
                origin0_fk_to_origin1 = table[3]
        self.df = pd.read_sql(query_read,con=self.db)

        self.RS = self.RS + 1
        self.df.loc[:,'RS'] = self.RS
        self.df.loc[:,f"HK_{origin0}"] = self.df.apply(
            lambda row: generate_hash_key(row[[origin0_FK,'RS']]), axis=1
        ) 
        self.df.loc[:, f"HK_{origin1}"] = self.df.apply(
            lambda row: generate_hash_key(row[[origin1_FK,'RS']]), axis=1
        )
        self.df.drop(columns=[origin0_FK, origin1_FK], inplace=True)
        self.df.loc[:,'HK'] = self.df.apply(
            lambda row: generate_hash_key(row[['RS', f"HK_{origin0}", f"HK_{origin1}"]]), axis = 1
        )
        self.df.loc[:,'LDTS'] = dt.datetime.now()
        self.df.set_index('HK')
        # DF to Table
        drop_table(self.LINK_name, self.cursor)
        self.df[['HK',
                 f'HK_{origin0}',
                 f'HK_{origin1}',
                 'RS',
                 'LDTS']].to_sql(
            name = self.LINK_name,
            con = self.db,
            if_exists='replace',
            index = False)
        print(f"""Table {self.LINK_name} created.""")

# Satelites

In [12]:
class Satelite:
    def __init__(self, SATELITE_name, source_table, db, source_db_path):
        self.SATELITE_name = SATELITE_name    
        self.source_db_path = source_db_path
        self.cursor = get_cursor(db)
        self.source_table = source_table     
        self.col_names = get_column_names(source_table, db)
        self.df = None
        self.create_table(source_table, self.col_names, db)
        close_connection(self.cursor)

    def create_table(self, source_table, column_names, db):
        drop_table(self.SATELITE_name, self.cursor)
        # TODO 1 delete
        # query_generate_hub_table = f"""
        #     CREATE TABLE {self.SATELITE_name} ( 
        #         hash_key STRING NOT NULL PRIMARY KEY UNIQUE, 
        #         hk_{self.source_table} STRING NOT NULL REFERENCES {self.source_table} (hash_key), 
        #         load_datetime_stamp DATETIME NOT NULL, 
        #         end_datetime_stamp DATETIME NOT NULL, 
        #         record_source STRING NOT NULL, 
        #         hash_diff STRING NOT NULL
        #     )
        # """
        # self.cursor.execute(query_generate_hub_table)
        df_source = pd.read_sql(f'SELECT * FROM {source_table};', con=db)
        primary_key = get_primary_keys(source_table, db)
        # creates new DF from source df 
        self.df = df_source[primary_key].to_frame(name=f"HK_{source_table}")
        
        # creates LDTS and EDTS(12 hours later) column
        now = dt.datetime.now()
        load_time = now.strftime("%Y/%m/%d %H:%M:%S")  # LDTS
        self.df["LDTS"] = load_time

        end = now + dt.timedelta(hours=12)
        end_time = end.strftime("%Y/%m/%d %H:%M:%S")  # EDTS
        self.df["EDTS"] = end_time
        
        # creates RS column
        self.df["RS"] = 1
        
        # creates HK_source_table with hash
        self.df[f"HK_{source_table}"] = self.df.apply(lambda row : str(row[f"HK_{source_table}"])+ '_' + str(row["RS"]), axis=1)
        self.df[f"HK_{source_table}"] = self.df.apply(lambda row: generate_hash_key(row[[f"HK_{source_table}"]], '-'), axis=1)

        # creates HK column for HSAT with hash
        self.df["HK"] = self.df.apply(lambda row : str(row[f"HK_{source_table}"])+ '_' + str(row["RS"])+'_'+str(row["LDTS"]), axis=1)
        self.df["HK"] = self.df.apply(lambda row: generate_hash_key(row[["HK"]]), axis=1)
        
        # create attribute columns
        attributes = list(column_names)
        for attribute in attributes:
            self.df[f"{attribute}"] = df_source[attribute]
            
        # create Hash_Diff
        attribute_count = len(column_names)
        self.df["HD"] = ""
        
        for i in range(1, attribute_count):
            self.df["HD"] = str(self.df["HD"]) +"_"+ str(self.df.iloc[:,-i])        
        
        self.df["HD"] = self.df.apply(lambda row: generate_hash_key(row[["HD"]]), axis=1)

        # column allocation (HK -> 0, HD -> 5)
        first_column = self.df.pop("HK")
        self.df.insert(0, "HK", first_column)
        fifth_col = self.df.pop("HD")
        self.df.insert(5, "HD", fifth_col)
        
        # load df as a table to SQL
        self.df.to_sql(self.SATELITE_name, con=db, if_exists='replace',  index=False,
                    dtype={"HK": "TEXT NOT NULL PRIMARY KEY", f"HK_{source_table}":"TEXT", "LDTS": "DATETIME", "EDTS": "DATETIME",
                            "RS": "INTEGER", "HD": "TEXT", f"{column_names}":"TEXT"})
        
        # df_HSAT = pd.read_sql(f'SELECT * FROM {SATELITE_name};', con=db)
        # return df_HSAT.head()

        print(f"Table '{self.SATELITE_name}' created.")

In [14]:
db = get_db()
for hub in ['categories',
            'customer_demographics',
            'customers',
            'employees',
            'suppliers',
            'products',
            'region',
            'shippers',
            'orders',
            'territories']:
    hub_name = f"HUB_{hub}"
    tmp = Hub(hub_name,hub, db, connection_string)

Connection established!
Get data from source 'categories'
Get primary key: category_id
Table HUB_categories created.
Connection closed!
******************
Connection established!
Get data from source 'customer_demographics'
Get primary key: customer_type_id
Table HUB_customer_demographics created.
Connection closed!
******************
Connection established!
Get data from source 'customers'
Get primary key: customer_id
Table HUB_customers created.
Connection closed!
******************
Connection established!
Get data from source 'employees'
Get primary key: employee_id
Table HUB_employees created.
Connection closed!
******************
Connection established!
Get data from source 'suppliers'
Get primary key: supplier_id
Table HUB_suppliers created.
Connection closed!
******************
Connection established!
Get data from source 'products'
Get primary key: product_id
Table HUB_products created.
Connection closed!
******************
Connection established!
Get data from source 'region'


In [16]:
LINK_territories_region = Link('LINK_territories_region','territories', 'territory_id',  'region', 'region_id', db, connection_string)
LINK_employees_territories = Link('LINK_employees_territories', 'employee_territories','employee_id', 'territories','territory_id', db, connection_string)
LINK_employees_employees = Link('LINK_employees_employees', 'employees','employee_id', 'employees_a', 'reports_to', db, connection_string)
LINK_employees_orders = Link('LINK_employees_orders', 'orders','order_id', 'employees','employee_id', db, connection_string)
LINK_shippers_orders = Link('LINK_shippers_orders', 'orders','order_id', 'shippers','ship_via',  db, connection_string)
LINK_customers_orders = Link('LINK_customers_orders', 'orders','order_id', 'customers','customer_id', db, connection_string)
LINK_customer_customer_demo = Link('LINK_customer_customer_demo', 'customer_customer_demo','customer_type_id', 'customers','customer_id', db, connection_string)
LINK_order_details = Link('LINK_order_details', 'order_details','order_id', 'products','product_id', db, connection_string)
LINK_suppliers_products = Link('LINK_suppliers_products', 'products','product_id','suppliers','supplier_id', db, connection_string)
LINK_categories_products = Link('LINK_categories_products', 'products','product_id','categories','category_id', db, connection_string)

Connection established!
Get data from source 'territories'
Table LINK_territories_region created.
Connection closed!
******************
Connection established!
Get data from source 'employee_territories'
Table LINK_employees_territories created.
Connection closed!
******************
Connection established!
Get data from source 'employees'
Table LINK_employees_employees created.
Connection closed!
******************
Connection established!
Get data from source 'orders'
Table LINK_employees_orders created.
Connection closed!
******************
Connection established!
Get data from source 'orders'
Table LINK_shippers_orders created.
Connection closed!
******************
Connection established!
Get data from source 'orders'
Table LINK_customers_orders created.
Connection closed!
******************
Connection established!
Get data from source 'customer_customer_demo'
Table LINK_customer_customer_demo created.
Connection closed!
******************
Connection established!
Get data from source

In [18]:
HSAT_territories = Satelite('HSAT_territories', 'territories', db, connection_string)
HSAT_region = Satelite('HSAT_region', 'region', db, connection_string)
HSAT_orders = Satelite('HSAT_orders', 'orders', db, connection_string)
HSAT_shippers = Satelite('HSAT_shippers', 'shippers', db, connection_string)
HSAT_customer_demographics = Satelite('HSAT_customer_demographics', 'customer_demographics', db, connection_string)
HSAT_customers = Satelite('HSAT_customers', 'customers', db, connection_string)
HSAT_categories = Satelite('HSAT_categories', 'categories', db, connection_string)
# LSAT_order_details = Satelite('LSAT_order_details', 'order_details', db, connection_string)
HSAT_employees = Satelite('HSAT_employees', 'employees', db, connection_string)
HSAT_suppliers = Satelite('HSAT_suppliers', 'suppliers', db, connection_string)
HSAT_roducts = Satelite('HSAT_products', 'products', db, connection_string)


Connection established!
Get primary key: territory_id
Table 'HSAT_territories' created.
Connection closed!
******************
Connection established!
Get primary key: region_id
Table 'HSAT_region' created.
Connection closed!
******************
Connection established!
Get primary key: order_id
Table 'HSAT_orders' created.
Connection closed!
******************
Connection established!
Get primary key: shipper_id
Table 'HSAT_shippers' created.
Connection closed!
******************
Connection established!
Get primary key: customer_type_id
Table 'HSAT_customer_demographics' created.
Connection closed!
******************
Connection established!
Get primary key: customer_id
Table 'HSAT_customers' created.
Connection closed!
******************
Connection established!
Get primary key: category_id
Table 'HSAT_categories' created.
Connection closed!
******************
Connection established!
Get primary key: employee_id
Table 'HSAT_employees' created.
Connection closed!
******************
Connecti