In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.clustering import KMeans
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import ClusteringEvaluator, MulticlassClassificationEvaluator

# Initialize Spark session
spark = SparkSession.builder.appName("ECommerceCustomerAnalysis").getOrCreate()

# Load dataset
df = spark.read.csv("s3://ecommerce-project-new1/ecommerce_customer_data_custom_ratios.csv", header=True, inferSchema=True)

df

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1731022594447_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[Customer ID: int, Purchase Date: timestamp, Product Category: string, Product Price: int, Quantity: int, Total Purchase Amount: int, Payment Method: string, Customer Age: int, Returns: double, Customer Name: string, Age: int, Gender: string, Churn: int]

In [3]:
# Data Cleaning
# Drop duplicates
df = df.dropDuplicates()

# Drop unnecessary columns
df = df.drop("Customer Name", "Purchase Date", "Customer ID")

# Handling missing values
df = df.na.fill({"Returns": 0})  # Replace nulls in 'Returns' with 0

# Data type conversion if needed
df = df.withColumn("Churn", col("Churn").cast("int"))

# Filter only required columns and apply transformations if needed
df = df.withColumn("Total Purchase Amount", col("Total Purchase Amount").cast("float"))

df





VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[Product Category: string, Product Price: int, Quantity: int, Total Purchase Amount: float, Payment Method: string, Customer Age: int, Returns: double, Age: int, Gender: string, Churn: int]

In [4]:
# 1. Customer Segmentation (Clustering)
# Vectorize features for clustering
assembler = VectorAssembler(inputCols=["Product Price", "Quantity", "Total Purchase Amount", "Customer Age"], outputCol="features")
df_features = assembler.transform(df)

# Scale the features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
df_scaled = scaler.fit(df_features).transform(df_features)

# K-Means clustering for customer segmentation
kmeans = KMeans(featuresCol="scaledFeatures", k=3)
kmeans_model = kmeans.fit(df_scaled)
clusters = kmeans_model.transform(df_scaled)

# Evaluate clustering
clustering_evaluator = ClusteringEvaluator()
silhouette_score = clustering_evaluator.evaluate(clusters)
print("Silhouette Score for KMeans Clustering:", silhouette_score)

# Optional: Show cluster centers for interpretation
print("Cluster Centers:", kmeans_model.clusterCenters())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Silhouette Score for KMeans Clustering: -0.023337180964140614
Cluster Centers: [array([1.676504  , 1.49696998, 1.88759033, 3.72829395]), array([1.80319786, 3.2073973 , 1.8913154 , 2.85587593]), array([1.92430883, 1.50394717, 1.88702729, 1.94603597])]

In [5]:
# 2. Churn Prediction (Predictive Modeling)
# Index categorical variables
indexer = StringIndexer(inputCols=["Product Category", "Payment Method", "Gender"], outputCols=["ProductCategoryIndex", "PaymentMethodIndex", "GenderIndex"])
df_indexed = indexer.fit(df).transform(df)

# Vectorize features for modeling
feature_columns = ["ProductCategoryIndex", "PaymentMethodIndex", "GenderIndex", "Product Price", "Quantity", "Total Purchase Amount", "Customer Age", "Returns"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_model = assembler.transform(df_indexed)

# Train-Test Split
train_df, test_df = df_model.randomSplit([0.8, 0.2], seed=42)

# Churn Prediction Model - Random Forest
rf = RandomForestClassifier(labelCol="Churn", featuresCol="features")
rf_model = rf.fit(train_df)
rf_predictions = rf_model.transform(test_df)

# Evaluation
evaluator = MulticlassClassificationEvaluator(labelCol="Churn", metricName="accuracy")
rf_accuracy = evaluator.evaluate(rf_predictions)
print("Random Forest Churn Prediction Accuracy:", rf_accuracy)

# Logistic Regression for comparison
lr = LogisticRegression(labelCol="Churn", featuresCol="features")
lr_model = lr.fit(train_df)
lr_predictions = lr_model.transform(test_df)

# Logistic Regression Evaluation
lr_accuracy = evaluator.evaluate(lr_predictions)
print("Logistic Regression Churn Prediction Accuracy:", lr_accuracy)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Random Forest Churn Prediction Accuracy: 0.7993213669035859
Logistic Regression Churn Prediction Accuracy: 0.7993213669035859

In [10]:
from pyspark.ml.classification import GBTClassifier

# Initialize Gradient-Boosted Tree model
gbt = GBTClassifier(labelCol="Churn", featuresCol="features", maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)

# Evaluate Gradient-Boosted Tree model
gbt_accuracy = evaluator.evaluate(gbt_predictions)
print("Gradient-Boosted Tree Churn Prediction Accuracy:", gbt_accuracy)



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Gradient-Boosted Tree Churn Prediction Accuracy: 0.7993213669035859

In [11]:
from pyspark.ml.classification import LinearSVC

# Initialize SVM model
svm = LinearSVC(labelCol="Churn", featuresCol="features", maxIter=10, regParam=0.1)
svm_model = svm.fit(train_df)
svm_predictions = svm_model.transform(test_df)

# Evaluate SVM model
svm_accuracy = evaluator.evaluate(svm_predictions)
print("SVM Churn Prediction Accuracy:", svm_accuracy)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SVM Churn Prediction Accuracy: 0.7993213669035859

In [12]:
from pyspark.ml.classification import NaiveBayes

# Initialize Naive Bayes model
nb = NaiveBayes(labelCol="Churn", featuresCol="features", smoothing=1.0)
nb_model = nb.fit(train_df)
nb_predictions = nb_model.transform(test_df)

# Evaluate Naive Bayes model
nb_accuracy = evaluator.evaluate(nb_predictions)
print("Naive Bayes Churn Prediction Accuracy:", nb_accuracy)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Naive Bayes Churn Prediction Accuracy: 0.794301891338393

In [13]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

# Define network layers for MLP
layers = [len(feature_columns), 20, 10, 2]  # Adjust based on your data

# Initialize MLP model
mlp = MultilayerPerceptronClassifier(labelCol="Churn", featuresCol="features", maxIter=100, layers=layers)
mlp_model = mlp.fit(train_df)
mlp_predictions = mlp_model.transform(test_df)

# Evaluate MLP model
mlp_accuracy = evaluator.evaluate(mlp_predictions)
print("MLP Churn Prediction Accuracy:", mlp_accuracy)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

MLP Churn Prediction Accuracy: 0.7993213669035859

In [14]:
# Evaluation using F1-score instead of accuracy
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="Churn", metricName="f1")

# Evaluate models on F1-score
rf_f1 = evaluator_f1.evaluate(rf_predictions)
print("Random Forest Churn Prediction F1 Score:", rf_f1)

gbt_f1 = evaluator_f1.evaluate(gbt_predictions)
print("GBT Churn Prediction F1 Score:", gbt_f1)

lr_f1 = evaluator_f1.evaluate(lr_predictions)
print("Logistic Regression Churn Prediction F1 Score:", lr_f1)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Random Forest Churn Prediction F1 Score: 0.71017291223314
GBT Churn Prediction F1 Score: 0.71017291223314
Logistic Regression Churn Prediction F1 Score: 0.71017291223314