In [3]:
import pandas as pd
import pyodbc
import duckdb

import os
import sys

In [4]:
# Get the current working directory
current_dir = os.getcwd()
# Move up one level from the current directory
parent_dir = os.path.dirname(current_dir)
# Change directory into data directory
data_dir = os.path.join(parent_dir, 'data')

In [5]:
sys.path.append(parent_dir)

In [6]:
import config as cfg

# 1. Extract
Extract table from SQL server

In [7]:
def extract_table(table_name):
    # Step 1: Define the connection string
    conn_str = (
        'DRIVER={SQL Server};'
        F'SERVER={cfg.SERVER_NAME};'
        F'DATABASE={cfg.DATABASE_NAME};'
        'Trusted_Connection=yes;'
    )
    
    # Step 2: Create the connection
    connection = pyodbc.connect(conn_str)
    
    # Step 3: Execute a SQL query and fetch the results
    query = F"SELECT * FROM {table_name}" 

    df = pd.read_sql(query, connection)

    connection.close()
    return df

In [8]:
df_dim_date = extract_table("dbo.dim_date")
df_inventory = extract_table("original.inventory")
df_orders = extract_table("original.orders")
df_fulfillment = extract_table("original.fulfillment")

  df = pd.read_sql(query, connection)


In [9]:
df_orders.head()

Unnamed: 0,order_id,order_item_id,order_year_month,order_year,order_month,order_day,order_time,order_quantity,product_department,product_category,...,customer_country,warehouse_country,shipment_year,shipment_month,shipment_day,shipment_mode,shipment_days_scheduled,gross_sales,discount_percent,profit
0,1,4381,202101,2021,1,1,2:48,1,Fan Shop,Water Sports,...,Nicaragua,USA,2021,1,6,Same Day,3,200,0.09,100
1,2,29,202101,2021,1,1,3:30,1,Apparel,Cleats,...,Brazil,Puerto Rico,2021,1,3,Standard Class,4,60,0.2,147
2,3,32,202101,2021,1,1,3:30,4,Footwear,Cardio Equipment,...,Brazil,Puerto Rico,2021,1,3,Standard Class,4,400,0.06,245
3,4,33,202101,2021,1,1,3:30,5,Fan Shop,Indoor/Outdoor Games,...,Brazil,Puerto Rico,2023,9,5,Standard Class,4,250,0.09,125
4,5,109801,202101,2021,1,1,4:12,5,Footwear,Electronics,...,Belarus,USA,2021,1,5,Standard Class,4,160,0.15,75


In [10]:
df_inventory.head()

Unnamed: 0,product_name,year_month,warehouse_inventory,inventory_cost_per_unit
0,Perfect Fitness Perfect Rip Deck,202312,0,0.69517
1,Nike Men's Dri-FIT Victory Golf Polo,202312,2,1.29291
2,O'Brien Men's Neoprene Life Vest,202312,0,0.56531
3,Nike Men's Free 5.0+ Running Shoe,202312,1,1.26321
4,Under Armour Girls' Toddler Spine Surge Runni,202312,0,1.47648


In [11]:
df_fulfillment.head()

Unnamed: 0,product_name,warehouse_order_fulfillment_days
0,Perfect Fitness Perfect Rip Deck,8.3
1,Nike Men's Dri-FIT Victory Golf Polo,6.6
2,O'Brien Men's Neoprene Life Vest,5.5
3,Nike Men's Free 5.0+ Running Shoe,9.4
4,Under Armour Girls' Toddler Spine Surge Runni,6.3


# 2. Transform 
Modelling with DuckDB

In [12]:
# Step 1: Create a DuckDB in-memory connection
con = duckdb.connect()

In [13]:
con.unregister('df_orders')

<duckdb.duckdb.DuckDBPyConnection at 0x22204ba23b0>

In [14]:
# Step 2: Register the DataFrames with DuckDB
con.register('df_fulfillment', df_fulfillment)
con.register('df_orders', df_orders)
con.register('df_inventory', df_inventory)

<duckdb.duckdb.DuckDBPyConnection at 0x22204ba23b0>

In [15]:
# Step 3: Execute the SQL query using DuckDB
dim_product_query = """
WITH cte_fulfillment AS (
    SELECT * 
    FROM df_fulfillment
),

cte_product AS (
    SELECT DISTINCT 
        product_name, 
        product_category,
        product_department
    FROM df_orders
)
SELECT 
    cte_fulfillment.product_name,
    CASE WHEN product_category IS NULL THEN 'Unknown' ELSE product_category END AS product_category,
    CASE WHEN product_department IS NULL THEN 'Unknown' ELSE product_department END AS product_department,
    warehouse_order_fulfillment_days
FROM cte_fulfillment
LEFT JOIN cte_product
ON cte_fulfillment.product_name = cte_product.product_name
-- WHERE product_category IS NULL
ORDER BY 3,2
;
"""

In [16]:
# Step 4: Execute the query and fetch the result into a DataFrame
df_dim_product = con.execute(dim_product_query).df()

In [17]:
df_dim_product.head()

Unnamed: 0,product_name,product_category,product_department,warehouse_order_fulfillment_days
0,Baby sweater,Baby,Apparel,7.9
1,Children's heaters,Children's Clothing,Apparel,2.7
2,Total Gym 1400,Cleats,Apparel,1.3
3,Perfect Fitness Perfect Rip Deck,Cleats,Apparel,8.3
4,Porcelain crafts,Crafts,Apparel,7.1


In [18]:
# Similar with dim_customer_query
dim_customer_query = """
SELECT distinct
    customer_country, 
    customer_market,
    CASE
        WHEN customer_country = 'USA' THEN 'USA'
    ELSE customer_region END AS customer_region
FROM df_orders
;
"""

In [19]:
df_dim_customer = con.execute(dim_customer_query).df()

In [20]:
df_dim_customer

Unnamed: 0,customer_country,customer_market,customer_region
0,Australia,Pacific Asia,Oceania
1,Norway,Europe,Northern Europe
2,India,Pacific Asia,South Asia
3,Jamaica,LATAM,Caribbean
4,Trinidad and Tobago,LATAM,Caribbean
...,...,...,...
134,Mauritania,Africa,West Africa
135,Niger,Africa,West Africa
136,Rwanda,Africa,East Africa
137,Lesotho,Africa,Southern Africa


In [21]:
df_dim_customer.loc[df_dim_customer['customer_country']=='USA']

Unnamed: 0,customer_country,customer_market,customer_region
20,USA,North America,USA


In [22]:
# similar to dim_shipment
dim_shipment_query = """
SELECT distinct 
    shipment_mode, 
    shipment_days_scheduled
FROM df_orders
ORDER BY 2;
"""

In [23]:
df_dim_shipment = con.execute(dim_shipment_query).df()

In [24]:
df_dim_shipment

Unnamed: 0,shipment_mode,shipment_days_scheduled
0,First Class,1
1,Second Class,2
2,Same Day,3
3,Standard Class,4


In [25]:
# similar with fact_sales
fact_orders_query = """
SELECT 
    order_id,
    CAST(CONCAT(order_year, '-', order_month, '-', order_day) AS DATE) AS order_date,
    order_time,
    order_quantity,
    product_name, 
    customer_country,
    CAST(CONCAT(shipment_year, '-', shipment_month, '-', shipment_day) AS DATE) AS shipment_date,
    shipment_mode,
    gross_sales,
    CAST(
        CASE 
            WHEN discount_percent = '  -  ' THEN '0'
            ELSE discount_percent
        END AS FLOAT
    ) AS discount_percent,
    profit
FROM 
    df_orders
"""


In [26]:
df_fact_orders = con.execute(fact_orders_query).df()

In [27]:
df_fact_orders.head()

Unnamed: 0,order_id,order_date,order_time,order_quantity,product_name,customer_country,shipment_date,shipment_mode,gross_sales,discount_percent,profit
0,1,2021-01-01,2:48,1,Pelican Sunstream 100 Kayak,Nicaragua,2021-01-06,Same Day,200,0.09,100
1,2,2021-01-01,3:30,1,Perfect Fitness Perfect Rip Deck,Brazil,2021-01-03,Standard Class,60,0.2,147
2,3,2021-01-01,3:30,4,Nike Men's Free 5.0+ Running Shoe,Brazil,2021-01-03,Standard Class,400,0.06,245
3,4,2021-01-01,3:30,5,O'Brien Men's Neoprene Life Vest,Brazil,2023-09-05,Standard Class,250,0.09,125
4,5,2021-01-01,4:12,5,Under Armour Women's Ignite Slide,Belarus,2021-01-05,Standard Class,160,0.15,75


In [28]:
df_fact_orders.dtypes

order_id                     int64
order_date          datetime64[us]
order_time                  object
order_quantity               int64
product_name                object
customer_country            object
shipment_date       datetime64[us]
shipment_mode               object
gross_sales                  int64
discount_percent           float32
profit                       int64
dtype: object

In [29]:
df_fact_orders['order_date'] = pd.to_datetime(df_fact_orders['order_date']).dt.date 

In [30]:
df_dim_shipment.dtypes

shipment_mode              object
shipment_days_scheduled     int64
dtype: object

In [31]:
df_dim_customer.dtypes

customer_country    object
customer_market     object
customer_region     object
dtype: object

In [32]:
df_dim_date.dtypes

date                      object
year                      object
quarter                   object
month                     object
month_name                object
month_abbreviation        object
year_month                object
day                       object
day_of_week               object
day_name                  object
day_abbreviation          object
week                      object
is_weekend                  bool
fiscal_year               object
fiscal_quarter            object
fiscal_month              object
is_last_day_of_month        bool
is_last_day_of_quarter      bool
is_last_day_of_year         bool
dtype: object

# 3. Load 


## Load data back to SQL server

In [33]:
def get_column_types(df):
    # Map Pandas data types to SQL Server data types
    type_map = {
        'object': 'NVARCHAR(MAX)',
        'int64': 'BIGINT',
        'float64': 'FLOAT',
        'datetime64[ns]': 'DATETIME2',  # Change this to 'DATE'
        'bool': 'BIT'
    }
    return [type_map.get(str(dt), 'NVARCHAR(MAX)') for dt in df.dtypes]


In [34]:
def load_table(df, table_name, schema_name):
    # Define the connection string
    conn_str = (
        f'DRIVER={{SQL Server}};'
        f'SERVER={cfg.SERVER_NAME};'
        f'DATABASE={cfg.DATABASE_NAME};'
        'Trusted_Connection=yes;'
    )

    # Create a pyodbc connection using the connection string
    conn = pyodbc.connect(conn_str)
    cursor = conn.cursor()

    # Drop the table if it already exists
    cursor.execute(f"IF OBJECT_ID('{schema_name}.{table_name}', 'U') IS NOT NULL DROP TABLE {schema_name}.{table_name}")
    conn.commit()

    # Create the table schema
    columns = ', '.join([f'{col} {dtype}' for col, dtype in zip(df.columns, get_column_types(df))])
    create_table_sql = f"CREATE TABLE {schema_name}.{table_name} ({columns})"
    cursor.execute(create_table_sql)
    conn.commit()

    # Insert the data into the table
    insert_sql = f"INSERT INTO {schema_name}.{table_name} VALUES ({','.join(['?'] * len(df.columns))})"
    for _, row in df.iterrows():
        cursor.execute(insert_sql, *row.tolist())
    conn.commit()

    cursor.close()
    conn.close()
    print(f"DataFrame loaded successfully into {schema_name}.{table_name}")

In [35]:
df_fact_orders['order_date'] = df_fact_orders['order_date'].astype(str)
df_fact_orders['shipment_date'] = df_fact_orders['shipment_date'].astype(str)


In [36]:
load_table(df_fact_orders, 'fact_orders', 'dbo')
load_table(df_dim_customer, 'dim_customer', 'dbo')
load_table(df_dim_shipment, 'dim_shipment', 'dbo')
load_table(df_dim_product, 'dim_product', 'dbo')
load_table(df_dim_date, 'dim_date', 'dbo')

DataFrame loaded successfully into dbo.fact_orders
DataFrame loaded successfully into dbo.dim_customer
DataFrame loaded successfully into dbo.dim_shipment
DataFrame loaded successfully into dbo.dim_product
DataFrame loaded successfully into dbo.dim_date


## Load data to DuckDB

In [37]:
# Get the current working directory
current_dir = os.getcwd()
# Move up one level from the current directory
parent_dir = os.path.dirname(current_dir)
# Change directory into db directory
db_dir = os.path.join(parent_dir, 'db')

In [39]:
# Connect to DuckDB and specify the file name you want to createa
con = duckdb.connect(os.path.join(db_dir, 'swiftmart.db'))

In [41]:
# Create the schema if it doesn't exist
con.execute("CREATE SCHEMA IF NOT EXISTS dbo")

<duckdb.duckdb.DuckDBPyConnection at 0x22204bc9df0>

In [42]:
def load_table(df, table_name, schema_name=''):
    """
    Load a DataFrame into a specified table in DuckDB, truncating and reloading each time.
    
    Parameters:
    - con: DuckDB connection object
    - df: pandas DataFrame to load into the table
    - table_name: name of the table to load data into
    - schema_name: optional schema name; if provided, it will be included in the table name
    """
    # Construct the full table name with schema if provided
    if schema_name:
        table_name = f"{schema_name}.{table_name}"
    else:
        table_name = table_name

    # Truncate the table before loading the data
    try:
        con.execute(f"TRUNCATE TABLE {table_name}")
    except duckdb.CatalogException as e:
        print(f"Table {table_name} does not exist. Skipping truncate step.")

    # Register the DataFrame as a temporary table
    con.register('df_temp', df)

    # Create or replace the table with the DataFrame data
    con.execute(f"CREATE OR REPLACE TABLE {table_name} AS SELECT * FROM df_temp")

    # Unregister the temporary table
    con.unregister('df_temp')


In [43]:
load_table(df_fact_orders, 'fact_orders', 'main')
load_table(df_dim_customer, 'dim_customer', 'main')
load_table(df_dim_shipment, 'dim_shipment', 'main')
load_table(df_dim_product, 'dim_product', 'main')
load_table(df_dim_date, 'dim_date', 'main')

In [44]:
# Query to list all tables in the database
df_tables = con.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'").fetchdf()

In [45]:
df_tables

Unnamed: 0,table_name
0,dim_customer
1,dim_date
2,dim_product
3,dim_shipment
4,fact_orders


In [50]:
# Close the connection
con.close()

## Load csv for business users

In [49]:
df_dim_product.to_csv(os.path.join(data_dir, 'dim_product.csv'))
df_dim_date.to_csv(os.path.join(data_dir, 'dim_date.csv'))
df_dim_customer.to_csv(os.path.join(data_dir, 'dim_customer.csv'))
df_dim_shipment.to_csv(os.path.join(data_dir, 'dim_shipment.csv'))
df_fact_orders.to_csv(os.path.join(data_dir, 'fact_orders.csv'))