In [1]:
import pyodbc
import sqlite3 as sql
import pandas as pd
import warnings
import numpy as np
from datetime import datetime
import os
from dotenv import load_dotenv

warnings.simplefilter('ignore')

Load in .env file for safe connection

In [2]:
class EnvConfig:
    def __init__(self):
        load_dotenv()
    
    def get(self, key: str) -> str:

        value = os.getenv(key)
        if not value:
            print(f"Warning: {key} not found in environment")
        return value
    
    def is_configured(self) -> bool:
        required_keys = ["DB_SERVER", "DB_NAME_SDM", "DB_NAME_DWH", "DB_USER", "DB_PASSWORD"]
        return all(self.get(key) for key in required_keys)

Safe connection

In [3]:
env = EnvConfig()

SDM_CONN_PATH = (
    f"DRIVER={{SQL Server}};"
    f"SERVER={env.get('DB_SERVER')},1433;"
    f"DATABASE={env.get('DB_NAME_SDM')};"
    f"UID={env.get('DB_USER')};"
    f"PWD={env.get('DB_PASSWORD')}"
)

DWH_CONN_PATH = (
    f"DRIVER={{SQL Server}};"
    f"SERVER={env.get('DB_SERVER')},1433;"
    f"DATABASE={env.get('DB_NAME_DWH')};"
    f"UID={env.get('DB_USER')};"
    f"PWD={env.get('DB_PASSWORD')}"
)

FROM_DB = pyodbc.connect(SDM_CONN_PATH)
SDM_cursor = FROM_DB.cursor()

TO_DB = pyodbc.connect(DWH_CONN_PATH)
DWH_cursor = TO_DB.cursor()

Dataframes

In [4]:
def create_dataframes_sql(connection):
    dictionary : dict = {}
    query = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE';"
    key = "TABLE_NAME"
    
    tables = pd.read_sql(query, connection)
    
    for table in tables[key].tolist():
        dictionary[table] = pd.read_sql(f"SELECT * FROM {table}", connection)
    
    return dictionary

sdm_frames = create_dataframes_sql(FROM_DB)

Query's

In [5]:
from typing import Iterable

def query_remove(table_name : str):
    try:
        query = f"DELETE FROM {table_name}"
        DWH_cursor.execute(query)
        DWH_cursor.commit()
    except pyodbc.Error as e:
        print(f"ERROR: {table_name}: {e}")
        return table_name
    
    return None

def right_type(value, column_name, types):
    dtype = types[column_name]
    
    if pd.isna(value):
        return "NULL"
    
    if dtype == "object" or dtype == "string":
        value = value.replace("'", "''")
        return f"'{value}'"
    
    return f"{value}"
    

def create_add_query(row, types):
    query = ""
    columns = list(row.keys())
    
    for pos in range(len(columns)):
        column_name = columns[pos]
        value = row[column_name]
        
        if (pos == len(columns) - 1):
            query += f"{column_name}) VALUES ("
        else:
            query += f"{column_name}, "
    
    data = list(row)
    
    for pos in range(len(data)):
        column_name = columns[pos]
        value = row[column_name]
        
        if pos == len(columns) - 1:
            query += f"{right_type(value, column_name, types)})"
        else:
            query += f"{right_type(value, column_name, types)}, "
    
    return query

def query_add(table_name : str, table_data : pd.DataFrame):
    queries = []
    types = table_data.dtypes

    for index, row in table_data.iterrows():
        query : str = f"INSERT INTO {table_name} ("
        query += create_add_query(row, types)
        queries.append(query)
    
    return queries

Maak het DWH leeg

In [6]:
def empty_dwh():
    tables_dwh : list = [
        "DimProduct",
        "DimCustomer",
        "DimEmployee",
        "DimDate",
        "DimTerritory",
        "FactSales",
    ]

    while len(tables_dwh) > 0:
        for table in tables_dwh:
            try:
                temp = query_remove(table)
                
                if (temp == None):
                    tables_dwh.remove(table)
                    print(f"REMOVED {table}")
                else:
                    print(f"NOT YET REMOVED: {table}")
            except pyodbc.Error as e:
                print(e)
        
    print("All items are removed")

empty_dwh()

ERROR: DimProduct: ('23000', '[23000] [Microsoft][ODBC SQL Server Driver][SQL Server]The DELETE statement conflicted with the REFERENCE constraint "FK__FactSales__Produ__4316F928". The conflict occurred in database "Datawarehouse_DEDS_P1", table "dbo.FactSales", column \'ProductID\'. (547) (SQLExecDirectW); [23000] [Microsoft][ODBC SQL Server Driver][SQL Server]The statement has been terminated. (3621)')
NOT YET REMOVED: DimProduct
ERROR: DimCustomer: ('23000', '[23000] [Microsoft][ODBC SQL Server Driver][SQL Server]The DELETE statement conflicted with the REFERENCE constraint "FK__FactSales__Custo__412EB0B6". The conflict occurred in database "Datawarehouse_DEDS_P1", table "dbo.FactSales", column \'CustomerID\'. (547) (SQLExecDirectW); [23000] [Microsoft][ODBC SQL Server Driver][SQL Server]The statement has been terminated. (3621)')
NOT YET REMOVED: DimCustomer
ERROR: DimEmployee: ('23000', '[23000] [Microsoft][ODBC SQL Server Driver][SQL Server]The DELETE statement conflicted with th

Datum converter

In [7]:
def convert_date(date : str):
    converted = pd.to_datetime(date)
    year = converted.year
    month = converted.month
    quarter = (month - 1 ) // 3 + 1
    converted = converted.strftime("%Y-%m-%d")
    
    return pd.DataFrame({
        "StartDate" : [converted], 
        "Year" : [year], 
        "Month" : [month], 
        "Quarter" : [quarter]}).astype({
            "Year": "Int64",
            "Month" : "Int64",
            "Quarter" : "Int64"
        })

Dataframes aanmaken

In [15]:
dates_dwh = pd.DataFrame({
    "StartDate" : [],
    "Year" : [],
    "Quarter" : [],
    "Month" : []
})

def create_territory():
    return sdm_frames["Territories"]


def create_product():
    global dates_dwh
    dim_product = pd.merge(sdm_frames["Production_Product"], sdm_frames["Sales_SalesOrderDetail"], left_on="ProductID", right_on="ProductID")

    sdm_frames["Suppliers"]["SupplierID"] = sdm_frames["Suppliers"]["SupplierID"].astype(str)
    dim_product["SupplierID"] = dim_product["SupplierID"].astype(str)

    dim_product = pd.merge(dim_product, sdm_frames["Suppliers"], left_on="SupplierID", right_on="SupplierID")
    dim_product = dim_product[["ProductID", "Name", "description", "Category", "Color", "UnitPrice", "UnitPriceDiscount", "CompanyName",
                               "quantity", "StandardCost"]].rename(columns={
                                   "StandardCost" : "Costs",
                                   "quantity" : "Quantity",
                                   "description" : "Description"
    })
    
    return dim_product

def create_employee():
    global dates_dwh
    dim_employee = sdm_frames["Employee"]
    dim_employee = dim_employee[["emp_id", "manager_id", "emp_lname", "Title", "start_date", "dept_id"]].rename(columns={
                                   "emp_id" : "EmpID",
                                   "emp_lname" : "EmpLName",
                                   "manager_id" : "ManagerID",
                                   "dept_id" : "DeptID",
                                   "start_date" : "StartDate"
    })

    for index, row in dim_employee.iterrows():
        date_details : pd.DataFrame = convert_date(row["StartDate"])
        dates_dwh = pd.concat([dates_dwh, date_details], ignore_index=True)
    
    return dim_employee

def create_customer():
    global dates_dwh
    dim_customer = pd.merge(sdm_frames["Customer"], sdm_frames["Sales_SalesOrderHeader"], left_on="id", right_on="CustomerID")
    dim_customer = dim_customer[["CustomerID", "fname", "lname", "address", "phone", "company_name", "SalesOrderID", "OrderDate",
                                 ]].rename(columns={
                                   "fname" : "Fname",
                                   "lname" : "Lname",
                                   "address" : "Address",
                                   "phone" : "Phone",
                                   "company_name" : "CompanyName"
    })

    for index, row in dim_customer.iterrows():
        date_details : pd.DataFrame = convert_date(row["OrderDate"])
        dates_dwh = pd.concat([dates_dwh, date_details], ignore_index=True)
    
    return dim_customer



territory_dwh, customer_dwh, employee_dwh, product_dwh = (
    create_territory(),
    create_customer(),
    create_employee(),
    create_product(),
)

def create_fact_sales ():
    global dates_dwh


    fact_sales = pd.merge(sdm_frames["Sales_SalesOrderDetail"], sdm_frames["Sales_SalesOrderHeader"], on="SalesOrderID")
    fact_sales = pd.merge(fact_sales, product_dwh, on="ProductID")
    fact_sales = pd.merge(fact_sales, customer_dwh, on="CustomerID")
    fact_sales = pd.merge(fact_sales, employee_dwh, left_on="SalesPersonID", right_on="EmpID")
    fact_sales = pd.merge(fact_sales, territory_dwh, on="TerritoryID")

    fact_sales["TotalSales"] = (fact_sales["UnitPrice_x"] * fact_sales["Quantity"]) - fact_sales["UnitPriceDiscount_x"]
    fact_sales["TotalProfits"] = ((fact_sales["UnitPrice_x"] - fact_sales["Costs"]) * fact_sales["Quantity"]) - fact_sales["UnitPriceDiscount_x"]

    for index, row in fact_sales.iterrows():
        date_details = convert_date(row["OrderDate_x"])
        dates_dwh = pd.concat([dates_dwh, date_details], ignore_index=True)

    fact_sales = fact_sales[[
        "SalesOrderID_x",
        "CustomerID",
        "EmpID",
        "ProductID",
        "TerritoryID",
        "OrderDate_x",
        "UnitPrice_x",
        "UnitPriceDiscount_x",
        "Quantity",
        "TotalSales",
        "TotalProfits"
        ]].rename(columns={
            "SalesOrderID_x" : "SalesOrderID",
            "OrderDate_x" : "OrderDate",
            "UnitPrice_x" : "UnitPrice",
            "UnitPriceDiscount_x" : "UnitPriceDiscount"
        })
    
    return fact_sales

fact_sales_dwh = create_fact_sales()
dates_dwh = dates_dwh.drop_duplicates()

fact_sales_dwh

Unnamed: 0,SalesOrderID,CustomerID,EmpID,ProductID,TerritoryID,OrderDate,UnitPrice,UnitPriceDiscount,Quantity,TotalSales,TotalProfits


In [11]:
empty_dwh()

tables = {
    "DimDate" : dates_dwh,
    "DimProduct" : product_dwh,
    "DimCustomer" : customer_dwh,
    "DimEmployee" : employee_dwh,
    "DimTerritory" : territory_dwh,
    "FactSales" : fact_sales_dwh
}
allowed = True

for key in tables:
    for query in query_add(key, tables[key]):
        try:
            DWH_cursor.execute(query)
        except pyodbc.Error as e:
            print(query)
            print(e)

if allowed:
    DWH_cursor.commit()
    print("ITEMS INSERTED")
else:
    DWH_cursor.rollback()
    print("NOT ALLOWED TO COMMIT")

REMOVED DimProduct
REMOVED DimEmployee
REMOVED DimTerritory
REMOVED DimCustomer
REMOVED FactSales
REMOVED DimDate
All items are removed
INSERT INTO DimCustomer (CustomerID, Fname, Lname, Address, Phone, CompanyName, SalesOrderID, OrderDate) VALUES ('ALFKI', NULL, NULL, 'Obere Str. 57', '030-0074321', 'Alfreds Futterkiste', 10692, '2023-10-03')
('23000', "[23000] [Microsoft][ODBC SQL Server Driver][SQL Server]Violation of PRIMARY KEY constraint 'PK__DimCusto__A4AE64B85BF655F0'. Cannot insert duplicate key in object 'dbo.DimCustomer'. The duplicate key value is (ALFKI). (2627) (SQLExecDirectW); [23000] [Microsoft][ODBC SQL Server Driver][SQL Server]The statement has been terminated. (3621)")
INSERT INTO DimCustomer (CustomerID, Fname, Lname, Address, Phone, CompanyName, SalesOrderID, OrderDate) VALUES ('ALFKI', NULL, NULL, 'Obere Str. 57', '030-0074321', 'Alfreds Futterkiste', 10702, '2023-10-13')
('23000', "[23000] [Microsoft][ODBC SQL Server Driver][SQL Server]Violation of PRIMARY KEY 