**Load Libraries**

In [1]:
!pip install pyspark
import pyspark.sql.functions as f
from pyspark.ml.feature import PCA, VectorAssembler,StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import pyspark.sql.functions as f
import pyspark.sql.functions as f
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
import sklearn.metrics
from pyspark.ml.clustering import GaussianMixture
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.classification import  LogisticRegression
import re
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import RandomForestRegressor,DecisionTreeRegressor
from functools import reduce
from pyspark.sql import SparkSession
import numpy as np
from pyspark.sql.functions import regexp_replace
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd




**Data Manipulation**

In [2]:
spark = SparkSession\
    .builder.appName("MachineLearningTesting")\
    .master("local[*]")\
    .getOrCreate()
df_pyspark=spark.read.csv('churn.csv',header=True,inferSchema=True)

oldcolumns=df_pyspark.columns
newcolumns=pd.Series(df_pyspark.columns).str.lower().str.replace(' ','_').tolist()
df_pyspark=reduce(lambda df,
      i: df.withColumnRenamed(oldcolumns[i],newcolumns[i]),range(len(oldcolumns)),df_pyspark)
mapping={'account_length':'account_len',"int'l_plan":'intl_plan','eve_mins':'evening_mins','eve_calls':'evening_calls',
         'eve_charge': 'evening_charge', 'churn?':'churn'}

newcol=[mapping.get(x,x) for x in df_pyspark.columns]
df_pyspark.toDF(*newcol).toPandas()
df_pyspark=df_pyspark.toDF(*newcol)

# convert strings to upper and making any neccesary correction

df_pyspark=df_pyspark.withColumn("churn",f.regexp_replace("churn","\.",""))

upperlist=[col[0] for col in df_pyspark.dtypes if col[1]=='string']

df_pyspark=reduce(lambda df,colname:
   df.withColumn(colname,f.upper(f.col(colname))),upperlist,df_pyspark)


**PCA**

In [3]:
col=[item[0] for item in df_pyspark.dtypes if not(item[1].startswith('string'))]

featureassembler=VectorAssembler(inputCols=col,outputCol='features')
assembler_df=featureassembler.transform(df_pyspark)

scaler=StandardScaler(inputCol='features',outputCol='features_scaled',withStd=True,withMean=False)
scaled_df=scaler.fit(assembler_df).transform(assembler_df)

pca=PCA(k=len(col),inputCol='features',outputCol='pca_features')
model=pca.fit(scaled_df)

print(model.explainedVariance.round(3)*100)
print(model.explainedVariance.cumsum().round(3)*100)

pca=PCA(k=4,inputCol='features',outputCol='pca_features')
model=pca.fit(scaled_df)

pca_df=model.transform(scaled_df)

pca_df.select('pca_features').show(truncate=False)


[23.5 20.1 19.6 13.9 12.2  3.1  3.1  2.9  1.4  0.1  0.   0.   0.   0.
  0.   0. ]
[ 23.5  43.7  63.3  77.1  89.3  92.4  95.5  98.4  99.9  99.9 100.  100.
 100.  100.  100.  100. ]
+-------------------------------------------------------------------------------+
|pca_features                                                                   |
+-------------------------------------------------------------------------------+
|[-278.5709226981437,27.07709503672126,297.2297740248364,-404.50495418263625]   |
|[-173.8461564451706,22.11452207304285,309.12035879832683,-405.0323256391566]   |
|[-251.93720525128546,9.717034506165433,187.1094726516783,-403.08195917163334]  |
|[-306.26069317050496,-59.265720530748446,181.35476455020364,-403.8758691875634]|
|[-175.55925259576688,20.44659897036715,227.15102808832444,-408.1380434549122]  |
|[-235.02778511191295,71.78089389732655,277.68974689091715,-498.8641288738459]  |
|[-235.19597517888946,173.69741231275447,356.5419980946255,-497.0944537111983]  |


In [5]:
pca_df=pca_df.withColumn('total_charge',
                         sum([f.col(colname) for colname in df_pyspark.columns if '_charge' in colname]))

df=pca_df.join(df_pyspark.groupBy('phone').pivot('state').agg(f.count('*')).fillna(0),
             ['phone'],how='inner')
state_cols=[x[0] for x in df_pyspark.select('state').distinct().collect()]

listcol=['pca_features']
listcol.extend(state_cols)

featureassembler=VectorAssembler(inputCols=listcol,outputCol='final_features')
df=featureassembler.transform(df)

**Linear Regression**

In [9]:
(train_data,test_data)= df.randomSplit([0.7,0.3])
lm = LinearRegression(featuresCol='final_features', labelCol='total_charge')
model = lm.fit(train_data)

def modelsummary(model):
    print("##", "-------------------------------------------------")
    print("##", "  Estimate  | Std.Error  | t-statistic  | p-value")
    coef = np.append(list(model.coefficients), model.intercept)
    summary = model.summary  # Corrected this line

    for i in range(len(summary.pValues)):  # Corrected this line
        print("##", '{:9.2f}'.format(coef[i]),
              '{:12.2f}'.format(summary.coefficientStandardErrors[i]),  # Corrected this line
              '{:12.2f}'.format(summary.tValues[i]),  # Corrected this line
              '{:12.2f}'.format(summary.pValues[i]))  # Corrected this line

    print("##", '--------------')

    print("##", "Mean squared error: %.5f" % summary.meanSquaredError,
          "RMSE: %1.5f" % summary.rootMeanSquaredError)
    print("##", 'Multiple R-squared: %1.5f' % summary.r2)

modelsummary(model)

# Make predictions on the test data
predictions = model.transform(test_data)
predictions.select('features', 'total_charge', 'prediction')

# Evaluate the model using RMSE
evaluator = RegressionEvaluator(labelCol='total_charge',
                                predictionCol="prediction",
                                metricName='rmse')

rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE):", rmse)

## -------------------------------------------------
##   Estimate  | Std.Error  | t-statistic  | p-value
##     -0.17         0.00      -595.49         0.00
##      0.04         0.00       134.64         0.00
##      0.08         0.00       248.74         0.00
##     -0.00         0.00        -5.77         0.00
##   1339.45   1478342.00         0.00         1.00
##   1339.34   1478342.00         0.00         1.00
##   1339.16   1478342.00         0.00         1.00
##   1339.46   1478342.00         0.00         1.00
##   1339.45   1478342.00         0.00         1.00
##   1339.35   1478342.00         0.00         1.00
##   1339.30   1478342.00         0.00         1.00
##   1339.46   1478342.00         0.00         1.00
##   1339.22   1478342.00         0.00         1.00
##   1339.36   1478342.00         0.00         1.00
##   1339.49   1478342.00         0.00         1.00
##   1339.29   1478342.00         0.00         1.00
##   1339.32   1478342.00         0.00         1.00
##   1339.

**DecisionTreeRegressor**

In [None]:
listcol=[col for col in df_pyspark.columns if '_charge' in col]
df_pyspark=df_pyspark.withColumn('total_charge',sum([f.col(colname) for colname in listcol]))

listcol=[x for x in df_pyspark.columns if re.compile('^.*_calls|mins$').search(x)]
featureassembler=VectorAssembler(inputCols=listcol,outputCol='features_modeling')
df_pyspark=featureassembler.transform(df_pyspark)


In [42]:
train_data,test_data=df_pyspark.randomSplit([0.7,0.3])


dt=DecisionTreeRegressor(featuresCol='features_modeling',labelCol='total_charge',
                         maxDepth=5,minInstancesPerNode=round(df_pyspark.count()*0.01))
model=dt.fit(train_data)

predictions=model.transform(test_data)
evaluator=RegressionEvaluator(labelCol='total_charge',predictionCol='prediction',metricName='mae')
evaluator.evaluate(predictions)

2.6142729908474713

In [43]:
y_true = predictions.select("total_charge").toPandas()
y_pred = predictions.select("prediction").toPandas()

r2_score = sklearn.metrics.r2_score(y_true, y_pred)
print(f'r2_score: {r2_score:.3f}\n')

listcol=[x for x in df_pyspark.columns if re.compile('^.*_calls|mins$').search(x)]

imp=model.featureImportances
indices=np.argsort(imp)[::-1]
sort_imp=sorted(imp,reverse=True)
for name,value in zip(np.array(listcol)[indices],sort_imp):
    print(name,':',f'{value:.4f}')

r2_score: 0.900

day_mins : 0.8315
evening_mins : 0.1636
night_mins : 0.0049
custserv_calls : 0.0000
intl_calls : 0.0000
intl_mins : 0.0000
night_calls : 0.0000
evening_calls : 0.0000
day_calls : 0.0000


**RandomForestRegressor**

In [41]:
rf=RandomForestRegressor(featuresCol='features_modeling',labelCol='total_charge',
                         maxDepth=5,minInstancesPerNode=round(df_pyspark.count()*0.01))

model=rf.fit(train_data)
predictions=model.transform(test_data)

evaluator=RegressionEvaluator(labelCol='total_charge',predictionCol="prediction",metricName='mae')
evaluator.evaluate(predictions)

imp=model.featureImportances
indices=np.argsort(imp)[::-1]
sort_imp=sorted(imp,reverse=True)
for name,value in zip(np.array(listcol)[indices],sort_imp):
    print(name,':',f'{value:.4f}')

day_mins : 0.7737
evening_mins : 0.1698
night_mins : 0.0346
day_charge : 0.0052
intl_mins : 0.0051
evening_charge : 0.0045
night_charge : 0.0034
intl_charge : 0.0025
total_charge : 0.0012


**GBTRegressor**

In [40]:
rf=GBTRegressor(featuresCol='features_modeling',labelCol='total_charge',
                         maxDepth=5,minInstancesPerNode=round(df_pyspark.count()*0.01))
model=rf.fit(train_data)
predictions=model.transform(test_data)
evaluator=RegressionEvaluator(labelCol='total_charge',predictionCol='prediction')
evaluator.evaluate(predictions)

imp=model.featureImportances
indices=np.argsort(imp)[::-1]
sort_imp=sorted(imp,reverse=True)
for name,value in zip(np.array(listcol)[indices],sort_imp):
    print(name,':',f'{value:.4f}')

day_mins : 0.5455
night_mins : 0.2400
evening_mins : 0.1658
intl_mins : 0.0370
night_charge : 0.0030
evening_charge : 0.0024
day_charge : 0.0023
intl_charge : 0.0021
total_charge : 0.0019


LogisticRegression

In [29]:
listcol=[x for x in df_pyspark.columns if re.compile('^._calls|_mins|_charge').search(x)]
featureassembler=VectorAssembler(inputCols=listcol,outputCol='features_clf')
df_pyspark=featureassembler.transform(df_pyspark)
df_pyspark=df_pyspark.withColumn('churn',f.when(f.col('churn').isin('TRUE'), 1).when(f.col('churn').isin('FALSE'), 0))

In [30]:
logr=LogisticRegression(featuresCol='features_clf',labelCol='churn')
(train_data,test_data)=df_pyspark.randomSplit([0.7,0.3],seed=42)
model=logr.fit(train_data)
predictions=model.transform(test_data)
predictions.select('features_clf','churn','prediction').show()

df_pyspark.groupBy('churn').count().withColumn('PRC_count',f.col('count')/df_pyspark.count()).show()



evaluator=MulticlassClassificationEvaluator(labelCol='churn',predictionCol='prediction',
                                            metricName='accuracy')

multiclass_metrics=['accuracy','weightedRecall','weightedPrecision','f1']
for metric in multiclass_metrics:
    result=evaluator.evaluate(predictions,{evaluator.metricName:metric})
    print(f'Metrics {metric}: {result:.4f}')

evaluator=BinaryClassificationEvaluator(labelCol='churn',rawPredictionCol='prediction')
binary_metrics=['areaUnderROC','areaUnderPR']
for metric in binary_metrics:
    result=evaluator.evaluate(predictions,{evaluator.metricName: metric})
    print(f'Binary {metric}: {result:.4f}')


predictions.groupBy('prediction').count().withColumn('PRC',f.col('count')/predictions.count()).show()

trainingSummary=model.summary
trainingSummary.roc.show(5)
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head(5)
print(maxFMeasure)

+--------------------+-----+----------+
|        features_clf|churn|prediction|
+--------------------+-----+----------+
|[159.3,27.08,125....|    0|       0.0|
|[217.0,36.89,152....|    0|       0.0|
|[148.3,25.21,181....|    0|       0.0|
|[139.3,23.68,178....|    0|       0.0|
|[252.4,42.91,187....|    0|       0.0|
|[185.0,31.45,232....|    0|       0.0|
|[262.3,44.59,198....|    0|       0.0|
|[190.3,32.35,194....|    0|       0.0|
|[150.0,25.5,159.4...|    0|       0.0|
|[238.4,40.53,246....|    0|       0.0|
|[213.6,36.31,175....|    0|       0.0|
|[103.0,17.51,242....|    0|       0.0|
|[100.1,17.02,233....|    0|       0.0|
|[148.4,25.23,193....|    0|       0.0|
|[245.2,41.68,159....|    0|       0.0|
|[181.5,30.86,205....|    0|       0.0|
|[178.4,30.33,168....|    0|       0.0|
|[182.3,30.99,169....|    0|       0.0|
|[175.7,29.87,187....|    0|       0.0|
|[181.1,30.79,314....|    0|       0.0|
+--------------------+-----+----------+
only showing top 20 rows

+-----+-----+-

**DecisionTreeClassifier**

In [31]:
dtre=DecisionTreeClassifier(featuresCol='features_clf',labelCol='churn',
                            maxDepth=5,minInstancesPerNode=round(df_pyspark.count()*0.01))
model=dtre.fit(train_data)
predictions=model.transform(test_data)
evaluator=MulticlassClassificationEvaluator(labelCol='churn',predictionCol='prediction',metricName='accuracy')
multiclass_metrics=['accuracy','weightedRecall','weightedPrecision','f1']

for metric in multiclass_metrics:
    result=evaluator.evaluate(predictions,{evaluator.metricName: metric})
    print(f'Metric {metric}: {result}')

evaluator=BinaryClassificationEvaluator(labelCol='churn',rawPredictionCol='prediction')
binary_metrics=['areaUnderROC','areaUnderPR']
print('\n')
for metric in binary_metrics:
    result=evaluator.evaluate(predictions,{evaluator.metricName: metric})
    print(f'Binary {metric}: {result:.4f}')

Metric accuracy: 0.8888888888888888
Metric weightedRecall: 0.8888888888888888
Metric weightedPrecision: 0.8763145861911293
Metric f1: 0.8766343807509956


Binary areaUnderROC: 0.6904
Binary areaUnderPR: 0.5303


In [32]:
imp=model.featureImportances
indices=np.argsort(imp)[::-1]
sort_imp=sorted(imp,reverse=True)
for name,value in zip(np.array(listcol)[indices],sort_imp):
    print(name,':',f'{value:.4f}')

total_charge : 0.9937
day_mins : 0.0063
intl_charge : 0.0000
intl_mins : 0.0000
night_charge : 0.0000
night_mins : 0.0000
evening_charge : 0.0000
evening_mins : 0.0000
day_charge : 0.0000


**RandomForestClassifier**

In [38]:
rf=RandomForestClassifier(featuresCol='features_clf',labelCol='churn',
                            maxDepth=5,minInstancesPerNode=round(df_pyspark.count()*0.01))
model=rf.fit(train_data)
predictions=model.transform(test_data)
evaluator=MulticlassClassificationEvaluator(labelCol='churn',predictionCol='prediction',metricName='accuracy')
multiclass_metrics=['accuracy','weightedRecall','weightedPrecision','f1']

for metric in multiclass_metrics:
    result=evaluator.evaluate(predictions,{evaluator.metricName: metric})
    print(f'Metric {metric}: {result}')

evaluator=BinaryClassificationEvaluator(labelCol='churn',rawPredictionCol='prediction')
binary_metrics=['areaUnderROC','areaUnderPR']
print('\n')
for metric in binary_metrics:
    result=evaluator.evaluate(predictions,{evaluator.metricName: metric})
    print(f'Binary {metric}: {result:.4f}')

Metric accuracy: 0.8814814814814815
Metric weightedRecall: 0.8814814814814815
Metric weightedPrecision: 0.867886612869853
Metric f1: 0.8570728232397171


Binary areaUnderROC: 0.6280
Binary areaUnderPR: 0.5230


In [39]:
imp=model.featureImportances
indices=np.argsort(imp)[::-1]
sort_imp=sorted(imp,reverse=True)
for name,value in zip(np.array(listcol)[indices],sort_imp):
    print(name,':',f'{value:.4f}')

total_charge : 0.4161
day_mins : 0.2906
day_charge : 0.2030
evening_mins : 0.0258
evening_charge : 0.0237
night_charge : 0.0189
intl_charge : 0.0091
intl_mins : 0.0073
night_mins : 0.0055


**GBTClassifier**

In [45]:
gbt=GBTClassifier(featuresCol='features_clf',labelCol='churn',
                            maxDepth=5,minInstancesPerNode=round(df_pyspark.count()*0.01))
model=gbt.fit(train_data)
predictions=model.transform(test_data)
evaluator=MulticlassClassificationEvaluator(labelCol='churn',predictionCol='prediction',metricName='accuracy')
multiclass_metrics=['accuracy','weightedRecall','weightedPrecision','f1']

for metric in multiclass_metrics:
    result=evaluator.evaluate(predictions,{evaluator.metricName: metric})
    print(f'Metric {metric}: {result}')

evaluator=BinaryClassificationEvaluator(labelCol='churn',rawPredictionCol='prediction')
binary_metrics=['areaUnderROC','areaUnderPR']
print('\n')
for metric in binary_metrics:
    result=evaluator.evaluate(predictions,{evaluator.metricName: metric})
    print(f'Binary {metric}: {result:.4f}')

Metric accuracy: 0.8867724867724868
Metric weightedRecall: 0.8867724867724868
Metric weightedPrecision: 0.8731796335195858
Metric f1: 0.8718288489477318


Binary areaUnderROC: 0.6739
Binary areaUnderPR: 0.5253


In [46]:
imp=model.featureImportances
indices=np.argsort(imp)[::-1]
sort_imp=sorted(imp,reverse=True)
for name,value in zip(np.array(listcol)[indices],sort_imp):
    print(name,':',f'{value:.4f}')

custserv_calls : 0.3488
intl_mins : 0.2272
evening_mins : 0.1438
night_mins : 0.1208
day_mins : 0.1024
night_calls : 0.0413
evening_calls : 0.0158
intl_calls : 0.0000
day_calls : 0.0000


**Naive** **Bayes**

In [47]:
nb = NaiveBayes(featuresCol='features_clf', labelCol='churn')

model=nb.fit(train_data)
predictions=model.transform(test_data)
evaluator=MulticlassClassificationEvaluator(labelCol='churn',predictionCol='prediction',metricName='accuracy')
multiclass_metrics=['accuracy','weightedRecall','weightedPrecision','f1']

for metric in multiclass_metrics:
    result=evaluator.evaluate(predictions,{evaluator.metricName: metric})
    print(f'Metric {metric}: {result}')

evaluator=BinaryClassificationEvaluator(labelCol='churn',rawPredictionCol='prediction')
binary_metrics=['areaUnderROC','areaUnderPR']
print('\n')
for metric in binary_metrics:
    result=evaluator.evaluate(predictions,{evaluator.metricName: metric})
    print(f'Binary {metric}: {result:.4f}')

Metric accuracy: 0.6634920634920635
Metric weightedRecall: 0.6634920634920635
Metric weightedPrecision: 0.7966778439107283
Metric f1: 0.7099680611268521


Binary areaUnderROC: 0.6016
Binary areaUnderPR: 0.1996


**K-Means**

In [49]:
col=[item[0] for item in df_pyspark.dtypes if not(item[1].startswith('string'))]

featureassembler=VectorAssembler(inputCols=col,outputCol='features_clust')
df_pyspark=featureassembler.transform(df_pyspark)
sil=[]
for k in range(5,10):
    kmeans=KMeans().setK(k).setSeed(42).setFeaturesCol('features_clust').setPredictionCol('cluster')
    model=kmeans.fit(df_pyspark)
    predictions=model.transform(df_pyspark)
    evaluator=ClusteringEvaluator(featuresCol='features_clust',predictionCol='cluster')
    silhouette=evaluator.evaluate(predictions)
    sil.append(silhouette)

In [50]:
print(sil)

[0.2548164267567587, 0.24565456894744797, 0.24805341316205598, 0.23715204757475922, 0.23605261908185068]


**Gaussian Mixture Model**

In [None]:
sil=[]
for k in range(5, 10):
    gmm = GaussianMixture(k=k, seed=42, featuresCol='features_clust', predictionCol='cluster')
    model = gmm.fit(df_pyspark)
    predictions = model.transform(df_pyspark)
    evaluator = ClusteringEvaluator(featuresCol='features_clust', predictionCol='cluster')
    silhouette = evaluator.evaluate(predictions)
    sil.append(silhouette)

print(sil)