# Random Forest

In [1]:
from pyspark.sql.types import BooleanType
from pyspark.ml.feature import IndexToString, Normalizer, StringIndexer, VectorAssembler, VectorIndexer, StandardScaler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import expr
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from helpers.helper_functions import translate_to_file_string
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.mllib.util import MLUtils
import pandas as pd
from IPython.display import display, HTML

#Change Column Names
def delete_space(df):
    names = df.schema.names
    for name in names:
        newName = name.replace(" ","")
        df = df.withColumnRenamed(name, newName)
    return df

In [2]:
inputFile = translate_to_file_string("../data/data.csv")

Spark session creation 

In [3]:
spark = (SparkSession
       .builder
       .appName("Modell_DecisionTree")
       .getOrCreate())

DataFrame creation using an ifered Schema 

In [4]:
# create a DataFrame using an ifered Schema 
df = spark.read.option("header", "true") \
       .option("inferSchema", "true") \
       .option("delimiter", ";") \
       .csv(inputFile) 

df = delete_space(df)
df_orig = df
df = df.where("MonthlyCharges Between 22 AND 95")
df = df.where("TotalCharges IS NOT NULL")

## Prepare training and test data.

### Use this for Pandas Dataframe

In [5]:
# #Create Pandas DataFrame
# df_pandas = df.toPandas()
# df_pandas_cat = df.toPandas()
# #Pandas Indexing Method to Integer or Category Datatype
# pandasCol = list(df_pandas)
# for col in pandasCol:
#     if df_pandas[col].dtypes=='object':
#         if not col == "Contract":
#             #Categorize
#             df_pandas_cat[col]= pd.Categorical(pd.factorize(df_pandas_cat[col])[0])
#             #ToInteger
#             df_pandas[col]= pd.factorize(df_pandas[col])[0]

# #Define whicht Columns should be normalized
# newCols = []
# for col in pandasCol:
#     if not col == "Contract":
#         newCols.append(col)

# #Normalize the selected Columns
# df_pandas[newCols]=(df_pandas[newCols]-df_pandas[newCols].min())/(df_pandas[newCols].max()-df_pandas[newCols].min())

# #Write Pandas Dataframe Back to Spark Dataframe
# df_temp = spark.createDataFrame(df_pandas)
# df = df_temp
# # Create Indexer for Contract (Still needed)
# contractIndexer = StringIndexer().setInputCol("Contract").setOutputCol("Contract_Int").fit(df)

# #Create FeatureCols 
# featureCols = df.columns.copy()
# featureCols.remove("Contract")
# featureCols.remove("CustomerID")

### Create Indexer

In [6]:
#Comment the Following if Pandas-Dataset is used
IDIndexer = StringIndexer().setInputCol("CustomerID").setOutputCol("CustomerID_Int").fit(df)
genderIndexer = StringIndexer().setInputCol("Gender").setOutputCol("Gender_Int").fit(df)
seniorIndexer = StringIndexer().setInputCol("SeniorCitizen").setOutputCol("SeniorCitizen_Int").fit(df)
partnerIndexer = StringIndexer().setInputCol("Partner").setOutputCol("Partner_Int").fit(df)
DependentsIndexer = StringIndexer().setInputCol("Dependents").setOutputCol("Dependents_Int").fit(df)
tenureIndexer = StringIndexer().setInputCol("Tenure").setOutputCol("Tenure_Int").fit(df)
phoneIndexer = StringIndexer().setInputCol("PhoneService").setOutputCol("PhoneService_Int").fit(df)
multipleIndexer = StringIndexer().setInputCol("MultipleLines").setOutputCol("MultipleLines_Int").fit(df)
internetIndexer = StringIndexer().setInputCol("InternetService").setOutputCol("InternetService_Int").fit(df)
onlineSecurityIndexer = StringIndexer().setInputCol("OnlineSecurity").setOutputCol("OnlineSecurity_Int").fit(df)
onlineBackupIndexer = StringIndexer().setInputCol("OnlineBackup").setOutputCol("OnlineBackup_Int").fit(df)
deviceIndexer = StringIndexer().setInputCol("DeviceProtection").setOutputCol("DeviceProtection_Int").fit(df)
techIndexer = StringIndexer().setInputCol("TechSupport").setOutputCol("TechSupport_Int").fit(df)
streamingTVIndexer = StringIndexer().setInputCol("StreamingTV").setOutputCol("StreamingTV_Int").fit(df)
streamingMoviesIndexer = StringIndexer().setInputCol("StreamingMovies").setOutputCol("StreamingMovies_Int").fit(df)
contractIndexer = StringIndexer().setInputCol("Contract").setOutputCol("Contract_Int").fit(df)
paperlessIndexer = StringIndexer().setInputCol("PaperlessBilling").setOutputCol("PaperlessBilling_Int").fit(df)
paymentIndexer = StringIndexer().setInputCol("PaymentMethod").setOutputCol("PaymentMethod_Int").fit(df)
monthlyIndexer = StringIndexer().setInputCol("MonthlyCharges").setOutputCol("MonthlyCharges_Int").fit(df)
totalIndexer = StringIndexer().setInputCol("TotalCharges").setOutputCol("TotalCharges_Int").fit(df)

### Identify the Columns that should be used for the Feature Vector

In [7]:
##Comment this if Pandas Dataset is used
featureCols = df.columns.copy()
for col in featureCols:
    if not col == "Tenure" and not col == "MonthlyCharges" and not col == "TotalCharges":
        featureCols.remove(col)
        colname = col +"_Int"
        featureCols = featureCols + [colname]
    else:
        featureCols.remove(col)
        featureCols = featureCols + [col]

featureCols.remove("Contract_Int")
featureCols.remove("CustomerID_Int")
featureCols.remove("Gender")
featureCols = featureCols + ["Gender_Int"]

### Create Assembler, FeatureIndexer, PredConverter and Scaler

In [8]:
assembler =  VectorAssembler(outputCol="features", inputCols=list(featureCols))

featureIndexer = VectorIndexer(inputCol="features",outputCol="indexedFeatures", maxCategories=6)
 
predConverter = IndexToString(inputCol="prediction",outputCol="predictedLabel",labels=contractIndexer.labels)

scaler = StandardScaler(inputCol="indexedFeatures", outputCol="scaledFeatures",withStd=True, withMean=False)

### Build the Decision Tree Model

In [9]:
dt = DecisionTreeClassifier(labelCol="Contract_Int", featuresCol="scaledFeatures")
paramGrid = ParamGridBuilder().addGrid(dt.maxDepth, [ 10, 15 , 20 ]) \
				              .addGrid(dt.minInfoGain, [ 0.02, 0.01, 0.001]) \
				              .addGrid(dt.minInstancesPerNode, [5, 10, 15]) \
                              .addGrid(dt.maxBins, [5, 6, 9]) \
				              .build()

### Create Train and Test Datasets

In [10]:
splits = df.randomSplit([0.9, 0.1 ], 12345)
train = splits[0]
test = splits[1]

### Build the Pipeline

In [11]:
#Use This for Pandas-Dataframe
# pipeline = Pipeline(stages= [contractIndexer, assembler, featureIndexer, scaler, dt, predConverter])

In [12]:
#Use This for Spark Dataframe
pipeline = Pipeline(stages= [genderIndexer, seniorIndexer, partnerIndexer, DependentsIndexer, phoneIndexer, multipleIndexer, internetIndexer, onlineSecurityIndexer, onlineBackupIndexer, deviceIndexer, techIndexer, streamingTVIndexer, streamingMoviesIndexer, contractIndexer, paperlessIndexer, paymentIndexer, assembler, featureIndexer, scaler, dt, predConverter])

### Build Evaluator and CrossValidator and train the Model

In [13]:
evaluator =  MulticlassClassificationEvaluator(labelCol="Contract_Int", metricName="f1")

cv = CrossValidator(estimator=pipeline,evaluator=evaluator,estimatorParamMaps=paramGrid,numFolds=5, parallelism=2)

cvModel = cv.fit(train)

In [14]:
#stages[19] for Spark Dataframe ;  stages[4] for Pandas Dataframe 
dtModel = cvModel.bestModel.stages[19]
print("Learned classification Random Forest model:\n",dtModel)
print("Best Params: \n", dtModel.explainParams())

Learned classification Random Forest model:
 DecisionTreeClassificationModel: uid=DecisionTreeClassifier_ae16ad464861, depth=8, numNodes=109, numClasses=3, numFeatures=18
Best Params: 
 cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featuresCol: features column name. (default: features, current: scaledFeatures)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini)
labelCol: label colum

In [15]:
predictions = cvModel.transform(test)
predictions.select("prediction", "Contract_Int", "predictedLabel", "Contract", "features", "scaledFeatures").show()

+----------+------------+--------------+--------------+--------------------+--------------------+
|prediction|Contract_Int|predictedLabel|      Contract|            features|      scaledFeatures|
+----------+------------+--------------+--------------+--------------------+--------------------+
|       2.0|         2.0|      One year|      One year|[0.0,0.0,0.0,27.0...|[0.0,0.0,0.0,1.11...|
|       1.0|         1.0|      Two year|      Two year|[0.0,1.0,1.0,25.0...|[0.0,2.0066441334...|
|       0.0|         0.0|Month-to-month|Month-to-month|(18,[3,5,8,9,12,1...|(18,[3,5,8,9,12,1...|
|       1.0|         0.0|      Two year|Month-to-month|(18,[1,3,6,9,11,1...|(18,[1,3,6,9,11,1...|
|       1.0|         2.0|      Two year|      One year|[1.0,0.0,0.0,44.0...|[2.66845817562711...|
|       0.0|         1.0|Month-to-month|      Two year|[0.0,1.0,1.0,48.0...|[0.0,2.0066441334...|
|       0.0|         0.0|Month-to-month|Month-to-month|(18,[1,2,3,6,12,1...|(18,[1,2,3,6,12,1...|
|       0.0|        

In [16]:
new_df = predictions.groupBy("prediction").count()
new_df.show()
new_train_df = train.groupBy("Contract").count()
new_train_df.show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|  248|
|       1.0|   83|
|       2.0|   61|
+----------+-----+

+--------------+-----+
|      Contract|count|
+--------------+-----+
|Month-to-month| 2369|
|      One year|  665|
|      Two year|  778|
+--------------+-----+



In [17]:
accuracy = evaluator.evaluate(predictions)
print("Test Error = " ,(1.0 - accuracy))

Test Error =  0.27964680521498697


In [18]:
predictionAndLabels = predictions.select("prediction", "Contract_Int").rdd.map(lambda p: [p[0], p[1]]) # Map to RDD prediction|label
metrics =  MulticlassMetrics(predictionAndLabels)

In [19]:
confusion = metrics.confusionMatrix()
print("Confusion matrix: \n" , confusion)

Confusion matrix: 
 DenseMatrix([[207.,   5.,  24.],
             [  8.,  54.,  11.],
             [ 33.,  24.,  26.]])


In [20]:
labels = predictionAndLabels.map(lambda x: x[1]).distinct().collect()
for label in  labels:
  print("Class %f precision = %f\n" % (label , metrics.precision(label)))
  print("Class %f recall = %f\n" % (label, metrics.recall(label)))
  print("Class %f F1 score = %f\n" % (label, metrics.fMeasure( label)))


Class 2.000000 precision = 0.426230

Class 2.000000 recall = 0.313253

Class 2.000000 F1 score = 0.361111

Class 1.000000 precision = 0.650602

Class 1.000000 recall = 0.739726

Class 1.000000 F1 score = 0.692308

Class 0.000000 precision = 0.834677

Class 0.000000 recall = 0.877119

Class 0.000000 F1 score = 0.855372



In [21]:
print("Weighted precision = %s\n" % metrics.weightedPrecision)
print("Weighted recall = %s\n" % metrics.weightedRecall)
print("Weighted F1 score = %s\n" % metrics.weightedFMeasure)
print("Weighted false positive rate = %s\n" % metrics.weightedFalsePositiveRate)

Weighted precision = 0.7139155511522558

Weighted recall = 0.7321428571428571

Weighted F1 score = <bound method MulticlassMetrics.weightedFMeasure of <pyspark.mllib.evaluation.MulticlassMetrics object at 0x7fb56f758150>>

Weighted false positive rate = 0.19914106928670036



In [22]:
print("Recall = %s" % metrics.recall(1.0))
print("Precision = %s" % metrics.precision(1.0))
print("Accuracy = %s" % metrics.accuracy) 
print("F1 = %s" % metrics.fMeasure(1.0))

Recall = 0.7397260273972602
Precision = 0.6506024096385542
Accuracy = 0.7321428571428571
F1 = 0.6923076923076923


In [23]:
#spark.stop()