In [34]:
# (1) Import the required Python dependencies
%matplotlib inline
import matplotlib.pyplot as plt
import pandas as pd
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
# (2) Instantiate a Spark Context
conf = SparkConf().setMaster("spark://192.168.56.10:7077").setAppName("CART - Congressional Voting")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [35]:
# (3) Load the Congressional Voting dataset (data/congressional-voting-data/house-votes-84.csv) into a Spark DataFrame
schema = StructType([
    StructField("party", StringType()),
    StructField("handicapped_infants", StringType()),
    StructField("water_project_cost_sharing", StringType()),
    StructField("adoption_of_the_budget_resolution", StringType()),
    StructField("physician_fee_freeze", StringType()),
    StructField("el_salvador_aid", StringType()),
    StructField("religious_groups_in_schools", StringType()),
    StructField("anti_satellite_test_ban", StringType()),
    StructField("aid_to_nicaraguan_contras", StringType()),
    StructField("mx_missile", StringType()),
    StructField("immigration", StringType()),
    StructField("synfuels_corporation_cutback", StringType()),
    StructField("education_spending", StringType()),
    StructField("superfund_right_to_sue", StringType()),
    StructField("crime", StringType()),
    StructField("duty_free_exports", StringType()),
    StructField("export_administration_act_south_africa", StringType())
])

congressional_voting_df = sqlContext.read.format('com.databricks.spark.csv').schema(schema).options(header = 'false', inferschema = 'false').load('/data/workspaces/jillur.quddus/jupyter/notebooks/Machine-Learning-with-Apache-Spark-QuickStart-Guide/chapter04/data/congressional-voting-data/house-votes-84.data')
congressional_voting_df.show(2)

+----------+-------------------+--------------------------+---------------------------------+--------------------+---------------+---------------------------+-----------------------+-------------------------+----------+-----------+----------------------------+------------------+----------------------+-----+-----------------+--------------------------------------+
|     party|handicapped_infants|water_project_cost_sharing|adoption_of_the_budget_resolution|physician_fee_freeze|el_salvador_aid|religious_groups_in_schools|anti_satellite_test_ban|aid_to_nicaraguan_contras|mx_missile|immigration|synfuels_corporation_cutback|education_spending|superfund_right_to_sue|crime|duty_free_exports|export_administration_act_south_africa|
+----------+-------------------+--------------------------+---------------------------------+--------------------+---------------+---------------------------+-----------------------+-------------------------+----------+-----------+----------------------------+--------

In [36]:
# (4) Index the relevant categorical and label variables using a Pipeline of stages
categorical_columns = ['handicapped_infants', 'water_project_cost_sharing', 'adoption_of_the_budget_resolution', 'physician_fee_freeze', 'el_salvador_aid', 'religious_groups_in_schools', 'anti_satellite_test_ban', 'aid_to_nicaraguan_contras', 'mx_missile', 'immigration', 'synfuels_corporation_cutback', 'education_spending', 'superfund_right_to_sue', 'crime', 'duty_free_exports', 'export_administration_act_south_africa']
pipeline_stages = []
for categorial_column in categorical_columns:
    string_indexer = StringIndexer(inputCol = categorial_column, outputCol = categorial_column + 'Index')
    encoder = OneHotEncoderEstimator(inputCols = [string_indexer.getOutputCol()], outputCols=[categorial_column + "classVec"])
    pipeline_stages += [string_indexer, encoder]
    
label_string_idx = StringIndexer(inputCol = 'party', outputCol = 'label')
pipeline_stages += [label_string_idx]
vector_assembler_inputs = [c + "classVec" for c in categorical_columns]
vector_assembler = VectorAssembler(inputCols = vector_assembler_inputs, outputCol = "features")
pipeline_stages += [vector_assembler]

In [37]:
# (5) Generate Input Feature Vectors from the Raw Spark DataFrame by executing the previously constructed Pipeline
pipeline = Pipeline(stages = pipeline_stages)
pipeline_model = pipeline.fit(congressional_voting_df)
label_column = 'label'
congressional_voting_features_df = pipeline_model.transform(congressional_voting_df).select(['features', label_column, 'party'])
pd.DataFrame(congressional_voting_features_df.take(5), columns=congressional_voting_features_df.columns).transpose()

Unnamed: 0,0,1,2,3,4
features,"(1.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, ...","(1.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, ...","(0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, ...","(1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, ...","(0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, ..."
label,1,1,0,0,0
party,republican,republican,democrat,democrat,democrat


In [38]:
# (6) Split the Raw Features and Labelled DataFrame into a Training DataFrame and a Test DataFrame
train_df, test_df = congressional_voting_features_df.randomSplit([0.75, 0.25], seed=12345)
train_df.count(), test_df.count()

(322, 113)

In [39]:
# (7) Train a Classification Tree Model on the Training DataFrame
decision_tree = DecisionTreeClassifier(featuresCol = 'features', labelCol = label_column)
decision_tree_model = decision_tree.fit(train_df)

In [40]:
# (8) Apply the Trained Classification Tree Model to the Test DataFrame to make predictions
test_decision_tree_predictions_df = decision_tree_model.transform(test_df)
print("TEST DATASET PREDICTIONS AGAINST ACTUAL LABEL: ")
test_decision_tree_predictions_df.select("probability", "rawPrediction", "prediction", label_column, "features").show()

TEST DATASET PREDICTIONS AGAINST ACTUAL LABEL: 
+-----------+-------------+----------+-----+--------------------+
|probability|rawPrediction|prediction|label|            features|
+-----------+-------------+----------+-----+--------------------+
|  [1.0,0.0]|  [167.0,0.0]|       0.0|  0.0|(32,[0,2,4,6,8,10...|
|  [1.0,0.0]|  [167.0,0.0]|       0.0|  0.0|(32,[0,2,4,6,8,12...|
|  [1.0,0.0]|  [167.0,0.0]|       0.0|  0.0|(32,[0,2,4,6,9,10...|
|  [1.0,0.0]|  [167.0,0.0]|       0.0|  0.0|(32,[0,2,4,6,9,10...|
|  [1.0,0.0]|  [167.0,0.0]|       0.0|  0.0|(32,[0,2,4,6,9,10...|
|  [1.0,0.0]|  [167.0,0.0]|       0.0|  0.0|(32,[0,2,4,6,9,10...|
|  [0.0,1.0]|   [0.0,96.0]|       1.0|  0.0|(32,[0,2,4,7,8,10...|
|  [0.0,1.0]|   [0.0,96.0]|       1.0|  0.0|(32,[0,2,4,7,8,10...|
|  [1.0,0.0]|   [12.0,0.0]|       0.0|  0.0|(32,[0,2,5,6,8,10...|
|  [0.0,1.0]|   [0.0,96.0]|       1.0|  1.0|(32,[0,2,5,7,8,10...|
|  [0.0,1.0]|   [0.0,96.0]|       1.0|  1.0|(32,[0,2,5,7,8,10...|
|  [0.0,1.0]|   [0.0,96.0]| 

In [41]:
# (9) Evaluate the performance of our Classification Tree Model on the Test DataFrame using Area under a ROC curve
evaluator_roc_area = BinaryClassificationEvaluator(rawPredictionCol = "rawPrediction", labelCol = label_column, metricName = "areaUnderROC")
print("Area Under ROC Curve on Test Data = %g" % evaluator_roc_area.evaluate(test_decision_tree_predictions_df))

Area Under ROC Curve on Test Data = 0.909553


In [42]:
# (10) Visualise the Classification Tree
print(str(decision_tree_model.toDebugString))

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_4342b3d31685b6648513) of depth 5 with 31 nodes
  If (feature 7 in {0.0})
   If (feature 6 in {1.0})
    If (feature 5 in {0.0})
     Predict: 0.0
    Else (feature 5 not in {0.0})
     If (feature 11 in {0.0})
      Predict: 0.0
     Else (feature 11 not in {0.0})
      If (feature 2 in {1.0})
       Predict: 0.0
      Else (feature 2 not in {1.0})
       Predict: 1.0
   Else (feature 6 not in {1.0})
    If (feature 20 in {0.0})
     If (feature 10 in {1.0})
      Predict: 0.0
     Else (feature 10 not in {1.0})
      If (feature 1 in {1.0})
       Predict: 0.0
      Else (feature 1 not in {1.0})
       Predict: 0.0
    Else (feature 20 not in {0.0})
     Predict: 1.0
  Else (feature 7 not in {0.0})
   If (feature 31 in {1.0})
    If (feature 16 in {1.0})
     Predict: 0.0
    Else (feature 16 not in {1.0})
     If (feature 4 in {1.0})
      If (feature 12 in {0.0})
       Predict: 0.0
      Else (feature 12 not in {0.0})
    

In [43]:
# (11) Train a Random Forest Classifier Model on the Training DataFrame
random_forest = RandomForestClassifier(featuresCol = 'features', labelCol = label_column)
random_forest_model = random_forest.fit(train_df)

In [44]:
# (12) Apply the Trained Random Forest Classifier Model to the Test DataFrame to make predictions
test_random_forest_predictions_df = random_forest_model.transform(test_df)
print("TEST DATASET PREDICTIONS AGAINST ACTUAL LABEL: ")
test_random_forest_predictions_df.select("probability", "rawPrediction", "prediction", label_column, "features").show()

TEST DATASET PREDICTIONS AGAINST ACTUAL LABEL: 
+--------------------+--------------------+----------+-----+--------------------+
|         probability|       rawPrediction|prediction|label|            features|
+--------------------+--------------------+----------+-----+--------------------+
|[0.99444444444444...|[19.8888888888888...|       0.0|  0.0|(32,[0,2,4,6,8,10...|
|         [0.95,0.05]|          [19.0,1.0]|       0.0|  0.0|(32,[0,2,4,6,8,12...|
|[0.99166666666666...|[19.8333333333333...|       0.0|  0.0|(32,[0,2,4,6,9,10...|
|[0.97142857142857...|[19.4285714285714...|       0.0|  0.0|(32,[0,2,4,6,9,10...|
|           [1.0,0.0]|          [20.0,0.0]|       0.0|  0.0|(32,[0,2,4,6,9,10...|
|[0.99166666666666...|[19.8333333333333...|       0.0|  0.0|(32,[0,2,4,6,9,10...|
|[0.26964285714285...|[5.39285714285714...|       1.0|  0.0|(32,[0,2,4,7,8,10...|
|[0.42361111111111...|[8.47222222222222...|       1.0|  0.0|(32,[0,2,4,7,8,10...|
|[0.90348837209302...|[18.0697674418604...|       

In [45]:
# (13) Evaluate the performance of our Random Forest Classifier Model on the Test DataFrame using Area under a ROC curve
evaluator_rf_roc_area = BinaryClassificationEvaluator(rawPredictionCol = "rawPrediction", labelCol = label_column, metricName = "areaUnderROC")
print("Area Under ROC Curve on Test Data = %g" % evaluator_rf_roc_area.evaluate(test_random_forest_predictions_df))

Area Under ROC Curve on Test Data = 0.974593


In [46]:
# (14) Stop the Spark Context
sc.stop()