<a href="https://colab.research.google.com/github/KeoratileShongoane/AML/blob/main/Spark_(1)_(2).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

***
# **Data Science in Practise**: Assignment 2
#### __Name:__ Keoratile Shongoane
#### __Student number:__ 1389986

***




```
`# This is formatted as code`
```

# Table of Contents

- [Imports](#imports)
- [1. Starting Spark Session](#1-starting-spark-session)
- [2. Preprocessing](#2-preprocessing)
  - [2.1 Importing Data](#21-importing-data)
  - [2.2 Viewing data types](#22-viewing-data-types)
  - [2.3 Dropping missing values](#23-dropping-missing-values)
  - [2.4 Transposing Data](#24-transposing-data)
  - [2.5 Calculating Descriptive statistics](#25-calculating-descriptive-statistics)
  - [2.6 Encoding](#26-encoding)
  - [2.7 Transforming Features](#27-transforming-features)
  - [2.8 Encoding Target Column](#28-encoding-target-column)
  - [2.9 Train-Test Split](#29-train-test-split)
- [3. Model Training](#3-model-training)
  - [3.1 Random Forest Classifier](#31-random-forest-classifier)
  - [3.2 Decision Tree Classifier](#32-decision-tree-classifier)
- [4. Model Evaluation](#4-model-evaluation)
  - [4.1 Random Forest Classifier](#41-random-forest-classifier)
  - [4.2 Decision Tree Classifier](#42-decision-tree-classifier)

# __Imports__

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=fe6453232e7c469302aad7951c4b0551538524bfb8da305fadc208e15662208d
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


The cell below imports spark, starts a spark session, and then prints out the details of the running spark instance.

In [None]:
# Import SparkSession
from pyspark.sql import SparkSession, column
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F
import numpy as np
import pandas as pd



# __1. Starting Spark Session__

In [None]:
# Create a Spark Session
spark = SparkSession.builder.appName('ml-income').getOrCreate()

# __2. Proprocessing__

### __2.1 Importing Data__

In [None]:
df = spark.read.csv('income.csv', header=True, nullValue='?',ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, inferSchema=True)
# df = spark.read.csv('C:/User/tshol/Downloads/income.csv', header=True, nullValue='?',ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, inferSchema=True)


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/income.csv.

### __2.2 Viewing data types__

In [None]:
df.printSchema()

### __2.3 Dropping missing values__

In [None]:
df = df.na.drop()
df.show()

### __2.4 Transposing Data__

In [None]:

pd.DataFrame(df.take(5),columns=df.columns).transpose()

### __2.5 Calculating Descriptive statistics__

In [None]:
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int']
df.select(numeric_features).describe().toPandas().transpose()

 ### __2.6 Encoding__

In [None]:
label_stringIdx= StringIndexer(inputCols=["workclass", "education", "marital_status","occupation", "relationship", "race","sex","citizenship"], outputCols=["label_workclass", "label_education", "label_marital_status", "label_occupation", "label_relationship","label_race", "label_sex", "label_citizenship"])
df = label_stringIdx.fit(df).transform(df)
df.show()

### __2.7 Transforming Features__

In [None]:
#using the vector assember to feature transform

numericCols =["label_workclass", "label_education", "label_marital_status", "label_occupation", "label_relationship","label_race", "label_sex", "label_citizenship"]
assembler = VectorAssembler(inputCols=numericCols, outputCol="features")
df = assembler.transform(df)
df.show()

### __2.8 Encoding Target Column__

In [None]:
labels = StringIndexer(inputCol="income_class", outputCol="label")
df = labels.fit(df).transform(df)
df.show()

In [None]:
# pd.DataFrame(df.take(100),columns=df.columns)

### __2.9 Train-Test Split__

In [None]:
#splitting our datasets
train, test = df.randomSplit([0.7,0.3])


In [None]:
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

# __3. Model Training__

#### Here, I train 2 models, namely: `Decision Tree Classifier` and `Random Forest Classifier`.

### __3.1 Random Forest Classifier__

In [None]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", maxBins=41)
modelrf = rf.fit(train)

In [None]:
#The RandomnForest Predictor
predictionsrf = modelrf.transform(test)
predictionsrf.select("features", "label").show(25)

### __3.2 Decision Tree Classifier__

In [None]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxBins=41)
modeldt = dt.fit(train)

In [None]:
#the DecisionTree Prediction
predictionsdt = modeldt.transform(test)
predictionsdt.select("features", "label").show(25)

# __4. Model Evaluation__

### __4.1 Random Forest Classifier__

In [None]:
# Initialize evaluators for RandomForest
evaluator_accuracy_rf = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_precision_rf = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall_rf = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
evaluator_f1_rf = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

# Evaluate metrics for RandomForest
accuracy_rf = evaluator_accuracy_rf.evaluate(predictionsrf)
precision_rf = evaluator_precision_rf.evaluate(predictionsrf)
recall_rf = evaluator_recall_rf.evaluate(predictionsrf)
f1_rf = evaluator_f1_rf.evaluate(predictionsrf)

print("RandomForest Metrics:\n")
print(f"Accuracy = {accuracy_rf}")
print(f"Precision = {precision_rf}")
print(f"Recall = {recall_rf}")
print(f"F1 Score = {f1_rf}")
print(f"Test Error = {1.0 - accuracy_rf}")


In [None]:
#confusionmetric
preds = predictionsrf.select(['prediction','label']).withColumn('label', F.col('label').cast(FloatType()))

preds = preds.select(['prediction', 'label'])
metrics = MulticlassMetrics(preds.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())

### __4.2 Decision Tree Classifier__

In [None]:

# Initialize evaluators for DecisionTree
evaluator_accuracy_dt = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_precision_dt = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall_dt = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
evaluator_f1_dt = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

# Evaluate metrics for DecisionTree
accuracy_dt = evaluator_accuracy_dt.evaluate(predictionsdt)
precision_dt = evaluator_precision_dt.evaluate(predictionsdt)
recall_dt = evaluator_recall_dt.evaluate(predictionsdt)
f1_dt = evaluator_f1_dt.evaluate(predictionsdt)

print("DecisionTree Metrics:\n")
print(f"Accuracy = {accuracy_dt}")
print(f"Precision = {precision_dt}")
print(f"Recall = {recall_dt}")
print(f"F1 Score = {f1_dt}")
print(f"Test Error = {1.0 - accuracy_dt}")


In [None]:
preds = predictionsdt.select(['prediction','label']).withColumn('label', F.col('label').cast(FloatType()))

preds = preds.select(['prediction', 'label'])
metrics = MulticlassMetrics(preds.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())