<a href="https://colab.research.google.com/github/BxMild/Data-warehouse-2024/blob/main/653020211_6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Data Warehouse**

ปีย์รดา ภู่ถนนนอก 653020211-6

In [None]:
import polars as pl
import pandas as pd
import duckdb as dd
from datetime import datetime
import dlt

# Pipeline

In [None]:
pipeline = dlt.pipeline(pipeline_name="load_northwind", destination='duckdb',dataset_name="stg_northwind")
pipeline.run()

# Data transformation, Staging area load, multidimensional data model for data warehouse.

Given northwind databased diagram as:

<img src="./figs/nortwind_diagram.jpg" alt="alt text" width="width" height="height">

We need to create data cube like this:


<img src="./figs/data_cube_diagram.jpg" alt="alt text" width="700" height="700">

In [None]:
from datetime import datetime
import polars as pl

def add_timestamp(df,colname):
    now = datetime.now()
    current_timestamp = now.strftime('%m/%d/%y %H/%M/%S')
    df_cus = df.with_columns(pl.lit(current_timestamp).alias(colname))
    return df

def change_dtypedatetime(df,colname):
    format = '%m/%d/%Y %H:%M:%S'
    return df.with_columns(
        pl.col(colname).str.strptime(pl.Date,format=format)
    )

def unique(df,colname):
    return df.unique(colname)

def sort(df,colname):
    return df.sort(by=colname)

def rename_col(df,colname_dict):
    return df.rename(colname_dict)

def exclude(df,colname):
    return df.select(pl.col('*').exclude(colname))

## customer dimension table.

1. Read customer table from `customer.csv` file.

2. Create timstamp column with column name `ingestion_timestamp` and name table as `stg_customer`, and load data into `stg_northwind` database.

3. Transform data as follows:
    - Sort by `id` and get unique `id`.
    - Rename `id` columns to `customer_id`.
    - Delete an `ingestion_timestamp` column and add timstamp column with column name `insertion_timestamp`.


In [None]:
df_customers = pl.read_csv("./data/northwind/customer.csv")

In [None]:
stg_customer = (df_customers
                .pipe(add_timestamp, 'insertion_timestamp')
                .pipe(unique, 'id')
                .pipe(sort, 'id')
                .pipe(rename_col, {'id': 'customer_id'})
               )
stg_customer.head()

customer_id,company,last_name,first_name,email_address,job_title,business_phone,home_phone,mobile_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page,notes,attachments
i64,str,str,str,str,str,str,str,str,str,str,str,str,i64,str,str,str,str
1,"""Company A""","""Bedecs""","""Anna""",,"""Owner""","""(123)555-0100""",,,"""(123)555-0101""","""123 1st Street""","""Seattle""","""WA""",99999,"""USA""",,,
2,"""Company B""","""Gratacos Solsona""","""Antonio""",,"""Owner""","""(123)555-0100""",,,"""(123)555-0101""","""123 2nd Street""","""Boston""","""MA""",99999,"""USA""",,,
3,"""Company C""","""Axen""","""Thomas""",,"""Purchasing Representative""","""(123)555-0100""",,,"""(123)555-0101""","""123 3rd Street""","""Los Angelas""","""CA""",99999,"""USA""",,,
4,"""Company D""","""Lee""","""Christina""",,"""Purchasing Manager""","""(123)555-0100""",,,"""(123)555-0101""","""123 4th Street""","""New York""","""NY""",99999,"""USA""",,,
5,"""Company E""","""O’Donnell""","""Martin""",,"""Owner""","""(123)555-0100""",,,"""(123)555-0101""","""123 5th Street""","""Minneapolis""","""MN""",99999,"""USA""",,,


In [None]:
pipeline.run(stg_customer.to_pandas(), table_name='stg_customer', write_disposition='replace')

LoadInfo(pipeline=<dlt.pipeline.pipeline.Pipeline object at 0x0000015202725810>, metrics={'1729340364.5035021': [{'started_at': DateTime(2024, 10, 19, 12, 19, 25, 161505, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 10, 19, 12, 19, 25, 606454, tzinfo=Timezone('UTC'))}]}, destination_type='dlt.destinations.duckdb', destination_displayable_credentials='duckdb:///d:\\DW_Pipline-Main\\load_northwind.duckdb', destination_name='duckdb', environment=None, staging_type=None, staging_name=None, staging_displayable_credentials=None, destination_fingerprint='', dataset_name='stg_northwind', loads_ids=['1729340364.5035021'], load_packages=[LoadPackageInfo(load_id='1729340364.5035021', package_path='C:\\Users\\Admin\\.dlt\\pipelines\\load_northwind\\load\\loaded\\1729340364.5035021', state='loaded', schema=Schema load_northwind at 1451774886160, schema_update={}, completed_at=DateTime(2024, 10, 19, 12, 19, 25, 569619, tzinfo=Timezone('UTC')), jobs={'new_jobs': [], 'completed_jobs': [LoadJ

---

## Employee dimension table.

1. Read employee table from `employees.csv` file.

2. Create timstamp column with column name `ingestion_timestamp` and name table as `stg_employee`, and load data into `stg_northwind` database.

3. Transform data as follows:
    - Sort by `id` and get unique `id`.
    - Rename `id` columns to `employee_id`.
    - Delete an `ingestion_timestamp` column and add timstamp column with column name `insertion_timestamp`.


In [None]:
df_employees = pl.read_csv("./data/northwind/employees.csv")

In [None]:
stg_employee=(df_employees
              .pipe(add_timestamp, 'insertion_timestamp')
              .pipe(unique, 'id')
              .pipe(sort, 'id')
              .pipe(rename_col, {'id': 'employee_id'})
               )
stg_employee.head()

employee_id,company,last_name,first_name,email_address,job_title,business_phone,home_phone,mobile_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page,notes,attachments
i64,str,str,str,str,str,str,str,str,str,str,str,str,i64,str,str,str,str
1,"""Northwind Traders""","""Freehafer""","""Nancy""","""nancy@northwindtraders.com""","""Sales Representative""","""(123)555-0100""","""(123)555-0102""",,"""(123)555-0103""","""123 1st Avenue""","""Seattle""","""WA""",99999,"""USA""","""#http://northwindtraders.com#""",,
2,"""Northwind Traders""","""Cencini""","""Andrew""","""andrew@northwindtraders.com""","""Vice President, Sales""","""(123)555-0100""","""(123)555-0102""",,"""(123)555-0103""","""123 2nd Avenue""","""Bellevue""","""WA""",99999,"""USA""","""http://northwindtraders.com#ht…","""Joined the company as a sales …",
3,"""Northwind Traders""","""Kotas""","""Jan""","""jan@northwindtraders.com""","""Sales Representative""","""(123)555-0100""","""(123)555-0102""",,"""(123)555-0103""","""123 3rd Avenue""","""Redmond""","""WA""",99999,"""USA""","""http://northwindtraders.com#ht…","""Was hired as a sales associate…",
4,"""Northwind Traders""","""Sergienko""","""Mariya""","""mariya@northwindtraders.com""","""Sales Representative""","""(123)555-0100""","""(123)555-0102""",,"""(123)555-0103""","""123 4th Avenue""","""Kirkland""","""WA""",99999,"""USA""","""http://northwindtraders.com#ht…",,
5,"""Northwind Traders""","""Thorpe""","""Steven""","""steven@northwindtraders.com""","""Sales Manager""","""(123)555-0100""","""(123)555-0102""",,"""(123)555-0103""","""123 5th Avenue""","""Seattle""","""WA""",99999,"""USA""","""http://northwindtraders.com#ht…","""Joined the company as a sales …",


In [None]:
pipeline.run(stg_employee.to_pandas(), table_name='stg_employee', write_disposition='replace')

LoadInfo(pipeline=<dlt.pipeline.pipeline.Pipeline object at 0x0000015202725810>, metrics={'1729340365.747456': [{'started_at': DateTime(2024, 10, 19, 12, 19, 26, 387297, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 10, 19, 12, 19, 26, 801815, tzinfo=Timezone('UTC'))}]}, destination_type='dlt.destinations.duckdb', destination_displayable_credentials='duckdb:///d:\\DW_Pipline-Main\\load_northwind.duckdb', destination_name='duckdb', environment=None, staging_type=None, staging_name=None, staging_displayable_credentials=None, destination_fingerprint='', dataset_name='stg_northwind', loads_ids=['1729340365.747456'], load_packages=[LoadPackageInfo(load_id='1729340365.747456', package_path='C:\\Users\\Admin\\.dlt\\pipelines\\load_northwind\\load\\loaded\\1729340365.747456', state='loaded', schema=Schema load_northwind at 1451792143440, schema_update={}, completed_at=DateTime(2024, 10, 19, 12, 19, 26, 764792, tzinfo=Timezone('UTC')), jobs={'new_jobs': [], 'completed_jobs': [LoadJobIn

---

## production dimension table.

1. Read product table from `products.csv` file.

2. Transform data as follows:
    - Filter out records with `supplier_ids` containing ';'.
    - Change the column type of `supplier_ids` column to `pl.Int64`.
    - Rename `supplier_ids` columns to `supplier_id`.
    - Rename `id` columns to `product_id`.
    - Create timstamp column with column name `ingestion_timestamp` and name table as `stg_product`, and load data into `stg_northwind` database.  

3. Read suppliers table from `suppliers.csv` file.

4. Create timstamp column with column name `ingestion_timestamp`, rename `id` to `supplier_id`, and name table as `stg_supplier`, and load data into `stg_northwind` database.

5. Transform data as follows:
    - Perform left join of `stg_product` table and the temporary table containing ['supplier_id','company'] obtained from `stg_supplier` table.
    - Select all columns exclude `supplier_id`.
    - get unique `product_id`.
    - Delete an `ingestion_timestamp` column and add timstamp column with column name `insertion_timestamp`.
    - Name the result table `dim_product`.

In [None]:
df_products = pl.read_csv("./data/northwind/products.csv")

In [None]:
stg_product = (df_products
                .filter(~pl.col("supplier_ids").str.contains(";"))
                .with_columns(pl.col("supplier_ids").cast(pl.Int64))
                .pipe(rename_col,{'supplier_ids': 'supplier_id'})
                .pipe(rename_col, {'id': 'product_id'})
                .pipe(add_timestamp, 'insertion_timestamp')
               )
stg_product.head()

supplier_id,product_id,product_code,product_name,description,standard_cost,list_price,reorder_level,target_level,quantity_per_unit,discontinued,minimum_reorder_quantity,category,attachments
i64,i64,str,str,str,f64,f64,i64,i64,str,i64,i64,str,str
1,19,"""NWTBGM-19""","""Northwind Traders Chocolate Bi…",,6.9,9.2,5,20,"""10 boxes x 12 pieces""",0,5.0,"""Baked Goods & Mixes""",
1,21,"""NWTBGM-21""","""Northwind Traders Scones""",,7.5,10.0,5,20,"""24 pkgs. x 4 pieces""",0,5.0,"""Baked Goods & Mixes""",
6,91,"""NWTCFV-91""","""Northwind Traders Cherry Pie F…",,1.0,2.0,10,40,"""15.25 OZ""",0,,"""Canned Fruit & Vegetables""",
6,90,"""NWTCFV-90""","""Northwind Traders Pineapple""",,1.0,1.8,10,40,"""15.25 OZ""",0,,"""Canned Fruit & Vegetables""",
6,88,"""NWTCFV-88""","""Northwind Traders Pears""",,1.0,1.3,10,40,"""15.25 OZ""",0,,"""Canned Fruit & Vegetables""",


In [None]:
pipeline.run(stg_product.to_pandas(), table_name='stg_product', write_disposition='replace')

LoadInfo(pipeline=<dlt.pipeline.pipeline.Pipeline object at 0x0000015202725810>, metrics={'1729340366.9642122': [{'started_at': DateTime(2024, 10, 19, 12, 19, 27, 571500, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 10, 19, 12, 19, 27, 985834, tzinfo=Timezone('UTC'))}]}, destination_type='dlt.destinations.duckdb', destination_displayable_credentials='duckdb:///d:\\DW_Pipline-Main\\load_northwind.duckdb', destination_name='duckdb', environment=None, staging_type=None, staging_name=None, staging_displayable_credentials=None, destination_fingerprint='', dataset_name='stg_northwind', loads_ids=['1729340366.9642122'], load_packages=[LoadPackageInfo(load_id='1729340366.9642122', package_path='C:\\Users\\Admin\\.dlt\\pipelines\\load_northwind\\load\\loaded\\1729340366.9642122', state='loaded', schema=Schema load_northwind at 1451792427856, schema_update={}, completed_at=DateTime(2024, 10, 19, 12, 19, 27, 949211, tzinfo=Timezone('UTC')), jobs={'new_jobs': [], 'completed_jobs': [LoadJ

In [None]:
df_suppliers = pl.read_csv('./data/northwind/suppliers.csv')

In [None]:
stg_supplier = (df_suppliers
                .pipe(add_timestamp, 'insertion_timestamp')
                .pipe(rename_col, {'id': 'supplier_id'})
               )
stg_supplier.head()

supplier_id,company,last_name,first_name,email_address,job_title,business_phone,home_phone,mobile_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page,notes,attachments
i64,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str
1,"""Supplier A""","""Andersen""","""Elizabeth A.""",,"""Sales Manager""",,,,,,,,,,,,
2,"""Supplier B""","""Weiler""","""Cornelia""",,"""Sales Manager""",,,,,,,,,,,,
5,"""Supplier E""","""Hernandez-Echevarria""","""Amaya""",,"""Sales Manager""",,,,,,,,,,,,
9,"""Supplier I""","""Sandberg""","""Mikael""",,"""Sales Manager""",,,,,,,,,,,,
10,"""Supplier J""","""Sousa""","""Luis""",,"""Sales Manager""",,,,,,,,,,,,


In [None]:
pipeline.run(stg_supplier.to_pandas(), table_name='stg_supplier', write_disposition='replace')

LoadInfo(pipeline=<dlt.pipeline.pipeline.Pipeline object at 0x0000015202725810>, metrics={'1729340368.158272': [{'started_at': DateTime(2024, 10, 19, 12, 19, 28, 855949, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 10, 19, 12, 19, 29, 288417, tzinfo=Timezone('UTC'))}]}, destination_type='dlt.destinations.duckdb', destination_displayable_credentials='duckdb:///d:\\DW_Pipline-Main\\load_northwind.duckdb', destination_name='duckdb', environment=None, staging_type=None, staging_name=None, staging_displayable_credentials=None, destination_fingerprint='', dataset_name='stg_northwind', loads_ids=['1729340368.158272'], load_packages=[LoadPackageInfo(load_id='1729340368.158272', package_path='C:\\Users\\Admin\\.dlt\\pipelines\\load_northwind\\load\\loaded\\1729340368.158272', state='loaded', schema=Schema load_northwind at 1451792205136, schema_update={}, completed_at=DateTime(2024, 10, 19, 12, 19, 29, 252067, tzinfo=Timezone('UTC')), jobs={'new_jobs': [], 'completed_jobs': [LoadJobIn

In [None]:
dim_products = (stg_product
                .join(stg_supplier, on='supplier_id', how='left',coalesce=True)
                .pipe(unique, 'product_id')
                .pipe(exclude, 'supplier_id')
                .pipe(add_timestamp, 'insertion_timestamp')
    )
dim_products.head()

product_id,product_code,product_name,description,standard_cost,list_price,reorder_level,target_level,quantity_per_unit,discontinued,minimum_reorder_quantity,category,attachments,company,last_name,first_name,email_address,job_title,business_phone,home_phone,mobile_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page,notes,attachments_right
i64,str,str,str,f64,f64,i64,i64,str,i64,i64,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str
87,"""NWTB-87""","""Northwind Traders Tea""",,2.0,4.0,20,50,"""100 count per box""",0,,"""Beverages""",,"""Supplier G""","""Glasson""","""Stuart""",,"""Marketing Manager""",,,,,,,,,,,,
91,"""NWTCFV-91""","""Northwind Traders Cherry Pie F…",,1.0,2.0,10,40,"""15.25 OZ""",0,,"""Canned Fruit & Vegetables""",,"""Supplier F""","""Hayakawa""","""Satomi""",,"""Marketing Assistant""",,,,,,,,,,,,
89,"""NWTCFV-89""","""Northwind Traders Peaches""",,1.0,1.5,10,40,"""15.25 OZ""",0,,"""Canned Fruit & Vegetables""",,"""Supplier F""","""Hayakawa""","""Satomi""",,"""Marketing Assistant""",,,,,,,,,,,,
86,"""NWTBGM-86""","""Northwind Traders Cake Mix""",,10.5,15.99,10,20,"""4 boxes""",0,5.0,"""Baked Goods & Mixes""",,"""Supplier A""","""Andersen""","""Elizabeth A.""",,"""Sales Manager""",,,,,,,,,,,,
72,"""NWTD-72""","""Northwind Traders Mozzarella""",,26.1,34.8,10,40,"""24 - 200 g pkgs.""",0,10.0,"""Dairy `dl_northwind`.`products…",,"""Supplier E""","""Hernandez-Echevarria""","""Amaya""",,"""Sales Manager""",,,,,,,,,,,,


In [None]:
pipeline.run(dim_products.to_pandas(), table_name='dim_product', write_disposition='replace')

LoadInfo(pipeline=<dlt.pipeline.pipeline.Pipeline object at 0x0000015202725810>, metrics={'1729340369.4455614': [{'started_at': DateTime(2024, 10, 19, 12, 19, 30, 100241, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 10, 19, 12, 19, 30, 516729, tzinfo=Timezone('UTC'))}]}, destination_type='dlt.destinations.duckdb', destination_displayable_credentials='duckdb:///d:\\DW_Pipline-Main\\load_northwind.duckdb', destination_name='duckdb', environment=None, staging_type=None, staging_name=None, staging_displayable_credentials=None, destination_fingerprint='', dataset_name='stg_northwind', loads_ids=['1729340369.4455614'], load_packages=[LoadPackageInfo(load_id='1729340369.4455614', package_path='C:\\Users\\Admin\\.dlt\\pipelines\\load_northwind\\load\\loaded\\1729340369.4455614', state='loaded', schema=Schema load_northwind at 1451774509712, schema_update={}, completed_at=DateTime(2024, 10, 19, 12, 19, 30, 482794, tzinfo=Timezone('UTC')), jobs={'new_jobs': [], 'completed_jobs': [LoadJ

## create dim_date

In [None]:
import polars as pl
from datetime import datetime
import duckdb as dd

In [None]:
## 1.Setting start and end dates
start_date = pl.datetime(2006, 1, 1)
end_date = pl.datetime(2050, 1, 1)

## 2.Generate date range
date_range = pl.date_range(start=start_date, end=end_date, interval="1d", eager=True)

## 3.Convert to DataFrame and setting column name as 'date'
df_date = pl.DataFrame({
        "date": date_range
})

In [None]:
df_date.head()

date
datetime[μs]
2006-01-01 00:00:00
2006-01-02 00:00:00
2006-01-03 00:00:00
2006-01-04 00:00:00
2006-01-05 00:00:00


In [None]:
dim_date = (df_date
           .select(
        pl.col('date').dt.strftime("%m-%d-%Y").alias('date_id'),
        pl.col('date').dt.date().alias('order_date'),
        pl.col("date").dt.year().alias("year"),
        pl.col("date").dt.week().alias("year_week"),
        pl.col("date").dt.ordinal_day().alias("year_day"),
        pl.col("date").dt.year().alias("fiscal_year"),
        pl.col("date").dt.quarter().alias("fiscal_qtr"),
        pl.col("date").dt.strftime("%B").alias("month_name"),
        pl.col("date").dt.strftime("%W").alias("week_day"),
        pl.col("date").dt.strftime("%A").alias("day_name"),
    ).with_columns(pl.col('day_name').is_in(['Saturday','Sunday']).alias("WeekendFlag"))
    .to_pandas()
           )

In [None]:
pipeline.run(dim_date, table_name='dim_date', write_disposition='replace')

LoadInfo(pipeline=<dlt.pipeline.pipeline.Pipeline object at 0x0000015202725810>, metrics={'1729342080.1794314': [{'started_at': DateTime(2024, 10, 19, 12, 48, 0, 913224, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 10, 19, 12, 48, 1, 427233, tzinfo=Timezone('UTC'))}]}, destination_type='dlt.destinations.duckdb', destination_displayable_credentials='duckdb:///d:\\DW_Pipline-Main\\load_northwind.duckdb', destination_name='duckdb', environment=None, staging_type=None, staging_name=None, staging_displayable_credentials=None, destination_fingerprint='', dataset_name='stg_northwind', loads_ids=['1729342080.1794314'], load_packages=[LoadPackageInfo(load_id='1729342080.1794314', package_path='C:\\Users\\Admin\\.dlt\\pipelines\\load_northwind\\load\\loaded\\1729342080.1794314', state='loaded', schema=Schema load_northwind at 1452204915984, schema_update={}, completed_at=DateTime(2024, 10, 19, 12, 48, 1, 383392, tzinfo=Timezone('UTC')), jobs={'new_jobs': [], 'completed_jobs': [LoadJobI

## Order dimension table.

In [None]:
df_order = pl.read_csv('./data/northwind/orders.csv')
df_order_detail = pl.read_csv('./data/northwind/order_details.csv')

In [None]:
df_orders = (df_order
            .pipe(add_timestamp,'ingestion_timestamp')
            .pipe(rename_col,{'id':'order_id'})
            .pipe(sort,'order_id')
)

In [None]:
df_orders_details = (df_order_detail
            .pipe(add_timestamp,'ingestion_timestamp')
            .pipe(rename_col,{'id':'orderdetail_id'})
)

In [None]:
pipeline.run(df_orders.to_pandas(), table_name='df_orders', write_disposition='replace')

LoadInfo(pipeline=<dlt.pipeline.pipeline.Pipeline object at 0x0000015202725810>, metrics={'1729340824.829875': [{'started_at': DateTime(2024, 10, 19, 12, 27, 5, 477330, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 10, 19, 12, 27, 5, 868583, tzinfo=Timezone('UTC'))}]}, destination_type='dlt.destinations.duckdb', destination_displayable_credentials='duckdb:///d:\\DW_Pipline-Main\\load_northwind.duckdb', destination_name='duckdb', environment=None, staging_type=None, staging_name=None, staging_displayable_credentials=None, destination_fingerprint='', dataset_name='stg_northwind', loads_ids=['1729340824.829875'], load_packages=[LoadPackageInfo(load_id='1729340824.829875', package_path='C:\\Users\\Admin\\.dlt\\pipelines\\load_northwind\\load\\loaded\\1729340824.829875', state='loaded', schema=Schema load_northwind at 1452088381264, schema_update={'df_orders': {'name': 'df_orders', 'columns': {'order_id': {'name': 'order_id', 'nullable': True, 'data_type': 'bigint'}, 'employee_id':

In [None]:
pipeline.run(df_orders_details.to_pandas(), table_name='df_orders_details', write_disposition='replace')

LoadInfo(pipeline=<dlt.pipeline.pipeline.Pipeline object at 0x0000015202725810>, metrics={'1729340827.9249744': [{'started_at': DateTime(2024, 10, 19, 12, 27, 8, 560325, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 10, 19, 12, 27, 8, 953431, tzinfo=Timezone('UTC'))}]}, destination_type='dlt.destinations.duckdb', destination_displayable_credentials='duckdb:///d:\\DW_Pipline-Main\\load_northwind.duckdb', destination_name='duckdb', environment=None, staging_type=None, staging_name=None, staging_displayable_credentials=None, destination_fingerprint='', dataset_name='stg_northwind', loads_ids=['1729340827.9249744'], load_packages=[LoadPackageInfo(load_id='1729340827.9249744', package_path='C:\\Users\\Admin\\.dlt\\pipelines\\load_northwind\\load\\loaded\\1729340827.9249744', state='loaded', schema=Schema load_northwind at 1452071177168, schema_update={'df_orders_details': {'name': 'df_orders_details', 'columns': {'orderdetail_id': {'name': 'orderdetail_id', 'nullable': True, 'data_

- Read `stg_orders` tabel with `northwind_stg` schema from `load_northwind.duckdb` and naming `df_order`.
- Read `stg_orders_details` tabel with `northwind_stg` schema from `load_northwind.duckdb` and naming `df_order_detail`.
- From `df_order` table, select columns: ['order_id','customer_id','employee_id','shipper_id',
'order_date','shipped_date','paid_date']
- Change dtype of `order_date` string type to `Date` dtype.
- From `df_order_detail` table, select columns: ['order_id','product_id',
'status_id','purchase_order_id','inventory_id','quantity','unit_price','discount']

In [None]:
from lib_pipefn import pipe_nw,manage_duckdb

In [None]:
stg_orders = manage_duckdb.read_duckdb_repo('load_northwind.duckdb','stg_northwind.df_orders')
stg_orders_details = manage_duckdb.read_duckdb_repo('load_northwind.duckdb','stg_northwind.df_orders_details')

In [None]:
stg_order = (stg_orders
               .select(pl.col(['order_id', 'customer_id', 'employee_id', 'shipper_id',
                'order_date', 'shipped_date', 'paid_date']))
               .pipe(change_dtypedatetime,'order_date')
               )
stg_order

order_id,customer_id,employee_id,shipper_id,order_date,shipped_date,paid_date
i64,i64,i64,f64,date,str,str
30,27,9,2.0,2006-01-15,"""1/22/2006 0:00:00""","""1/15/2006 0:00:00"""
31,4,3,1.0,2006-01-20,"""1/22/2006 0:00:00""","""1/20/2006 0:00:00"""
32,12,4,2.0,2006-01-22,"""1/22/2006 0:00:00""","""1/22/2006 0:00:00"""
33,8,6,3.0,2006-01-30,"""1/31/2006 0:00:00""","""1/30/2006 0:00:00"""
34,4,9,3.0,2006-02-06,"""2/7/2006 0:00:00""","""2/6/2006 0:00:00"""
…,…,…,…,…,…,…
77,26,9,3.0,2006-06-05,"""6/5/2006 0:00:00""","""6/5/2006 0:00:00"""
78,29,1,2.0,2006-06-05,"""6/5/2006 0:00:00""","""6/5/2006 0:00:00"""
79,6,2,3.0,2006-06-23,"""6/23/2006 0:00:00""","""6/23/2006 0:00:00"""
80,4,2,,2006-04-25,,


In [None]:
stg_orders_details_s = (stg_orders_details
                    .select(pl.col(['order_id', 'product_id', 'status_id',
                        'purchase_order_id', 'inventory_id', 'quantity', 'unit_price', 'discount']))
               )

In [None]:
pipeline.run(stg_orders.to_pandas(), table_name='stg_orders', write_disposition='replace')
pipeline.run(stg_orders_details.to_pandas(), table_name='stg_orders_details', write_disposition='replace')

LoadInfo(pipeline=<dlt.pipeline.pipeline.Pipeline object at 0x0000015202725810>, metrics={'1729340874.8795478': [{'started_at': DateTime(2024, 10, 19, 12, 27, 55, 526467, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 10, 19, 12, 27, 55, 837336, tzinfo=Timezone('UTC'))}]}, destination_type='dlt.destinations.duckdb', destination_displayable_credentials='duckdb:///d:\\DW_Pipline-Main\\load_northwind.duckdb', destination_name='duckdb', environment=None, staging_type=None, staging_name=None, staging_displayable_credentials=None, destination_fingerprint='', dataset_name='stg_northwind', loads_ids=['1729340874.8795478'], load_packages=[LoadPackageInfo(load_id='1729340874.8795478', package_path='C:\\Users\\Admin\\.dlt\\pipelines\\load_northwind\\load\\loaded\\1729340874.8795478', state='loaded', schema=Schema load_northwind at 1452070827280, schema_update={}, completed_at=DateTime(2024, 10, 19, 12, 27, 55, 800297, tzinfo=Timezone('UTC')), jobs={'new_jobs': [], 'completed_jobs': [LoadJ

## Create sales fact table and name it as 'fact_sales'

In [None]:
stg_orders = manage_duckdb.read_duckdb_repo('load_northwind.duckdb','stg_northwind.stg_orders')
stg_orders_details = manage_duckdb.read_duckdb_repo('load_northwind.duckdb','stg_northwind.stg_orders_details')

In [None]:
fact_sales = (stg_orders
              .join(stg_orders_details, on='order_id', how='left',coalesce=True)
              .pipe(add_timestamp, 'insertion_timestamp')
              )
fact_sales.head()

order_id,employee_id,customer_id,order_date,shipped_date,shipper_id,ship_name,ship_address,ship_city,ship_state_province,ship_zip_postal_code,ship_country_region,shipping_fee,taxes,payment_type,paid_date,tax_rate,status_id,orderdetail_id,product_id,quantity,unit_price,discount,purchase_order_id,inventory_id,orderdetail_id_right,product_id_right,quantity_right,unit_price_right,discount_right,status_id_right,purchase_order_id_right,inventory_id_right
i64,i64,i64,str,str,f64,str,str,str,str,i64,str,i64,i64,str,str,i64,i64,i64,i64,i64,f64,i64,f64,f64,i64,i64,i64,f64,i64,i64,f64,f64
30,9,27,"""1/15/2006 0:00:00""","""1/22/2006 0:00:00""",2.0,"""Karen Toh""","""789 27th Street""","""Las Vegas""","""NV""",99999,"""USA""",200,0,"""Check""","""1/15/2006 0:00:00""",0,3,,,,,,,,28,80,30,3.5,0,2,,63.0
30,9,27,"""1/15/2006 0:00:00""","""1/22/2006 0:00:00""",2.0,"""Karen Toh""","""789 27th Street""","""Las Vegas""","""NV""",99999,"""USA""",200,0,"""Check""","""1/15/2006 0:00:00""",0,3,,,,,,,,27,34,100,14.0,0,2,96.0,83.0
31,3,4,"""1/20/2006 0:00:00""","""1/22/2006 0:00:00""",1.0,"""Christina Lee""","""123 4th Street""","""New York""","""NY""",99999,"""USA""",5,0,"""Credit Card""","""1/20/2006 0:00:00""",0,3,,,,,,,,31,80,10,3.5,0,2,,66.0
31,3,4,"""1/20/2006 0:00:00""","""1/22/2006 0:00:00""",1.0,"""Christina Lee""","""123 4th Street""","""New York""","""NY""",99999,"""USA""",5,0,"""Credit Card""","""1/20/2006 0:00:00""",0,3,,,,,,,,29,7,10,30.0,0,2,,64.0
31,3,4,"""1/20/2006 0:00:00""","""1/22/2006 0:00:00""",1.0,"""Christina Lee""","""123 4th Street""","""New York""","""NY""",99999,"""USA""",5,0,"""Credit Card""","""1/20/2006 0:00:00""",0,3,,,,,,,,30,51,10,53.0,0,2,,65.0


In [None]:
pipeline.run(fact_sales.to_pandas(), table_name='face_sales', write_disposition='replace')

LoadInfo(pipeline=<dlt.pipeline.pipeline.Pipeline object at 0x0000015202725810>, metrics={'1729343212.7022474': [{'started_at': DateTime(2024, 10, 19, 13, 6, 53, 466841, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 10, 19, 13, 6, 53, 971173, tzinfo=Timezone('UTC'))}]}, destination_type='dlt.destinations.duckdb', destination_displayable_credentials='duckdb:///d:\\DW_Pipline-Main\\load_northwind.duckdb', destination_name='duckdb', environment=None, staging_type=None, staging_name=None, staging_displayable_credentials=None, destination_fingerprint='', dataset_name='stg_northwind', loads_ids=['1729343212.7022474'], load_packages=[LoadPackageInfo(load_id='1729343212.7022474', package_path='C:\\Users\\Admin\\.dlt\\pipelines\\load_northwind\\load\\loaded\\1729343212.7022474', state='loaded', schema=Schema load_northwind at 1452224557072, schema_update={}, completed_at=DateTime(2024, 10, 19, 13, 6, 53, 927435, tzinfo=Timezone('UTC')), jobs={'new_jobs': [], 'completed_jobs': [LoadJobI

# Data cude

In [None]:
dim_date_polars = pl.from_pandas(dim_date)

In [None]:
print(fact_sales.dtypes)
print(stg_customer.dtypes)
print(dim_date_polars.dtypes)


[Int64, Int64, Int64, String, String, Float64, String, String, String, String, Int64, String, Int64, Int64, String, String, Int64, Int64, Int64, Int64, Int64, Float64, Int64, Float64, Float64, Int64, Int64, Int64, Float64, Int64, Int64, Float64, Float64]
[Int64, String, String, String, String, String, String, String, String, String, String, String, String, Int64, String, String, String, String]
[String, Datetime(time_unit='ms', time_zone=None), Int32, Int8, Int16, Int32, Int8, String, String, String, Boolean]


In [None]:
fact_sales = fact_sales.with_columns(
    pl.col("order_date").str.strptime(pl.Datetime, "%m/%d/%Y %H:%M:%S")
)

In [None]:
fact_sales = fact_sales.with_columns(
    pl.col("order_date").cast(pl.Datetime(time_unit='ms'))
)

In [None]:
datacube1 = (fact_sales
            .select(pl.col('order_id','customer_id','order_date','unit_price','quantity'))
            .join(stg_customer,on='customer_id',how='left',coalesce=True)
            .join(dim_date_polars,on='order_date',how = 'left',coalesce=True)
)

In [None]:
rollup_quantity = datacube1.groupby(['city','fiscal_year','month_name']).agg(pl.col('quantity').sum().alias('total_order_sales'))
print(rollup_quantity)

shape: (39, 4)
┌────────────────┬─────────────┬────────────┬───────────────────┐
│ city           ┆ fiscal_year ┆ month_name ┆ total_order_sales │
│ ---            ┆ ---         ┆ ---        ┆ ---               │
│ str            ┆ i32         ┆ str        ┆ i64               │
╞════════════════╪═════════════╪════════════╪═══════════════════╡
│ Memphis        ┆ 2006        ┆ June       ┆ 0                 │
│ New York       ┆ null        ┆ null       ┆ 0                 │
│ Los Angelas    ┆ 2006        ┆ February   ┆ 0                 │
│ Boise          ┆ 2006        ┆ March      ┆ 0                 │
│ Denver         ┆ 2006        ┆ June       ┆ 0                 │
│ …              ┆ …           ┆ …          ┆ …                 │
│ New York       ┆ 2006        ┆ February   ┆ 0                 │
│ Portland       ┆ 2006        ┆ June       ┆ 0                 │
│ New York       ┆ 2006        ┆ April      ┆ 0                 │
│ Salt Lake City ┆ 2006        ┆ June       ┆ 0              

  rollup_quantity = datacube1.groupby(['city','fiscal_year','month_name']).agg(pl.col('quantity').sum().alias('total_order_sales'))


# streamlit

Example code


```
import streamlit as st
import pandas as pd
import plotly.express as px

# Sample sales data with two measures (sales and quantity)
data = pd.DataFrame({
'region': ['North America', 'North America', 'North America', 'South America', 'South America', 'Europe', 'Europe', 'Asia', 'Asia'],
'country': ['USA', 'Canada', 'Mexico', 'Brazil', 'Argentina', 'Germany', 'France', 'China', 'India'],
'product': ['A', 'A', 'B', 'B', 'C', 'C', 'D', 'D', 'E'],
'sales': [100, 150, 200, 250, 300, 350, 400, 450, 500],
'quantity': [10, 15, 20, 25, 30, 35, 40, 45, 50]
})

# Title of the dashboard
st.title('Sales Data Cube Viewer Dashboard')

st.sidebar.header('Filter Options')
region_filter = st.sidebar.multiselect('Select Region(s):', options=data['region'].unique(), default=data['region'].unique())
country_filter = st.sidebar.multiselect('Select Country(s):', options=data['country'].unique(), default=data['country'].unique())
product_filter = st.sidebar.multiselect('Select Product(s):', options=data['product'].unique(), default=data['product'].unique())

# Applying filters to the data
filtered_data = data[
(data['region'].isin(region_filter)) &
(data['country'].isin(country_filter)) &
(data['product'].isin(product_filter))
]

st.header('Sales Data Visualization')

# Aggregation
agg_data = filtered_data.groupby(['region', 'country', 'product']).sum().reset_index()

# Sunburst Chart for Sales
st.subheader('Sales by Region, Country, and Product (Sunburst Chart)')
sunburst_sales = px.sunburst(
agg_data,
path=['region', 'country', 'product'],
values='sales',
title='Sales by Region, Country, and Product'
)
st.plotly_chart(sunburst_sales)

# Treemap Chart for Quantity
st.subheader('Quantity by Region, Country, and Product (Treemap Chart)')
treemap_quantity = px.treemap(
agg_data,
path=['region', 'country', 'product'],
values='quantity',
title='Quantity by Region, Country, and Product'
)
st.plotly_chart(treemap_quantity)

# Bar Chart for both Sales and Quantity
st.subheader('Sales and Quantity by Country and Product')
bar_sales_quantity = px.bar(
agg_data,
x='country',
y=['sales', 'quantity'],
color='product',
title='Sales and Quantity by Country and Product',
barmode='group'
)
st.plotly_chart(bar_sales_quantity)

# Display filtered aggregated data table
st.header('Aggregated Filtered Data')
st.write(agg_data)

if st.sidebar.button('Show Info'):
st.sidebar.write('This is a demo of a hierarchical data visualization using Streamlit and Plotly, showing Sales and Quantity by Region and Product.')
```

In [None]:
import streamlit as st
import pandas as pd
import numpy as np

st.title('My first web application Who :cat: ')
st.write('please selects the checkbox list:')
a1 = st.checkbox(label ='A')
a2 = st.checkbox(label ='B')
if a1:
    st.write('my selrction is A')
if a2:
    st.write('my selection is B')



# Lenrn more Streamlit and apply to my project from
https://docs.streamlit.io/

# Link to my GitHub
https://github.com/BxMild/Data-warehouse-2024