# Interview Case Study - Requirements

### 01.Data Loading: Load the provided three files using either SQL or PySpark. Name each table with a raw_ prefix to differentiate between original and transformed data.

### 02.Data Review and Storage: Review the loaded data and assign appropriate data types based on your best judgment. Identify primary and foreign keys for each table. Store the transformed data using a store_ prefix.

### 03.Product Master Transformations: Perform the following transformations on the product master data and write the results into a table named publish_product: Replace NULL values in the Color field with N/A.

##### Enhance the ProductCategoryName field when it is NULL using the following logic:

##### If ProductSubCategoryName is in (‘Gloves’, ‘Shorts’, ‘Socks’, ‘Tights’, ‘Vests’), set ProductCategoryName to ‘Clothing’.

##### If ProductSubCategoryName is in (‘Locks’, ‘Lights’, ‘Headsets’, ‘Helmets’, ‘Pedals’, ‘Pumps’), set ProductCategoryName to ‘Accessories’.

##### If ProductSubCategoryName contains the word ‘Frames’ or is in (‘Wheels’, ‘Saddles’), set ProductCategoryName to ‘Components’.

### 04.Sales Order Transformations: 
##### Join SalesOrderDetail with SalesOrderHeader on SalesOrderId and apply the following transformations: Calculate LeadTimeInBusinessDays as the difference between OrderDate and ShipDate, excluding Saturdays and Sundays 
##### Calculate TotalLineExtendedPrice using the formula: OrderQty * (UnitPrice - UnitPriceDiscount).
##### Write the results into a table named publish_orders, including: All fields from SalesOrderDetail. All fields from SalesOrderHeader except SalesOrderId, and rename Freight to TotalOrderFreight.

### 05.Analysis Questions: Provide answers to the following questions based on the transformed data:
##### Which color generated the highest revenue each year? 
##### What is the average LeadTimeInBusinessDays by ProductCategoryName?

# -> 01.Data Loading:
##### 01.Load the provided three files using either SQL or PySpark.
##### 02.Name each table with a raw_ prefix to differentiate between original and transformed data.

In [0]:
%sql
DROP TABLE prod_data.raw.raw_products;
DROP TABLE prod_data.raw.raw_sales_order_detail;
DROP TABLE prod_data.raw.raw_sales_order_header;
DROP TABLE prod_data.store.store_products;
DROP TABLE prod_data.store.store_sales_order_header;
DROP TABLE prod_data.store.store_sales_order_detail;
DROP TABLE prod_data.publish.publish_products;
DROP TABLE prod_data.publish.publish_orders;

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS prod_data;
CREATE SCHEMA IF NOT EXISTS prod_data.raw;
CREATE VOLUME IF NOT EXISTS prod_data.raw.data;

In [0]:
%python

prod_data_path = "/Volumes/prod_data/raw/data/"

def read_csv_as_string(path):
    return (
        spark.read.format("csv")                 
            .option("header", True)                
            .option("inferSchema", False)           # Lets get everything as STRING so I can check each column later.
            .option("encoding", "utf-8")            # Ensures correct reading of international characters
            .load(path)                             # Load the file
    )

products_df = read_csv_as_string(prod_data_path + "products.csv")
sales_order_header_df = read_csv_as_string(prod_data_path + "sales_order_header.csv")
sales_order_detail_df = read_csv_as_string(prod_data_path + "sales_order_detail.csv")

In [0]:
display(products_df.limit(100))
display(sales_order_header_df.limit(100))
display(sales_order_detail_df.limit(100))

ProductID,ProductDesc,ProductNumber,MakeFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Size,SizeUnitMeasureCode,Weight,WeightUnitMeasureCode,ProductCategoryName,ProductSubCategoryName
680,"HL Road Frame - Black, 58",FR-R92B-58,True,Black,500,375,1059.31,1431.5,58,CM,2.24,LB,,Road Frames
706,"HL Road Frame - Red, 58",FR-R92R-58,True,Red,500,375,1059.31,1431.5,58,CM,2.24,LB,,Road Frames
707,"Sport-100 Helmet, Red",HL-U509-R,False,Red,4,3,13.0863,34.99,,,,,,Helmets
708,"Sport-100 Helmet, Black",HL-U509,False,Black,4,3,13.0863,34.99,,,,,,Helmets
709,"Mountain Bike Socks, M",SO-B909-M,False,White,4,3,3.3963,9.5,M,,,,,Socks
710,"Mountain Bike Socks, L",SO-B909-L,False,White,4,3,3.3963,9.5,L,,,,,Socks
711,"Sport-100 Helmet, Blue",HL-U509-B,False,Blue,4,3,13.0863,34.99,,,,,,Helmets
712,AWC Logo Cap,CA-1098,False,Multi,4,3,6.9223,8.99,,,,,,Caps
713,"Long-Sleeve Logo Jersey, S",LJ-0192-S,False,Multi,4,3,38.4923,49.99,S,,,,,Jerseys
714,"Long-Sleeve Logo Jersey, M",LJ-0192-M,False,Multi,4,3,38.4923,49.99,M,,,,,Jerseys


SalesOrderID,OrderDate,ShipDate,OnlineOrderFlag,AccountNumber,CustomerID,SalesPersonID,Freight
43828,2021-06,2021-07-05,True,10-4030-027605,27605,,89.4568
43829,2021-06,2021-07-05,True,10-4030-027611,27611,,89.4568
43830,2021-06,2021-07-05,True,10-4030-016347,16347,,89.4568
43831,2021-06,2021-07-05,True,10-4030-011028,11028,,84.3748
43832,2021-06,2021-07-05,True,10-4030-013584,13584,,89.4568
43659,2021-05-31,2021-06-07,False,10-4020-000676,29825,279.0,616.0984
43660,2021-05-31,2021-06-07,False,10-4020-000117,29672,279.0,38.8276
43661,2021-05-31,2021-06-07,False,10-4020-000442,29734,282.0,985.553
43662,2021-05-31,2021-06-07,False,10-4020-000227,29994,282.0,867.2389
43663,2021-05-31,2021-06-07,False,10-4020-000510,29565,276.0,12.5838


SalesOrderID,SalesOrderDetailID,OrderQty,ProductID,UnitPrice,UnitPriceDiscount
43659,1,1,776,2024.994,0.0
43659,2,3,777,2024.994,0.0
43659,3,1,778,2024.994,0.0
43659,4,1,771,2039.994,0.0
43659,5,1,772,2039.994,0.0
43659,6,2,773,2039.994,0.0
43659,7,1,774,2039.994,0.0
43659,8,3,714,28.8404,0.0
43659,9,1,716,28.8404,0.0
43659,10,6,709,5.7,0.0


In [0]:
%python
products_df.write.format("delta").mode("overwrite").saveAsTable("prod_data.raw.raw_products")
sales_order_header_df.write.format("delta").mode("overwrite").saveAsTable("prod_data.raw.raw_sales_order_header")
sales_order_detail_df.write.format("delta").mode("overwrite").saveAsTable("prod_data.raw.raw_sales_order_detail")

# -> 02.Data Review and Storage:
##### 01.Review the loaded data and assign appropriate data types based on your best judgment.
##### 02.Identify primary and foreign keys for each table.
##### 03.Store the transformed data using a store_ prefix.

## -> 02.1 Defining data types for each table.
### Defining DATA types for each one of the tables and separating it into groups for TEST CASES.

In [0]:
# Int Columns
int_cols = ["ProductID", "SafetyStockLevel", "ReorderPoint"] 

# Decimal Columns
decimal_cols = ["StandardCost", "ListPrice", "Weight"]  # Defined as DECIMAL since we want to keep the precision of those columns tracked down.

# BOOLEAN columns
bool_cols = ["MakeFlag"]

# String columns
string_cols = [
    "ProductDesc",
    "ProductNumber",
    "Color",
    "Size",
    "SizeUnitMeasureCode",
    "WeightUnitMeasureCode",
    "ProductCategoryName",
    "ProductSubCategoryName",
]

In [0]:
# Int Columns
int_cols_header = [
    "SalesOrderID",
    "CustomerID",
    "SalesPersonID"
]

# Decimal Columns
decimal_cols_header = [
    "Freight"
]

# Boolean Columns
bool_cols_header = [
    "OnlineOrderFlag"
]

# String Columns
string_cols_header = [
    "AccountNumber"
]

# Date Columns
date_cols_header = [
    "ShipDate"
]

# Timestamp Columns
timestamp_cols_header = [
    "OrderDate"
]

In [0]:
# Int Columns
int_cols_detail = [
    "SalesOrderDetailID",
    "SalesOrderID",
    "OrderQty",
    "ProductID"
]

# Decimal Columns 
decimal_cols_detail_normal = ["UnitPrice"]

# Decimal Columns for UnityPriceDiscount which has a different format since it's a fraction representation.
decimal_cols_detail_unipd = ["UnitPriceDiscount"]

## -> 02.2 Test Cases
### Creating test cases to verify integrity and issues in each segment of group from the tables.

In [0]:
from pyspark.sql import functions as F

'''
#1 Test case to validate if ProductID has duplicates
'''
def duplicate_productid_condition(df, col="ProductID"):
    dup_ids = (df.groupBy(col).count().filter("count > 1").select(col)) 

    # Join to mark the duplicate rows
    return F.col(col).isin([row[col] for row in dup_ids.collect()])

'''
#2 Test case to validate if there is any INT column with issue:
'''
def invalid_int_condition(col):
    return (F.col(col).isNotNull() &
            ~F.col(col).rlike(r"^[0-9]+$"))
    
#3 Test case to validate if there is any DECIMAL column with issue:
def invalid_decimal_condition(col):
    return (F.col(col).isNotNull() &
            ~F.col(col).rlike(r"^[0-9]+([.,][0-9]+)?$"))

# 3.1 Test case to validate the column UNITYPRICEDESCOUNT because it looks like a % at first but reviewing it I beliave it's the actually value of discount.
def invalid_decimal_condition_unipd(col):
    valid_decimal_pattern = r"^[0-9]*([.,][0-9]+)?$"
    return (
        F.col(col).isNotNull() &
        ~F.col(col).rlike(valid_decimal_pattern)
    )
    
#4 Test case to validate if there is any BOOLEAN column with issue:
def invalid_bool_condition(col):
    valid = ["true","false"]
    return (F.col(col).isNotNull() &
            ~F.lower(F.col(col)).isin(valid))
    
#5 Test case to validate if there is any STRING column with SPACES issues:
def needs_trim_condition(col):
    return (F.col(col).isNotNull() &
            (F.col(col) != F.trim(F.col(col))))
    
# 6 Test case to validate DATE columns looking if they are not null and if spark can parse them as DATE
def invalid_date_condition(col):
    parsed = F.to_date(F.col(col))
    return F.col(col).isNotNull() & parsed.isNull()


# 7 Test case to validate DATE columns looking if they are not null and if spark can parse them as TIMESTAMP
def invalid_timestamp_condition(col):
    col_ref = F.col(col)
    parsed_ts = F.to_timestamp(col_ref)
    incomplete_ts = col_ref.rlike(r"^\d{4}-\d{2}$")
    return col_ref.isNotNull() & (parsed_ts.isNull() | incomplete_ts)

In [0]:
from pyspark.sql import functions as f

'''
This function will summarize and display the amount of erros when veryfy a table in the RAW catalog.
'''
def total_errors_from_products(df):
    total_rows = df.count()

    def print_metric(label, count):
        pct = (count / total_rows) * 100 if total_rows else 0
        print(f"{label}: {count} rows ({pct:.2f}%)")

    # 1) DUPLICATES (PK)
    duplicate_keys = (
        df.groupBy("ProductID").count().filter("count > 1").count()
    )

    # 2) INT COLUMNS
    int_error = None
    for c in int_cols:
        cond = invalid_int_condition(c)
        int_error = cond if int_error is None else (int_error | cond)

    # 3) DECIMAL COLUMNS
    decimal_error = None
    for c in decimal_cols:
        cond = invalid_decimal_condition(c)
        decimal_error = cond if decimal_error is None else (decimal_error | cond)

    # 4) BOOLEAN COLUMNS
    bool_error = None
    for c in bool_cols:
        cond = invalid_bool_condition(c)
        bool_error = cond if bool_error is None else (bool_error | cond)

    # 5) STRING COLUMNS
    trim_error = None
    for c in string_cols:
        cond = needs_trim_condition(c)
        trim_error = cond if trim_error is None else (trim_error | cond)

    # 3) Get all erros at once so we don't full_scan the table for each testcase.

    summary = df.select(
        int_error.cast("int").alias("int_error_int"),
        decimal_error.cast("int").alias("decimal_error_int"),
        bool_error.cast("int").alias("bool_error_int"),
        trim_error.cast("int").alias("trim_error_int")
    ).agg(
        F.sum("int_error_int").alias("int_errors"),
        F.sum("decimal_error_int").alias("decimal_errors"),
        F.sum("bool_error_int").alias("bool_errors"),
        F.sum("trim_error_int").alias("trim_errors")
    ).collect()[0]

    # 4) Display each metric for each test_case
    
    print("Test_Cases_Validations: RAW_products")
    print(f"Total rows: {total_rows}\n")
    print_metric("DUPLICATE ProductID (PK keys)", duplicate_keys)
    print_metric("INT errors", summary["int_errors"] or 0)
    print_metric("DECIMAL errors", summary["decimal_errors"] or 0)
    print_metric("BOOLEAN errors", summary["bool_errors"] or 0)
    print_metric("STRING issues", summary["trim_errors"] or 0)
    display(df.filter(trim_error))

In [0]:
from pyspark.sql import functions as F

'''
This function will summarize and display the amount of errors
when verifying the RAW_sales_order_header table.
'''
def total_errors_from_header(df):

    total_rows = df.count()

    def print_metric(label, count):
        pct = (count / total_rows) * 100 if total_rows else 0 #% errors
        print(f"{label}: {count} rows ({pct:.2f}%)")

    # 1) DUPLICATES (PK)
    duplicate_keys = (
        df.groupBy("SalesOrderID").count().filter("count > 1").count()
    )

    # 2) INT Columns
    int_error = None
    for c in int_cols_header:
        cond = invalid_int_condition(c)
        int_error = cond if int_error is None else (int_error | cond)

    # 3) DECIMAL Columns
    decimal_error = None
    for c in decimal_cols_header:
        cond = invalid_decimal_condition(c)
        decimal_error = cond if decimal_error is None else (decimal_error | cond)
    
    # 4) BOOLEAN Columns
    bool_error = None
    for c in bool_cols_header:
        cond = invalid_bool_condition(c)
        bool_error = cond if bool_error is None else (bool_error | cond)

    # 5) STRING Columns
    trim_error = None
    for c in string_cols_header:
        cond = needs_trim_condition(c)
        trim_error = cond if trim_error is None else (trim_error | cond)

    # 6) DATE Columns
    date_error = None
    for c in date_cols_header:
        cond = invalid_date_condition(c)
        date_error = cond if date_error is None else (date_error | cond)

    # 7) TIMESTAMP Columns
    timestamp_error = None
    for c in timestamp_cols_header:
        cond = invalid_timestamp_condition(c)
        timestamp_error = cond if timestamp_error is None else (timestamp_error | cond)

    # 8) Get all errors at once so we don't full_scan the table for each testcase.
    summary = df.select(
        int_error.cast("int").alias("int_error_int"),
        decimal_error.cast("int").alias("decimal_error_int"),
        bool_error.cast("int").alias("bool_error_int"),
        trim_error.cast("int").alias("trim_error_int"),
        date_error.cast("int").alias("date_error_int"),
        timestamp_error.cast("int").alias("timestamp_error_int")
    ).agg(
        F.sum("int_error_int").alias("int_errors"),
        F.sum("decimal_error_int").alias("decimal_errors"),
        F.sum("bool_error_int").alias("bool_errors"),
        F.sum("trim_error_int").alias("trim_errors"),
        F.sum("date_error_int").alias("date_errors"),
        F.sum("timestamp_error_int").alias("timestamp_errors")
    ).collect()[0]

    # 9) Display each metric for each test_case
    print("Test_Cases_Validations: RAW_sales_order_header")
    print(f"Total rows: {total_rows}\n")

    print_metric("DUPLICATE SalesOrderID (PK keys)", duplicate_keys)
    print_metric("INT errors", summary["int_errors"] or 0)
    print_metric("DECIMAL errors", summary["decimal_errors"] or 0)
    print_metric("BOOLEAN errors", summary["bool_errors"] or 0)
    print_metric("STRING issues", summary["trim_errors"] or 0)
    print_metric("DATE errors", summary["date_errors"] or 0)
    print_metric("TIMESTAMP errors", summary["timestamp_errors"] or 0)
    display(df.filter(timestamp_error))


In [0]:
from pyspark.sql import functions as F

'''
This function will summarize and display the amount of errors
when verifying the RAW_sales_order_detail table.
'''
def total_errors_from_detail(df):

    total_rows = df.count()

    def print_metric(label, count):
        pct = (count / total_rows) * 100 if total_rows else 0
        print(f"{label}: {count} rows ({pct:.2f}%)")

    # 1) DUPLICATES (PK)
    duplicate_keys = (
        df.groupBy("SalesOrderDetailID").count().filter("count > 1").count()
    )

    # 2) INT Columns
    int_error = None
    for c in int_cols_detail:
        cond = invalid_int_condition(c)
        int_error = cond if int_error is None else (int_error | cond)

    # 3) DECIMAL Columns (normal decimals)
    decimal_error = None
    for c in decimal_cols_detail_normal:   # <-- apenas ajuste do grupo
        cond = invalid_decimal_condition(c)
        decimal_error = cond if decimal_error is None else (decimal_error | cond)

    # 3.1) DECIMAL Columns (UnitPriceDiscount special case)
    decimal_error_unipd = None
    for c in decimal_cols_detail_unipd:     # <-- seu novo test case aplicado aqui
        cond = invalid_decimal_condition_unipd(c)
        decimal_error_unipd = cond if decimal_error_unipd is None else (decimal_error_unipd | cond)

    # 3) Get all erros at once so we don't full_scan the table for each testcase.
    summary = df.select(
        int_error.cast("int").alias("int_error_int"),
        decimal_error.cast("int").alias("decimal_error_int"),
        decimal_error_unipd.cast("int").alias("decimal_error_unipd_int")
    ).agg(
        F.sum("int_error_int").alias("int_errors"),
        F.sum("decimal_error_int").alias("decimal_errors"),
        F.sum("decimal_error_unipd_int").alias("decimal_unipd_errors")
    ).collect()[0]

    # 4) Display each metric for each test_case
    print("Test_Cases_Validations: RAW_sales_order_detail")
    print(f"Total rows: {total_rows}\n")

    print_metric("DUPLICATE SalesOrderDetailID (PK keys)", duplicate_keys)
    print_metric("INT errors", summary["int_errors"] or 0)
    print_metric("DECIMAL errors (normal)", summary["decimal_errors"] or 0)
    print_metric("DECIMAL errors (UnitPriceDiscount)", summary["decimal_unipd_errors"] or 0)


In [0]:
total_errors_from_products(products_df)

Test_Cases_Validations: RAW_products
Total rows: 303

DUPLICATE ProductID (PK keys): 8 rows (2.64%)
INT errors: 0 rows (0.00%)
DECIMAL errors: 0 rows (0.00%)
BOOLEAN errors: 0 rows (0.00%)
STRING issues: 198 rows (65.35%)


ProductID,ProductDesc,ProductNumber,MakeFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Size,SizeUnitMeasureCode,Weight,WeightUnitMeasureCode,ProductCategoryName,ProductSubCategoryName
680,"HL Road Frame - Black, 58",FR-R92B-58,True,Black,500,375,1059.31,1431.5,58.0,CM,2.24,LB,,Road Frames
706,"HL Road Frame - Red, 58",FR-R92R-58,True,Red,500,375,1059.31,1431.5,58.0,CM,2.24,LB,,Road Frames
717,"HL Road Frame - Red, 62",FR-R92R-62,True,Red,500,375,868.6342,1431.5,62.0,CM,2.3,LB,,Road Frames
718,"HL Road Frame - Red, 44",FR-R92R-44,True,Red,500,375,868.6342,1431.5,44.0,CM,2.12,LB,,Road Frames
719,"HL Road Frame - Red, 48",FR-R92R-48,True,Red,500,375,868.6342,1431.5,48.0,CM,2.16,LB,,Road Frames
720,"HL Road Frame - Red, 52",FR-R92R-52,True,Red,500,375,868.6342,1431.5,52.0,CM,2.2,LB,,Road Frames
721,"HL Road Frame - Red, 56",FR-R92R-56,True,Red,500,375,868.6342,1431.5,56.0,CM,2.24,LB,,Road Frames
722,"LL Road Frame - Black, 58",FR-R38B-58,True,Black,500,375,204.6251,337.22,58.0,CM,2.46,LB,,Road Frames
723,"LL Road Frame - Black, 60",FR-R38B-60,True,Black,500,375,204.6251,337.22,60.0,CM,2.48,LB,,Road Frames
724,"LL Road Frame - Black, 62",FR-R38B-62,True,Black,500,375,204.6251,337.22,62.0,CM,2.5,LB,,Road Frames


In [0]:
total_errors_from_header(sales_order_header_df)

Test_Cases_Validations: RAW_sales_order_header
Total rows: 31465

DUPLICATE SalesOrderID (PK keys): 0 rows (0.00%)
INT errors: 0 rows (0.00%)
DECIMAL errors: 0 rows (0.00%)
BOOLEAN errors: 0 rows (0.00%)
STRING issues: 0 rows (0.00%)
DATE errors: 0 rows (0.00%)
TIMESTAMP errors: 5 rows (0.02%)


SalesOrderID,OrderDate,ShipDate,OnlineOrderFlag,AccountNumber,CustomerID,SalesPersonID,Freight
43828,2021-06,2021-07-05,True,10-4030-027605,27605,,89.4568
43829,2021-06,2021-07-05,True,10-4030-027611,27611,,89.4568
43830,2021-06,2021-07-05,True,10-4030-016347,16347,,89.4568
43831,2021-06,2021-07-05,True,10-4030-011028,11028,,84.3748
43832,2021-06,2021-07-05,True,10-4030-013584,13584,,89.4568


In [0]:
total_errors_from_detail(sales_order_detail_df)

Test_Cases_Validations: RAW_sales_order_detail
Total rows: 121317

DUPLICATE SalesOrderDetailID (PK keys): 0 rows (0.00%)
INT errors: 2 rows (0.00%)
DECIMAL errors (normal): 94 rows (0.08%)
DECIMAL errors (UnitPriceDiscount): 0 rows (0.00%)


In [0]:
%sql
SELECT 
  ProductID, 
  COUNT(*) AS ProductID_Duplicates 
FROM 
  prod_data.raw.raw_products 
GROUP BY 
  ProductID
HAVING 
  COUNT(*) > 1

-- SELECT *
-- FROM prod_data.raw.raw_products
-- WHERE 
--       ProductDesc IS NULL
--    OR ProductNumber IS NULL
--    OR StandardCost IS NULL
--    OR ListPrice IS NULL
--    OR Color IS NULL
--    OR ProductCategoryName IS NULL
--    OR ProductSubCategoryName IS NULL;
--LIMIT 100

ProductID,ProductID_Duplicates
713,2
714,2
715,2
716,2
881,2
882,2
883,2
884,2


In [0]:
%sql
SELECT
  *
FROM 
  prod_data.raw.raw_products
WHERE 
  ProductID = 713

ProductID,ProductDesc,ProductNumber,MakeFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Size,SizeUnitMeasureCode,Weight,WeightUnitMeasureCode,ProductCategoryName,ProductSubCategoryName
713,"Long-Sleeve Logo Jersey, S",LJ-0192-S,False,Multi,4,3,38.4923,49.99,S,,,,,Jerseys
713,"Long-Sleeve Logo Jersey, S",LJ-0192-S,False,Multi,4,3,38.4923,49.99,S,,,,Clothing,Shirt


In [0]:
%sql
SELECT *
FROM prod_data.raw.raw_products
WHERE 1=1 
 --OR ProductDesc <> TRIM(ProductDesc)
 --OR ProductCategoryName <> TRIM(ProductCategoryName)
 --OR ProductSubCategoryName <> TRIM(ProductSubCategoryName)
 OR WeightUnitMeasureCode <> TRIM(WeightUnitMeasureCode)
 OR SizeUnitMeasureCode <> TRIM(SizeUnitMeasureCode)
LIMIT 100;

ProductID,ProductDesc,ProductNumber,MakeFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Size,SizeUnitMeasureCode,Weight,WeightUnitMeasureCode,ProductCategoryName,ProductSubCategoryName
680,"HL Road Frame - Black, 58",FR-R92B-58,True,Black,500,375,1059.31,1431.5,58,CM,2.24,LB,,Road Frames
706,"HL Road Frame - Red, 58",FR-R92R-58,True,Red,500,375,1059.31,1431.5,58,CM,2.24,LB,,Road Frames
707,"Sport-100 Helmet, Red",HL-U509-R,False,Red,4,3,13.0863,34.99,,,,,,Helmets
708,"Sport-100 Helmet, Black",HL-U509,False,Black,4,3,13.0863,34.99,,,,,,Helmets
709,"Mountain Bike Socks, M",SO-B909-M,False,White,4,3,3.3963,9.5,M,,,,,Socks
710,"Mountain Bike Socks, L",SO-B909-L,False,White,4,3,3.3963,9.5,L,,,,,Socks
711,"Sport-100 Helmet, Blue",HL-U509-B,False,Blue,4,3,13.0863,34.99,,,,,,Helmets
712,AWC Logo Cap,CA-1098,False,Multi,4,3,6.9223,8.99,,,,,,Caps
713,"Long-Sleeve Logo Jersey, S",LJ-0192-S,False,Multi,4,3,38.4923,49.99,S,,,,,Jerseys
714,"Long-Sleeve Logo Jersey, M",LJ-0192-M,False,Multi,4,3,38.4923,49.99,M,,,,,Jerseys


### 2.11 Notes about data_validation
#### 01.raw_products
##### 1.1-8 duplicates on PRIMARY KEY for PRODUCTID with is 2.64% of the total table.
##### 1.2-No erros founded on INT, DECIMAL, BOOLEAN data fields.
##### 1.3-65.35% of STRING Columns with SPACES issues.
##### 1.3-Many columns with null values(must create another test case)
##### 1.4-Monetary values changed for DECIMAL
##### 1.5-Total of 295 distincts products.

#### 02.sales_order_header_df
##### 2.1- No Duplicates on PK
##### 2.2- No errors on INT,DECIMAL,BOOLEAN,STRING,DATA FIELDS
##### 2.3- There are 5 rows with ERROS on TIMESTAMP FIELD which represents 0.02% of the dataset. Those timestamp are returning as YYYY-MM with no valid format.
##### 2.4- A important point is that UnitPriceDiscount is actually represents a fraction of the discount, so the requirement is not correct when it says to
##### calculate like: TotalLineExtendedPrice using the formula: OrderQty * (UnitPrice - UnitPriceDiscount). This should probably be something like; OrderQty * UnitPrice * (1 - UnitPriceDiscount).

#### 03.sales_order_detail_df
##### 3.1 - There are 2 ERROS for INT COLUMNS
##### 3.2 - There are 94 errors/issues for DECIMAL COLUMNS which represents 0.08% of the total dataset. Those errors are different format within the decimal COLUMN
##### 3.3 - No other errors or attention points were found beyond that.

## ->02.3 Creating Store schema and tables
### Ingestion Raw -> Store

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS prod_data.store;

In [0]:
%sql
CREATE OR REPLACE TABLE prod_data.store.store_products (
  ProductID INT COMMENT "Primary key. Unique identifier for each product",
  ProductDesc STRING COMMENT "Normal description",
  ProductNumber STRING COMMENT "Alphanumeric column / SKU",
  MakeFlag BOOLEAN COMMENT "Internal flag",
  Color STRING COMMENT "Color of the product",
  SafetyStockLevel INT COMMENT "Minimum stock level required",
  ReorderPoint INT COMMENT "Reorder threshold",
  StandardCost DECIMAL(19,4) COMMENT "Internal product cost reference", --DECIMAL since is monetarial value
  ListPrice DECIMAL(19,4) COMMENT "Product price",                      -- DECIMAL since is monetarial value
  Size STRING COMMENT "Size label",
  SizeUnitMeasureCode STRING COMMENT "Unit of measure for Size",
  Weight DECIMAL(10,4) COMMENT "Product weight.",                       --TO DECIMAL and precision 4 since might be needed for any calculation.
  WeightUnitMeasureCode STRING COMMENT "Unit of measure for Weight",
  ProductCategoryName STRING COMMENT "Category name of a product",
  ProductSubCategoryName STRING COMMENT "SubCategory name of ap roduct", 
  CONSTRAINT PK_store_products PRIMARY KEY (ProductID)
)
USING DELTA;

In [0]:
%sql
--Ingestion from raw to store but also storing the data properly from RAW.
--REMOVING DUPLICATE ID's
--REMOVING UNECESSARY SPACES
INSERT INTO prod_data.store.store_products (
    ProductID,
    ProductDesc,
    ProductNumber,
    MakeFlag,
    Color,
    SafetyStockLevel,
    ReorderPoint,
    StandardCost,
    ListPrice,
    Size,
    SizeUnitMeasureCode,
    Weight,
    WeightUnitMeasureCode,
    ProductCategoryName,
    ProductSubCategoryName
)
WITH dedup_PRODUCTID AS (
    SELECT
        *,
        ROW_NUMBER() OVER (PARTITION BY ProductID 
        ORDER BY
                CASE WHEN ProductCategoryName IS NOT NULL THEN 0 ELSE 1 END,    -- Keep Category if category is not null
                CASE WHEN ProductSubCategoryName IS NOT NULL THEN 0 ELSE 1 END  -- Keep SubCategory if subcategory is not null
        ) AS choose_keep_line
    FROM prod_data.raw.raw_products
)
SELECT
    ProductID,
    ProductDesc AS ProductDesc,
    ProductNumber AS ProductNumber,
    MakeFlag,
    Color AS Color,
    SafetyStockLevel,
    ReorderPoint,
    StandardCost,
    ListPrice,
    Size AS Size,
    SizeUnitMeasureCode AS SizeUnitMeasureCode,
    Weight,
    WeightUnitMeasureCode AS WeightUnitMeasureCode,
    TRIM(ProductCategoryName) AS ProductCategoryName,
    TRIM(ProductSubCategoryName) AS ProductSubCategoryName
FROM 
    dedup_PRODUCTID
WHERE 
    choose_keep_line = 1;

num_affected_rows,num_inserted_rows
295,295


In [0]:
%sql
CREATE OR REPLACE TABLE prod_data.store.store_sales_order_header (
  SalesOrderID INT COMMENT "Unique identifier for each sales order heade",
  OrderDate TIMESTAMP COMMENT "Creation time of an order",
  ShipDate DATE COMMENT "Date the order was shipped",
  OnlineOrderFlag BOOLEAN COMMENT "Identifier for online order or not",
  AccountNumber STRING COMMENT "Customer account",
  CustomerID INT COMMENT "Identifier of customer",
  SalesPersonID INT COMMENT "Identifier of SalesPerson",
  Freight DECIMAL(19,4) COMMENT "Freight cost associated with the order.",
  CONSTRAINT pk_store_sales_order_header PRIMARY KEY (SalesOrderID)
)
USING DELTA;

In [0]:
%sql
INSERT INTO prod_data.store.store_sales_order_header
SELECT 
  SalesOrderID,
  OrderDate,
  ShipDate,
  OnlineOrderFlag,
  AccountNumber,
  CustomerID,
  SalesPersonID,
  Freight
FROM 
  prod_data.raw.raw_sales_order_header;

num_affected_rows,num_inserted_rows
31465,31465


In [0]:
%sql
CREATE OR REPLACE TABLE prod_data.store.store_sales_order_detail (
  SalesOrderDetailID INT COMMENT "Unique order identifier",
  SalesOrderID INT COMMENT "Order reference",
  OrderQty INT COMMENT "Amount ordered",
  ProductID INT COMMENT "Product reference",
  UnitPrice DECIMAL(19,4) COMMENT "Base price.",             --Decimal with low precision(monetary value)
  UnitPriceDiscount DECIMAL(19,4) COMMENT "Discount price.", --Decimal with low precision(monetary value)
  
  CONSTRAINT PK_store_sales_order_detail PRIMARY KEY (SalesOrderDetailID),
  CONSTRAINT FK_detail_header FOREIGN KEY (SalesOrderID)
    REFERENCES prod_data.store.store_sales_order_header (SalesOrderID),
  CONSTRAINT FK_detail_product FOREIGN KEY (ProductID)
    REFERENCES prod_data.store.store_products (ProductID)
)
USING DELTA;

In [0]:
%sql
INSERT INTO prod_data.store.store_sales_order_detail
SELECT 
  SalesOrderDetailID,
  SalesOrderID,
  OrderQty,
  ProductID,
  UnitPrice,
  UnitPriceDiscount
FROM 
  prod_data.raw.raw_sales_order_detail

num_affected_rows,num_inserted_rows
121317,121317


In [0]:
%sql
DESCRIBE TABLE prod_data.store.store_products;
--DESCRIBE TABLE prod_data.store.store_sales_order_header
--DESCRIBE TABLE prod_data.store.store_sales_order_detail

col_name,data_type,comment
ProductID,int,Primary key. Unique identifier for each product
ProductDesc,string,Normal description
ProductNumber,string,Alphanumeric column / SKU
MakeFlag,boolean,Internal flag
Color,string,Color of the product
SafetyStockLevel,int,Minimum stock level required
ReorderPoint,int,Reorder threshold
StandardCost,"decimal(19,4)",Internal product cost reference
ListPrice,"decimal(19,4)",Product price
Size,string,Size label


# - 03.Product Master Transformations:
## Perform the following transformations on the product master data and write the results into a table named publish_product:
##### 01.Replace NULL values in the Color field with N/A.
##### 02.Enhance the ProductCategoryName field when it is NULL using the following logic:
##### 02.1.If ProductSubCategoryName is in (‘Gloves’, ‘Shorts’, ‘Socks’, ‘Tights’, ‘Vests’), set ProductCategoryName to ‘Clothing’.
##### 02.2.If ProductSubCategoryName is in (‘Locks’, ‘Lights’, ‘Headsets’, ‘Helmets’, ‘Pedals’, ‘Pumps’), set ProductCategoryName to ‘Accessories’.
##### 02.3.If ProductSubCategoryName contains the word ‘Frames’ or is in (‘Wheels’, ‘Saddles’), set ProductCategoryName to ‘Components’.

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS prod_data.publish;

In [0]:
%sql
CREATE OR REPLACE TABLE prod_data.publish.publish_products (
  ProductID INT COMMENT "Primary key. Unique identifier for each product",
  ProductDesc STRING COMMENT "Normal description",
  ProductNumber STRING COMMENT "Alphanumeric column / SKU",
  MakeFlag BOOLEAN COMMENT "Internal flag",
  Color STRING COMMENT "Color of the product",
  SafetyStockLevel INT COMMENT "Minimum stock level required",
  ReorderPoint INT COMMENT "Reorder threshold",
  StandardCost DECIMAL(19,4) COMMENT "Internal product cost reference", --Changed Data Type TO DECIMAL since is monetarial value.
  ListPrice DECIMAL(19,4) COMMENT "Product price", --Changed Data Type TO DECIMAL since the representation is a fraction.
  Size STRING COMMENT "Size label",
  SizeUnitMeasureCode STRING COMMENT "Unit of measure for Size",
  Weight DECIMAL(10,4) COMMENT "Product weight.", --Changed Data Type TO DECIMAL and precision 4 since might be needed for any calculation.
  WeightUnitMeasureCode STRING COMMENT "Unit of measure for Weight",
  ProductCategoryName STRING COMMENT "Category name of a product",
  ProductSubCategoryName STRING COMMENT "SubCategory name of ap roduct", 
  CONSTRAINT PK_store_products PRIMARY KEY (ProductID)
)
USING DELTA;

In [0]:
%sql
INSERT INTO prod_data.publish.publish_products
SELECT
  ProductID,
  ProductDesc,
  ProductNumber,
  MakeFlag,
  -- 01. Replace NULL Color with 'N/A'
  COALESCE(Color, 'N/A') AS Color,
  SafetyStockLevel,
  ReorderPoint,
  StandardCost,
  ListPrice,
  Size,
  SizeUnitMeasureCode,
  Weight,
  WeightUnitMeasureCode,
  -- 02. Enhance ProductCategoryName only when it is NULL
  CASE
    WHEN ProductCategoryName IS NOT NULL 
      THEN ProductCategoryName
    WHEN ProductSubCategoryName IN ('Gloves', 'Shorts', 'Socks', 'Tights', 'Vests')
      THEN 'Clothing'
    WHEN ProductSubCategoryName IN ('Locks', 'Lights', 'Headsets', 'Helmets', 'Pedals', 'Pumps')
      THEN 'Accessories'
    WHEN ProductSubCategoryName LIKE '%Frames%'
         OR ProductSubCategoryName IN ('Wheels', 'Saddles')
      THEN 'Components'
    ELSE 'Products_with_no_description'  -- Replacing with a new category instead of just NULL
  END AS ProductCategoryName,
ProductSubCategoryName
FROM 
  prod_data.store.store_products;

num_affected_rows,num_inserted_rows
295,295


In [0]:
%sql
SELECT *
FROM 
  prod_data.publish.publish_products
LIMIT 100

ProductID,ProductDesc,ProductNumber,MakeFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Size,SizeUnitMeasureCode,Weight,WeightUnitMeasureCode,ProductCategoryName,ProductSubCategoryName
680,"HL Road Frame - Black, 58",FR-R92B-58,True,Black,500,375,1059.31,1431.5,58,CM,2.24,LB,Components,Road Frames
706,"HL Road Frame - Red, 58",FR-R92R-58,True,Red,500,375,1059.31,1431.5,58,CM,2.24,LB,Components,Road Frames
707,"Sport-100 Helmet, Red",HL-U509-R,False,Red,4,3,13.0863,34.99,,,,,Accessories,Helmets
708,"Sport-100 Helmet, Black",HL-U509,False,Black,4,3,13.0863,34.99,,,,,Accessories,Helmets
709,"Mountain Bike Socks, M",SO-B909-M,False,White,4,3,3.3963,9.5,M,,,,Clothing,Socks
710,"Mountain Bike Socks, L",SO-B909-L,False,White,4,3,3.3963,9.5,L,,,,Clothing,Socks
711,"Sport-100 Helmet, Blue",HL-U509-B,False,Blue,4,3,13.0863,34.99,,,,,Accessories,Helmets
712,AWC Logo Cap,CA-1098,False,Multi,4,3,6.9223,8.99,,,,,Products_with_no_description,Caps
713,"Long-Sleeve Logo Jersey, S",LJ-0192-S,False,Multi,4,3,38.4923,49.99,S,,,,Clothing,Shirt
714,"Long-Sleeve Logo Jersey, M",LJ-0192-M,False,Multi,4,3,38.4923,49.99,M,,,,Clothing,Shirt


# - 04.Sales Order Transformations:
## Join SalesOrderDetail with SalesOrderHeader on SalesOrderId and apply the following transformations:
##### 01.Calculate LeadTimeInBusinessDays as the difference between OrderDate and ShipDate, excluding Saturdays and Sundays.
##### 02.Calculate TotalLineExtendedPrice using the formula: OrderQty * (UnitPrice - UnitPriceDiscount).
##### 03.Write the results into a table named publish_orders, including:
##### 04.All fields from SalesOrderDetail.
##### 05.All fields from SalesOrderHeader except SalesOrderId, and rename Freight to TotalOrderFreight.

In [0]:
%sql
--store.Tables: SalesOrdeDetails X SalesOrderHeader
CREATE OR REPLACE TABLE prod_data.publish.publish_orders (
  -- From SalesOrderDetail
  SalesOrderDetailID INT COMMENT "Primary key. Unique identifier for each order line.",
  SalesOrderID INT COMMENT "FK reference to the sales order header.",
  OrderQty INT COMMENT "Quantity ordered.",
  ProductID INT COMMENT "FK reference to the product.",
  UnitPrice DECIMAL(19,4) COMMENT "Base unit price before discount.",
  UnitPriceDiscount DECIMAL(19,4) COMMENT "Discount applied on the unit price.",
  TotalLineExtendedPrice DECIMAL(19,4) COMMENT "Calculated:OrderQty * UnitPrice * (1 - UnitPriceDiscount).",

  -- From SalesOrderHeader
  OrderDate DATE COMMENT "Date the order was placed.",
  ShipDate DATE COMMENT "Date the order was shipped.",
  OnlineOrderFlag BOOLEAN COMMENT "Flag indicating if order was placed online.",
  AccountNumber STRING COMMENT "ERP customer account identifier.",
  CustomerID INT COMMENT "Customer identifier.",
  SalesPersonID INT COMMENT "Salesperson identifier.",
  TotalOrderFreight DECIMAL(19,4) COMMENT "Freight cost at order level.",

  -- Calculated(From requirement)
  LeadTimeInBusinessDays INT COMMENT "Business-day difference between OrderDate and ShipDate (excludes weekends).",
  CONSTRAINT pk_publish_orders PRIMARY KEY (SalesOrderDetailID)
)
USING DELTA;


In [0]:
%sql
INSERT INTO prod_data.publish.publish_orders
SELECT
  d.SalesOrderDetailID,
  d.SalesOrderID,
  d.OrderQty,
  d.ProductID,
  d.UnitPrice,
  d.UnitPriceDiscount,
  -- 02. TotalLineExtendedPrice = OrderQty * (UnitPrice - UnitPriceDiscount)

  CAST(d.OrderQty * d.UnitPrice * (1 - d.UnitPriceDiscount) AS DECIMAL(19,4)) AS TotalLineExtendedPrice,

  h.OrderDate,
  h.ShipDate,
  h.OnlineOrderFlag,
  h.AccountNumber,
  h.CustomerID,
  h.SalesPersonID,
  CAST(h.Freight AS DECIMAL(19,4)) AS TotalOrderFreight,

  -- 01. LeadTimeInBusinessDays (diferença em dias úteis entre OrderDate e ShipDate)
  SIZE(
    FILTER(
      SEQUENCE(h.OrderDate, h.ShipDate),
      d -> date_format(d, 'E') NOT IN ('Sat', 'Sun')
    )
  ) AS LeadTimeInBusinessDays
FROM 
  prod_data.store.store_sales_order_detail d
INNER JOIN prod_data.store.store_sales_order_header h ON d.SalesOrderID = h.SalesOrderID;

num_affected_rows,num_inserted_rows
121317,121317


In [0]:
%sql
SELECT *
FROM 
  prod_data.publish.publish_orders
LIMIT 100

SalesOrderDetailID,SalesOrderID,OrderQty,ProductID,UnitPrice,UnitPriceDiscount,TotalLineExtendedPrice,OrderDate,ShipDate,OnlineOrderFlag,AccountNumber,CustomerID,SalesPersonID,TotalOrderFreight,LeadTimeInBusinessDays
1,43659,1,776,2024.994,0.0,2024.994,2021-05-31,2021-06-07,False,10-4020-000676,29825,279,616.0984,6
2,43659,3,777,2024.994,0.0,6074.982,2021-05-31,2021-06-07,False,10-4020-000676,29825,279,616.0984,6
3,43659,1,778,2024.994,0.0,2024.994,2021-05-31,2021-06-07,False,10-4020-000676,29825,279,616.0984,6
4,43659,1,771,2039.994,0.0,2039.994,2021-05-31,2021-06-07,False,10-4020-000676,29825,279,616.0984,6
5,43659,1,772,2039.994,0.0,2039.994,2021-05-31,2021-06-07,False,10-4020-000676,29825,279,616.0984,6
6,43659,2,773,2039.994,0.0,4079.988,2021-05-31,2021-06-07,False,10-4020-000676,29825,279,616.0984,6
7,43659,1,774,2039.994,0.0,2039.994,2021-05-31,2021-06-07,False,10-4020-000676,29825,279,616.0984,6
8,43659,3,714,28.8404,0.0,86.5212,2021-05-31,2021-06-07,False,10-4020-000676,29825,279,616.0984,6
9,43659,1,716,28.8404,0.0,28.8404,2021-05-31,2021-06-07,False,10-4020-000676,29825,279,616.0984,6
10,43659,6,709,5.7,0.0,34.2,2021-05-31,2021-06-07,False,10-4020-000676,29825,279,616.0984,6


# - 05.Data Analysis Questions:
## Provide answers to the following questions based on the transformed data:
##### 01.Which color generated the highest revenue each year?
##### 02.What is the average LeadTimeInBusinessDays by ProductCategoryName?

In [0]:
%sql
-- 01. Highest revenue color by year
WITH color_year_revenue AS (
  SELECT
    YEAR(o.OrderDate) AS order_year,
    p.Color,
    SUM(o.TotalLineExtendedPrice) AS total_revenue
  FROM 
    prod_data.publish.publish_orders o
    INNER JOIN prod_data.publish.publish_products p ON o.ProductID = p.ProductID
  GROUP BY 
    YEAR(o.OrderDate), p.Color
),
rank_highest_revenue_year AS (
  SELECT
    order_year,
    Color,
    total_revenue,
    ROW_NUMBER() OVER (PARTITION BY order_year ORDER BY total_revenue DESC) AS rk
  FROM 
    color_year_revenue
)
SELECT
  order_year,
  Color,
  total_revenue
FROM 
  rank_highest_revenue_year
WHERE 
  rk = 1
ORDER BY 
  total_revenue DESC, order_year DESC;

order_year,Color,total_revenue
2023,Black,15031199.7343
2022,Black,13917844.9058
2024,Yellow,6360243.0023
2021,Red,6019394.4108


In [0]:
%sql
-- 02. Average lead time (business days) by product category
SELECT
  p.ProductCategoryName,
  AVG(o.LeadTimeInBusinessDays) AS avg_LeadTimeInBusinessDays
FROM 
  prod_data.publish.publish_orders o
  INNER JOIN prod_data.publish.publish_products p ON o.ProductID = p.ProductID
GROUP BY 
  p.ProductCategoryName
ORDER BY 
  avg_LeadTimeInBusinessDays DESC;


ProductCategoryName,avg_LeadTimeInBusinessDays
Products_with_no_description,5.7202082171407325
Clothing,5.709380234505863
Accessories,5.702787804316105
Bikes,5.667897567632656
Components,5.667113624438874
