In [1]:
import os 
import duckdb 
from dotenv import load_dotenv

In [2]:
# instanstiating the dotenv to have an access to env vars 
load_dotenv()

# setting up the paths 
base_path = os.getenv('base_path')
database = os.getenv('database_path')

In [3]:
# setting up the database path 
database_path = f"{database}/database.db"

# connect to the databse file
ddb = duckdb.connect(database_path)

In [4]:
# create view for the order items dataset 
ddb.execute(
    f'''
        CREATE VIEW IF NOT EXISTS view_order_items AS 
        SELECT *
        FROM read_csv_auto('{base_path}/datasets/olist_order_items_dataset.csv');
    '''
)

<duckdb.duckdb.DuckDBPyConnection at 0x110afa670>

In [5]:
# breaking down the creation of fct_table 
# 1.1
ddb.execute(
    '''
        CREATE TEMPORARY TABLE tmp_orders_data AS
        SELECT
            o.OrderId,
            vo.customer_id,
            vo.order_purchase_timestamp,
            o.OrderStatus
        FROM
            dim_orders o
        LEFT JOIN view_orders_data vo ON o.OrderId = vo.order_id;
    '''
)

<duckdb.duckdb.DuckDBPyConnection at 0x110afa670>

In [6]:
# 1.2 
ddb.execute(
    '''
        CREATE TEMPORARY TABLE tmp_order_items AS
        SELECT
            voi.order_id,
            voi.product_id,
            voi.seller_id,
            voi.price,
            voi.freight_value
        FROM
            view_order_items voi
        LEFT JOIN dim_products p ON voi.product_id = p.ProductId
        LEFT JOIN dim_sellers s ON voi.seller_id = s.SellerId;
    '''
)

<duckdb.duckdb.DuckDBPyConnection at 0x110afa670>

In [7]:
# 1.3
ddb.execute(
    '''
        CREATE TEMPORARY TABLE tmp_payments_reviews AS
        SELECT
            pay.OrderId,
            pay.PaymentType,
            vr.review_score
        FROM
            dim_payments pay
        LEFT JOIN view_reviews_data vr ON pay.OrderId = vr.order_id;
    '''
)

<duckdb.duckdb.DuckDBPyConnection at 0x110afa670>

In [8]:
# 1.4 (Final)
ddb.execute(
    '''
        CREATE TABLE IF NOT EXISTS fct_sales AS
        SELECT
            o.OrderId,
            o.customer_id,
            i.product_id,
            i.seller_id,
            p.PaymentType,
            o.order_purchase_timestamp,
            o.OrderStatus,
            i.price,
            i.freight_value,
            p.review_score
        FROM
            tmp_orders_data o
        LEFT JOIN tmp_order_items i ON o.OrderId = i.order_id
        LEFT JOIN tmp_payments_reviews p ON o.OrderId = p.OrderId;
    '''
)

<duckdb.duckdb.DuckDBPyConnection at 0x110afa670>

In [9]:
ddb.sql(
    '''
        SELECT *
        FROM fct_sales;
    '''
)

┌──────────────────────────────────┬──────────────────────────────────┬──────────────────────────────────┬──────────────────────────────────┬─────────────┬──────────────────────────┬─────────────┬────────┬───────────────┬──────────────┐
│             OrderId              │           customer_id            │            product_id            │            seller_id             │ PaymentType │ order_purchase_timestamp │ OrderStatus │ price  │ freight_value │ review_score │
│             varchar              │             varchar              │             varchar              │             varchar              │   varchar   │        timestamp         │   varchar   │ double │    double     │    int64     │
├──────────────────────────────────┼──────────────────────────────────┼──────────────────────────────────┼──────────────────────────────────┼─────────────┼──────────────────────────┼─────────────┼────────┼───────────────┼──────────────┤
│ 972f5a89813198a05f8dd4463d82f3fc │ 50c7d896c15275b

In [13]:
# showing all names of the fct_sales 
ddb.sql(
    '''
        SELECT column_name, data_type
        FROM information_schema.columns
        WHERE table_name = 'fct_sales';
    '''
)

┌───────────────────┬───────────┐
│    column_name    │ data_type │
│      varchar      │  varchar  │
├───────────────────┼───────────┤
│ OrderId           │ VARCHAR   │
│ CustomerID        │ VARCHAR   │
│ ProductId         │ VARCHAR   │
│ SellerId          │ VARCHAR   │
│ PaymentType       │ VARCHAR   │
│ OrderPurchaseDate │ TIMESTAMP │
│ OrderStatus       │ VARCHAR   │
│ Price             │ DOUBLE    │
│ FreightValue      │ DOUBLE    │
│ ReviewScore       │ BIGINT    │
├───────────────────┴───────────┤
│ 10 rows             2 columns │
└───────────────────────────────┘

In [12]:
# changing names of some columns into appropriate name 
ddb.sql(
    '''
        ALTER TABLE fct_sales 
        RENAME customer_id to CustomerID;
    '''
)

ddb.sql(
    '''
        ALTER TABLE fct_sales 
        RENAME product_id to ProductId;
    '''
)

ddb.sql(
    '''
        ALTER TABLE fct_sales 
        RENAME seller_id to SellerId;
    '''
)

ddb.sql(
    '''
        ALTER TABLE fct_sales 
        RENAME order_purchase_timestamp to OrderPurchaseDate;
    '''
)

ddb.sql(
    '''
        ALTER TABLE fct_sales 
        RENAME price to Price;
    '''
)

ddb.sql(
    '''
        ALTER TABLE fct_sales 
        RENAME freight_value to FreightValue;
    '''
)

ddb.sql(
    '''
        ALTER TABLE fct_sales 
        RENAME review_score to ReviewScore;
    '''
)

In [15]:
# Checking null values in fct_sales table 
ddb.sql(
    '''
        SELECT
            COUNT(*) AS total_rows,
            COUNT(CASE WHEN OrderId IS NULL THEN 1 END) AS OrderId,
            COUNT(CASE WHEN CustomerID IS NULL THEN 1 END) AS CustomerID,
            COUNT(CASE WHEN ProductId IS NULL THEN 1 END) AS ProductId,
            COUNT(CASE WHEN SellerId IS NULL THEN 1 END) AS SellerId,
            COUNT(CASE WHEN PaymentType IS NULL THEN 1 END) AS PaymentType,
            COUNT(CASE WHEN OrderPurchaseDate IS NULL THEN 1 END) AS OrderPurchaseDate,
            COUNT(CASE WHEN OrderStatus IS NULL THEN 1 END) AS OrderStatus,
            COUNT(CASE WHEN Price IS NULL THEN 1 END) AS Price,
            COUNT(CASE WHEN FreightValue IS NULL THEN 1 END) AS FreightValue,
            COUNT(CASE WHEN ReviewScore IS NULL THEN 1 END) AS ReviewScore,
            
        FROM 
            fct_sales;
    '''
)

┌────────────┬─────────┬────────────┬───────────┬──────────┬─────────────┬───────────────────┬─────────────┬───────┬──────────────┬─────────────┐
│ total_rows │ OrderId │ CustomerID │ ProductId │ SellerId │ PaymentType │ OrderPurchaseDate │ OrderStatus │ Price │ FreightValue │ ReviewScore │
│   int64    │  int64  │   int64    │   int64   │  int64   │    int64    │       int64       │    int64    │ int64 │    int64     │    int64    │
├────────────┼─────────┼────────────┼───────────┼──────────┼─────────────┼───────────────────┼─────────────┼───────┼──────────────┼─────────────┤
│   15670827 │       0 │          0 │       833 │      833 │         423 │                 0 │           0 │   833 │          833 │      131232 │
└────────────┴─────────┴────────────┴───────────┴──────────┴─────────────┴───────────────────┴─────────────┴───────┴──────────────┴─────────────┘

In [17]:
# showing all names of the fct_sales 
ddb.sql(
    '''
        SELECT column_name, data_type
        FROM information_schema.columns
        WHERE table_name = 'fct_sales';
    '''
)

┌───────────────────┬───────────┐
│    column_name    │ data_type │
│      varchar      │  varchar  │
├───────────────────┼───────────┤
│ OrderId           │ VARCHAR   │
│ CustomerID        │ VARCHAR   │
│ ProductId         │ VARCHAR   │
│ SellerId          │ VARCHAR   │
│ PaymentType       │ VARCHAR   │
│ OrderPurchaseDate │ TIMESTAMP │
│ OrderStatus       │ VARCHAR   │
│ Price             │ DOUBLE    │
│ FreightValue      │ DOUBLE    │
│ ReviewScore       │ INTEGER   │
├───────────────────┴───────────┤
│ 10 rows             2 columns │
└───────────────────────────────┘

In [16]:
# changing data type - review score 
ddb.sql(
    '''
        ALTER TABLE fct_sales 
        ALTER COLUMN ReviewScore SET DATA TYPE INT;
    '''
)

In [18]:
# Define the output directory and ensure it exists
output_dir = "/Users/macintoshcider/Documents/Programming/Python/ETL/climate-commerce/analytics"
output_file = os.path.join(output_dir, "fct_sales.csv")
ddb.execute(
    f"""
        COPY fct_sales TO '{output_file}' (FORMAT CSV, HEADER);
    """
)
print(f"Data successfully exported to {output_file}")

Data successfully exported to /Users/macintoshcider/Documents/Programming/Python/ETL/climate-commerce/analytics/fct_sales.csv


In [19]:
# finally close the connection instance 
ddb.close()