In [None]:
# (1) Import the required Python dependencies
%matplotlib inline
import matplotlib.pyplot as plt
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
# (2) Instantiate a Spark Context
conf = SparkConf().setMaster("local").setAppName("CART - Congressional Voting")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [None]:
# (3) Load the Congressional Voting dataset (data/congressional-voting-data/house-votes-84.csv) into a Spark DataFrame
schema = StructType([
    StructField("party", StringType()),
    StructField("handicapped_infants", StringType()),
    StructField("water_project_cost_sharing", StringType()),
    StructField("adoption_of_the_budget_resolution", StringType()),
    StructField("physician_fee_freeze", StringType()),
    StructField("el_salvador_aid", StringType()),
    StructField("religious_groups_in_schools", StringType()),
    StructField("anti_satellite_test_ban", StringType()),
    StructField("aid_to_nicaraguan_contras", StringType()),
    StructField("mx_missile", StringType()),
    StructField("immigration", StringType()),
    StructField("synfuels_corporation_cutback", StringType()),
    StructField("education_spending", StringType()),
    StructField("superfund_right_to_sue", StringType()),
    StructField("crime", StringType()),
    StructField("duty_free_exports", StringType()),
    StructField("export_administration_act_south_africa", StringType())
])

congressional_voting_df = sqlContext.read.format('com.databricks.spark.csv').schema(schema).options(header = 'false', inferschema = 'false').load('./data/congressional-voting-data/house-votes-84.data')
congressional_voting_df.show(2)

In [None]:
# (4) Index the relevant categorical and label variables using a Pipeline of stages
categorical_columns = ['handicapped_infants', 'water_project_cost_sharing', 'adoption_of_the_budget_resolution', 'physician_fee_freeze', 'el_salvador_aid', 'religious_groups_in_schools', 'anti_satellite_test_ban', 'aid_to_nicaraguan_contras', 'mx_missile', 'immigration', 'synfuels_corporation_cutback', 'education_spending', 'superfund_right_to_sue', 'crime', 'duty_free_exports', 'export_administration_act_south_africa']
pipeline_stages = []
for categorial_column in categorical_columns:
    string_indexer = StringIndexer(inputCol = categorial_column, outputCol = categorial_column + 'Index')
    encoder = OneHotEncoder(inputCols = [string_indexer.getOutputCol()], outputCols=[categorial_column + "classVec"])
    pipeline_stages += [string_indexer, encoder]
    
label_string_idx = StringIndexer(inputCol = 'party', outputCol = 'label')
pipeline_stages += [label_string_idx]
vector_assembler_inputs = [c + "classVec" for c in categorical_columns]
vector_assembler = VectorAssembler(inputCols = vector_assembler_inputs, outputCol = "features")
pipeline_stages += [vector_assembler]

In [None]:
# (5) Generate Input Feature Vectors from the Raw Spark DataFrame by executing the previously constructed Pipeline
pipeline = Pipeline(stages = pipeline_stages)
pipeline_model = pipeline.fit(congressional_voting_df)
label_column = 'label'
congressional_voting_features_df = pipeline_model.transform(congressional_voting_df).select(['features', label_column, 'party'])
pd.DataFrame(congressional_voting_features_df.take(5), columns=congressional_voting_features_df.columns).transpose()

In [None]:
# (6) Split the Raw Features and Labelled DataFrame into a Training DataFrame and a Test DataFrame
train_df, test_df = congressional_voting_features_df.randomSplit([0.75, 0.25], seed=12345)
train_df.count(), test_df.count()

In [None]:
# (7) Train a Classification Tree Model on the Training DataFrame
decision_tree = DecisionTreeClassifier(featuresCol = 'features', labelCol = label_column)
decision_tree_model = decision_tree.fit(train_df)

In [None]:
# (8) Apply the Trained Classification Tree Model to the Test DataFrame to make predictions
test_decision_tree_predictions_df = decision_tree_model.transform(test_df)
print("TEST DATASET PREDICTIONS AGAINST ACTUAL LABEL: ")
test_decision_tree_predictions_df.select("probability", "rawPrediction", "prediction", label_column, "features").show()

In [None]:
# (9) Evaluate the performance of our Classification Tree Model on the Test DataFrame using Area under a ROC curve
evaluator_roc_area = BinaryClassificationEvaluator(rawPredictionCol = "rawPrediction", labelCol = label_column, metricName = "areaUnderROC")
print("Area Under ROC Curve on Test Data = %g" % evaluator_roc_area.evaluate(test_decision_tree_predictions_df))

In [None]:
# (10) Visualise the Classification Tree
print(str(decision_tree_model.toDebugString))

In [None]:
# (11) Train a Random Forest Classifier Model on the Training DataFrame
random_forest = RandomForestClassifier(featuresCol = 'features', labelCol = label_column)
random_forest_model = random_forest.fit(train_df)

In [None]:
# (12) Apply the Trained Random Forest Classifier Model to the Test DataFrame to make predictions
test_random_forest_predictions_df = random_forest_model.transform(test_df)
print("TEST DATASET PREDICTIONS AGAINST ACTUAL LABEL: ")
test_random_forest_predictions_df.select("probability", "rawPrediction", "prediction", label_column, "features").show()

In [None]:
# (13) Evaluate the performance of our Random Forest Classifier Model on the Test DataFrame using Area under a ROC curve
evaluator_rf_roc_area = BinaryClassificationEvaluator(rawPredictionCol = "rawPrediction", labelCol = label_column, metricName = "areaUnderROC")
print("Area Under ROC Curve on Test Data = %g" % evaluator_rf_roc_area.evaluate(test_random_forest_predictions_df))

In [None]:
# (14) Stop the Spark Context
sc.stop()