# Theoretical

**ETL (Extract, Transform, Load)** is a process used in data warehousing that involves:
- **Extracting** data from various sources
- **Transforming** data to fit operational needs, which can include cleansing, organising and consolidating the data.
- **Loading** data into the final target database
- ![image.png](attachment:d6130475-2101-4461-84aa-d89ecdb5ca02.png)

**Data Quality** ensure data is fit for decision-making purposes.
- Poor quality => incorrect conclusions and poor business decisions.
- ![image.png](attachment:08a530af-fd3a-476f-a05e-4ab5dfdae30f.png)

**ETL lifecycle**
- set up data sources
- establish data extraction rules
- planning for data transformation (i.e cleansing & confirming)
- loading data into data warehousing structures like fact and dimension tables.
![image.png](attachment:aeb63924-9baa-4515-81b2-5cd401300e75.png)

**Logical data map** outlines flow from source to destination and helps understand planning in ETL process
![image.png](attachment:5b224c7e-a0ca-4bc7-95d2-8c53c9c23198.png)
![image.png](attachment:bcc47891-b27f-4844-81f7-1521959d0d12.png)

**Data staging area** temporary holding pens for data before its precessed and loaded into warehouse
- crucial to perform transformation efficiently and effectively w/o taxing operational systems.

OLTP what is it? OLAP what is it? OLP what is it?????

# Practical
1. Select Tables:
   - Choose 2 fact tables and 2 dimension tables from AdventureworksDW database
2. Analyze Relationships:
   - Determine relationships between fact and dimension tables. Identify which columns link the tables together (foreign keys in fact tables that reference primary keys in
     the dimension tables)
3. Plan Data Transformation:
   - Decide how to transform the data. Include:
         - Aggregate data from fact table
         - Cleansing data in dimension tables
         - Convert data types
         - Creating calculated columns
4. Develop Mappings:
   - Map the columns from Adventureworks database (database A) to AdventureworksDW target structure
     (database B). Consider which columns are necessary for analysis and how they correspond between 2
     databases.
5. Write Transformation Code:
   - ETL Process:
         - Extract data from source table
         - Transform data according to the mappings and transformation plan
         - Load data into destination table

Your task: Select any TWO fact tables and TWO dimension tables from the AdventureworksDW database, then think about which tables/columns you should select and how should you transform these tables/columns from Adventureworks to your selected tables in AdventureworksDW. 

This will demonstrate the core concepts of the ETL process. 

You have data in database A in format A.

You want to load them into database B in format B

The missing chunk above is the ETL process

You will need to find out the mappings between data (tables/columns) from database A and data(tables/columns) from database B. This is your practice task.

Then translate the mappings into Python/SQL code or Excel operations: read data out from database A => do transformation => put data back to database B

The most important part is to understand how the mappings work.

# FactAdditionalInternationalProductDescription

![image.png](attachment:989fdb96-5fb4-402f-a316-eb5b13596639.png)

![image.png](attachment:645c2575-4159-48c9-ab26-ce201489f508.png)

## Mapping 
- **Product.ProductID** directly maps to **FactAdditionalInternationalProductDescription.ProductKey**
- **ProductModelProductDescriptionCulture** (junction table; associates products with descriptions + cultures. Connection:
    - **ProductModelID** connects to **Product.ProductModelID**
    - **ProductDescriptionID** connects to **ProductDescription.ProductDescriptionID**
    - **CultureID** connects to **FactAdditionalInternationalProductDescription.CultureName**

In [1]:
import pandas as pd
import sqlalchemy

engine = sqlalchemy.create_engine('postgresql://postgres:postgres@pgdb:5432/Adventureworks')

# Chosen Fact Table: FactAdditionalInternationalProductDescription

# Extract from 3 tables: 'Product', 'ProductDescription', 'ProductModelProductDescriptionCulture'
df_product = pd.read_sql("SELECT * FROM production.product", engine)
df_product_description = pd.read_sql("SELECT * FROM production.productdescription", engine)
df_product_model_product_description_culture = pd.read_sql("SELECT * FROM production.productmodelproductdescriptionculture", engine)


In [6]:
# Merged those 3 table
df_joined = df_product.merge(df_product_model_product_description_culture, on='productmodelid')\
                      .merge(df_product_description, on='productdescriptionid')

# No trailing spaces/Cleaning
df_joined['cultureid'] = df_joined['cultureid'].str.strip()

# Mapping culture IDs to names => culturename column
culture_mapping = {
    'zh-cht': 'Traditional Chinese',
    'fr': 'French',
    'ar': 'Arabic',
    'he': 'Hebrew',
    'th': 'Thai',
    'en': 'English'
}
df_joined['culturename'] = df_joined['cultureid'].map(culture_mapping)

# Check any NaN values in the culturename column after mapping
if df_joined['culturename'].isnull().any():
    raise ValueError("There's NaN values in the 'culturename' column after mapping. Check culture_mapping.")

# Select + Rename to match schema of destination table
df_transformed = df_joined[['productid', 'culturename', 'description']]
df_transformed.columns = ['productkey', 'culturename', 'productdescription']

In [None]:
# Running THIS CELL AGAIN would cause DUPLICATION!!!!!
engine_dw = sqlalchemy.create_engine('postgresql://postgres:postgres@pgdb:5432/AdventureworksDW')

# Loading data into Fact Table
df_transformed.to_sql('factadditionalinternationalproductdescription', engine_dw, if_exists='append', index=False)

In [7]:
# Checking: Find rows with null culturename after mapping
null_culturename_rows = df_joined[df_joined['culturename'].isnull()]
print(null_culturename_rows)


Empty DataFrame
Columns: [productid, name, productnumber, makeflag, finishedgoodsflag, color, safetystocklevel, reorderpoint, standardcost, listprice, size, sizeunitmeasurecode, weightunitmeasurecode, weight, daystomanufacture, productline, class, style, productsubcategoryid, productmodelid, sellstartdate, sellenddate, discontinueddate, rowguid_x, modifieddate_x, productdescriptionid, cultureid, modifieddate_y, description, rowguid_y, modifieddate, culturename]
Index: []

[0 rows x 32 columns]


In [8]:
# Checking: see if it has been implemented properly
import pandas as pd
import sqlalchemy
engine_dw = sqlalchemy.create_engine('postgresql://postgres:postgres@pgdb:5432/AdventureworksDW')

query = """
SELECT *
FROM factadditionalinternationalproductdescription
WHERE culturename = 'French'
"""

# Use pandas to execute the query and load the results into a DataFrame
df_query_results = pd.read_sql_query(query, engine_dw)

# Display the first few rows of the DataFrame to verify the data
print(df_query_results.head())

   productkey culturename                                 productdescription
0         680      French  Notre cadre en aluminium plus léger et de qual...
1         712      French  Style classique avec une visière relevable ; t...
2         706      French  Notre cadre en aluminium plus léger et de qual...
3         707      French  Légère, aérée, taille unique, avec une visière...
4         708      French  Légère, aérée, taille unique, avec une visière...


# FactInternetSalesReason
![image.png](attachment:907ac40a-415a-40dd-9684-96f586fa243c.png)

In [52]:
import pandas as pd
import sqlalchemy

engine_src = sqlalchemy.create_engine('postgresql://postgres:postgres@pgdb:5432/Adventureworks')
engine_dw = sqlalchemy.create_engine('postgresql://postgres:postgres@pgdb:5432/AdventureworksDW')

# Extract data from source tables
df_salesorderheader = pd.read_sql("SELECT * FROM sales.salesorderheader", engine_src)
df_salesorderdetail = pd.read_sql("SELECT * FROM sales.salesorderdetail", engine_src)
df_salesorderdetail['salesorderlinenumber'] = df_salesorderdetail.sort_values(by=['salesorderdetailid']).groupby('salesorderid').cumcount() + 1
df_salesreason = pd.read_sql("SELECT * FROM sales.salesreason", engine_src)
df_salesorderheadersalesreason = pd.read_sql("SELECT * FROM sales.salesorderheadersalesreason", engine_src)

# Transform
df_reason_details = df_salesorderheadersalesreason.merge(df_salesreason, left_on='salesreasonid', right_on='salesreasonid')
df_order_reasons = df_reason_details.merge(df_salesorderheader, left_on='salesorderid', right_on='salesorderid')
df_final = df_order_reasons.merge(df_salesorderdetail, left_on=['salesorderid', 'salesorderid'], right_on=['salesorderid', 'salesorderid'], suffixes=('_header', '_detail'))

# Update salesordernumber to match destination format by adding 'SO' prefix
df_final['salesordernumber'] = 'SO' + df_final['salesorderid'].astype(str)

# Select + Rename columns match destination table
df_fact_internet_sales_reason = df_final[['salesordernumber', 'salesorderlinenumber', 'salesreasonid']]
df_fact_internet_sales_reason.columns = ['salesordernumber', 'salesorderlinenumber', 'salesreasonkey']

# Check for existing rows in the destination fact table to avoid duplicates
existing_rows = pd.read_sql("SELECT salesordernumber, salesorderlinenumber, salesreasonkey FROM factinternetsalesreason", engine_dw)

# Perform an anti-join to find the rows in df_fact_internet_sales_reason that are not in existing_rows
df_new_rows = pd.merge(
    df_fact_internet_sales_reason, 
    existing_rows, 
    on=['salesordernumber', 'salesorderlinenumber', 'salesreasonkey'], 
    how='left', 
    indicator=True
).query('_merge == "left_only"').drop(columns='_merge')

# Load only new data into the FactInternetSalesReason table in the destination database
df_new_rows.to_sql('factinternetsalesreason', engine_dw, if_exists='append', index=False)

0

In [56]:
import pandas as pd
import sqlalchemy

# Define the high-value threshold
high_value_threshold = 8000

# Establish connections to the source and destination databases
engine_src = sqlalchemy.create_engine('postgresql://postgres:postgres@pgdb:5432/Adventureworks')
engine_dw = sqlalchemy.create_engine('postgresql://postgres:postgres@pgdb:5432/AdventureworksDW')

# Extract data from source tables
df_salesorderheader = pd.read_sql("SELECT * FROM sales.salesorderheader", engine_src)
df_salesorderdetail = pd.read_sql("SELECT * FROM sales.salesorderdetail", engine_src)
df_salesreason = pd.read_sql("SELECT * FROM sales.salesreason", engine_src)
df_salesorderheadersalesreason = pd.read_sql("SELECT * FROM sales.salesorderheadersalesreason", engine_src)

# Transform
# Calculate new fields in salesorderheader
df_salesorderheader['order_processing_time'] = df_salesorderheader['shipdate'] - df_salesorderheader['orderdate']
df_salesorderheader['high_value_order'] = (df_salesorderheader['totaldue'] > high_value_threshold).astype(int)

# Calculate average unit price discount in salesorderdetail
df_avg_discount = df_salesorderdetail.groupby('salesorderid')['unitpricediscount'].mean().reset_index()
df_avg_discount.columns = ['salesorderid', 'average_discount']

# Merge df_salesorderheader and df_avg_discount into df_salesorderdetail
df_salesorderdetail = df_salesorderdetail.merge(df_salesorderheader[['salesorderid', 'order_processing_time', 'high_value_order']], on='salesorderid', how='left')
df_salesorderdetail = df_salesorderdetail.merge(df_avg_discount, on='salesorderid', how='left')

# Update salesordernumber to match destination format by adding 'SO' prefix
df_salesorderdetail['salesordernumber'] = 'SO' + df_salesorderdetail['salesorderid'].astype(str)

# Select and rename columns to match the schema of the destination table
df_fact_internet_sales_reason = df_salesorderdetail[['salesordernumber', 'salesorderdetailid', 'average_discount', 'order_processing_time', 'high_value_order']]
df_fact_internet_sales_reason.columns = ['salesordernumber', 'salesorderlinenumber', 'average_discount', 'order_processing_duration', 'is_high_value_order']

# Get a list of existing records in the destination fact table
existing_records_query = """
SELECT salesordernumber, salesorderlinenumber, salesreasonkey
FROM factinternetsalesreason
"""
existing_records = pd.read_sql(existing_records_query, engine_dw)

# Perform an anti-join to find the records in df_fact_internet_sales_reason that are not in existing_records
new_records = pd.merge(
    df_fact_internet_sales_reason, 
    existing_records, 
    on=['salesordernumber', 'salesorderlinenumber'], 
    how='left', 
    indicator=True
).query('_merge == "left_only"').drop(columns='_merge')

# Load the new data into the FactInternetSalesReason table in the destination database
new_records.to_sql('factinternetsalesreason', engine_dw, if_exists='append', index=False)


  new_records.to_sql('factinternetsalesreason', engine_dw, if_exists='append', index=False)


ProgrammingError: (psycopg2.errors.UndefinedColumn) column "average_discount" of relation "factinternetsalesreason" does not exist
LINE 1: ...esreason (salesordernumber, salesorderlinenumber, average_di...
                                                             ^

[SQL: INSERT INTO factinternetsalesreason (salesordernumber, salesorderlinenumber, average_discount, order_processing_duration, is_high_value_order, salesreasonkey) VALUES (%(salesordernumber__0)s, %(salesorderlinenumber__0)s, %(average_discount__0)s, %(or ... 177154 characters truncated ... t__999)s, %(order_processing_duration__999)s, %(is_high_value_order__999)s, %(salesreasonkey__999)s)]
[parameters: {'average_discount__0': 0.0, 'salesorderlinenumber__0': 1, 'salesreasonkey__0': None, 'is_high_value_order__0': 1, 'salesordernumber__0': 'SO43659', 'order_processing_duration__0': 604800000000000, 'average_discount__1': 0.0, 'salesorderlinenumber__1': 2, 'salesreasonkey__1': None, 'is_high_value_order__1': 1, 'salesordernumber__1': 'SO43659', 'order_processing_duration__1': 604800000000000, 'average_discount__2': 0.0, 'salesorderlinenumber__2': 3, 'salesreasonkey__2': None, 'is_high_value_order__2': 1, 'salesordernumber__2': 'SO43659', 'order_processing_duration__2': 604800000000000, 'average_discount__3': 0.0, 'salesorderlinenumber__3': 4, 'salesreasonkey__3': None, 'is_high_value_order__3': 1, 'salesordernumber__3': 'SO43659', 'order_processing_duration__3': 604800000000000, 'average_discount__4': 0.0, 'salesorderlinenumber__4': 5, 'salesreasonkey__4': None, 'is_high_value_order__4': 1, 'salesordernumber__4': 'SO43659', 'order_processing_duration__4': 604800000000000, 'average_discount__5': 0.0, 'salesorderlinenumber__5': 6, 'salesreasonkey__5': None, 'is_high_value_order__5': 1, 'salesordernumber__5': 'SO43659', 'order_processing_duration__5': 604800000000000, 'average_discount__6': 0.0, 'salesorderlinenumber__6': 7, 'salesreasonkey__6': None, 'is_high_value_order__6': 1, 'salesordernumber__6': 'SO43659', 'order_processing_duration__6': 604800000000000, 'average_discount__7': 0.0, 'salesorderlinenumber__7': 8, 'salesreasonkey__7': None, 'is_high_value_order__7': 1, 'salesordernumber__7': 'SO43659', 'order_processing_duration__7': 604800000000000, 'average_discount__8': 0.0, 'salesorderlinenumber__8': 9 ... 5900 parameters truncated ... 'salesordernumber__991': 'SO43892', 'order_processing_duration__991': 604800000000000, 'average_discount__992': 0.0, 'salesorderlinenumber__992': 993, 'salesreasonkey__992': None, 'is_high_value_order__992': 1, 'salesordernumber__992': 'SO43892', 'order_processing_duration__992': 604800000000000, 'average_discount__993': 0.0, 'salesorderlinenumber__993': 994, 'salesreasonkey__993': None, 'is_high_value_order__993': 1, 'salesordernumber__993': 'SO43892', 'order_processing_duration__993': 604800000000000, 'average_discount__994': 0.0, 'salesorderlinenumber__994': 995, 'salesreasonkey__994': None, 'is_high_value_order__994': 1, 'salesordernumber__994': 'SO43892', 'order_processing_duration__994': 604800000000000, 'average_discount__995': 0.0, 'salesorderlinenumber__995': 996, 'salesreasonkey__995': None, 'is_high_value_order__995': 1, 'salesordernumber__995': 'SO43892', 'order_processing_duration__995': 604800000000000, 'average_discount__996': 0.0, 'salesorderlinenumber__996': 997, 'salesreasonkey__996': None, 'is_high_value_order__996': 0, 'salesordernumber__996': 'SO43893', 'order_processing_duration__996': 604800000000000, 'average_discount__997': 0.0, 'salesorderlinenumber__997': 998, 'salesreasonkey__997': None, 'is_high_value_order__997': 1, 'salesordernumber__997': 'SO43894', 'order_processing_duration__997': 604800000000000, 'average_discount__998': 0.0, 'salesorderlinenumber__998': 999, 'salesreasonkey__998': None, 'is_high_value_order__998': 1, 'salesordernumber__998': 'SO43894', 'order_processing_duration__998': 604800000000000, 'average_discount__999': 0.0, 'salesorderlinenumber__999': 1000, 'salesreasonkey__999': None, 'is_high_value_order__999': 1, 'salesordernumber__999': 'SO43894', 'order_processing_duration__999': 604800000000000}]
(Background on this error at: https://sqlalche.me/e/20/f405)

In [55]:
import pandas as pd
import sqlalchemy

# Connection to source database (already defined)
engine_src = sqlalchemy.create_engine('postgresql://postgres:postgres@pgdb:5432/Adventureworks')

# Connection to destination database
engine_dw = sqlalchemy.create_engine('postgresql://postgres:postgres@pgdb:5432/AdventureworksDW')

print(df_fact_internet_sales_reason.head())
print("########")
print(df_salesorderdetail.head())
print("########")
print(existing_rows.head())
print("########")
print(df_final.head())

query = """
SELECT *
FROM factinternetsales
WHERE salesordernumber = 'SO43659';
"""

# Execute the query
df_check = pd.read_sql(query, engine_dw)

# Display the results
print(df_check)


  salesordernumber  salesorderlinenumber  salesreasonkey
0          SO43697                     1               5
1          SO43697                     1               9
2          SO43702                     1               5
3          SO43702                     1               9
4          SO43703                     1               5
########
   salesorderid  salesorderdetailid carriertrackingnumber  orderqty  \
0         43659                   1          4911-403C-98         1   
1         43659                   2          4911-403C-98         3   
2         43659                   3          4911-403C-98         1   
3         43659                   4          4911-403C-98         1   
4         43659                   5          4911-403C-98         1   

   productid  specialofferid  unitprice  unitpricediscount  \
0        776               1   2024.994                0.0   
1        777               1   2024.994                0.0   
2        778               1   2024.