## Module 7 Activity 1

**Link to download data:**
GitHub Session7
<br>
**perform the following tasks using PySpark:** 


1. Use the training dataset to create a decision tree model (Model 1) to predict a customer’s Income using Marital Status and Capital Gains and Losses; 
2. Use the test data set to evaluate Model 1. Construct a contingency table to compare the actual and predicted values for Income; 
3. Use the training dataset to create a decision tree model (Model 2) to predict a customer’s Income using Marital Status and Age; 
4. Use the test data set to evaluate Model 2. Construct a contingency table to compare the actual and predicted values for Income; 
5. Compare Model 1 and Model 2 in terms of accuracy, precision, recall and the F1 scores. 

#### Importing Libraries

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from sklearn.metrics import classification_report

#### Initialize Spark Session

In [3]:
spark = SparkSession.builder.appName("SupervisedLearning").getOrCreate()

#### Load the dataset

In [4]:
df1_training = spark.read.csv('DSPR_Data_Sets/adult_ch6_training.csv', header=True, inferSchema=True)
df1_testing = spark.read.csv('DSPR_Data_Sets/adult_ch6_test.csv', header=True, inferSchema=True)

In [5]:
df1_training.head(5)

[Row(age=52, Marital status='Never-married', Income='<=50K', Cap_Gains_Losses=0.02174),
 Row(age=37, Marital status='Divorced', Income='<=50K', Cap_Gains_Losses=0.0),
 Row(age=53, Marital status='Married', Income='<=50K', Cap_Gains_Losses=0.0),
 Row(age=45, Marital status='Married', Income='<=50K', Cap_Gains_Losses=0.0),
 Row(age=39, Marital status='Married', Income='<=50K', Cap_Gains_Losses=0.0)]

#### Handle missing data

In [6]:
df1_training = df1_training.dropna()
df1_testing = df1_testing.dropna()

#### Showing the type of each column

In [7]:
df1_training.printSchema()

root
 |-- age: integer (nullable = true)
 |-- Marital status: string (nullable = true)
 |-- Income: string (nullable = true)
 |-- Cap_Gains_Losses: double (nullable = true)



#### Create a list including all categorical columns of INPUTS

In [8]:
categorical_cols = ['Marital status', 'Income']

#### StringIndexer: convert categorical string columns in a DataFrame into numerical indices

##### Training Data

In [9]:
indexers = [StringIndexer(inputCol=col, outputCol=col + "Numeric").fit(df1_training) for col in categorical_cols]

pipeline = Pipeline(stages=indexers)
df1_training_encoded = pipeline.fit(df1_training).transform(df1_training)

In [10]:
# Show the dataset
df1_training_encoded.toPandas()

Unnamed: 0,age,Marital status,Income,Cap_Gains_Losses,Marital statusNumeric,IncomeNumeric
0,52,Never-married,<=50K,0.02174,1.0,0.0
1,37,Divorced,<=50K,0.00000,2.0,0.0
2,53,Married,<=50K,0.00000,0.0,0.0
3,45,Married,<=50K,0.00000,0.0,0.0
4,39,Married,<=50K,0.00000,0.0,0.0
...,...,...,...,...,...,...
18756,40,Divorced,<=50K,0.00000,2.0,0.0
18757,37,Married,<=50K,0.00000,0.0,0.0
18758,42,Married,<=50K,0.00000,0.0,0.0
18759,47,Divorced,<=50K,0.00000,2.0,0.0


##### Testing Data

In [11]:
indexers = [StringIndexer(inputCol=col, outputCol=col + "Numeric").fit(df1_testing) for col in categorical_cols]

pipeline = Pipeline(stages=indexers)
df1_testing_encoded = pipeline.fit(df1_testing).transform(df1_testing)

In [12]:
# Show the dataset
df1_testing_encoded.toPandas()

Unnamed: 0,age,Marital status,Income,Cap_Gains_Losses,Marital statusNumeric,IncomeNumeric
0,52,Married,<=50K,0.000000,0.0,0.0
1,69,Married,>50K,0.051781,0.0,1.0
2,51,Never-married,<=50K,0.000000,1.0,0.0
3,40,Divorced,>50K,0.000000,2.0,1.0
4,78,Married,>50K,0.000000,0.0,1.0
...,...,...,...,...,...,...
6150,36,Married,<=50K,0.000000,0.0,0.0
6151,46,Never-married,<=50K,0.000000,1.0,0.0
6152,41,Divorced,<=50K,0.000000,2.0,0.0
6153,46,Never-married,<=50K,0.000000,1.0,0.0


#### VectorAssembler: combining a given list of columns into a **single vector** column.

##### Training Data

In [13]:
# Define feature columns and assemble them as a vector
assembler = VectorAssembler(
    inputCols=['Marital statusNumeric', 'Cap_Gains_Losses'],
    outputCol='features')

df1_training_assembled = assembler.transform(df1_training_encoded)

Now, all Inputs(features) have been assembled into a single vector, titled as 'features'.

In [14]:
df1_training_assembled.toPandas()

Unnamed: 0,age,Marital status,Income,Cap_Gains_Losses,Marital statusNumeric,IncomeNumeric,features
0,52,Never-married,<=50K,0.02174,1.0,0.0,"[1.0, 0.02174]"
1,37,Divorced,<=50K,0.00000,2.0,0.0,"[2.0, 0.0]"
2,53,Married,<=50K,0.00000,0.0,0.0,"(0.0, 0.0)"
3,45,Married,<=50K,0.00000,0.0,0.0,"(0.0, 0.0)"
4,39,Married,<=50K,0.00000,0.0,0.0,"(0.0, 0.0)"
...,...,...,...,...,...,...,...
18756,40,Divorced,<=50K,0.00000,2.0,0.0,"[2.0, 0.0]"
18757,37,Married,<=50K,0.00000,0.0,0.0,"(0.0, 0.0)"
18758,42,Married,<=50K,0.00000,0.0,0.0,"(0.0, 0.0)"
18759,47,Divorced,<=50K,0.00000,2.0,0.0,"[2.0, 0.0]"


##### Testing Data

In [15]:
# Define feature columns and assemble them as a vector
assembler = VectorAssembler(
    inputCols=['Marital statusNumeric', 'Cap_Gains_Losses'],
    outputCol='features')

df1_testing_assembled = assembler.transform(df1_testing_encoded)

Now, all Inputs(features) have been assembled into a single vector, titled as 'features'.

In [16]:
df1_testing_assembled.toPandas()

Unnamed: 0,age,Marital status,Income,Cap_Gains_Losses,Marital statusNumeric,IncomeNumeric,features
0,52,Married,<=50K,0.000000,0.0,0.0,"(0.0, 0.0)"
1,69,Married,>50K,0.051781,0.0,1.0,"[0.0, 0.051781]"
2,51,Never-married,<=50K,0.000000,1.0,0.0,"[1.0, 0.0]"
3,40,Divorced,>50K,0.000000,2.0,1.0,"[2.0, 0.0]"
4,78,Married,>50K,0.000000,0.0,1.0,"(0.0, 0.0)"
...,...,...,...,...,...,...,...
6150,36,Married,<=50K,0.000000,0.0,0.0,"(0.0, 0.0)"
6151,46,Never-married,<=50K,0.000000,1.0,0.0,"[1.0, 0.0]"
6152,41,Divorced,<=50K,0.000000,2.0,0.0,"[2.0, 0.0]"
6153,46,Never-married,<=50K,0.000000,1.0,0.0,"[1.0, 0.0]"


From this point forward, we just need two columns:
1. **features** which includes all Inputs
2. **IncomeNumeric** which is the Output of the model

#### Filtering the Input and Output columns into a new dataframe

In [17]:
df1_training_assembled_filtered = df1_training_assembled.select("features", "IncomeNumeric")
df1_training_assembled_filtered.toPandas()

Unnamed: 0,features,IncomeNumeric
0,"[1.0, 0.02174]",0.0
1,"[2.0, 0.0]",0.0
2,"(0.0, 0.0)",0.0
3,"(0.0, 0.0)",0.0
4,"(0.0, 0.0)",0.0
...,...,...
18756,"[2.0, 0.0]",0.0
18757,"(0.0, 0.0)",0.0
18758,"(0.0, 0.0)",0.0
18759,"[2.0, 0.0]",0.0


In [18]:
df1_testing_assembled_filtered = df1_testing_assembled.select("features", "IncomeNumeric")
df1_testing_assembled_filtered.toPandas()

Unnamed: 0,features,IncomeNumeric
0,"(0.0, 0.0)",0.0
1,"[0.0, 0.051781]",1.0
2,"[1.0, 0.0]",0.0
3,"[2.0, 0.0]",1.0
4,"(0.0, 0.0)",1.0
...,...,...
6150,"(0.0, 0.0)",0.0
6151,"[1.0, 0.0]",0.0
6152,"[2.0, 0.0]",0.0
6153,"[1.0, 0.0]",0.0


### Building the MODEL

#### Train a Decision Tree model

In [19]:
dtc1 = DecisionTreeClassifier(featuresCol='features', labelCol="IncomeNumeric")
model1 = dtc1.fit(df1_training_assembled_filtered)

### Prediction using the Trained Model

In [20]:
# Predictions using test_data
predictions1 = model1.transform(df1_testing_assembled_filtered)

TP (True Positives):The ones with Actual Label = 1 and Predicted Label = 1 <br>
FP (False Positives):The one with Actual Label = 0 and Predicted Label = 1<br>
TN (True Negatives): The ones with Actual Label = 0 and Predicted Label = 0<br>
FN (False Negatives):The one with Actual Label = 1 and Predicted Label = 0


In [21]:
# "Raw prediction" for each possible label. The meaning of a "raw" prediction may vary between algorithms, but it intuitively gives a measure of confidence in each possible label (where larger = more confident).
predictions1.toPandas()

Unnamed: 0,features,IncomeNumeric,rawPrediction,probability,prediction
0,"(0.0, 0.0)",0.0,"[4821.0, 2865.0]","[0.6272443403590945, 0.37275565964090557]",0.0
1,"[0.0, 0.051781]",1.0,"[4821.0, 2865.0]","[0.6272443403590945, 0.37275565964090557]",0.0
2,"[1.0, 0.0]",0.0,"[8951.0, 424.0]","[0.9547733333333334, 0.045226666666666665]",0.0
3,"[2.0, 0.0]",1.0,"[8951.0, 424.0]","[0.9547733333333334, 0.045226666666666665]",0.0
4,"(0.0, 0.0)",1.0,"[4821.0, 2865.0]","[0.6272443403590945, 0.37275565964090557]",0.0
...,...,...,...,...,...
6150,"(0.0, 0.0)",0.0,"[4821.0, 2865.0]","[0.6272443403590945, 0.37275565964090557]",0.0
6151,"[1.0, 0.0]",0.0,"[8951.0, 424.0]","[0.9547733333333334, 0.045226666666666665]",0.0
6152,"[2.0, 0.0]",0.0,"[8951.0, 424.0]","[0.9547733333333334, 0.045226666666666665]",0.0
6153,"[1.0, 0.0]",0.0,"[8951.0, 424.0]","[0.9547733333333334, 0.045226666666666665]",0.0


In [22]:
# Print Decision Tree rules
print(model1.toDebugString)


DecisionTreeClassificationModel: uid=DecisionTreeClassifier_48be34c53a21, depth=4, numNodes=11, numClasses=2, numFeatures=2
  If (feature 0 in {1.0,2.0,3.0,4.0})
   If (feature 1 <= 0.053166000000000005)
    Predict: 0.0
   Else (feature 1 > 0.053166000000000005)
    If (feature 1 <= 0.15007500000000001)
     If (feature 0 in {3.0})
      Predict: 0.0
     Else (feature 0 not in {3.0})
      Predict: 1.0
    Else (feature 1 > 0.15007500000000001)
     Predict: 0.0
  Else (feature 0 not in {1.0,2.0,3.0,4.0})
   If (feature 1 <= 0.053166000000000005)
    Predict: 0.0
   Else (feature 1 > 0.053166000000000005)
    Predict: 1.0



### Evaluate the performance of a binary classification model

**BinaryClassificationEvaluator:** This is an evaluator for binary classification, which expects two input columns: **raw prediction** and **label**.

Parameters:

`rawPredictionCol="rawPrediction"`: This parameter tells the evaluator to expect the column named "rawPrediction" in the dataset (typically predictions in this context) to hold the raw prediction values from the model.
`labelCol="GradeNumeric"`: This parameter tells the evaluator that the true labels for the binary classification task can be found in the "GradeNumeric" column of the dataset.
evaluate() Method:

`evaluator.evaluate(predictions)`: This is where the actual evaluation happens. The evaluate() method computes the metric (Area Under ROC, by default) for the predictions dataset using the true labels and raw predictions.

**Area Under ROC:**

The code calculates the Area Under the Receiver Operating Characteristic (ROC) curve, which is a metric used to evaluate the performance of binary classification models. The value of Area Under ROC (often abbreviated as AUC) ranges between 0 and 1. A value of 0.5 indicates no discriminative power (i.e., the model is as good as random guessing), while a value of 1.0 indicates perfect classification. A higher AUC indicates a better model.

In [23]:
evaluator1 = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="IncomeNumeric")
area_under_roc_1 = evaluator1.evaluate(predictions1)
print("Area Under ROC:", area_under_roc_1)

Area Under ROC: 0.7121148294890319


When dealing with Spark's Machine Learning Library (MLlib), often one needs to evaluate the performance of a model, especially for classification tasks. In order to do that, you often use evaluators that require the prediction and actual label in a specific format.

Convert 'predictions' DataFrame to an Resilient Distributed Dataset(RDD) of (prediction, label) tuples" means that you need to transform the DataFrame (predictions) which contains predicted and actual values into a Resilient Distributed Dataset (RDD) that consists of tuples. Each tuple in this RDD contains two elements: the **predicted value** (often the first element) and the **actual label** (often the second element).

Each tuple in this RDD contains two elements: the predicted value (often the first element) and the actual label (often the second element).


In [24]:
#  Convert 'predictions' DataFrame to an RDD of (prediction, label) tuples

prediction_and_label_1 = predictions1.select("prediction", "IncomeNumeric").rdd.map(lambda row: (float(row["prediction"]), float(row["IncomeNumeric"])))
prediction_and_label_1

PythonRDD[143] at RDD at PythonRDD.scala:53

In [25]:
# Using 'collect' to show the content of a RDD
for pred, label in prediction_and_label_1.collect():
    print(f"Prediction: {pred}, Actual Label: {label}")

Prediction: 0.0, Actual Label: 0.0
Prediction: 0.0, Actual Label: 1.0
Prediction: 0.0, Actual Label: 0.0
Prediction: 0.0, Actual Label: 1.0
Prediction: 0.0, Actual Label: 1.0
Prediction: 0.0, Actual Label: 0.0
Prediction: 1.0, Actual Label: 0.0
Prediction: 0.0, Actual Label: 0.0
Prediction: 0.0, Actual Label: 0.0
Prediction: 0.0, Actual Label: 0.0
Prediction: 0.0, Actual Label: 1.0
Prediction: 0.0, Actual Label: 0.0
Prediction: 1.0, Actual Label: 1.0
Prediction: 0.0, Actual Label: 0.0
Prediction: 0.0, Actual Label: 0.0
Prediction: 0.0, Actual Label: 0.0
Prediction: 0.0, Actual Label: 1.0
Prediction: 0.0, Actual Label: 0.0
Prediction: 0.0, Actual Label: 0.0
Prediction: 1.0, Actual Label: 1.0
Prediction: 1.0, Actual Label: 1.0
Prediction: 0.0, Actual Label: 0.0
Prediction: 0.0, Actual Label: 1.0
Prediction: 0.0, Actual Label: 0.0
Prediction: 0.0, Actual Label: 1.0
Prediction: 0.0, Actual Label: 0.0
Prediction: 0.0, Actual Label: 0.0
Prediction: 0.0, Actual Label: 1.0
Prediction: 0.0, Act

### Confusion Matrix

Where:

- **TN (True Negative):** The number of actual negatives (0s) that were correctly predicted as negatives by the model.
- **FP (False Positive):** The number of actual negatives (0s) that were incorrectly predicted as positives (1s) by the model.
- **FN (False Negative):** The number of actual positives (1s) that were incorrectly predicted as negatives (0s) by the model.
- **TP (True Positive):** The number of actual positives (1s) that were correctly predicted as positives by the model.


###### Interpretation:

**High values of TP and TN, along with low values of FP and FN, generally indicate a good model.**

In [27]:
# Create a MulticlassMetrics object to develop the Confusion Matrix
metrics_1 = MulticlassMetrics(prediction_and_label_1)
confusion_matrix_1 = metrics_1.confusionMatrix()

In [28]:
# Step 17:Print the confusion matrix
print("Confusion Matrix:")
print(confusion_matrix_1)

Confusion Matrix:
DenseMatrix([[4614.,   60.],
             [1124.,  357.]])


### Using Scikit-learn package to get a detailed classification report

In [29]:
# Convert 'predictions' DataFrame to a Pandas DataFrame
predictions_pd_1 = predictions1.select("prediction", "IncomeNumeric").toPandas()

In [30]:
# Step 19: Calculate classification report
report1 = classification_report(predictions_pd_1["IncomeNumeric"], predictions_pd_1["prediction"])
print("Classification Report:")
print(report1)

Classification Report:
              precision    recall  f1-score   support

         0.0       0.80      0.99      0.89      4674
         1.0       0.86      0.24      0.38      1481

    accuracy                           0.81      6155
   macro avg       0.83      0.61      0.63      6155
weighted avg       0.82      0.81      0.76      6155



## Model 2

In [32]:
#Assembler (training)
assembler = VectorAssembler(
    inputCols=['age', 'Cap_Gains_Losses'],
    outputCol='features')

df2_training_assembled = assembler.transform(df1_training_encoded)

#Assembler (training)

df2_testing_assembled = assembler.transform(df1_testing_encoded)

#Filtering
df2_training_assembled_filtered = df2_training_assembled.select("features", "IncomeNumeric")
df2_testing_assembled_filtered = df2_testing_assembled.select("features", "IncomeNumeric")

# Model building and training
dtc2 = DecisionTreeClassifier(featuresCol='features', labelCol="IncomeNumeric")
model2 = dtc2.fit(df2_training_assembled_filtered)

# Predictions using test_data
predictions2 = model2.transform(df2_testing_assembled_filtered)

# Evaluation
evaluator2 = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="IncomeNumeric")
area_under_roc_2 = evaluator2.evaluate(predictions2)
print("Area Under ROC:", area_under_roc_2)

# RDD
prediction_and_label_2 = predictions2.select("prediction", "IncomeNumeric").rdd.map(lambda row: (float(row["prediction"]), float(row["IncomeNumeric"])))

# Confusion Matrix
metrics_2 = MulticlassMetrics(prediction_and_label_2)
confusion_matrix_2 = metrics_2.confusionMatrix()
print("Confusion Matrix:")
print(confusion_matrix_2)

# Classification Report:
predictions_pd_2 = predictions2.select("prediction", "IncomeNumeric").toPandas()
report2 = classification_report(predictions_pd_2["IncomeNumeric"], predictions_pd_2["prediction"])
print("Classification Report:")
print(report2)

Area Under ROC: 0.6495179851937117
Confusion Matrix:
DenseMatrix([[4656.,   18.],
             [ 533.,  948.]])




Classification Report:
              precision    recall  f1-score   support

         0.0       0.90      1.00      0.94      4674
         1.0       0.98      0.64      0.77      1481

    accuracy                           0.91      6155
   macro avg       0.94      0.82      0.86      6155
weighted avg       0.92      0.91      0.90      6155



### Contingency Table : 
is a tabular mechanism with at least two rows and two columns used in statistics to present categorical data in terms of frequency counts.

In [None]:
# Input: a categorical variable such as marital status
# Output: categorical such as Income

##### Training Data

In [33]:
df1_training_pd = df1_training.toPandas()
df1_training_pd

Unnamed: 0,age,Marital status,Income,Cap_Gains_Losses
0,52,Never-married,<=50K,0.02174
1,37,Divorced,<=50K,0.00000
2,53,Married,<=50K,0.00000
3,45,Married,<=50K,0.00000
4,39,Married,<=50K,0.00000
...,...,...,...,...
18756,40,Divorced,<=50K,0.00000
18757,37,Married,<=50K,0.00000
18758,42,Married,<=50K,0.00000
18759,47,Divorced,<=50K,0.00000


In [34]:
marital_status_ct = pd.crosstab(df1_training_pd['Marital status'],df1_training_pd['Income'],margins = False)
marital_status_ct

Income,<=50K,>50K
Marital status,Unnamed: 1_level_1,Unnamed: 2_level_1
Divorced,2292,266
Married,5011,3859
Never-married,5885,287
Separated,559,38
Widowed,524,40


In [None]:
# Input: a non-categorical variable such as age
# Output: categorical such as Income

In [35]:
age_ct = pd.crosstab(df1_training_pd['age'],df1_training_pd['Income'],margins = False)
age_ct

Income,<=50K,>50K
age,Unnamed: 1_level_1,Unnamed: 2_level_1
30,0,106
31,0,86
32,0,69
33,0,105
34,0,95
35,693,78
36,682,86
37,727,82
38,717,98
39,663,107


In [None]:
# Convert 'age' to categorical categories

In [36]:
import numpy as np
bins = [0, 20, 30, 40, 50, 60, 70, 80, np.inf]
names = ['<20','20-30', '30-40', '40-50', '50-60', '60-70', '70-80', '80+']

df1_training_pd['age_categorical'] = pd.cut(df1_training_pd['age'], bins, labels=names)
df1_training_pd

Unnamed: 0,age,Marital status,Income,Cap_Gains_Losses,age_categorical
0,52,Never-married,<=50K,0.02174,50-60
1,37,Divorced,<=50K,0.00000,30-40
2,53,Married,<=50K,0.00000,50-60
3,45,Married,<=50K,0.00000,40-50
4,39,Married,<=50K,0.00000,30-40
...,...,...,...,...,...
18756,40,Divorced,<=50K,0.00000,30-40
18757,37,Married,<=50K,0.00000,30-40
18758,42,Married,<=50K,0.00000,40-50
18759,47,Divorced,<=50K,0.00000,40-50


In [37]:
age_categorical_ct = pd.crosstab(df1_training_pd['age_categorical'],df1_training_pd['Income'],margins = False)
age_categorical_ct

Income,<=50K,>50K
age_categorical,Unnamed: 1_level_1,Unnamed: 2_level_1
20-30,0,106
30-40,4211,895
40-50,7152,853
50-60,2908,872
60-70,0,926
70-80,0,838


In [None]:
# Multiple Variables

In [38]:
multiple_ct = pd.crosstab([df1_training_pd['age_categorical'],df1_training_pd['Marital status']],df1_training_pd['Income'],margins = False)
multiple_ct

Unnamed: 0_level_0,Income,<=50K,>50K
age_categorical,Marital status,Unnamed: 2_level_1,Unnamed: 3_level_1
20-30,Divorced,0,10
20-30,Married,0,83
20-30,Never-married,0,9
20-30,Separated,0,4
30-40,Divorced,697,49
30-40,Married,1464,779
30-40,Never-married,1745,47
30-40,Separated,158,12
30-40,Widowed,147,8
40-50,Divorced,1142,48


In [39]:
# Stop the Spark session (if you're done with other tasks)
spark.stop()