In [0]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [0]:
# Read data
df = spark.read.table("retail_catalog.retail_schema.features")

In [0]:
pd_df = df.toPandas()
pd_df.head()

In [0]:
pd_df['revenue'] = pd_df['quantity'] * pd_df['price']
pd_df.head()

In [0]:
# Find the most recent date in the dataset to calculate recency
today_date = max(pd_df['invoice_date'])

# Calculate RFM values for each customer
rfm_df = pd_df.groupby('customer_id').agg(
    recency=('invoice_date', lambda x: (today_date - x.max()).days),
    frequency=('quantity', 'sum'),
    monetary=('price', 'sum')
).reset_index()

print("RFM Values for Each Customer:")
print(rfm_df)

In [0]:
# Create quintile labels
r_labels = range(5, 0, -1)  # Recency: 5 is best (most recent)
f_m_labels = range(1, 5)    # Frequency & Monetary: 5 is best (most frequent/highest spending)

# Assign scores using pd.qcut()
rfm_df['R_Score'] = pd.qcut(rfm_df['recency'], q=5, labels=r_labels)
#rfm_df['F_Score'] = pd.qcut(rfm_df["frequency"], q=5, duplicates="drop")
rfm_df['F_Score'] = pd.qcut(rfm_df['frequency'], q=5, duplicates='drop')
rfm_df['M_Score'] = pd.qcut(rfm_df['monetary'], q=5)

In [0]:
# .cat.codes returns 0-based integers, so we add 1 to make them 1-based
rfm_df['F_Score_int'] = rfm_df['F_Score'].cat.codes + 1
rfm_df['M_Score_int'] = rfm_df['M_Score'].cat.codes + 1

In [0]:
rfm_df.head()

In [0]:

# Combine the scores to get a single RFM score
rfm_df['rfm_score'] = rfm_df['R_Score'].astype(int) + rfm_df['F_Score_int'].astype(int) + rfm_df['M_Score_int'].astype(int)

print("\nRFM Scores for Each Customer:")
print(rfm_df)

In [0]:
rfm_df['rfm_score'].max()

In [0]:
rfm_df['rfm_score'].min()

In [0]:
# Define segmentation logic
def get_segment(score):
    if score >= 10:
        return 'Champions'
    elif score >= 7:
        return 'Loyal Customers'
    elif score >= 5:
        return 'Potential'
    else:
        return 'At Risk'

# Create a new column 'customer_segment'
rfm_df['customer_segment'] = rfm_df['rfm_score'].apply(get_segment)

print("\nFinal RFM Customer Segments:")
print(rfm_df[['customer_id', 'rfm_score', 'customer_segment']].sort_values(by='rfm_score', ascending=False))

In [0]:
rfm_score_df = pd.merge(pd_df, rfm_df, on='customer_id', how='left')

print("Merged DataFrame:")
print(rfm_score_df.head())

In [0]:
import pandas as pd
from pyspark.sql import SparkSession

In [0]:
# Create a SparkSession (if not already available)
spark = SparkSession.builder.appName("pandas_to_spark").getOrCreate()

# Convert the pandas DataFrame to a Spark DataFrame
spark_df = spark.createDataFrame(rfm_score_df)

# Store the Spark DataFrame as a table
spark_df.write.mode("overwrite").saveAsTable("retail_catalog.retail_schema.rfm_table")

In [0]:
# Example with mode="append"
#spark_df.write.mode("append").saveAsTable("gold_layer.customer_data")