In [None]:
# Import python packages
import streamlit as st
import pandas as pd

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, when, lit
# Snowpark for Python
from snowflake.snowpark.types import DoubleType
import snowflake.snowpark.functions as F

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()


In [None]:
-- Using Warehouse, Database, and Schema created during Setup
USE WAREHOUSE BILLION_RECORDS_WH;
USE DATABASE COMPANY_TB_TEST;
-- USE SCHEMA TPCDS_SF10TCL;
USE SCHEMA SILVER;

In [None]:
# Get Snowflake Session object
session = get_active_session()
session.sql_simplifier_enabled = True


# Current Environment Details
print('Connection Established with the following parameters:')
print('User      : {}'.format(session.get_current_user()))
print('Role      : {}'.format(session.get_current_role()))
print('Database  : {}'.format(session.get_current_database()))
print('Schema    : {}'.format(session.get_current_schema()))
print('Warehouse : {}'.format(session.get_current_warehouse()))

In [None]:
#df_table = session.table("BILLION_RECORDS_TABLE1")
#df_table.write.mode('overwrite').save_as_table('BILLION_RECORDS_TABLE1_TRANSFORMED')

In [None]:
drop table COMPANY_TB_TEST.SILVER.BILLION_RECORDS_ENRICHED_V2;
drop table COMPANY_TB_TEST.SILVER.BILLION_RECORDS_ENRICHED_V3;

In [None]:


# Load the dataframes
# billion_records_df = session.table("COMPANY_TB_TEST.TPCDS_SF10TCL.BILLION_RECORDS_TABLE1")
billion_records_df = session.table("COMPANY_TB_TEST.BRONZE.BILLION_RECORDS_TABLE1")

customer_df = session.table("COMPANY_TB_TEST.TPCDS_SF10TCL.CUSTOMER")
date_dim_df = session.table("COMPANY_TB_TEST.TPCDS_SF10TCL.DATE_DIM")

# Perform the transformation with joins and calculated columns
transformed_df = (
    billion_records_df
    .join(customer_df, billion_records_df["SS_CUSTOMER_SK"] == customer_df["C_CUSTOMER_SK"], how="left")
    .join(date_dim_df, billion_records_df["SS_SOLD_DATE_SK"] == date_dim_df["D_DATE_SK"], how="left")
    .select(
        billion_records_df["SS_SOLD_DATE_SK"],
        billion_records_df["SS_SOLD_TIME_SK"],
        billion_records_df["SS_ITEM_SK"],
        billion_records_df["SS_CUSTOMER_SK"],
        billion_records_df["SS_CDEMO_SK"],
        billion_records_df["SS_HDEMO_SK"],
        billion_records_df["SS_ADDR_SK"],
        billion_records_df["SS_STORE_SK"],
        billion_records_df["SS_PROMO_SK"],
        billion_records_df["SS_TICKET_NUMBER"],
        billion_records_df["SS_QUANTITY"],
        billion_records_df["SS_WHOLESALE_COST"],
        billion_records_df["SS_LIST_PRICE"],
        billion_records_df["SS_SALES_PRICE"],
        billion_records_df["SS_EXT_DISCOUNT_AMT"],
        billion_records_df["SS_EXT_SALES_PRICE"],
        billion_records_df["SS_EXT_WHOLESALE_COST"],
        billion_records_df["SS_EXT_LIST_PRICE"],
        billion_records_df["SS_EXT_TAX"],
        billion_records_df["SS_COUPON_AMT"],
        billion_records_df["SS_NET_PAID"],
        billion_records_df["SS_NET_PAID_INC_TAX"],
        billion_records_df["SS_NET_PROFIT"],
        customer_df["C_CUSTOMER_ID"],
        customer_df["C_FIRST_NAME"],
        customer_df["C_LAST_NAME"],
        date_dim_df["D_DATE"],
        date_dim_df["D_DAY_NAME"],
        date_dim_df["D_QUARTER_NAME"],
        # Derived Metrics
        (billion_records_df["SS_SALES_PRICE"] - billion_records_df["SS_WHOLESALE_COST"]).alias("GROSS_PROFIT"),  # Gross profit calculation
        (billion_records_df["SS_EXT_DISCOUNT_AMT"] + billion_records_df["SS_COUPON_AMT"]).alias("TOTAL_DISCOUNT"),  # Total discount given
        (billion_records_df["SS_QUANTITY"] * billion_records_df["SS_SALES_PRICE"]).alias("TOTAL_REVENUE"),  # Total revenue for the transaction
        (billion_records_df["SS_EXT_SALES_PRICE"] / when(billion_records_df["SS_QUANTITY"] == 0, lit(1)).otherwise(billion_records_df["SS_QUANTITY"])).alias("AVERAGE_ITEM_PRICE"),  # Avoid division by zero
        # Categorizing profit levels
        when((billion_records_df["SS_SALES_PRICE"] - billion_records_df["SS_WHOLESALE_COST"]) > 500, lit('High Profit'))
        .when((billion_records_df["SS_SALES_PRICE"] - billion_records_df["SS_WHOLESALE_COST"]).between(100, 500), lit('Medium Profit'))
        .otherwise(lit('Low Profit')).alias("PROFIT_CATEGORY"),
        # Checking for significant discounts
        when(billion_records_df["SS_EXT_DISCOUNT_AMT"] > billion_records_df["SS_LIST_PRICE"] * 0.5, lit('High Discount'))
        .otherwise(lit('Regular Discount')).alias("DISCOUNT_CATEGORY")
    )
)

# Create the new table in Snowflake
# transformed_df.write.mode("overwrite").save_as_table("COMPANY_TB_TEST.TPCDS_SF10TCL.BILLION_RECORDS_ENRICHED")
transformed_df.write.mode("overwrite").save_as_table("COMPANY_TB_TEST.SILVER.BILLION_RECORDS_ENRICHED_V2")




In [None]:
select count(*) from BILLION_RECORDS_ENRICHED_V2;

In [None]:
CREATE OR REPLACE TABLE COMPANY_TB_TEST.SILVER.BILLION_RECORDS_ENRICHED_V3 AS
SELECT 
    br.SS_SOLD_DATE_SK,
    br.SS_SOLD_TIME_SK,
    br.SS_ITEM_SK,
    br.SS_CUSTOMER_SK,
    br.SS_CDEMO_SK,
    br.SS_HDEMO_SK,
    br.SS_ADDR_SK,
    br.SS_STORE_SK,
    br.SS_PROMO_SK,
    br.SS_TICKET_NUMBER,
    br.SS_QUANTITY,
    br.SS_WHOLESALE_COST,
    br.SS_LIST_PRICE,
    br.SS_SALES_PRICE,
    br.SS_EXT_DISCOUNT_AMT,
    br.SS_EXT_SALES_PRICE,
    br.SS_EXT_WHOLESALE_COST,
    br.SS_EXT_LIST_PRICE,
    br.SS_EXT_TAX,
    br.SS_COUPON_AMT,
    br.SS_NET_PAID,
    br.SS_NET_PAID_INC_TAX,
    br.SS_NET_PROFIT,
    c.C_CUSTOMER_ID,
    c.C_FIRST_NAME,
    c.C_LAST_NAME,
    d.D_DATE,
    d.D_DAY_NAME,
    d.D_QUARTER_NAME,
    -- Derived Metrics
    (br.SS_SALES_PRICE - br.SS_WHOLESALE_COST) AS GROSS_PROFIT, -- Gross profit calculation
    (br.SS_EXT_DISCOUNT_AMT + br.SS_COUPON_AMT) AS TOTAL_DISCOUNT, -- Total discount given
    (br.SS_QUANTITY * br.SS_SALES_PRICE) AS TOTAL_REVENUE, -- Total revenue for the transaction
    (br.SS_EXT_SALES_PRICE / NULLIF(br.SS_QUANTITY, 0)) AS AVERAGE_ITEM_PRICE, -- Avoid division by zero
    -- Categorizing profit levels
    CASE 
        WHEN (br.SS_SALES_PRICE - br.SS_WHOLESALE_COST) > 500 THEN 'High Profit'
        WHEN (br.SS_SALES_PRICE - br.SS_WHOLESALE_COST) BETWEEN 100 AND 500 THEN 'Medium Profit'
        ELSE 'Low Profit'
    END AS PROFIT_CATEGORY,
    -- Checking for significant discounts
    CASE 
        WHEN br.SS_EXT_DISCOUNT_AMT > br.SS_LIST_PRICE * 0.5 THEN 'High Discount'
        ELSE 'Regular Discount'
    END AS DISCOUNT_CATEGORY
FROM 
    COMPANY_TB_TEST.BRONZE.BILLION_RECORDS_TABLE1 br
LEFT JOIN 
    COMPANY_TB_TEST.TPCDS_SF10TCL.CUSTOMER c
    ON br.SS_CUSTOMER_SK = c.C_CUSTOMER_SK
LEFT JOIN 
    COMPANY_TB_TEST.TPCDS_SF10TCL.DATE_DIM d
    ON br.SS_SOLD_DATE_SK = d.D_DATE_SK;

In [None]:
select count(*) from BILLION_RECORDS_ENRICHED_V3;

In [None]:
# Write the transformed data back to a new table
#transformed_df.write.save_as_table("BILLION_RECORDS_TRANSFORMED",mode="overwrite")