# Create DuckDB Data Working File

In this file, we will be guided along some initial work that was done, and each cell will have at least 1 fill-in-the-blank for you to reason with

The end of this file will have a data model with 2 fact tables and 3 dim tables! and we will talk about business use cases for them

In [None]:
import pandas as pd
import duckdb
import pathlib
import os

# Define Data Path
try:
    workshop_root_dir = pathlib.Path(__file__).absolute().parent
except:
    workshop_root_dir = pathlib.Path(pathlib.Path().absolute().parent)

if os.path.exists(workshop_root_dir / "data/transactions_multithreaded.json"):
    os.chdir(workshop_root_dir)
    
data_path = pathlib.Path(workshop_root_dir, 'data')

# Read In Data
json_data = duckdb.sql(f"SELECT * FROM __('{data_path}/transactions_multithreaded.json')").fetchdf()


In [None]:
# Create Connection to load data into DB
con = duckdb.connect('duckdb.db', read_only=False)

# Load into PyRelation
relation = con.from_df(json_data)
# type(relation) # duckdb.PyRelation

# Create Schema for Landing Zone and Load data into Landing Zone
con.execute("""CREATE SCHEMA IF NOT EXISTS landing""")
con.execute("""CREATE TABLE IF NOT EXISTS ________ AS SELECT * FROM relation""")

In [None]:
# Display as pandas df
con.execute("""SELECT * FROM landing.transactions _____ 10000""").fetch_df()

In [None]:
# Evaluate the Products
con.execute("""SELECT ___ FROM landing.transactions LIMIT 10000""").fetchdf()

In [None]:
## Unpack LIST(STRUCT[]) type in the DF

# 3 things to fill in this cell #

unpacked_products_df = con.execute("""
            
            -- Filter to first 20k rows
            WITH filter AS (
            SELECT *, ___() OVER() AS row_num
            FROM landing.transactions
            ),
            
            -- Products col is of type LIST, need to extract (1-indexed)
            extract_list AS (
                SELECT *,
                    ___(Products, 1) AS Products_Extract
                FROM filter
                WHERE row_num <= 20000
                        )

            -- Now we can unpack the STRUCT type
            SELECT *,
                Products_Extract.___
            FROM extract_list""").fetch_arrow_table()

In [None]:
# Check it
unpacked_products_df.to_pandas().iloc[:, ____]

In [None]:
## We can access that above object as either a Python DF or SQL Object, wow, so cool!
import pandas.testing as pdt

# SQL
sql_pandas_df = con.execute("""SELECT * 
           FROM unpacked_products_df""").fetch_df()

# Python
pandas_df = unpacked_products_df.to_pandas()

# Test equality (using if not pdt.assert_frame_equal() because it returns None if equal)
if not pdt.assert_frame_equal(___, ___):
    print("Dataframes are equal!")
else:
    print("Dataframes are not equal!")

In [None]:
# Reorder columns
reordered_cols = con.execute("""SELECT Timestamp, TransactionID, UserID, SessionID,
                    ProductID, Quantity, Price,
                    TotalAmount, TaxAmount, ShippingAmount, Discounts, PaymentType,
                    ShippingAddress, BillingAddress, EventType
            FROM ___""").fetch_df()

reordered_cols

In [None]:
# First col I notice is a timestamp and we want to turn that to an appropriate datetime value
# reordered_cols
correct_time = con.execute("""SELECT ____(Timestamp, '%Y-%m-%dT%H:%M:%S') AS Timestamp, *
                              FROM reordered_cols""").fetch_df()
correct_time
# SQL doesn't have a DROP column function, so we'll just select the columns we want when we are ready


In [None]:
# Next up I see the string column ProductID, which we want to turn into an INT by parsing all digits after the _

# 2 fill in the blanks here # 

correct_product_id = con.execute("""SELECT *,
                                    CAST(
                                        ___(
                                            ___(ProductID, '_'),
                                            2) 
                                    AS INT) AS ProductID_int
            FROM correct_time""").fetch_df()
correct_product_id

## We've done a lot of the cleaning we want to do, now let's think more about our data model

- We have again the Product ID which in this circumstance doesn't have much information about the products available. 
- But in the real world we would have a product catalog.
- That would provide us context to what products we are selling, and a Data Scientist might need to easier get that information back.
- We are going to create another table called `DIM_PRODUCTS` which would represent that product catalog.

In [None]:
con.execute("""
            -- Creating it in the staging schema to be available to our data model
            CREATE SCHEMA IF NOT EXISTS staging;
            
            -- Creating table dim_products
            CREATE TABLE IF NOT EXISTS staging.___ AS
            SELECT DISTINCT(ProductID_int) AS Product_ID,
                       'Comment about product' AS Product_Info
                FROM correct_product_id""").fetch_df()


con.execute("""SELECT * FROM staging.___""").fetch_df()

In [None]:
# Since we are following the same process for PaymentType and EventType, let's create a function to do this, but they don't have unique IDs so we add in UUID
# This cell has 0 fill in the blanks so you can get the jist of what's going on

def create_dim_table(table_name, col_name):
    """
    Create a dim table with a unique ID and a comment column
    Assuming the attribute we are creating an ID for is not already an ID.
    
    Example: We have Mastercard, Visa, etc for Payment Types, but there's no Payment Type Code yet
    We will create that payment type code
    
    But, for PRODUCTs, they're integers, so we don't need to recreate a different ID for those products
    
    Args:
        table_name (str): Name of table to create
        col_name (str): Name of column to create
    """
    con.execute(f"""
                -- Creating it in the staging schema to be available to our data model
                CREATE SCHEMA IF NOT EXISTS staging;
                
                -- Creating table dim_{table_name}
                CREATE TABLE IF NOT EXISTS staging.{table_name} (
                    id INT NOT NULL UNIQUE,
                    {col_name} VARCHAR,
                    {col_name}_Info VARCHAR
                );
                
                INSERT OR REPLACE INTO staging.{table_name}
                WITH base_table AS (
                            SELECT DISTINCT({col_name}) AS {col_name},
                                'Comment about {col_name}' AS {col_name}_Info
                    FROM correct_product_id
                    )
                    
                SELECT ROW_NUMBER() OVER () as id,
                        {col_name},
                        {col_name}_Info
                FROM base_table""")
 
# Payment Type   
con.execute("""DROP TABLE IF EXISTS staging.dim_payment_type""")
create_dim_table('dim_payment_type', 'PaymentType')
#con.execute("""SELECT * FROM staging.dim_payment_type""").fetch_df()

# Event Type
con.execute("""DROP TABLE IF EXISTS staging.dim_event_type""")
create_dim_table('dim_event_type', 'EventType')
con.execute("""SELECT * FROM staging.dim_event_type""").fetch_df()

In [None]:
# Dispaly Table Info
con.execute("""SHOW ALL TABLES""").fetch_df()

In [None]:
# We have some tables that came from my dev, let's show another way to filter out table names we might not want
# https://duckdb.org/docs/archive/0.9.2/sql/information_schema
con.execute("""
            SELECT *
            FROM information_schema.tables
            WHERE table_schema IN ('landing', 'staging')
            AND table_name NOT IN ('___', '___')""").fetch_df()

## Back to our Main Table

In [None]:
correct_product_id

## Caveat about our table: We have traits that pertain to the Line Items themselves (Product ID, QTY, and Price), and then other pieces that pertain to the Order itself (Discounts, Total Price, Billing Address, Shipping Address)

### So, we will split our table now into 2 different tables.

In [None]:
transactions_filtered_out_detail = con.execute("""
            -- Creating our FCT_TRANSACTIONS table
            WITH unique_transactions AS (
               SELECT TransactionID, ROW_NUMBER() OVER (____ __ _______) AS row_number
               FROM correct_product_id
            ),
            
            final_table AS (
            SELECT Timestamp,
                   transactions1.TransactionID,
                   UserID,
                   SessionID,
                   TotalAmount,
                   TaxAmount,
                   ShippingAmount,
                   Discounts,
                   PaymentType,
                   ShippingAddress,
                   BillingAddress,
                   EventType
                FROM correct_product_id transactions1
                INNER JOIN unique_transactions transactions2
                  ON transactions1.TransactionID = transactions2.TransactionID
               WHERE transactions2.row_number = 1
               )
               
             SELECT *
             FROM final_table""").fetch_df()


# Step 2: Create the staging.sample_fct_transactions table using the data from the DataFrame
con.execute("""
            DROP TABLE staging.sample_fct_transactions
            """)

con.execute("""
            CREATE TABLE IF NOT EXISTS staging.sample_fct_transactions AS
            SELECT *
            FROM transactions_filtered_out_detail""")

In [None]:
# Finish out adding codes to the transaction table, and we've got our staging table!
# (we've left address alone for now, this could be a good exercise to do on your own)

# Fill in the Aliases in the Joins #

final_staging_transactions = con.execute("""
            with final_staging_transactions AS (
            SELECT fct_trans.Timestamp,
                fct_trans.TransactionID,
                fct_trans.UserID,
                fct_trans.SessionID,
                fct_trans.TotalAmount,
                fct_trans.TaxAmount,
                fct_trans.ShippingAmount,
                fct_trans.Discounts,
                fct_trans.ShippingAddress,
                fct_trans.BillingAddress,
                dim_payment_type.id AS PaymentTypeCode,
                dim_event_type.id AS EventTypeCode
            FROM staging.sample_fct_transactions fct_trans
            INNER JOIN staging.dim_payment_type dim_payment
              ON ___.PaymentType = ___.PaymentType
            INNER JOIN staging.dim_event_type dim_event
                ON ___.EventType = ___.EventType
            )
            
            SELECT *
            FROM final_staging_transactions""").fetch_df()

# Step 2: Create the staging.fct_transactions table using the data from the DataFrame
con.execute("""
            DROP TABLE IF EXISTS staging.fct_transactions
            """)
            
con.execute("""
            CREATE TABLE IF NOT EXISTS staging.fct_transactions AS
            SELECT *
            FROM final_staging_transactions""")

### Transaction Detail Table Creation

In [None]:
# Creating the transaction detail table

# Fill in after the OVER() in the ROW_NUMBER() function #

transaction_detail = con.execute("""
            -- Creating our FCT_TRANSACTIONS table
            WITH unique_transaction_products AS (
               SELECT TransactionID, ProductID_int, ROW_NUMBER() over (_____ __ ______, ______) AS row_number
               FROM correct_product_id
            ),
            
            final_table AS (
            SELECT transactions1.TransactionID,
                     transactions1.ProductID_int AS ProductID,
                     Quantity,
                     Price AS UnitPrice
                FROM correct_product_id transactions1
                INNER JOIN unique_transaction_products transactions2
                  ON transactions1.TransactionID = transactions2.TransactionID
                  AND transactions1.ProductID_int = transactions2.ProductID_int
               WHERE transactions2.row_number = 1
               )
               
               SELECT *
               FROM final_table""").fetch_df()

# Step 2: Create the staging.sample_fct_transactions table using the data from the DataFrame
con.execute("""
            DROP TABLE IF EXISTS staging.fct_transaction_detail
            """)

con.execute("""
            CREATE TABLE IF NOT EXISTS staging.fct_transaction_detail AS
            SELECT *
            FROM transaction_detail""")

### Evaluate what tables we have in our environment

In [None]:
con.execute("""SHOW ALL TABLES;""").fetch_df()

In [None]:
# We have some tables that came from my dev, let's show another way to filter out table names we might not want
# https://duckdb.org/docs/archive/0.9.2/sql/information_schema
con.execute("""
            SELECT *
            FROM information_schema.tables
            WHERE table_schema IN ('landing', 'staging')
            AND table_name NOT IN ('sample_fct_transaction_detail', 'sample_fct_transactions')""").fetch_df()

## See the slides from today
- https://www.canva.com/design/DAF6qdYt1uI/EDC80b79R8FiUZ7HNi6_ig/view?utm_content=DAF6qdYt1uI&utm_campaign=designshare&utm_medium=link&utm_source=editor