In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
from snowflake.snowpark.functions import col, count, sum as sum_, count_distinct, concat, lit, when, max as max_
from datetime import datetime, timedelta
from faker import Faker
import pandas as pd
import random
import time


# Connect to Snowflake
connection_parameters = {
    "account" : "FVQCWWK-IJB71419",
    "user" : "LALIT",
    "authenticator" : "Klalitkumar@2025",
    "role" : "ACCOUNTADMIN",
    "warehouse" : "SNOWFLAKE_LEARNING_WH",
    "database" : "SNOWFLAKE_LEARNING_DB",
    "schema" : "ODS"
}

session = Session.builder.configs(connection_parameters).create()

START_DATE = datetime(2024, 6, 7)
END_DATE = datetime(2025, 6, 7)

In [None]:
def create_schemas(session):
    import time
    start_time = time.time()

    try:
        def schema_exists_with_table(schema_name, table_name):
            try:
                session.sql(f"SELECT 1 FROM SNOWFLAKE_LEARNING_DB.{schema_name}.{table_name} LIMIT 1").collect()
                return True
            except:
                return False

        # Check STG using DEALER table
        if not schema_exists_with_table("STG", "DEALER_PRODUCT_COMBINATIONS"):
            session.sql("CREATE SCHEMA IF NOT EXISTS SNOWFLAKE_LEARNING_DB.STG").collect()
            print("Created schema STG")
        else:
            print("Schema STG already exists (checked via STG.DEALER_PRODUCT_COMBINATIONS)")

        # Check DWH using RECOMMENDATION_INPUT table
        if not schema_exists_with_table("DWH", "RECOMMENDATION_INPUT"):
            session.sql("CREATE SCHEMA IF NOT EXISTS SNOWFLAKE_LEARNING_DB.DWH").collect()
            print("Created schema DWH")
        else:
            print("Schema DWH already exists (checked via DWH.RECOMMENDATION_INPUT)")

        print(f"create_schemas completed in {time.time() - start_time:.2f} seconds")

    except Exception as e:
        print(f"Error creating schemas: {str(e)}")
        raise

create_schemas(session)

In [None]:
def read_data(session):
    start_time = time.time()
    try:
        sales_df = session.table("SNOWFLAKE_LEARNING_DB.ODS.SALES").filter(
            (col("DATE") >= START_DATE) & (col("DATE") <= END_DATE) &
            (col("QUANTITY") > 0)
        )
        dealers_df = session.table("SNOWFLAKE_LEARNING_DB.ODS.DEALERS")
        products_df = session.table("SNOWFLAKE_LEARNING_DB.ODS.PRODUCTS")
        dp_mapping_df = session.table("SNOWFLAKE_LEARNING_DB.ODS.DP_MAPPING")
        ec_club_df = session.table("SNOWFLAKE_LEARNING_DB.ODS.EC_CLUB")

        # Log schemas and sample data for debugging
        print(f"SALES schema (Snowpark): {sales_df.columns}")
        print(f"DEALERS schema (Snowpark): {dealers_df.columns}")
        print(f"PRODUCTS schema (Snowpark): {products_df.columns}")
        print(f"DP_MAPPING schema (Snowpark): {dp_mapping_df.columns}")
        print(f"EC_CLUB schema (Snowpark): {ec_club_df.columns}")
        sales_schema = session.sql("DESCRIBE TABLE SNOWFLAKE_LEARNING_DB.ODS.SALES").to_pandas()
        dealers_schema = session.sql("DESCRIBE TABLE SNOWFLAKE_LEARNING_DB.ODS.DEALERS").to_pandas()
        print(f"SALES table columns (Snowflake): {sales_schema.columns.tolist()}")
        print(f"DEALERS table columns (Snowflake): {dealers_schema.columns.tolist()}")
        # Log sample data
        sales_sample = sales_df.limit(5).to_pandas()
        dealers_sample = dealers_df.limit(5).to_pandas()
        print(f"SALES sample data:\n{sales_sample}")
        print(f"DEALERS sample data:\n{dealers_sample}")

        print(f"read_data completed in {time.time() - start_time:.2f} seconds")
        return sales_df, dealers_df, products_df, dp_mapping_df, ec_club_df
    except Exception as e:
        print(f"Error reading data: {str(e)}")
        raise

sales_df, dealers_df, products_df, dp_mapping_df, ec_club_df = read_data(session)

In [None]:
def clean_and_join_data(sales_df, dealers_df, products_df, dp_mapping_df, ec_club_df):
    start_time = time.time()
    try:
        # Log input table row counts and samples
        print(f"SALES row count: {sales_df.count()}")
        print(f"DEALERS row count: {dealers_df.count()}")
        print(f"PRODUCTS row count: {products_df.count()}")
        print(f"SALES sample:\n{sales_df.limit(5).to_pandas()}")
        print(f"DEALERS sample:\n{dealers_df.limit(5).to_pandas()}")
        print(f"PRODUCTS sample:\n{products_df.limit(5).to_pandas()}")

        # Join sales with dealers
        processed_df = sales_df.join(
            dealers_df,
            sales_df["DEALER_NO"] == dealers_df["DEALER_NO"],
            "left"
        ).select(
            sales_df["DEALER_NO"].alias("DEALER_NO"),
            sales_df["SKU"].alias("SKU"),
            sales_df["DATE"].alias("DATE"),
            sales_df["QUANTITY"].alias("QUANTITY"),
            sales_df["AMOUNT"].alias("AMOUNT"),
            dealers_df["DEALER_NAME"].alias("DEALER_NAME"),
            dealers_df["REGION"].alias("REGION"),
            dealers_df["ADDRESS"].alias("ADDRESS"),
            dealers_df["ANNUAL_REVENUE"].alias("ANNUAL_REVENUE"),
            dealers_df["DEALER_TYPE"].alias("DEALER_TYPE"),
            dealers_df["EC_CLUB"].alias("EC_CLUB")
        )

        print(f"processed_df schema after sales-dealers join: {processed_df.columns}")
        print(f"processed_df row count after sales-dealers join: {processed_df.count()}")

        # Join with dp_mapping
        processed_df = processed_df.join(
            dp_mapping_df,
            processed_df["DEALER_NO"] == dp_mapping_df["DN_NUMBER"],
            "left"
        ).select(
            processed_df["*"],
            dp_mapping_df["VERTICAL"].alias("VERTICAL")
        )
        print(f"processed_df schema after dp_mapping join: {processed_df.columns}")
        print(f"processed_df row count after dp_mapping join: {processed_df.count()}")

        # Check SKU overlap
        sales_skus = processed_df.select(col("SKU")).distinct()
        products_skus = products_df.select(col("SKU")).distinct()
        common_skus = sales_skus.join(products_skus, sales_skus["SKU"] == products_skus["SKU"], "inner")
        print(f"Common SKU count: {common_skus.count()}")
        unmatched_sales_skus = sales_skus.join(products_skus, sales_skus["SKU"] == products_skus["SKU"], "left_anti")
        print(f"Unmatched SKU in processed_df sample:\n{unmatched_sales_skus.limit(5).to_pandas()}")

        # Join with products and filter valid trading/finished goods
        processed_df = processed_df.join(
            products_df,
            processed_df["SKU"] == products_df["SKU"],
            "left"
        ).filter(col("IS_TRADING") == 1
        ).select(
            processed_df["DEALER_NO"].alias("DEALER_NO"),
            processed_df["SKU"].alias("SKU"),
            processed_df["DATE"].alias("DATE"),
            processed_df["QUANTITY"].alias("QUANTITY"),
            processed_df["AMOUNT"].alias("AMOUNT"),
            processed_df["DEALER_NAME"].alias("DEALER_NAME"),
            processed_df["REGION"].alias("REGION"),
            processed_df["ADDRESS"].alias("ADDRESS"),
            processed_df["ANNUAL_REVENUE"].alias("ANNUAL_REVENUE"),
            processed_df["DEALER_TYPE"].alias("DEALER_TYPE"),
            processed_df["EC_CLUB"].alias("EC_CLUB"),
            processed_df["VERTICAL"].alias("VERTICAL"),
            products_df["PRODUCT_NAME"].alias("PRODUCT_NAME"),
            products_df["PROD_CATEGORY"].alias("PROD_CATEGORY"),
            products_df["PROD_RANGE"].alias("PROD_RANGE"),
            products_df["PROD_SUBCATEGORY"].alias("PROD_SUBCATEGORY")
        ).drop("IS_TRADING", "IS_FINISHED")
            
        print(f"processed_df schema after products join: {processed_df.columns}")
        print(f"processed_df row count after products join: {processed_df.count()}")

        # Cache to optimize subsequent operations
        try:
            processed_df = processed_df.cache_result()
            print("Successfully cached processed_df")
        except Exception as cache_e:
            print(f"cache_result failed: {str(cache_e)}. Persisting to temporary table.")
            processed_df.write.mode("overwrite").save_as_table("SNOWFLAKE_LEARNING_DB.STG.TEMP_PROCESSED_DF")
            processed_df = session.table("SNOWFLAKE_LEARNING_DB.STG.TEMP_PROCESSED_DF")
            print("Loaded processed_df from temporary table")

        print(f"clean_and_join_data completed in {time.time() - start_time:.2f} seconds")
        return processed_df

    except Exception as e:
        print(f"Error cleaning and joining data: {str(e)}")
        raise

processed_df = clean_and_join_data(sales_df, dealers_df, products_df, dp_mapping_df, ec_club_df)

In [None]:

def compute_features(processed_df):
    start_time = time.time()
    try:
        dealer_features = processed_df.group_by(col("DEALER_NO")).agg(
            sum_(col("AMOUNT")).alias("TOTAL_SALES"),
            count("*").alias("TOTAL_INVOICES"),
            count_distinct(col("SKU")).alias("UNIQUE_SKUS"),
            sum_(col("ANNUAL_REVENUE")).alias("ANNUAL_REVENUE"),
            count_distinct(col("DEALER_TYPE")).alias("DEALER_TYPE_COUNT"),
            count_distinct(col("EC_CLUB")).alias("EC_CLUB_COUNT"),
            max_(col("DEALER_NAME")).alias("DEALER_NAME"),
            max_(col("REGION")).alias("REGION"),
            max_(col("ADDRESS")).alias("ADDRESS")
        ).with_column(
            "AVERAGE_BILL_VALUE", when(col("TOTAL_INVOICES") > 0, col("TOTAL_SALES") / col("TOTAL_INVOICES")).otherwise(0)
        ).with_column(
            "AVERAGE_SKUS_PER_INVOICE", when(col("TOTAL_INVOICES") > 0, col("UNIQUE_SKUS") / col("TOTAL_INVOICES")).otherwise(0)
        )

        product_combinations = processed_df.group_by(
            col("DEALER_NO"), col("PROD_CATEGORY"), col("PROD_RANGE"), col("PROD_SUBCATEGORY")
        ).agg(count("*").alias("PURCHASE_COUNT")).select(
            col("DEALER_NO"),
            concat(col("PROD_CATEGORY"), lit("_"), col("PROD_RANGE"), lit("_"), col("PROD_SUBCATEGORY")).alias("PRODUCT_COMBINATION")
        )

        purchase_matrix = processed_df.group_by("DEALER_NO", "SKU").agg(
            count("*").alias("PURCHASE_COUNT")
        ).select(
            col("DEALER_NO"), col("SKU"), when(col("PURCHASE_COUNT") > 0, 1).otherwise(0).alias("PURCHASED")
        )

        sku_plus_one_matrix = processed_df.group_by(
            col("DEALER_NO"), col("PROD_CATEGORY"), col("PROD_RANGE"), col("PROD_SUBCATEGORY")
        ).agg(
            count("*").alias("PURCHASE_COUNT")
        ).select(
            col("DEALER_NO"),
            concat(col("PROD_CATEGORY"), lit("_"), col("PROD_RANGE"), lit("_"), col("PROD_SUBCATEGORY")).alias("PRODUCT_COMBINATION"),
            when(col("PURCHASE_COUNT") > 0, 1).otherwise(0).alias("PURCHASED")
        )

        print(f"compute_features completed in {time.time() - start_time:.2f} seconds")
        return dealer_features, product_combinations, purchase_matrix, sku_plus_one_matrix
    except Exception as e:
        print(f"Error computing features: {str(e)}")
        raise

dealer_features, product_combinations, purchase_matrix, sku_plus_one_matrix = compute_features(processed_df)

In [None]:
def save_transformed_tables(session, processed_df, dealer_features, product_combinations, purchase_matrix, sku_plus_one_matrix):
    start_time = time.time()
    try:
        processed_df.write.mode("overwrite").save_as_table("SNOWFLAKE_LEARNING_DB.STG.PROCESSED_SALES")
        dealer_features.write.mode("overwrite").save_as_table("SNOWFLAKE_LEARNING_DB.STG.DEALER_FEATURES")
        product_combinations.write.mode("overwrite").save_as_table("SNOWFLAKE_LEARNING_DB.STG.DEALER_PRODUCT_COMBINATIONS")
        purchase_matrix.write.mode("overwrite").save_as_table("SNOWFLAKE_LEARNING_DB.STG.PURCHASE_MATRIX")
        sku_plus_one_matrix.write.mode("overwrite").save_as_table("SNOWFLAKE_LEARNING_DB.STG.SKU_PLUS_ONE_MATRIX")
        print("Saved transformed tables to STG schema")

        session.sql("""
            CREATE OR REPLACE TABLE SNOWFLAKE_LEARNING_DB.STG.DEALERS AS
            SELECT 
                d.DEALER_NO, 
                d.DEALER_NAME, 
                d.REGION, 
                d.ADDRESS, 
                d.ANNUAL_REVENUE, 
                d.DEALER_TYPE, 
                d.EC_CLUB,
                COALESCE(f.TOTAL_SALES, 0) AS TOTAL_SALES,
                COALESCE(f.TOTAL_INVOICES, 0) AS TOTAL_INVOICES,
                COALESCE(f.AVERAGE_BILL_VALUE, 0) AS AVERAGE_BILL_VALUE,
                COALESCE(f.AVERAGE_SKUS_PER_INVOICE, 0) AS AVERAGE_SKUS_PER_INVOICE
            FROM SNOWFLAKE_LEARNING_DB.ODS.DEALERS d
            LEFT JOIN SNOWFLAKE_LEARNING_DB.STG.DEALER_FEATURES f
            ON d.DEALER_NO = f.DEALER_NO
        """).collect()
        print("Updated DEALERS table in STG schema")
        print(f"save_transformed_tables completed in {time.time() - start_time:.2f} seconds")
    except Exception as e:
        print(f"Error saving transformed tables: {str(e)}")
        raise

save_transformed_tables(session, processed_df, dealer_features, product_combinations, purchase_matrix, sku_plus_one_matrix)


In [None]:

def create_dwh_table_and_validate(session, processed_df, dealer_features):
    start_time = time.time()
    try:
        # Force all column names to uppercase
        processed_df = processed_df.select(*[col(c).alias(c.upper()) for c in processed_df.schema.names])
        dealer_features = dealer_features.select(*[col(c).alias(c.upper()) for c in dealer_features.schema.names])

        # Join and prepare data
        dwh_input = processed_df.select(
            col("DEALER_NO"),
            col("SKU"),
            col("PRODUCT_NAME"),
            concat(col("PROD_CATEGORY"), lit("_"), col("PROD_RANGE"), lit("_"), col("PROD_SUBCATEGORY")).alias("PRODUCT_COMBINATION"),
            col("QUANTITY"),
            col("AMOUNT"),
            col("DEALER_NAME"),
            col("REGION"),
            col("ADDRESS"),
            col("ANNUAL_REVENUE"),
            col("DEALER_TYPE"),
            col("EC_CLUB"),
            col("VERTICAL")
        ).join(
            dealer_features.select(
                col("DEALER_NO"),
                col("TOTAL_SALES"),
                col("TOTAL_INVOICES"),
                col("AVERAGE_BILL_VALUE"),
                col("AVERAGE_SKUS_PER_INVOICE")
            ),
            on="DEALER_NO",
            how="left"
        )

        # Save table
        dwh_input.write.mode("overwrite").save_as_table("SNOWFLAKE_LEARNING_DB.DWH.RECOMMENDATION_INPUT")
        print("Saved RECOMMENDATION_INPUT table to DWH schema")

        # Duplicate check
        duplicates = session.sql("""
            SELECT DEALER_NO, SKU, DATE, COUNT(*)
            FROM SNOWFLAKE_LEARNING_DB.STG.PROCESSED_SALES
            GROUP BY DEALER_NO, SKU, DATE
            HAVING COUNT(*) > 1
        """).collect()
        if duplicates:
            print(f"Duplicates found in STG.PROCESSED_SALES:\n{duplicates}")
        else:
            print("No duplicates found in STG.PROCESSED_SALES")

        # Null check
        null_counts = processed_df.select(
            count(when(col("DEALER_NO").is_null(), 1)).alias("NULL_DEALER_NO"),
            count(when(col("SKU").is_null(), 1)).alias("NULL_SKU"),
            count(when(col("AMOUNT").is_null(), 1)).alias("NULL_AMOUNT")
        ).collect()[0]
        print(f"Null counts in PROCESSED_SALES:\n{null_counts}")

        # Sample output
        sample_dwh = session.table("SNOWFLAKE_LEARNING_DB.DWH.RECOMMENDATION_INPUT").limit(10).to_pandas()
        print("Sample of DWH.RECOMMENDATION_INPUT table:\n" + str(sample_dwh))

        print(f"create_dwh_table_and_validate completed in {time.time() - start_time:.2f} seconds")
    except Exception as e:
        print(f"Error creating DWH table or validating: {str(e)}")
        raise

create_dwh_table_and_validate(session, processed_df, dealer_features)


In [None]:
def collaborative_filtering(session):
    start_time = time.time()
    try:
        # Switch to STG schema
        session.sql("USE SCHEMA SNOWFLAKE_LEARNING_DB.STG").collect()
        print("Switched to STG schema")

        # Load and filter purchase matrix
        purchase_matrix = session.table("SNOWFLAKE_LEARNING_DB.STG.PURCHASE_MATRIX")
        print(f"Purchase matrix schema: {purchase_matrix.columns}")
        row_count = purchase_matrix.count()
        print(f"Purchase matrix row count: {row_count}")
        if row_count == 0:
            raise ValueError("PURCHASE_MATRIX is empty. Check clean_and_join_data and compute_features.")

        
        # Filter active dealers (≥3 purchases) and SKUs (≥3 dealers)
        session.sql("""
            CREATE OR REPLACE TEMPORARY TABLE SNOWFLAKE_LEARNING_DB.STG.TEMP_ACTIVE_DEALERS AS
            SELECT DEALER_NO
            FROM SNOWFLAKE_LEARNING_DB.STG.PURCHASE_MATRIX
            GROUP BY DEALER_NO
            HAVING SUM(PURCHASED) >= 3
        """).collect()
        session.sql("""
            CREATE OR REPLACE TEMPORARY TABLE SNOWFLAKE_LEARNING_DB.STG.TEMP_ACTIVE_SKUS AS
            SELECT SKU
            FROM SNOWFLAKE_LEARNING_DB.STG.PURCHASE_MATRIX
            GROUP BY SKU
            HAVING COUNT(DISTINCT DEALER_NO) >= 3
        """).collect()
        session.sql("""
            CREATE OR REPLACE TEMPORARY TABLE SNOWFLAKE_LEARNING_DB.STG.TEMP_FILTERED_PURCHASE_MATRIX AS
            SELECT PM.DEALER_NO, PM.SKU, PM.PURCHASED
            FROM SNOWFLAKE_LEARNING_DB.STG.PURCHASE_MATRIX PM
            INNER JOIN SNOWFLAKE_LEARNING_DB.STG.TEMP_ACTIVE_DEALERS AD ON PM.DEALER_NO = AD.DEALER_NO
            INNER JOIN SNOWFLAKE_LEARNING_DB.STG.TEMP_ACTIVE_SKUS ASK ON PM.SKU = ASK.SKU
        """).collect()
        filtered_count = session.sql("SELECT COUNT(*) AS CNT FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_FILTERED_PURCHASE_MATRIX").collect()[0]['CNT']
        print(f"Filtered purchase matrix row count: {filtered_count}")

        # --- User-User Collaborative Filtering ---
        # Pre-aggregate
        print("Starting pre-aggregation")
        session.sql("""
            CREATE OR REPLACE TABLE SNOWFLAKE_LEARNING_DB.STG.TEMP_PRE_PIVOT AS
            SELECT DEALER_NO, SKU, SUM(PURCHASED) AS PURCHASED
            FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_FILTERED_PURCHASE_MATRIX
            GROUP BY DEALER_NO, SKU
        """).collect()
        print("Pre-aggregation completed")

        # Pivot to dealer-SKU matrix
        print("Starting user pivot")
        sku_list = session.sql("SELECT DISTINCT SKU FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_PRE_PIVOT").to_pandas()['SKU'].tolist()
        pivot_columns = ", ".join([f"SUM(CASE WHEN SKU = '{sku}' THEN PURCHASED ELSE 0 END) AS SKU{sku.replace('SKU', '')}" for sku in sku_list])
        session.sql(f"""
            CREATE OR REPLACE TABLE SNOWFLAKE_LEARNING_DB.STG.TEMP_PIVOT_MATRIX AS
            SELECT DEALER_NO, {pivot_columns}
            FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_PRE_PIVOT
            GROUP BY DEALER_NO
        """).collect()
        # Fill nulls with 0
        pivot_cols = session.table("SNOWFLAKE_LEARNING_DB.STG.TEMP_PIVOT_MATRIX")
        # pivot_cols = session.sql("SHOW COLUMNS IN SNOWFLAKE_LEARNING_DB.STG.TEMP_PIVOT_MATRIX").to_pandas()['column_name'].tolist()
        update_cols = ", ".join([f"{c} = COALESCE({c}, 0)" for c in pivot_cols.columns if c != 'DEALER_NO'])
        session.sql(f"""
            UPDATE SNOWFLAKE_LEARNING_DB.STG.TEMP_PIVOT_MATRIX
            SET {update_cols}
        """).collect()
        pivot_count = session.sql("SELECT COUNT(*) AS CNT FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_PIVOT_MATRIX").collect()[0]['CNT']
        print(f"User pivot matrix schema: {pivot_cols.columns[:10]}... (total {len(pivot_cols.columns)} columns)")
        print(f"User pivot matrix row count: {pivot_count}")

        # Normalize vectors
        print("Starting user normalization")
        norm_cols = " + ".join([f"POWER(SKU{sku.replace('SKU', '')}, 2)" for sku in sku_list])
        session.sql(f"""
            CREATE OR REPLACE TEMPORARY TABLE SNOWFLAKE_LEARNING_DB.STG.TEMP_NORM AS
            SELECT DEALER_NO, SQRT({norm_cols}) AS NORM
            FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_PIVOT_MATRIX
            WHERE SQRT({norm_cols}) > 0
        """).collect()
        norm_count = session.sql("SELECT COUNT(*) AS CNT FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_NORM").collect()[0]['CNT']
        print(f"User norm row count: {norm_count}")

        # Compute dealer similarity
        print("Starting dealer similarity")
        dot_product = " + ".join([f"A.SKU{sku.replace('SKU', '')} * B.SKU{sku.replace('SKU', '')}" for sku in sku_list])
        session.sql(f"""
            CREATE OR REPLACE TABLE SNOWFLAKE_LEARNING_DB.STG.TEMP_DEALER_PAIRS AS
            SELECT 
                A.DEALER_NO AS DEALER_A,
                B.DEALER_NO AS DEALER_B,
                ({dot_product} / (NA.NORM * NB.NORM)) AS SIMILARITY
            FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_PIVOT_MATRIX A
            CROSS JOIN SNOWFLAKE_LEARNING_DB.STG.TEMP_PIVOT_MATRIX B
            INNER JOIN SNOWFLAKE_LEARNING_DB.STG.TEMP_NORM NA ON A.DEALER_NO = NA.DEALER_NO
            INNER JOIN SNOWFLAKE_LEARNING_DB.STG.TEMP_NORM NB ON B.DEALER_NO = NB.DEALER_NO
            WHERE A.DEALER_NO < B.DEALER_NO
            AND ({dot_product} / (NA.NORM * NB.NORM)) > 0.2
        """).collect()
        dealer_pairs_count = session.sql("SELECT COUNT(*) AS CNT FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_DEALER_PAIRS").collect()[0]['CNT']
        print(f"Dealer pairs row count: {dealer_pairs_count}")

        # Generate user-user recommendations
        print("Starting user-user recommendations")
        session.sql("""
            CREATE OR REPLACE TEMPORARY TABLE SNOWFLAKE_LEARNING_DB.STG.TEMP_USER_RECOMMENDATIONS AS
            SELECT 
                DP.DEALER_A AS DEALER_NO,
                PM.SKU,
                SUM(DP.SIMILARITY) AS RECOMMENDATION_SCORE,
                'USER_USER' AS RECOMMENDATION_TYPE
            FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_DEALER_PAIRS DP
            INNER JOIN SNOWFLAKE_LEARNING_DB.STG.TEMP_FILTERED_PURCHASE_MATRIX PM ON DP.DEALER_B = PM.DEALER_NO
            WHERE PM.PURCHASED = 1
            GROUP BY DP.DEALER_A, PM.SKU
            HAVING SUM(DP.SIMILARITY) > 0
        """).collect()
        session.sql("""
            CREATE OR REPLACE TABLE SNOWFLAKE_LEARNING_DB.STG.TEMP_USER_USER_RECOMMENDATIONS AS
            SELECT 
                UR.DEALER_NO, UR.SKU, UR.RECOMMENDATION_SCORE, UR.RECOMMENDATION_TYPE
            FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_USER_RECOMMENDATIONS UR
            LEFT JOIN SNOWFLAKE_LEARNING_DB.STG.TEMP_FILTERED_PURCHASE_MATRIX PM
            ON UR.DEALER_NO = PM.DEALER_NO AND UR.SKU = PM.SKU AND PM.PURCHASED = 1
            WHERE PM.DEALER_NO IS NULL
        """).collect()
        user_rec_count = session.sql("SELECT COUNT(*) AS CNT FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_USER_USER_RECOMMENDATIONS").count()
        print(f"User-user recommendations count: {user_rec_count}")

        # --- Item-Item Collaborative Filtering ---
        print("Starting item pivot")
        dealer_list = session.sql("SELECT DISTINCT DEALER_NO FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_PRE_PIVOT").to_pandas()['DEALER_NO'].tolist()
        item_pivot_columns = ", ".join([f"SUM(CASE WHEN DEALER_NO = '{dealer}' THEN PURCHASED ELSE 0 END) AS D{dealer.replace('DEALER_NO', '')}" for dealer in dealer_list])
        session.sql(f"""
            CREATE OR REPLACE TABLE SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_PIVOT AS
            SELECT SKU, {item_pivot_columns}
            FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_PRE_PIVOT
            GROUP BY SKU
        """).collect()
        
        item_pivot_cols = session.table("SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_PIVOT")
        # item_pivot_cols = session.sql("SHOW COLUMNS IN SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_PIVOT").to_pandas()['column_name'].tolist()
        item_update_cols = ", ".join([f"{c} = COALESCE({c}, 0)" for c in item_pivot_cols.columns if c != 'SKU'])
        session.sql(f"""
            UPDATE SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_PIVOT
            SET {item_update_cols}
        """).collect()
        item_pivot_count = session.sql("SELECT COUNT(*) AS CNT FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_PIVOT").collect()[0]['CNT']
        print(f"Item pivot matrix row count: {item_pivot_count}")

        print("Starting item normalization")
        item_norm_cols = " + ".join([f"POWER(D{dealer.replace('DEALER_', '')}, 2)" for dealer in dealer_list])
        session.sql(f"""
            CREATE OR REPLACE TEMPORARY TABLE SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_NORM AS
            SELECT SKU, SQRT({item_norm_cols}) AS norm_column
            FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_PIVOT
            WHERE SQRT({item_norm_cols}) > 0
        """).collect()
        item_norm_count = session.sql("SELECT COUNT(*) AS CNT FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_NORM").collect()[0]['CNT']
        print(f"Item norm count: {item_norm_count}")

        print("Starting SKU similarity")
        item_dot_product = " + ".join([f"A.D{dealer.replace('DEALER_', '')} * B.D{dealer.replace('DEALER_', '')}" for dealer in dealer_list])
        session.sql(f"""
            CREATE OR REPLACE TABLE SNOWFLAKE_LEARNING_DB.STG.TEMP_SKU_PAIRS AS
            SELECT 
                A.SKU AS SKU_A,
                B.SKU AS SKU_B,
                ({item_dot_product} / (NA.norm_column * NB.norm_column)) AS SIMILARITY
            FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_PIVOT A
            CROSS JOIN SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_PIVOT B
            INNER JOIN SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_NORM NA ON A.SKU = NA.SKU
            INNER JOIN SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_NORM NB ON B.SKU = NB.SKU
            WHERE A.SKU < B.SKU
            AND ({item_dot_product} / (NA.norm_column * NB.norm_column)) > 0.2
        """).collect()
        sku_pairs_count = session.sql("SELECT COUNT(*) AS CNT FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_SKU_PAIRS").collect()[0]['CNT']
        print(f"SKU pairs row count: {sku_pairs_count}")

        print("Starting item-item recommendations")
        session.sql("""
            CREATE OR REPLACE TEMPORARY TABLE SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_RECOMMENDATIONS AS
            SELECT 
                PM.DEALER_NO,
                SP.SKU_B AS SKU,
                SUM(SP.SIMILARITY) AS RECOMMENDATION_SCORE,
                'ITEM_ITEM' AS RECOMMENDATION_TYPE
            FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_SKU_PAIRS SP
            INNER JOIN SNOWFLAKE_LEARNING_DB.STG.TEMP_FILTERED_PURCHASE_MATRIX PM ON SP.SKU_A = PM.SKU
            WHERE PM.PURCHASED = 1
            GROUP BY PM.DEALER_NO, SP.SKU_B
            HAVING SUM(SP.SIMILARITY) > 0
        """).collect()
        session.sql("""
            CREATE OR REPLACE TABLE SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_ITEM_RECOMMENDATIONS AS
            SELECT 
                IR.DEALER_NO, IR.SKU, IR.RECOMMENDATION_SCORE, IR.RECOMMENDATION_TYPE
            FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_RECOMMENDATIONS IR
            LEFT JOIN SNOWFLAKE_LEARNING_DB.STG.TEMP_FILTERED_PURCHASE_MATRIX PM
            ON IR.DEALER_NO = PM.DEALER_NO AND IR.SKU = PM.SKU AND PM.PURCHASED = 1
            WHERE PM.DEALER_NO IS NULL
        """).collect()
        item_rec_count = session.sql("SELECT COUNT(*) AS CNT FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_ITEM_RECOMMENDATIONS").collect()[0]['CNT']
        print(f"Item-item recommendations row count: {item_rec_count}")

        # Combine recommendations
        session.sql("""
            CREATE OR REPLACE TEMPORARY TABLE SNOWFLAKE_LEARNING_DB.STG.TEMP_COMBINED_RECOMMENDATIONS AS
            SELECT DEALER_NO, SKU, RECOMMENDATION_SCORE, RECOMMENDATION_TYPE
            FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_USER_USER_RECOMMENDATIONS
            UNION
            SELECT DEALER_NO, SKU, RECOMMENDATION_SCORE, RECOMMENDATION_TYPE
            FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_ITEM_RECOMMENDATIONS
        """).collect()
        combined_count = session.sql("SELECT COUNT(*) AS CNT FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_COMBINED_RECOMMENDATIONS").collect()[0]['CNT']
        print(f"Combined recommendations row count: {combined_count}")

        # Enrich with metadata
        print("Starting metadata enrichment")
        session.sql("""
            CREATE OR REPLACE TABLE SNOWFLAKE_LEARNING_DB.DWH.COLLABORATIVE_RECOMMENDATIONS AS
            SELECT 
                CR.DEALER_NO,
                CR.SKU,
                CR.RECOMMENDATION_SCORE,
                CR.RECOMMENDATION_TYPE,
                P.PRODUCT_NAME,
                P.PROD_CATEGORY,
                P.PROD_SUBCATEGORY,
                D.DEALER_NAME,
                D.REGION,
                D.ADDRESS
            FROM SNOWFLAKE_LEARNING_DB.STG.TEMP_COMBINED_RECOMMENDATIONS CR
            LEFT JOIN SNOWFLAKE_LEARNING_DB.ODS.PRODUCTS P ON CR.SKU = P.SKU
            LEFT JOIN SNOWFLAKE_LEARNING_DB.STG.DEALERS D ON CR.DEALER_NO = D.DEALER_NO
            ORDER BY CR.DEALER_NO, CR.RECOMMENDATION_SCORE DESC
        """).collect()
        print("Saved COLLABORATIVE_RECOMMENDATIONS table to DWH schema")

        # Validation
        print("Starting validation")
        validation_stats = session.sql("""
            SELECT 
                COUNT(DISTINCT DEALER_NO) AS DEALER_COUNT,
                COUNT(DISTINCT SKU) AS SKU_COUNT,
                MIN(RECOMMENDATION_SCORE) AS MIN_SCORE,
                MAX(RECOMMENDATION_SCORE) AS MAX_SCORE,
                AVG(RECOMMENDATION_SCORE) AS AVG_SCORE
            FROM SNOWFLAKE_LEARNING_DB.DWH.COLLABORATIVE_RECOMMENDATIONS
        """).to_pandas()
        print(f"Recommendation validation stats:\n{validation_stats}")

        # Sample output
        sample_recommendations = session.sql("""
            SELECT * FROM SNOWFLAKE_LEARNING_DB.DWH.COLLABORATIVE_RECOMMENDATIONS
            LIMIT 10
        """).to_pandas()
        print(f"Sample of COLLABORATIVE_RECOMMENDATIONS:\n{sample_recommendations}")

        # Clean up temporary tables
        # session.sql("DROP TABLE IF EXISTS SNOWFLAKE_LEARNING_DB.STG.TEMP_PRE_PIVOT").collect()
        # session.sql("DROP TABLE IF EXISTS SNOWFLAKE_LEARNING_DB.STG.TEMP_PIVOT_MATRIX").collect()
        # session.sql("DROP TABLE IF EXISTS SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_PIVOT").collect()
        # session.sql("DROP TABLE IF EXISTS SNOWFLAKE_LEARNING_DB.STG.TEMP_ACTIVE_DEALERS").collect()
        # session.sql("DROP TABLE IF EXISTS SNOWFLAKE_LEARNING_DB.STG.TEMP_ACTIVE_SKUS").collect()
        # session.sql("DROP TABLE IF EXISTS SNOWFLAKE_LEARNING_DB.STG.TEMP_FILTERED_PURCHASE_MATRIX").collect()
        # session.sql("DROP TABLE IF EXISTS SNOWFLAKE_LEARNING_DB.STG.TEMP_NORM").collect()
        # session.sql("DROP TABLE IF EXISTS SNOWFLAKE_LEARNING_DB.STG.TEMP_DEALER_PAIRS").collect()
        # session.sql("DROP TABLE IF EXISTS SNOWFLAKE_LEARNING_DB.STG.TEMP_USER_RECOMMENDATIONS").collect()
        # session.sql("DROP TABLE IF EXISTS SNOWFLAKE_LEARNING_DB.STG.TEMP_USER_USER_RECOMMENDATIONS").collect()
        # session.sql("DROP TABLE IF EXISTS SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_NORM").collect()
        # session.sql("DROP TABLE IF EXISTS SNOWFLAKE_LEARNING_DB.STG.TEMP_SKU_PAIRS").collect()
        # session.sql("DROP TABLE IF EXISTS SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_RECOMMENDATIONS").collect()
        # session.sql("DROP TABLE IF EXISTS SNOWFLAKE_LEARNING_DB.STG.TEMP_ITEM_ITEM_RECOMMENDATIONS").collect()
        # session.sql("DROP TABLE IF EXISTS SNOWFLAKE_LEARNING_DB.STG.TEMP_COMBINED_RECOMMENDATIONS").collect()
        # print("Cleaned up temporary tables")

        print(f"collaborative_filtering completed in {time.time() - start_time:.2f} seconds")
    except Exception as e:
        print(f"Error in collaborative filtering: {str(e)}")
        raise
    finally:
        print("End of code.....")

collaborative_filtering(session)

In [None]:
def create_dashboard_view(session):
    start_time = time.time()
    try:
        session.sql("""
            CREATE OR REPLACE VIEW SNOWFLAKE_LEARNING_DB.DWH.RECOMMENDATION_DASHBOARD AS
            SELECT 
                r.DEALER_NO,
                r.DEALER_NAME,
                r.REGION,
                r.ADDRESS,
                r.SKU,
                r.PRODUCT_NAME,
                r.PROD_CATEGORY,
                r.PROD_SUBCATEGORY,
                r.RECOMMENDATION_SCORE,
                r.RECOMMENDATION_TYPE,
                i.TOTAL_SALES,
                i.TOTAL_INVOICES,
                i.AVERAGE_BILL_VALUE,
                i.VERTICAL
            FROM SNOWFLAKE_LEARNING_DB.DWH.COLLABORATIVE_RECOMMENDATIONS r
            LEFT JOIN SNOWFLAKE_LEARNING_DB.DWH.RECOMMENDATION_INPUT i
            ON r.DEALER_NO = i.DEALER_NO AND r.SKU = i.SKU
            WHERE r.RECOMMENDATION_SCORE > 0.5
        """).collect()
        print("Created RECOMMENDATION_DASHBOARD view in DWH schema")

        sample_view = session.sql("SELECT * FROM SNOWFLAKE_LEARNING_DB.DWH.RECOMMENDATION_DASHBOARD LIMIT 10").to_pandas()
        print(f"Sample of RECOMMENDATION_DASHBOARD view:\n{sample_view}")

        print(f"create_dashboard_view completed in {time.time() - start_time:.2f} seconds")
    except Exception as e:
        print(f"Error creating dashboard view: {str(e)}")
        raise

create_dashboard_view(session)


In [None]:
# from snowflake.snowpark.functions import col, sum as sum_, count_distinct, coalesce

# def prepare_clustering_features(session):
#     start_time = time.time()
#     try:
#         # Load dealer features and purchase matrix
#         dealer_features = session.table("SNOWFLAKE_LEARNING_DB.STG.DEALER_FEATURES")
#         purchase_matrix = session.table("SNOWFLAKE_LEARNING_DB.STG.PURCHASE_MATRIX")

#         # Create dealer-SKU purchase vector
#         dealer_sku_vector = purchase_matrix.group_by("DEALER_NO").pivot("SKU").agg(sum_("PURCHASED"))
#         for c in dealer_sku_vector.columns[1:]:
#             dealer_sku_vector = dealer_sku_vector.with_column(c, coalesce(col(c), lit(0)))

#         # Combine with dealer features
#         clustering_features = dealer_features.select(
#             col("DEALER_NO"),
#             col("TOTAL_SALES"),
#             col("TOTAL_INVOICES"),
#             col("AVERAGE_BILL_VALUE"),
#             col("AVERAGE_SKUS_PER_INVOICE")
#         ).join(
#             dealer_sku_vector,
#             "DEALER_NO",
#             "left"
#         )
#         for c in clustering_features.columns[1:]:
#             clustering_features = clustering_features.with_column(c, coalesce(col(c), lit(0)))

#         # Save to DWH
#         clustering_features.write.mode("overwrite").save_as_table("SNOWFLAKE_LEARNING_DB.DWH.CLUSTERING_FEATURES")
#         print("Saved CLUSTERING_FEATURES table to DWH schema")
#         print(f"Clustering features row count: {clustering_features.count()}")

#         # Sample output
#         sample_features = clustering_features.limit(10).to_pandas()
#         print(f"Sample of CLUSTERING_FEATURES:\n{sample_features}")

#         print(f"prepare_clustering_features completed in {time.time() - start_time:.2f} seconds")
#     except Exception as e:
#         print(f"Error preparing clustering features: {str(e)}")
#         raise

# prepare_clustering_features(session)
