# Classification in PySpark
> Now that you are familiar with getting data into Spark, you'll move onto building two types of classification model - Decision Trees and Logistic Regression. 

credit: This notebook is inspired from datacamp machine learning with spark course

#Setting up PySpark in Colab
Spark is written in the Scala programming language and requires the Java Virtual Machine (JVM) to run. Therefore, our first task is to download Java.


In [None]:
!apt-get install openjdk-8-jdk-headless

Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
The following additional packages will be installed:
  openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 2 newly installed, 0 to remove and 42 not upgraded.
Need to get 36.5 MB of archives.
After this operation, 143 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu bionic-updates/universe amd64 openjdk-8-jre-headless amd64 8u312-b07-0ubuntu1~18.04 [28.2 MB]
Get:2 http://archive.ubuntu.com/ubuntu bionic-updates/universe amd64 openjdk-8-jdk-headless amd64 8u312-b07-0ubuntu1~18.

Now install Spark 3.2.1 with Hadoop 2.7

In [None]:
!wget https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz


--2022-05-19 12:46:15--  https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134, 2a01:4f8:172:2ec5::2
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 272637746 (260M) [application/x-gzip]
Saving to: ‘spark-3.2.1-bin-hadoop2.7.tgz’


2022-05-19 12:46:58 (6.14 MB/s) - ‘spark-3.2.1-bin-hadoop2.7.tgz’ saved [272637746/272637746]



 we just need to unzip that folder.


In [None]:
!tar xf /content/spark-3.2.1-bin-hadoop2.7.tgz


There is one last thing that we need to install and that is the findspark library. It will locate Spark on the system and import it as a regular library.


In [None]:
!pip install -q findspark


In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 46.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=982aa254ead564a54b1c1cf1f3b56b55edf98fb3abc3c2a435efb4736e3e8772
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop2.7"

We need to locate Spark in the system. For that, we import findspark and use the findspark.init() method.

In [None]:
import findspark
findspark.init()
findspark.find()

'/content/spark-3.2.1-bin-hadoop2.7'

Now that we have installed all the necessary dependencies in Colab, it is time to set the environment path. This will enable us to run Pyspark in the Colab environment.

In [None]:
import pyspark
import numpy as np
import pandas as pd

#Problem description

In this exercise, you will develop two classifiers to perform a binary classification task - The objective is to predict whether a flight is likely to be delayed by at least 15 minutes (label 1) or not (label 0).

## Data Preparation

### Removing columns and rows
You previously loaded airline flight data from a CSV file. You're going to develop a model which will predict whether or not a given flight will be delayed.

In this exercise you need to trim those data down by:

1. removing an uninformative column and
2. removing rows which do not have information about whether or not a flight was delayed.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').appName('flamingo').getOrCreate()

# Read data from CSV file
flamingo_df = spark.read.csv('./combined-data.csv', sep=',', header=True, inferSchema=True, nullValue='NA')

flamingo_df.show(5)


+------+-------------+---------+------------+----------------+----------+-----------+---------+
|userId|userSessionId|teamLevel|platformType|count_gameclicks|count_hits|count_buyId|avg_price|
+------+-------------+---------+------------+----------------+----------+-----------+---------+
|   812|         5648|        1|     android|              69|         8|       NULL|     NULL|
|  1658|         5649|        1|      iphone|              31|         5|       NULL|     NULL|
|  1589|         5650|        1|      iphone|              26|         2|       NULL|     NULL|
|  1863|         5651|        1|     android|              35|         4|       NULL|     NULL|
|   937|         5652|        1|     android|              39|         0|          1|     1.00|
+------+-------------+---------+------------+----------------+----------+-----------+---------+
only showing top 5 rows



#Total number of records

In [None]:
flamingo_df.count()
flamingo_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- userSessionId: integer (nullable = true)
 |-- teamLevel: integer (nullable = true)
 |-- platformType: string (nullable = true)
 |-- count_gameclicks: integer (nullable = true)
 |-- count_hits: integer (nullable = true)
 |-- count_buyId: string (nullable = true)
 |-- avg_price: string (nullable = true)



#Data dictionary:

In [None]:
# Remove the 'flight' column
flamingo_drop_column = flamingo_df.drop('userSessionId')

# Number of records with missing 'delay' values
flamingo_drop_column.filter('count_buyId IS NULL').count()

# Remove records with missing 'delay' values
flamingo_valid_buys = flamingo_drop_column.filter(flamingo_drop_column.count_buyId != "NULL")

# Remove records with missing values in any column and get the number of remaining rows
flamingo_none_missing = flamingo_valid_buys.dropna()
flamingo_none_missing.show(5)
print(flamingo_none_missing.count())

+------+---------+------------+----------------+----------+-----------+---------+
|userId|teamLevel|platformType|count_gameclicks|count_hits|count_buyId|avg_price|
+------+---------+------------+----------------+----------+-----------+---------+
|   937|        1|     android|              39|         0|          1|     1.00|
|  1623|        1|      iphone|             129|         9|          1|    10.00|
|    83|        1|     android|             102|        14|          1|     5.00|
|   121|        1|     android|              39|         4|          1|     3.00|
|   462|        1|     android|              90|        10|          1|     3.00|
+------+---------+------------+----------------+----------+-----------+---------+
only showing top 5 rows

1411


### Column manipulation

In [None]:
from pyspark.sql.functions import round
from pyspark.sql.types import IntegerType

# Create 'label' column indicating whether flight delayed (1) or not(0)
flamingo_none_missing = flamingo_none_missing.withColumn('label', (flamingo_none_missing.avg_price > 10).cast('integer'))

flamingo_none_missing = flamingo_none_missing.withColumn("count_buyId", flamingo_none_missing["count_buyId"].cast(IntegerType()))
flamingo_none_missing = flamingo_none_missing.withColumn("avg_price", flamingo_none_missing["avg_price"].cast(IntegerType()))

# Check first five records
flamingo_none_missing.show(5)

+------+---------+------------+----------------+----------+-----------+---------+-----+
|userId|teamLevel|platformType|count_gameclicks|count_hits|count_buyId|avg_price|label|
+------+---------+------------+----------------+----------+-----------+---------+-----+
|   937|        1|     android|              39|         0|          1|        1|    0|
|  1623|        1|      iphone|             129|         9|          1|       10|    0|
|    83|        1|     android|             102|        14|          1|        5|    0|
|   121|        1|     android|              39|         4|          1|        3|    0|
|   462|        1|     android|              90|        10|          1|        3|    0|
+------+---------+------------+----------------+----------+-----------+---------+-----+
only showing top 5 rows



### Categorical columns


In [None]:
from pyspark.ml.feature import StringIndexer

# Create an indexer
indexer = StringIndexer(inputCol='platformType', outputCol='platformType_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(flamingo_none_missing)

# Indexer creates a new column with numeric index values
flamingo_indexed = indexer_model.transform(flamingo_none_missing)
flamingo_indexed.show(2)

+------+---------+------------+----------------+----------+-----------+---------+-----+----------------+
|userId|teamLevel|platformType|count_gameclicks|count_hits|count_buyId|avg_price|label|platformType_idx|
+------+---------+------------+----------------+----------+-----------+---------+-----+----------------+
|   937|        1|     android|              39|         0|          1|        1|    0|             1.0|
|  1623|        1|      iphone|             129|         9|          1|       10|    0|             0.0|
+------+---------+------------+----------------+----------+-----------+---------+-----+----------------+
only showing top 2 rows



In [None]:
from pyspark.ml.feature import VectorAssembler

coef_var = [
    'userId',
    'teamLevel',
    'platformType_idx',
    'count_hits',
    'count_buyId', 
    'count_gameclicks',
]
# Create an assembler object
assembler = VectorAssembler(inputCols=coef_var, outputCol='features')

# Consolidate predictor columns
flamingo_assembled = assembler.transform(flamingo_indexed)

# Check the resulting column
flamingo_assembled.select('features', 'avg_price').show(5, truncate=False)

+------------------------------+---------+
|features                      |avg_price|
+------------------------------+---------+
|[937.0,1.0,1.0,0.0,1.0,39.0]  |1        |
|[1623.0,1.0,0.0,9.0,1.0,129.0]|10       |
|[83.0,1.0,1.0,14.0,1.0,102.0] |5        |
|[121.0,1.0,1.0,4.0,1.0,39.0]  |3        |
|[462.0,1.0,1.0,10.0,1.0,90.0] |3        |
+------------------------------+---------+
only showing top 5 rows



## Decision Tree


### Train/test split
To objectively assess a Machine Learning model you need to be able to test it on an independent set of data. You can't use the same data that you used to train the model: of course the model will perform (relatively) well on those data!

You will split the data into two components:

- training data (used to train the model) and
- testing data (used to test the model).

In [None]:
# Split into training and test sets in a 80:20 ratio
flamingo_train, flamingo_test = flamingo_assembled.randomSplit([0.8, 0.2], seed=123)

flamingo_train.describe().show()

# Check that training set has around 80% of records
training_ratio = flamingo_train.count() / flamingo_assembled.count()
print(training_ratio)

+-------+------------------+-----------------+------------+------------------+------------------+------------------+---------------+-------------------+------------------+
|summary|            userId|        teamLevel|platformType|  count_gameclicks|        count_hits|       count_buyId|      avg_price|              label|  platformType_idx|
+-------+------------------+-----------------+------------+------------------+------------------+------------------+---------------+-------------------+------------------+
|  count|              1125|             1125|        1125|              1125|              1125|              1125|           1125|               1125|              1125|
|   mean|1203.6542222222222|            4.848|        null|142.60622222222221| 16.52177777777778| 1.696888888888889|          7.104|0.26222222222222225|0.9644444444444444|
| stddev| 692.1448203919557|1.791391037862423|        null|116.22403875897975|13.129817077203429|0.9065004034282496|6.4571425872021| 0.44003

In [None]:
flamingo_test.describe().show()

+-------+-----------------+------------------+------------+------------------+------------------+------------------+------------------+-------------------+------------------+
|summary|           userId|         teamLevel|platformType|  count_gameclicks|        count_hits|       count_buyId|         avg_price|              label|  platformType_idx|
+-------+-----------------+------------------+------------+------------------+------------------+------------------+------------------+-------------------+------------------+
|  count|              286|               286|         286|               286|               286|               286|               286|                286|               286|
|   mean| 1190.93006993007|  4.77972027972028|        null| 155.5734265734266| 17.68881118881119|1.6293706293706294|7.1573426573426575|0.25874125874125875|1.0419580419580419|
| stddev|692.5280324971557|1.7938710251340138|        null|132.48613244930695|15.149466344118979|0.8760222834490293| 6.837953

### Build a Decision Tree
Now that you've split the flights data into training and testing sets, you can use the training set to fit a Decision Tree model.

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier()
tree_model = tree.fit(flamingo_train.select('features','avg_price',"label"))

# Create predictions for the testing data and take a look at the predictions
prediction = tree_model.transform(flamingo_test)
prediction.select('label', 'prediction', 'probability').show(5, False)

+-----+----------+-----------------------------------------+
|label|prediction|probability                              |
+-----+----------+-----------------------------------------+
|0    |0.0       |[0.92152466367713,0.07847533632286996]   |
|0    |0.0       |[0.5443037974683544,0.45569620253164556] |
|0    |0.0       |[0.9952380952380953,0.004761904761904762]|
|1    |0.0       |[0.5443037974683544,0.45569620253164556] |
|1    |1.0       |[0.3695652173913043,0.6304347826086957]  |
+-----+----------+-----------------------------------------+
only showing top 5 rows



### Evaluate the Decision Tree
You can assess the quality of your model by evaluating how well it performs on the testing data. Because the model was not trained on these data, this represents an objective assessment of the model.

A confusion matrix gives a useful breakdown of predictions versus known values. It has four cells which represent the counts of:

- True Negatives (TN) — model predicts negative outcome & known outcome is negative
- True Positives (TP) — model predicts positive outcome & known outcome is positive
- False Negatives (FN) — model predicts negative outcome but known outcome is positive
- False Positives (FP) — model predicts positive outcome but known outcome is negative.

In [None]:
# Create a confusion matrix
prediction.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label = 1').count()
FP = prediction.filter('prediction = 1 AND label = 0').count()

print(TN)
print(TP)
print(FN)
print(FP)

# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN + TP + FN + FP)
print(accuracy)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|   24|
|    0|       0.0|  185|
|    1|       1.0|   50|
|    0|       1.0|   27|
+-----+----------+-----+

185
50
24
27
0.8216783216783217
