#Load Data

In [None]:
import pandas as pd
churn_table = pd.read_csv('https://storage.googleapis.com/datalynn-datasets/User_churn.csv')
churn_table

Unnamed: 0,customer_id,age,gender,income,loyalty,churn,creation_time
0,2824,19,Female,52098,4,0,2020-12-27
1,9935,23,Female,24165,1,0,2021-08-22
2,4811,50,Male,93563,4,1,2021-08-26
3,8359,55,Female,126093,1,0,2022-10-14
4,6574,35,Male,48221,6,0,2020-12-07
...,...,...,...,...,...,...,...
495,9231,38,Female,55446,3,0,2022-03-16
496,4588,19,Female,27679,8,1,2021-09-20
497,3615,24,Male,106096,4,1,2023-05-29
498,1916,32,Female,66504,3,0,2021-09-28


#Data Manipulation
**Q1** In the context of customer churn analysis, you are tasked with understanding the gender distribution among churned customers. How would you calculate the number of male and female customers who have churned, and what insights can be derived from this analysis to inform business decisions?





In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Assuming the dataset is in a CSV format
data = spark.read.csv('https://storage.googleapis.com/datalynn-datasets/User_churn.csv', header=True, inferSchema=True)

In [None]:
# Filter the DataFrame to include only churned customers
churned_customers = data.filter(data["churn"] == 1)

# Group the churned customers by gender and count the number of male and female customers
gender_distribution = churned_customers.groupBy("gender").count()

# Display the number of male and female customers who have churned
gender_distribution.show()


+------+-----+
|gender|count|
+------+-----+
|Female|  130|
|  Male|  120|
+------+-----+



**Q2** In the context of customer churn analysis, your task is to analyze and classify departing customers into different loyalty groups and count the number of customers in each group. By doing so, you can gain insights into customer loyalty patterns and identify segments of customers who are more likely to churn.

In [None]:
from pyspark.sql.functions import when, col

# Filter the DataFrame to include only churned customers
churned_customers = data.filter(data["churn"] == 1)

# Divide departing customers into low, medium, and high loyalty groups
loyalty_groups = churned_customers.withColumn("loyalty_group",
    when(col("loyalty") < 4, "Low")
    .when((col("loyalty") >= 4) & (col("loyalty") < 7), "Medium")
    .otherwise("High")
)

# Count the number of people in each loyalty group
loyalty_counts = loyalty_groups.groupBy("loyalty_group").count()

# Display the count of people in each loyalty group
loyalty_counts.show()

+-------------+-----+
|loyalty_group|count|
+-------------+-----+
|         High|  105|
|          Low|   76|
|       Medium|   69|
+-------------+-----+



**Q3** In the context of customer churn analysis, you are tasked with calculating the average income for each male age group and compare. How can you determine the average income for each age group among male customers, and how does income affect churn?


In [None]:
from pyspark.sql.functions import avg

# Generate age_group column based on age
data_with_age_group = data.withColumn("age_group",
                                      when(data["age"] < 30, "<30")
                                      .when((data["age"] >= 30) & (data["age"] <= 50), "30-50")
                                      .otherwise("50+"))

# Filter the DataFrame to include only male customers
male_customers = data_with_age_group.filter(data_with_age_group["gender"] == "Male")

# Group the male customers by churn and age_group, and calculate the average income for each group
average_income_per_age_group = male_customers.groupBy("churn", "age_group").agg(avg("income").alias("avg_income"))

# Display the average income for each age group among male customers, grouped by churn
average_income_per_age_group.show()

+-----+---------+-----------------+
|churn|age_group|       avg_income|
+-----+---------+-----------------+
|    1|      <30|88189.93548387097|
|    0|      50+|94896.17391304347|
|    0|    30-50|87789.12195121951|
|    1|      50+|84595.86842105263|
|    0|      <30|87366.91304347826|
|    1|    30-50|77410.11764705883|
+-----+---------+-----------------+



**Q4** When performing user churn analysis, how can you use Spark's window function to calculate the number of user churn per month? What insights can we gain by analyzing the number of churn per month, and how can we use this information to improve customer retention and business decisions?

In [None]:
from pyspark.sql.functions import year, month, sum, col, to_date
from pyspark.sql.window import Window

# Convert the date column to a date type (assuming the column name is 'date')
data = data.withColumn("creation_time", to_date(col("creation_time"), "yyyy-MM-dd"))

# Create a window specification based on the 'date' column
window_spec = Window.partitionBy(year("creation_time"), month("creation_time")).orderBy("creation_time")

# Calculate the churn count per month using the window function
data_with_churn_count = data.withColumn("churn_count", sum("churn").over(window_spec))

# Select the year, month, and churn_count columns
churn_count_per_month = data_with_churn_count.select(year("creation_time").alias("year"), month("creation_time").alias("month"), "churn_count").distinct()

# Sort the churn count per month in ascending order of the year
churn_count_per_month = churn_count_per_month.orderBy("year", "month")

# Display the churn count per month
churn_count_per_month.show()

+----+-----+-----------+
|year|month|churn_count|
+----+-----+-----------+
|2020|    6|          1|
|2020|    6|          5|
|2020|    6|          4|
|2020|    6|          0|
|2020|    6|          6|
|2020|    6|          3|
|2020|    7|          5|
|2020|    7|          1|
|2020|    7|          3|
|2020|    7|          2|
|2020|    7|          4|
|2020|    7|          6|
|2020|    8|          9|
|2020|    8|          5|
|2020|    8|          4|
|2020|    8|          2|
|2020|    8|          8|
|2020|    8|          3|
|2020|    8|          6|
|2020|    8|          7|
+----+-----+-----------+
only showing top 20 rows



#Machine learning
**Q5** As a data scientist, you have been tasked with developing a predictive model to identify customers who are likely to churn. How would you approach this task, considering the business problem of customer churn and the available dataset? Explain the steps you would take, including feature engineering, data preprocessing, model training, and evaluation. Also, discuss the potential business impact of accurately predicting customer churn and how it can inform retention strategies and drive business growth.

###Data Preprocessing

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# One-Hot Encoding
stringIndexer = StringIndexer(inputCol="gender", outputCol="gender_index")
model = stringIndexer.fit(data)
data = model.transform(data)
data.show()

+-----------+---+------+------+-------+-----+-------------+------------+
|customer_id|age|gender|income|loyalty|churn|creation_time|gender_index|
+-----------+---+------+------+-------+-----+-------------+------------+
|       2824| 19|Female| 52098|      4|    0|   2020-12-27|         0.0|
|       9935| 23|Female| 24165|      1|    0|   2021-08-22|         0.0|
|       4811| 50|  Male| 93563|      4|    1|   2021-08-26|         1.0|
|       8359| 55|Female|126093|      1|    0|   2022-10-14|         0.0|
|       6574| 35|  Male| 48221|      6|    0|   2020-12-07|         1.0|
|       7224| 24|Female|131082|      6|    1|   2020-08-28|         0.0|
|       8527| 52|  Male|147548|      7|    0|   2022-01-22|         1.0|
|       6925| 54|  Male|112349|      2|    0|   2021-09-10|         1.0|
|       5741| 23|  Male|133571|      2|    1|   2021-12-22|         1.0|
|       8428| 58|Female| 41319|      6|    1|   2021-08-04|         0.0|
|       5374| 62|  Male| 99840|      3|    0|   202

###Modeling and evaluation

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create a feature vector using relevant columns
feature_columns = ["age", "gender_index", "income", "loyalty"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Create a Gradient Boosted Tree classifier
gbt = GBTClassifier(featuresCol="features", labelCol="churn")

# Create a pipeline for data transformation and model training
pipeline = Pipeline(stages=[assembler, gbt])

# Fit the pipeline on the training data
model = pipeline.fit(train_data)

# Extract feature importance from the trained model
importances = model.stages[-1].featureImportances

# Print feature importance scores
for feature, importance in zip(feature_columns, importances):
    print(f"Feature: {feature}\t Importance: {importance}")

# Perform predictions on the test data
predictions = model.transform(test_data)

# Assuming 'predictions' is the DataFrame containing the predictions
evaluator = MulticlassClassificationEvaluator(labelCol="churn", predictionCol="prediction", metricName="accuracy")

# Calculate accuracy
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

# Calculate other metrics
weightedPrecision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
weightedRecall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
weightedF1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

# Print the classification report
print("Classification Report:")
print("Precision (weighted):", weightedPrecision)
print("Recall (weighted):", weightedRecall)
print("F1-Score (weighted):", weightedF1)

Feature: age	 Importance: 0.36933213212248683
Feature: gender_index	 Importance: 0.06212151804300232
Feature: income	 Importance: 0.3775107229711441
Feature: loyalty	 Importance: 0.1910356268633667
Accuracy: 0.6081081081081081
Classification Report:
Precision (weighted): 0.6096822096822097
Recall (weighted): 0.6081081081081081
F1-Score (weighted): 0.6078933728248797


In [None]:
# Close Spark
spark.stop()