In [15]:
# Suppress warnings 
import warnings
warnings.filterwarnings('ignore')

import plotly.express as px

# Spark Streaming
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum as spark_sum
from pyspark.sql.types import FloatType, IntegerType, StringType
from pyspark.sql.functions import regexp_replace
import webbrowser
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from copy import copy
from pyspark.ml.linalg import Vectors
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
import logging

In [30]:
# Set the logging level to control the amount of information printed
logging.basicConfig(level=logging.INFO)

# Initialize Spark session
spark = SparkSession.builder.appName("SparkEDA").getOrCreate()

# Read CSV into a Spark DataFrame
df = spark.read.csv('Telco-Customer-Churn.csv', header=True, inferSchema=True)

In [31]:
# Replace whitespace with null values in 'TotalCharges'
df = df.withColumn('TotalCharges', regexp_replace('TotalCharges', r'\s+', ''))

# Convert 'TotalCharges' to float
df = df.withColumn('TotalCharges', col('TotalCharges').cast(FloatType()))

# columns_to_convert = ['Partner', 'Dependents', 'PhoneService', 'PaperlessBilling', 'Churn']

# for item in columns_to_convert:
#     df = df.withColumn(item, when(col(item) == 'No', 0).otherwise(1).cast(IntegerType()))

# Function to check Spark DataFrame
def check_spark_df(dataframe, head=10):
    print('#'*20, 'Head', '#'*20)
    dataframe.show(head, truncate=False)
    
    print('#'*20, 'Schema', '#'*20)
    dataframe.printSchema()
    
    print('#'*20, 'Summary Statistics', '#'*20)
    dataframe.describe().show()

# Apply the function to the Spark DataFrame
check_spark_df(df)

#################### Head ####################
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+-------------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines   |InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract      |PaperlessBilling|PaymentMethod            |MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+-------------------------+--------------+------------+-----+
|7590-VHVEG|Female|0            |Yes    |No        |1     |No          |No phone service|DSL            

In [18]:
# Function to check missing values in Spark DataFrame
def missing_values_table_spark(dataframe, return_cols=True):
    missing_cols = [col_name for col_name in dataframe.columns if dataframe.filter(col(col_name).isNull()).count() > 0]
    
    if not missing_cols:
        print("No missing values found.")
        return []
    
    missing_count_expr = [spark_sum(col(col_name).isNull().cast("int")).alias(f"{col_name}_missing_count") for col_name in missing_cols]
    total_rows_expr = [spark_sum(col(col_name).isNotNull().cast("int")).alias(f"{col_name}_total_rows") for col_name in missing_cols]
    
    missing_data = dataframe.agg(*missing_count_expr, *total_rows_expr)
    
    missing_data.show()

    if return_cols:
        return missing_cols

# Apply the function to the Spark DataFrame
na_columns_spark = missing_values_table_spark(df, return_cols=True)

+--------------------------+-----------------------+
|TotalCharges_missing_count|TotalCharges_total_rows|
+--------------------------+-----------------------+
|                        11|                   7032|
+--------------------------+-----------------------+



In [32]:
df = df.na.fill(0, subset=["TotalCharges"])

In [20]:
# Select only numeric columns
numeric_cols = [col_name for (col_name, col_type) in df.dtypes if col_type in ('int', 'double')]
numeric_cols.append("Churn")

# Convert "yes" to 1 and "no" to 0
df = df.withColumn("Churn", when(df["Churn"] == "yes", 1).otherwise(0))

# Calculate correlations with the target variable for numeric columns
correlation_with_target = (
    df.select(numeric_cols)
    .toPandas()
    .corr()
    .loc[:, 'Churn']
    .abs()
)

# Get the top 15 variables with the highest absolute correlations
top_corr_vars = correlation_with_target.sort_values(ascending=False).head(40).index

# Create a correlation matrix for the selected variables
correlation_matrix = df.select(*top_corr_vars).toPandas().corr()

# Create a heatmap using Plotly
fig = px.imshow(
    correlation_matrix,
    x=correlation_matrix.columns,
    y=correlation_matrix.columns,
    zmin=-1,
    zmax=1,
    color_continuous_scale='RdBu_r',
    title='Top 15 Correlations with Target Heatmap (Numeric Columns Only)'
)

# Adjust the figure size
fig.update_layout(width=1600, height=1600)

fig.write_html("correlation_heatmap.html")

# Specify the path to the HTML file
html_path = "correlation_heatmap.html"

# Open the HTML file in a new tab
webbrowser.open_new_tab(html_path)

True

# Feature Extraction

In [33]:
tenure_bins = [0, 12, 24, 36, 48, 60, 72]
tenure_labels = ["0-1 Year", "1-2 Year", "2-3 Year", "3-4 Year", "4-5 Year", "5-6 Year"]
df = df.withColumn("NEW_TENURE_YEAR", when((df["tenure"] >= 0) & (df["tenure"] <= 72), "0-1 Year").otherwise(None))
for i in range(1, len(tenure_bins)):
    condition = (df["tenure"] > tenure_bins[i-1]) & (df["tenure"] <= tenure_bins[i])
    df = df.withColumn("NEW_TENURE_YEAR", when(condition, tenure_labels[i-1]).otherwise(df["NEW_TENURE_YEAR"]))

# Specify contract 1 or 2 year customers as Engaged
df = df.withColumn("NEW_Engaged", when(df["Contract"].isin(["One year", "Two year"]), 1).otherwise(0))

# Customer benefit from at least one online support
df = df.withColumn("NEW_noProt", when((col("OnlineBackup") != "No") | (col("DeviceProtection") != "No") | (col("TechSupport") != "No"), 1).otherwise(0))

# Young customers with monthly contracts
df = df.withColumn("NEW_Young_Not_Engaged", when((col("NEW_Engaged") == 0) & (col("SeniorCitizen") == 0), 1).otherwise(0))

# The total number of services received by the person
service_columns = ['PhoneService', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies']
df = df.withColumn("NEW_TotalServices", sum(when(col(service_col) == "Yes", 1).otherwise(0) for service_col in service_columns))

# Herhangi bir streaming hizmeti alan kişiler
df = df.withColumn("NEW_FLAG_ANY_STREAMING", when((col("StreamingTV") == "Yes") | (col("StreamingMovies") == "Yes"), 1).otherwise(0))

# Does the person make automatic payments?
df = df.withColumn("NEW_FLAG_AutoPayment", when(col("PaymentMethod").isin(["Bank transfer (automatic)", "Credit card (automatic)"]), 1).otherwise(0))

# average monthly payment
df = df.withColumn("NEW_AVG_Charges", col("TotalCharges") / (col("tenure") + 0.1))

# Sex and SeniorCitizen category
df = df.withColumn("new_sex_cat", when((col("gender") == "Male") & (col("SeniorCitizen") == 0), "youngmale")
                  .when((col("gender") == "Male") & (col("SeniorCitizen") == 1), "oldmale")
                  .when((col("gender") == "Female") & (col("SeniorCitizen") == 0), "youngfemale")
                  .when((col("gender") == "Female") & (col("SeniorCitizen") == 1), "oldfemale")
                  .otherwise(None))

In [34]:
# Assuming `df` is your PySpark DataFrame
value_counts_df = df.groupBy("NEW_TENURE_YEAR").count().orderBy("NEW_TENURE_YEAR")

# Show the count of each unique value
value_counts_df.show()

+---------------+-----+
|NEW_TENURE_YEAR|count|
+---------------+-----+
|       0-1 Year| 2186|
|       1-2 Year| 1024|
|       2-3 Year|  832|
|       3-4 Year|  762|
|       4-5 Year|  832|
|       5-6 Year| 1407|
+---------------+-----+



In [36]:
# Drop the "customerID" column
df = df.drop("customerID")
df = df.drop("converted")

In [38]:
# Identify categorical and numerical columns
categorical_cols = [col for (col, data_type) in df.dtypes if data_type == 'string'and col != 'Churn']
numerical_cols = [col for (col, data_type) in df.dtypes if data_type != 'string']

# One-hot encoding for categorical features
stages = []

for col in categorical_cols:
    string_indexer = StringIndexer(inputCol=col, outputCol=col + "_index")
    encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()], outputCols=[col + "_encoded"])
    stages += [string_indexer, encoder]

# Assemble features into a vector
assembler_inputs = [col + "_encoded" for col in categorical_cols] + numerical_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
stages += [assembler]

# Label indexing
label_indexer = StringIndexer(inputCol="Churn", outputCol="label")
stages += [label_indexer]

# Combine all stages into a pipeline
pipeline = Pipeline(stages=stages)

# Fit and transform the data using the pipeline
encoded_df = pipeline.fit(df).transform(df)

# Train-test split
(train_data, test_data) = encoded_df.randomSplit([0.8, 0.2], seed=123)

In [33]:
def base_models():
    # Create RandomForestClassifier and GBTClassifier without hyperparameter tuning
    rf_classifier = RandomForestClassifier(labelCol="label", featuresCol="features")
    gbt_classifier = GBTClassifier(labelCol="label", featuresCol="features")

    # Create pipelines
    pipeline_rf = Pipeline(stages=[rf_classifier])
    pipeline_gbt = Pipeline(stages=[gbt_classifier])

    # Fit the models
    model_rf = pipeline_rf.fit(train_data)
    model_gbt = pipeline_gbt.fit(train_data)

    # Return the models
    return model_rf, model_gbt

# Use base models
base_model_rf, base_model_gbt = base_models()

# Make predictions
predictions_base_rf = base_model_rf.transform(test_data)
predictions_base_gbt = base_model_gbt.transform(test_data)

# Show predictions
print("Base Random Forest Predictions:")
predictions_base_rf.select("features", "label", "prediction", "probability").show()

print("Base GBT Predictions:")
predictions_base_gbt.select("features", "label", "prediction", "probability").show()

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="label")

auc_rf = evaluator.evaluate(predictions_base_rf, {evaluator.metricName: "areaUnderROC"})
print(f"Area Under ROC RF: {auc_rf}")

auc_gbt = evaluator.evaluate(predictions_base_gbt, {evaluator.metricName: "areaUnderROC"})
print(f"Area Under ROC GBT: {auc_gbt}")

Area Under ROC RF: 0.8334650212606687
Area Under ROC GBT: 0.8395521224281862


In [41]:
# Access the model parameters
params = base_model_rf.stages[0].extractParamMap()
for key, value in params.items():
    print(f"{key.name}: {value}")

bootstrap: True
cacheNodeIds: False
checkpointInterval: 10
featureSubsetStrategy: auto
featuresCol: features
impurity: gini
labelCol: label
leafCol: 
maxBins: 32
maxDepth: 5
maxMemoryInMB: 256
minInfoGain: 0.0
minInstancesPerNode: 1
minWeightFractionPerNode: 0.0
numTrees: 20
predictionCol: prediction
probabilityCol: probability
rawPredictionCol: rawPrediction
seed: -4713376035466859559
subsamplingRate: 1.0


In [35]:
def hyperparameter_tuning(fold=3):
    # Create RandomForestClassifier and GBTClassifier
    rf_classifier = RandomForestClassifier(labelCol="label", featuresCol="features")
    gbt_classifier = GBTClassifier(labelCol="label", featuresCol="features")

    # Create a pipeline with StringIndexer, VectorAssembler, and the classifiers
    pipeline_rf = Pipeline(stages=[rf_classifier])
    pipeline_gbt = Pipeline(stages=[gbt_classifier])

    # Create a range for maxDepth from 5 to 20 with a step size of 5
    numTrees = [50, 100]
    maxDepth = list(range(10, 20, 5))
    maxBins = list(range(4, 20, 4))
    minInstancesPerNode = [2, 4, 6, 8]

    # Set up parameter grids for tuning
    paramGrid_rf = (ParamGridBuilder()
                    .addGrid(rf_classifier.numTrees, numTrees)
                    .addGrid(rf_classifier.maxDepth, maxDepth)
                    .addGrid(rf_classifier.maxBins, maxBins)
                    .addGrid(rf_classifier.minInstancesPerNode, minInstancesPerNode)
                    .build())

    paramGrid_gbt = (ParamGridBuilder()
                     .addGrid(gbt_classifier.maxIter, numTrees)
                     .addGrid(gbt_classifier.maxDepth, maxDepth)
                     .addGrid(gbt_classifier.maxBins, maxBins)
                     .addGrid(gbt_classifier.minInstancesPerNode, minInstancesPerNode)
                     .build())

    # Create Search instances
    search_rf = CrossValidator(estimator=pipeline_rf,
                                  estimatorParamMaps=paramGrid_rf,
                                  evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"),
                                  numFolds=fold)

    search_gbt = CrossValidator(estimator=pipeline_gbt,
                                   estimatorParamMaps=paramGrid_gbt,
                                   evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"),
                                   numFolds=fold)

    # Fit the models
    model_rf = search_rf.fit(train_data)
    model_gbt = search_gbt.fit(train_data)

    # Return the models
    return model_rf, model_gbt

model_rf, model_gbt = hyperparameter_tuning()

# Make predictions
predictions_rf = model_rf.transform(test_data)
predictions_gbt = model_gbt.transform(test_data)

# Show predictions
print("Random Forest Predictions:")
predictions_rf.select("features", "label", "prediction", "probability").show()

print("GBT Predictions:")
predictions_gbt.select("features", "label", "prediction", "probability").show()

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="label")

auc_rf = evaluator.evaluate(predictions_rf, {evaluator.metricName: "areaUnderROC"})
print(f"Area Under ROC RF: {auc_rf}")

auc_gbt = evaluator.evaluate(predictions_gbt, {evaluator.metricName: "areaUnderROC"})
print(f"Area Under ROC GBT: {auc_gbt}")

Area Under ROC RF: 0.8407633048125
Area Under ROC GBT: 0.8365410606422903


In [42]:
# Access the best model and its parameters
bestModel = model_gbt.bestModel
bestParams = bestModel.stages[0].extractParamMap()

# Print the best parameters
for key, value in bestParams.items():
    print(f"{key.name}: {value}")

cacheNodeIds: False
checkpointInterval: 10
featureSubsetStrategy: all
featuresCol: features
impurity: variance
labelCol: label
leafCol: 
lossType: logistic
maxBins: 8
maxDepth: 5
maxIter: 10
maxMemoryInMB: 256
minInfoGain: 0.0
minInstancesPerNode: 14
minWeightFractionPerNode: 0.0
predictionCol: prediction
probabilityCol: probability
rawPredictionCol: rawPrediction
seed: 3859683544350349931
stepSize: 0.1
subsamplingRate: 1.0
validationTol: 0.01


In [40]:
# Access the best model and its parameters
bestModel = model_rf.bestModel
bestParams = bestModel.stages[0].extractParamMap()

# Print the best parameters
for key, value in bestParams.items():
    print(f"{key.name}: {value}")

bootstrap: True
cacheNodeIds: False
checkpointInterval: 10
featureSubsetStrategy: auto
featuresCol: features
impurity: gini
labelCol: label
leafCol: 
maxBins: 8
maxDepth: 15
maxMemoryInMB: 256
minInfoGain: 0.0
minInstancesPerNode: 10
minWeightFractionPerNode: 0.0
numTrees: 50
predictionCol: prediction
probabilityCol: probability
rawPredictionCol: rawPrediction
seed: -4713376035466859559
subsamplingRate: 1.0


In [60]:
from pyspark.sql.functions import col, expr, when

# Alias prediction columns to avoid ambiguity
predictions_rf = predictions_rf.withColumnRenamed("prediction", "prediction_rf")
predictions_gbt = predictions_gbt.withColumnRenamed("prediction", "prediction_gbt")

# Combine predictions without using withColumn or F
combined_predictions = (
    predictions_rf.join(
        predictions_gbt.select("prediction_gbt", "features", "label"),
        on="features"
    )
    .select(
        predictions_rf["features"],
        predictions_rf["label"],
        ((col("prediction_rf") + col("prediction_gbt")) / 2.0).alias("combined_prediction")
    )
    .withColumn(
        "final_prediction",
        when(col("combined_prediction") < 0.5, 0).otherwise(1)
    )
)

# Show the combined predictions with the final prediction
combined_predictions.select("features", "label", "combined_prediction", "final_prediction").show(100)

+--------------------+-----+-------------------+----------------+
|            features|label|combined_prediction|final_prediction|
+--------------------+-----+-------------------+----------------+
|(45,[1,2,7,8,10,1...|  0.0|                1.0|               1|
|(45,[1,2,7,8,10,1...|  1.0|                1.0|               1|
|(45,[1,2,7,8,10,1...|  0.0|                0.5|               1|
|(45,[1,2,7,8,10,1...|  0.0|                0.5|               1|
|(45,[1,2,3,4,7,8,...|  0.0|                1.0|               1|
|(45,[1,2,3,4,7,8,...|  0.0|                1.0|               1|
|(45,[1,2,3,4,7,8,...|  0.0|                1.0|               1|
|(45,[1,2,3,4,7,8,...|  1.0|                1.0|               1|
|(45,[1,2,3,4,7,8,...|  0.0|                1.0|               1|
|(45,[1,2,3,4,7,8,...|  1.0|                1.0|               1|
|(45,[1,2,3,4,7,8,...|  1.0|                1.0|               1|
|(45,[1,2,3,4,7,8,...|  1.0|                1.0|               1|
|(45,[1,2,