1. Set Up the Snowflake Connector in PySpark:
You need to install the Snowflake Spark Connector to read data from Snowflake into PySpark.
pip install pyspark
pip install snowflake-connector-python
pip install snowflake-sqlalchemy


In [1]:
# configure the Snowflake Spark Connector in my PySpark environment.

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql.functions import *

# Initialize the Spark session with Snowflake connector
# jdbc_driver_path = "D:\Box\Apps\Spark\spark-3.5.3-bin-hadoop3\spark-snowflake_2.12-2.10.1-spark_3.1.jar,D:\Box\Apps\Spark\spark-3.5.3-bin-hadoop3\snowflake-jdbc-3.13.31.jar"
# .config("spark.jars.packages", "net.snowflake:spark-snowflake_2.12:2.9.2") \
#    .config("spark.jars.packages", "net.snowflake:snowflake-jdbc:3.13.14") \
# "sfWarehouse" : "<your_snowflake_warehouse>"

spark = SparkSession.builder \
    .appName("TPCDS_SF10TCL Churn Analysis") \
    .config("spark.jars.packages", "net.snowflake:spark-snowflake_2.12:2.10.1-spark_3.1") \
    .config("spark.jars.packages", "net.snowflake:snowflake-jdbc:3.13.13") \
    .getOrCreate()

# Snowflake connection options
sfOptions = {
    "sfURL" : "https://pdxuwmc-wb15506.snowflakecomputing.com",
    "sfUser" : "Bob35",
    "sfPassword" : "*******",
    "sfDatabase" : "SNOWFLAKE_SAMPLE_DATA",
    "sfSchema" : "TPCDS_SF10TCL",
    "sfRole" : "ACCOUNTADMIN", 
    
}

# Function to read Snowflake tables
def read_snowflake_table(table_name):
    return spark.read \
        .format("snowflake") \
        .options(**sfOptions) \
        .option("dbtable", table_name) \
        .load()

2. Load the Tables from TPCDS_SF10TCL:
Now, we will load the specific tables (STORE_SALES, WEB_SALES, STORE_RETURNS, WEB_RETURNS, CUSTOMER, PROMOTION) from TPCDS_SF10TCL.

In [2]:
# Load tables from TPCDS_SF10TCL
store_sales = read_snowflake_table("STORE_SALES")
web_sales = read_snowflake_table("WEB_SALES")
store_returns = read_snowflake_table("STORE_RETURNS")
web_returns = read_snowflake_table("WEB_RETURNS")
customer = read_snowflake_table("CUSTOMER")
promotions = read_snowflake_table("PROMOTION")



3. Adjust Feature Extraction for TPCDS_SF10TCL:
Adjust the feature extraction to work with the specific schema. The logic stays the same as the previous explanation.

In [5]:
from pyspark.sql.functions import col, count, avg, when

# 1. Purchase Frequency: count number of transactions per customer
store_sales_freq = store_sales.groupBy("ss_customer_sk") \
    .agg(count("ss_ticket_number").alias("purchase_count_store"))

store_sales_freq.show

web_sales_freq = web_sales.groupBy("ws_bill_customer_sk") \
    .agg(count("ws_order_number").alias("purchase_count_web"))

web_sales_freq.show

<bound method DataFrame.show of DataFrame[ws_bill_customer_sk: decimal(38,0), purchase_count_web: bigint]>

In [12]:
# 2. Average Purchase Value: calculate avg value per transaction
# store_avg_purchase = store_sales.groupBy("ss_customer_sk") \
#     .agg(avg("ss_net_paid").alias("avg_store_purchase_value"))
# Step 2: Join store_returns with store_sales_freq to get purchase counts for each customer
# We use sr_return_quantity to represent the number of returns
store_returns_agg = store_returns.join(store_sales_freq, store_returns["sr_customer_sk"] == store_sales_freq["ss_customer_sk"], "left")
# store_returns_agg = store_returns.join(store_sales_freq, store_returns["sr_customer_sk"] == store_sales_freq["ss_customer_sk"], "left")

# web_avg_purchase = web_sales.groupBy("ws_bill_customer_sk") \
#     .agg(avg("ws_net_paid").alias("avg_web_purchase_value"))

web_returns_agg = web_returns.join(web_sales_freq, web_returns["wr_returning_customer_sk"] == web_sales_freq["ws_bill_customer_sk"], "left")

In [13]:
# 3. Return Rate: Join store_returns with store_sales_freq to get purchase counts for return rate calculation
# store_returns_agg = store_returns.join(store_sales_freq, store_returns["sr_customer_sk"] == store_sales_freq["ss_customer_sk"], "left") \
#    .groupBy("sr_customer_sk") \
#    .agg((count("sr_ticket_number") / col("purchase_count_store")).alias("store_return_rate"))

# store_returns_agg = store_returns_agg.groupBy("sr_customer_sk") \
#    .agg(
#        (count("sr_ticket_number") / max("purchase_count_store")).alias("store_return_rate")  # Use max() or first() to aggregate purchase_count_store
#    )
store_returns_agg = store_returns_agg.groupBy("sr_customer_sk") \
    .agg(
        (sum("sr_return_quantity") / max("purchase_count_store")).alias("store_return_rate")  # Use sum() of sr_return_quantity to get total returns
    )



#web_returns_agg = web_returns.join(web_sales_freq, web_returns["wr_returning_customer_sk"] == web_sales_freq["ws_bill_customer_sk"], "left") \
#    .groupBy("wr_returning_customer_sk") \
#    .agg((count("wr_order_number") / col("purchase_count_web")).alias("web_return_rate"))

web_returns_agg = web_returns_agg.groupBy("wr_returning_customer_sk") \
    .agg(
        (sum("wr_return_quantity") / max("purchase_count_web")).alias("web_return_rate")  # Use sum() of wr_return_quantity for web returns
    )

In [17]:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [20]:
# 4. Promotional Participation: check if customers participated in a promotion
promo_sales_store = store_sales.join(promotions, store_sales["ss_promo_sk"] == promotions["p_promo_sk"]) \
    .groupBy("ss_customer_sk").agg(count("ss_ticket_number").alias("store_promo_participation"))

promo_sales_web = web_sales.join(promotions, web_sales["ws_promo_sk"] == promotions["p_promo_sk"]) \
    .groupBy("ws_bill_customer_sk").agg(count("ws_order_number").alias("web_promo_participation"))

# Merge all features together with customer demographics
# Using coalesce to manage potential nulls and any_value() to prevent non-aggregated issues

# Join customer with customer_demographics to get demographic details (like gender)
customer_demographics = read_snowflake_table("customer_demographics")  # Assuming this is the correct table name

# Join customer and customer_demographics on the c_current_cdemo_sk key
customer_with_demo = customer.join(customer_demographics, customer["c_current_cdemo_sk"] == customer_demographics["cd_demo_sk"], "left")

# customer_income_band = read_snowflake_table("income_band")



# Now proceed with joining the rest of the tables
features = customer_with_demo.join(store_sales_freq, customer_with_demo["c_customer_sk"] == store_sales_freq["ss_customer_sk"], "left") \
    .join(web_sales_freq, customer_with_demo["c_customer_sk"] == web_sales_freq["ws_bill_customer_sk"], "left") \
    .join(store_avg_purchase, customer_with_demo["c_customer_sk"] == store_avg_purchase["ss_customer_sk"], "left") \
    .join(web_avg_purchase, customer_with_demo["c_customer_sk"] == web_avg_purchase["ws_bill_customer_sk"], "left") \
    .join(store_returns_agg, customer_with_demo["c_customer_sk"] == store_returns_agg["sr_customer_sk"], "left") \
    .join(web_returns_agg, customer_with_demo["c_customer_sk"] == web_returns_agg["wr_returning_customer_sk"], "left") \
    .join(promo_sales_store, customer_with_demo["c_customer_sk"] == promo_sales_store["ss_customer_sk"], "left") \
    .join(promo_sales_web, customer_with_demo["c_customer_sk"] == promo_sales_web["ws_bill_customer_sk"], "left") \
    .select(customer_with_demo["c_customer_sk"], 
            "purchase_count_store", 
            "purchase_count_web",
            "avg_store_purchase_value", 
            "avg_web_purchase_value",
            "store_return_rate", 
            "web_return_rate",
            "store_promo_participation", 
            "web_promo_participation",
            "c_birth_year", 
            "cd_gender" )  # Updated to use the gender column from customer_demographics
           # "c_income_band_sk")


In [None]:

# 5. Add Churn Label: customers who haven’t made a purchase in a long time (e.g., last 6 months)
# Add appropriate column for churn labeling
latest_store_purchase = store_sales.agg(max("ss_sold_date_sk")).collect()[0][0]
latest_web_purchase = web_sales.agg(max("ws_sold_date_sk")).collect()[0][0]
latest_purchase_date = max(latest_store_purchase, latest_web_purchase)

# Label customers as churned if they have not purchased in the last 180 days
features = features.withColumn("is_churn", when(
    (col("purchase_count_store").isNull() & col("purchase_count_web").isNull()) |
    (col("purchase_count_store") < lit(latest_purchase_date - 180)) & 
    (col("purchase_count_web") < lit(latest_purchase_date - 180)),
    lit(1)
).otherwise(lit(0)))




In [None]:
# 6. Train Churn Prediction Model: Logistic Regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

# Define feature columns
feature_columns = ["purchase_count_store", "purchase_count_web", "avg_store_purchase_value", 
                   "avg_web_purchase_value", "store_return_rate", "web_return_rate", 
                   "store_promo_participation", "web_promo_participation"]

# VectorAssembler to combine feature columns into a single vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Define the logistic regression model
lr = LogisticRegression(labelCol="is_churn", featuresCol="features")

# Create pipeline
pipeline = Pipeline(stages=[assembler, lr])

# Fit the model
model = pipeline.fit(features)

# Predictions
predictions = model.transform(features)
predictions.select("c_customer_sk", "is_churn", "prediction").show()

4. Adjustments for TPCDS_SF10TCL:
We used the TPCDS_SF10TCL schema for all the table references.
The rest of the logic remains the same, where we aggregate features from customer data, calculate churn labels, and train a predictive model.
5. Churn Labeling:
For the churn label, customers who haven't made a purchase in the last 180 days are labeled as churned. This logic can be adjusted to fit specific business requirements.

6. Final Considerations:
This approach helps extract key customer features and build a predictive churn model using TPC-DS data. We can refine the model by tuning the feature set and model parameters further, or try other classification algorithms like Random Forest, Gradient Boosted Trees, etc.
