# Olist Customer Churn Analysis
This notebook performs churn prediction using PySpark and Logistic Regression.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, avg
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
import matplotlib.pyplot as plt
import seaborn as sns

spark = SparkSession.builder.appName("Olist Churn Analysis").getOrCreate()

## Load Datasets

In [None]:
orders = spark.read.csv("D:/Krishna/LPU/SEM 7/INT315/ProjectINT315/ProjectData/olist_orders_dataset.csv", header=True, inferSchema=True)
order_items = spark.read.csv("D:/Krishna/LPU/SEM 7/INT315/ProjectINT315/ProjectData/olist_order_items_dataset.csv", header=True, inferSchema=True)
customers = spark.read.csv("D:/Krishna/LPU/SEM 7/INT315/ProjectINT315/ProjectData/olist_customers_dataset.csv", header=True, inferSchema=True)

## Preprocessing

In [None]:
orders = orders.dropna(subset=['customer_id', 'order_id', 'order_status'])
order_items = order_items.dropna(subset=['order_id', 'price'])
customers = customers.dropna(subset=['customer_id'])
orders = orders.filter(col("order_status").isin(["delivered", "canceled", "unavailable"]))

## Join Datasets

In [None]:
order_items = order_items.withColumnRenamed("order_id", "order_id_item")
data = orders.join(order_items, orders.order_id == order_items.order_id_item, "inner")
data = data.join(customers, on="customer_id", how="inner")

## Feature Engineering

In [None]:
customer_orders = data.groupBy("customer_id", "customer_state").agg(
    count("order_id").alias("order_count"),
    avg("price").alias("avg_order_price")
)

churn_data = orders.groupBy("customer_id").agg(
    when(count(when(col("order_status") == "delivered", True)) > 0, 0).otherwise(1).alias("churned")
)

final_data = customer_orders.join(churn_data, on="customer_id", how="inner")

## Encode Categorical Features

In [None]:
state_indexer = StringIndexer(inputCol="customer_state", outputCol="customer_state_index")
final_data = state_indexer.fit(final_data).transform(final_data)

encoder = OneHotEncoder(inputCols=["customer_state_index"], outputCols=["customer_state_vec"])
final_data = encoder.fit(final_data).transform(final_data)

## Assemble Features

In [None]:
assembler = VectorAssembler(
    inputCols=["order_count", "avg_order_price", "customer_state_vec"],
    outputCol="features"
)
assembled_data = assembler.transform(final_data).select("features", "churned")

## Train Test Split & Model Training

In [None]:
train_data, test_data = assembled_data.randomSplit([0.8, 0.2], seed=42)
lr = LogisticRegression(featuresCol="features", labelCol="churned")
model = lr.fit(train_data)
predictions = model.transform(test_data)

## Model Evaluation

In [None]:
evaluator_auc = BinaryClassificationEvaluator(labelCol="churned", metricName="areaUnderROC")
auc = evaluator_auc.evaluate(predictions)

evaluator_acc = MulticlassClassificationEvaluator(labelCol="churned", metricName="accuracy")
accuracy = evaluator_acc.evaluate(predictions)

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="churned", metricName="f1")
f1_score = evaluator_f1.evaluate(predictions)

print(f"Model Accuracy : {accuracy:.2f}")
print(f"Model F1 Score : {f1_score:.2f}")
print(f"Model AUC      : {auc:.2f}")
print(f"Coefficients   : {model.coefficients}")
print(f"Intercept      : {model.intercept}")

## Visualizations

In [None]:
pdf = predictions.select("prediction", "churned").toPandas()
counts = pdf.groupby(["churned", "prediction"]).size().unstack(fill_value=0)

counts.plot(kind='bar', figsize=(7,5))
plt.title("Actual vs Predicted Churn")
plt.xlabel("Actual Churned")
plt.ylabel("Count")
plt.legend(title="Predicted")
plt.show()

In [None]:
pdf_probs = predictions.select("probability", "churned").toPandas()
pdf_probs["prob_churn"] = pdf_probs["probability"].apply(lambda x: float(x[1]))

plt.figure(figsize=(7,5))
sns.histplot(data=pdf_probs, x="prob_churn", hue="churned", kde=True, bins=20)
plt.title("Predicted Churn Probability Distribution")
plt.xlabel("Probability of Churn")
plt.ylabel("Count")
plt.show()