<a href="https://colab.research.google.com/github/floflokie/TP_BigData/blob/main/Chapter_3_Spark_MLib_intro.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# TP : Spark MLIb

Prise en main de Spark MLib via un use case simple de détection de churn.

In [1]:
import pandas as pd
import numpy as np

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

# Generate fake customer churn data
data = {
    'customer_id': np.arange(1, 10001),
    'age': np.random.randint(18, 70, 10000),
    'contract_type': np.random.choice(['Month-to-month', 'One year', 'Two year'], 10000),
    'monthly_charges': np.round(np.random.uniform(20, 100, 10000), 2),
    'total_charges': np.round(np.random.uniform(500, 5000, 10000), 2),
    'internet_service': np.random.choice(['DSL', 'Fiber optic', 'None'], 10000),
    'churn': np.random.choice([0, 1], 10000)  # Binary target variable
}
df = pd.DataFrame(data)

In [2]:
df.head()

Unnamed: 0,customer_id,age,contract_type,monthly_charges,total_charges,internet_service,churn
0,1,42,Month-to-month,84.03,943.4,Fiber optic,0
1,2,18,Month-to-month,50.62,4606.9,Fiber optic,0
2,3,34,Month-to-month,75.26,1829.1,,1
3,4,34,Two year,98.2,4799.44,,0
4,5,30,Month-to-month,85.72,3663.69,DSL,1


In [3]:
df.describe()

Unnamed: 0,customer_id,age,monthly_charges,total_charges,churn
count,10000.0,10000.0,10000.0,10000.0,10000.0
mean,5000.5,43.3566,59.818043,2743.736487,0.4935
std,2886.89568,14.849063,23.000524,1304.653987,0.499983
min,1.0,18.0,20.01,501.45,0.0
25%,2500.75,31.0,40.02,1618.755,0.0
50%,5000.5,43.0,59.92,2752.06,0.0
75%,7500.25,56.0,79.78,3874.9575,1.0
max,10000.0,69.0,99.99,4999.44,1.0


TODO: convert to Spark Dataframe

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Churn prediction").getOrCreate()

In [5]:
customer_df = spark.createDataFrame(df)

In [6]:
# String index categorical columns
contract_indexer = StringIndexer(inputCol="contract_type", outputCol="contract_index")
internet_indexer = StringIndexer(inputCol="internet_service", outputCol="internet_index")

# Assemble features into a single vector
assembler = VectorAssembler(inputCols=["age", "contract_index", "monthly_charges", "total_charges", "internet_index"], outputCol="features")

pipeline = Pipeline(stages=[contract_indexer, internet_indexer, assembler])

TODO: apply pipeline to data (fit, transform)

In [7]:
customer_df_transformed = pipeline.fit(customer_df).transform(customer_df)

In [8]:
# Split data
train, test = customer_df_transformed.randomSplit([0.8, 0.2], seed=1234)

# Define and train the model
lr = LogisticRegression(featuresCol="features", labelCol="churn")
lr_model = lr.fit(train)

# Evaluate the model
predictions = lr_model.transform(test)
predictions.select("customer_id", "features", "churn", "prediction").show(5)

+-----------+--------------------+-----+----------+
|customer_id|            features|churn|prediction|
+-----------+--------------------+-----+----------+
|          2|[18.0,0.0,50.62,4...|    0|       0.0|
|          5|[30.0,0.0,85.72,3...|    1|       0.0|
|         22|[66.0,2.0,54.15,1...|    1|       0.0|
|         25|[39.0,0.0,55.43,1...|    1|       1.0|
|         26|[65.0,1.0,74.35,3...|    0|       1.0|
+-----------+--------------------+-----+----------+
only showing top 5 rows



TODO: experiment with other models, such as RandomForestClassifier, and compare metrics like accuracy or precision.

https://spark.apache.org/docs/latest/ml-guide