# Customer Segmentation ETL using Snowpark

**Objective:** Create a feature set for customer segmentation by joining customer demographics with transaction history. This ETL job uses Snowpark for in-database processing.

In [None]:
import pandas as pd
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import col, sum, count, avg
import os

# Connection parameters
connection_parameters = {
    "account": os.getenv('SNOWFLAKE_ACCOUNT'),
    "user": os.getenv('SNOWFLAKE_USER'),
    "password": os.getenv('SNOWFLAKE_PASSWORD'),
    "role": "ANALYST",
    "warehouse": "COMPUTE_WH",
    "database": "RETAIL_PROD",
    "schema": "MARKETING"
}

In [None]:
# Create a Snowpark session
print("Initializing Snowpark session...")
session = Session.builder.configs(connection_parameters).create()
print("Session created successfully.")

In [None]:
# Read customer and transaction tables into Snowpark DataFrames
customers_sdf = session.table('CUSTOMERS')
transactions_sdf = session.table('TRANSACTIONS')

print("Tables loaded into Snowpark DataFrames.")

In [None]:
# Feature Engineering: Calculate RFM (Recency, Frequency, Monetary) scores
rfm_features = transactions_sdf.group_by("CUSTOMER_ID").agg(
    count("TRANSACTION_ID").alias("FREQUENCY"),
    sum("AMOUNT").alias("MONETARY")
)

# Join features back to the customer table
customer_features_sdf = customers_sdf.join(rfm_features, "CUSTOMER_ID", "left") \
                                     .select("CUSTOMER_ID", "AGE", "LOCATION", "FREQUENCY", "MONETARY")

print("Feature engineering complete. Showing a preview:")
customer_features_sdf.show()

In [None]:
# Final Step: Convert to Pandas for local use or saving
print("Converting to Pandas DataFrame...")
final_features_df = customer_features_sdf.to_pandas()

print("Saving features to CSV for ML model consumption...")
final_features_df.to_csv('customer_features.csv', index=False)

print("ETL process complete.")
session.close()