**Author:** Cainã Max Couto da Silva  
**LinkedIn:** [@cmcouto-silva](https://www.linkedin.com/in/cmcouto-silva/)

&nbsp;

---

In this final notebook, let's see how to replicate the scikit-learn pipeline using SparkML.  
You'll notice a similar general pattern but distinct details.

# **Setup**

## Spark Session / UI

In [1]:
!pip install -q pyngrok # for accessing Spark UI
!pip install -q pyspark # for Spark session

In [2]:
# Create Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.config('spark.ui.port', '4050').getOrCreate()

# Show session
display(spark)

Follow the commented steps if you want to see your session's Spark UI.

In [3]:
# Login into https://dashboard.ngrok.com/get-started/setup to get your own token
# ngrok_token = 'YOUR_TOKEN_HERE'

In [4]:
# # Make local Spark UI URL available at ngrok
# get_ipython().system_raw(f'ngrok authtoken {ngrok_token}')
# get_ipython().system_raw('ngrok http 4050 &')
# !sleep 3
# print('URL para interface Spark:')
# !curl -s http://localhost:4040/api/tunnels | grep -Po 'public_url":"(?=https)\K[^"]*'

## Importing libraries

In [5]:
import numpy as np
import pandas as pd

from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler, StandardScaler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator

## Loading dataset

In [6]:
# Load data
data_url = 'https://raw.githubusercontent.com/cmcouto-silva/datasets/main/datasets/telco_churn.csv'
df = pd.read_csv(data_url, index_col='CustomerID')
display(df)

Unnamed: 0_level_0,Count,Country,State,City,Zip Code,Lat Long,Latitude,Longitude,Gender,Senior Citizen,...,Contract,Paperless Billing,Payment Method,Monthly Charges,Total Charges,Churn Label,Churn Value,Churn Score,CLTV,Churn Reason
CustomerID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
3668-QPYBK,1,United States,California,Los Angeles,90003,"33.964131, -118.272783",33.964131,-118.272783,Male,No,...,Month-to-month,Yes,Mailed check,53.85,108.15,Yes,1,86,3239,Competitor made better offer
9237-HQITU,1,United States,California,Los Angeles,90005,"34.059281, -118.30742",34.059281,-118.307420,Female,No,...,Month-to-month,Yes,Electronic check,70.70,151.65,Yes,1,67,2701,Moved
9305-CDSKC,1,United States,California,Los Angeles,90006,"34.048013, -118.293953",34.048013,-118.293953,Female,No,...,Month-to-month,Yes,Electronic check,99.65,820.50,Yes,1,86,5372,Moved
7892-POOKP,1,United States,California,Los Angeles,90010,"34.062125, -118.315709",34.062125,-118.315709,Female,No,...,Month-to-month,Yes,Electronic check,104.80,3046.05,Yes,1,84,5003,Moved
0280-XJGEX,1,United States,California,Los Angeles,90015,"34.039224, -118.266293",34.039224,-118.266293,Male,No,...,Month-to-month,Yes,Bank transfer (automatic),103.70,5036.30,Yes,1,89,5340,Competitor had better devices
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2569-WGERO,1,United States,California,Landers,92285,"34.341737, -116.539416",34.341737,-116.539416,Female,No,...,Two year,Yes,Bank transfer (automatic),21.15,1419.40,No,0,45,5306,
6840-RESVB,1,United States,California,Adelanto,92301,"34.667815, -117.536183",34.667815,-117.536183,Male,No,...,One year,Yes,Mailed check,84.80,1990.50,No,0,59,2140,
2234-XADUH,1,United States,California,Amboy,92304,"34.559882, -115.637164",34.559882,-115.637164,Female,No,...,One year,Yes,Credit card (automatic),103.20,7362.90,No,0,71,5560,
4801-JZAZL,1,United States,California,Angelus Oaks,92305,"34.1678, -116.86433",34.167800,-116.864330,Female,No,...,Month-to-month,Yes,Electronic check,29.60,346.45,No,0,59,2793,


In [7]:
# List features and target
NUMERIC_FEATURES = [
    'Tenure Months',
    'Monthly Charges',
    'Total Charges',
    'CLTV'
]

CATEGORICAL_FEATURES = [
    'Senior Citizen',
    'Partner',
    'Dependents',
    'Multiple Lines',
    'Internet Service',
    'Online Security',
    'Online Backup',
    'Device Protection',
    'Tech Support',
    'Streaming TV',
    'Streaming Movies',
    'Contract',
    'Paperless Billing',
    'Payment Method'
]

FEATURES = NUMERIC_FEATURES + CATEGORICAL_FEATURES
TARGET = 'Churn Value'

In [8]:
# Create a spark dataframe
sdf = spark.createDataFrame(df)
print(f'Num. partitions: {sdf.rdd.getNumPartitions()}')
sdf.show(5)

Num. partitions: 2
+-----+-------------+----------+-----------+--------+--------------------+---------+-----------+------+--------------+-------+----------+-------------+-------------+--------------+----------------+---------------+-------------+-----------------+------------+------------+----------------+--------------+-----------------+--------------------+---------------+-------------+-----------+-----------+-----------+----+--------------------+
|Count|      Country|     State|       City|Zip Code|            Lat Long| Latitude|  Longitude|Gender|Senior Citizen|Partner|Dependents|Tenure Months|Phone Service|Multiple Lines|Internet Service|Online Security|Online Backup|Device Protection|Tech Support|Streaming TV|Streaming Movies|      Contract|Paperless Billing|      Payment Method|Monthly Charges|Total Charges|Churn Label|Churn Value|Churn Score|CLTV|        Churn Reason|
+-----+-------------+----------+-----------+--------+--------------------+---------+-----------+------+--------

# **PySpark Pipeline**

In [9]:
# Filter features and target
data = (
    sdf[[*NUMERIC_FEATURES, *CATEGORICAL_FEATURES, TARGET]]
    .withColumnRenamed(TARGET, 'label')
)

# Split data into train and test sets
train, test = data.randomSplit([.8, .2])

## Categorical features

In [10]:
# StringIndexer: Converts categorical columns to indices.
indexer = StringIndexer(
    inputCols=CATEGORICAL_FEATURES,
    outputCols=[f'cat_{col}' for col in CATEGORICAL_FEATURES]
)

# OneHotEncoder: Encodes categorical feature indices to one-hot vectors.
encoder = OneHotEncoder(
    inputCols=[f'cat_{col}' for col in CATEGORICAL_FEATURES],
    outputCols=[f'vec_{col}' for col in CATEGORICAL_FEATURES]
)

# Pipeline for processing categorical features:
categorical_preprocessor = Pipeline(stages=[
    indexer,
    encoder
])

In [11]:
# Show transformation
categorical_preprocessor.fit(train).transform(train).show(5)

+-------------+---------------+-------------+----+--------------+-------+----------+--------------+----------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+-----------------+--------------------+-----+------------------+-----------+--------------+------------------+--------------------+-------------------+-----------------+---------------------+----------------+----------------+--------------------+------------+---------------------+------------------+------------------+-------------+--------------+------------------+--------------------+-------------------+-----------------+---------------------+----------------+----------------+--------------------+-------------+---------------------+------------------+
|Tenure Months|Monthly Charges|Total Charges|CLTV|Senior Citizen|Partner|Dependents|Multiple Lines|Internet Service|    Online Security|      Online Backup|  Device Protection|       Tech Su

In [12]:
# Show transformation for a specific categorical variable
(
  categorical_preprocessor.fit(train).transform(train)
  .select('Contract','cat_Contract','vec_Contract')
  .drop_duplicates()
  .show(5)
)

+--------------+------------+-------------+
|      Contract|cat_Contract| vec_Contract|
+--------------+------------+-------------+
|Month-to-month|         0.0|(2,[0],[1.0])|
|      Two year|         1.0|(2,[1],[1.0])|
|      One year|         2.0|    (2,[],[])|
+--------------+------------+-------------+



## Numerical features

In [13]:
# Imputer for mean imputation
numeric_imputer = Imputer(
    inputCols=NUMERIC_FEATURES,
    outputCols=[f'{col}_imputed' for col in NUMERIC_FEATURES],
    strategy='mean'
)

# Assembler to combine the imputed numerical features into a single vector
numeric_assembler = VectorAssembler(
    inputCols=NUMERIC_FEATURES,
    outputCol='numeric_features'
)


# Scaler to standardize the features
numeric_scaler = StandardScaler(
    inputCol='numeric_features',
    outputCol='scaled_numeric_features',
    withMean=True, # default: withMean=False
    withStd=True
)

# Numerical preprocessing pipeline
numeric_preprocessor = Pipeline(stages=[
    numeric_imputer,
    numeric_assembler,
    numeric_scaler
])

# Show transformation
numeric_preprocessor.fit(train).transform(train).show(5)

+-------------+---------------+-------------+----+--------------+-------+----------+--------------+----------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+-----------------+--------------------+-----+---------------------+-----------------------+---------------------+------------+--------------------+-----------------------+
|Tenure Months|Monthly Charges|Total Charges|CLTV|Senior Citizen|Partner|Dependents|Multiple Lines|Internet Service|    Online Security|      Online Backup|  Device Protection|       Tech Support|       Streaming TV|   Streaming Movies|      Contract|Paperless Billing|      Payment Method|label|Tenure Months_imputed|Monthly Charges_imputed|Total Charges_imputed|CLTV_imputed|    numeric_features|scaled_numeric_features|
+-------------+---------------+-------------+----+--------------+-------+----------+--------------+----------------+-------------------+------------------

In [14]:
# Show transformation
numeric_preprocessor.fit(train).transform(train).show(5)

+-------------+---------------+-------------+----+--------------+-------+----------+--------------+----------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+-----------------+--------------------+-----+---------------------+-----------------------+---------------------+------------+--------------------+-----------------------+
|Tenure Months|Monthly Charges|Total Charges|CLTV|Senior Citizen|Partner|Dependents|Multiple Lines|Internet Service|    Online Security|      Online Backup|  Device Protection|       Tech Support|       Streaming TV|   Streaming Movies|      Contract|Paperless Billing|      Payment Method|label|Tenure Months_imputed|Monthly Charges_imputed|Total Charges_imputed|CLTV_imputed|    numeric_features|scaled_numeric_features|
+-------------+---------------+-------------+----+--------------+-------+----------+--------------+----------------+-------------------+------------------

## Categorical and numerical preprocessors

In [15]:
# Creating a list of feature column names after preprocessing (both categorical and numerical)
feature_cols = [f'vec_{col}' for col in CATEGORICAL_FEATURES] + ['scaled_numeric_features']

# VectorAssembler: Combines a given list of columns into a single vector column
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Pipeline: Combines numeric and categorical preprocessors with the assembler into a single pipeline
preprocessor = Pipeline(stages=[
    numeric_preprocessor,
    categorical_preprocessor,
    assembler
])

# Applying preprocessor pipeline to the training data
preprocessor.fit(train).transform(train).show(5)

+-------------+---------------+-------------+----+--------------+-------+----------+--------------+----------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+-----------------+--------------------+-----+---------------------+-----------------------+---------------------+------------+--------------------+-----------------------+------------------+-----------+--------------+------------------+--------------------+-------------------+-----------------+---------------------+----------------+----------------+--------------------+------------+---------------------+------------------+------------------+-------------+--------------+------------------+--------------------+-------------------+-----------------+---------------------+----------------+----------------+--------------------+-------------+---------------------+------------------+--------------------+
|Tenure Months|Monthly Charges|Total Charge

## Model Pipeline

In [16]:
# Instantiate logistic regression model
lr = LogisticRegression(featuresCol='features', labelCol='label')

# Build the model pipeline
model_pipeline = Pipeline(stages=[preprocessor, lr])

# Show pipeline type
type(model_pipeline)

pyspark.ml.pipeline.Pipeline

In [17]:
# Train the model
trained_model_pipeline = model_pipeline.fit(train) # it's not inplace!!

# Show trained pipeline
type(trained_model_pipeline)

pyspark.ml.pipeline.PipelineModel

Unlike scikit-learn, we can get the predictions into the table using the `transform` method.

In [18]:
# Get prediction on test set
trained_model_pipeline.transform(test).show(5)

+-------------+---------------+-------------+----+--------------+-------+----------+--------------+----------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+-----------------+----------------+-----+---------------------+-----------------------+---------------------+------------+--------------------+-----------------------+------------------+-----------+--------------+------------------+--------------------+-------------------+-----------------+---------------------+----------------+----------------+--------------------+------------+---------------------+------------------+------------------+-------------+--------------+------------------+--------------------+-------------------+-----------------+---------------------+----------------+----------------+--------------------+-------------+---------------------+------------------+--------------------+--------------------+--------------------+------

In [19]:
# Pipeline steps (or stages) are accessible through .stages
trained_model_pipeline.stages[-1].summary

<pyspark.ml.classification.BinaryLogisticRegressionTrainingSummary at 0x7c3e631fefe0>

In [20]:
# Get training accuracy
trained_model_pipeline.stages[-1].summary.accuracy

0.8098541444325863

In [21]:
# Describe model parameters
print( trained_model_pipeline.stages[-1].explainParams() )

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The

# Pyspark Tuning

## Train Validation Split

In [22]:
# Creating a parameter grid for model tuning: setting different values for regularization parameter
paramGrid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.1, 0.01]) # lr.regParam values to try: 0.1 and 0.01
    .build()
)

# Setting up Train-Validation Split for hyperparameter tuning
tvs = TrainValidationSplit(
    estimator=model_pipeline,                    # Model pipeline to be tuned
    estimatorParamMaps=paramGrid,                # Parameter grid to use in tuning
    evaluator=BinaryClassificationEvaluator(),   # Evaluator for binary classification
    trainRatio=0.8                               # Ratio of data split: 80% training, 20% validation
)

# Fitting the Train-Validation Split to find the best model parameters
tvs_model = tvs.fit(train)

# Show train-validation-split (tvs) model data type
type(tvs_model)

pyspark.ml.tuning.TrainValidationSplitModel

In [23]:
# Retrieve the best model
best_model = tvs_model.bestModel

In [24]:
# Get regularization param of the best model
best_model.stages[-1].getRegParam()

0.01

## Cross-validation

In [25]:
# Using the same model pipeline
model_pipeline = Pipeline(stages=[preprocessor, lr])

In [26]:
# Setting up CrossValidator for hyperparameter tuning
cross_validator = CrossValidator(
    estimator=model_pipeline,                     # Model pipeline to be tuned
    estimatorParamMaps=paramGrid,                 # Parameter grid to use in tuning
    evaluator=BinaryClassificationEvaluator(),    # Evaluator for binary classification
    numFolds=5                                    # Number of folds for cross-validation
)

# Fitting the CrossValidator to find the best model parameters
cv_model = cross_validator.fit(train)

# Show train-validation-split (tvs) model data type
type(tvs_model)

pyspark.ml.tuning.TrainValidationSplitModel

In [27]:
# Retrieve the best model
best_model = cv_model.bestModel

# Get regularization param of the best model
best_model.stages[-1].getRegParam()

0.01

Thank you for attending this session on notebooks.  
Happy studying and best wishes for all your future endeavors!
