# Big Data Real-Time Analytics with Python and Spark

## Chapter 14 - Apache Spark Machine Learning

### Lab 5 - Machine Learning with PySpark - Part2

![Lab5.png](attachment:Lab5.png)

## Multiclass Classification

We will use Multiclass Classification with Decision Tree to build a model capable of predicting the outcome of a soccer match with 3 possible outcomes: Win, lose or draw.

In [1]:
# Python verion
from platform import python_version
print('The version of python used in this notebook is: ', python_version())

The version of python used in this notebook is:  3.8.13


In [2]:
# import findspark and initialize it
import findspark
findspark.init()

In [3]:
# Imports
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.feature import StringIndexer
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [4]:
# Package versions used in this notebook
%reload_ext watermark
%watermark -a 'Bianca Amorim' --iversions

Author: Bianca Amorim

pyspark  : 3.3.1
findspark: 2.0.1



## Loading Dataset

In [5]:
# Create Spark Context
sc = SparkContext(appName = 'Lab5')

22/12/12 10:37:37 WARN Utils: Your hostname, Avell resolves to a loopback address: 127.0.1.1; using 192.168.1.80 instead (on interface wlo1)
22/12/12 10:37:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/12 10:37:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/12 10:37:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [6]:
sc.setLogLevel('ERROR')

In [7]:
# Spark Session - Used when we work with Spark dataframes
spSession = SparkSession.builder.master('local').getOrCreate()

In [8]:
# Loading dataset and generate RDD
football_time_data = sc.textFile('datasets/dataset2.csv')

In [9]:
# Caching the RDD. This process optimizes performance
football_time_data.cache()

datasets/dataset2.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [10]:
football_time_data.count()

                                                                                

151

In [11]:
football_time_data.take(5)

['media_faltas_sofridas,media_faltas_recebidas,media_cartoes_recebidos,media_chutes_a_gol,resultado',
 '4.8,3,1.4,0.3,vitoria',
 '5.1,3.8,1.6,0.2,vitoria',
 '4.6,3.2,1.4,0.2,vitoria',
 '5.3,3.7,1.5,0.2,vitoria']

In [12]:
# Removing head of file
football_time_data_2 = football_time_data.filter(lambda x: 'media_faltas_sofridas' not in x)
football_time_data_2.count()

150

## Cleaning and Transforming Dataset

In [13]:
# Separation of the columns
football_time_data_3 = football_time_data_2.map(lambda l: l.split(','))

In [14]:
# Mapping columns
football_time_data_4 = football_time_data_3.map(lambda p: Row(average_fouls_suffered = float(p[0]),
                                                             average_fouls_received = float(p[1]),
                                                             average_cards_received = float(p[2]),
                                                             average_shots_on_goal = float(p[3]),
                                                             score = p[4]))

In [15]:
# Convert RDD to Spark Dataframe
df_time = spSession.createDataFrame(football_time_data_4)
df_time.cache()

DataFrame[average_fouls_suffered: double, average_fouls_received: double, average_cards_received: double, average_shots_on_goal: double, score: string]

In [16]:
df_time.take(5)

[Row(average_fouls_suffered=4.8, average_fouls_received=3.0, average_cards_received=1.4, average_shots_on_goal=0.3, score='vitoria'),
 Row(average_fouls_suffered=5.1, average_fouls_received=3.8, average_cards_received=1.6, average_shots_on_goal=0.2, score='vitoria'),
 Row(average_fouls_suffered=4.6, average_fouls_received=3.2, average_cards_received=1.4, average_shots_on_goal=0.2, score='vitoria'),
 Row(average_fouls_suffered=5.3, average_fouls_received=3.7, average_cards_received=1.5, average_shots_on_goal=0.2, score='vitoria'),
 Row(average_fouls_suffered=5.1, average_fouls_received=3.5, average_cards_received=1.4, average_shots_on_goal=0.2, score='vitoria')]

In [17]:
# Creating a numerical index for the label target column
stringIndexer = StringIndexer(inputCol = 'score', outputCol = 'idx_score')

In [18]:
# Train string indexer
si_model = stringIndexer.fit(df_time)

In [19]:
# Apply string indexer
df_time_final = si_model.transform(df_time)

In [20]:
df_time_final.select('score', 'idx_score').distinct().collect()

[Row(score='derrota', idx_score=0.0),
 Row(score='vitoria', idx_score=2.0),
 Row(score='empate', idx_score=1.0)]

## Exploratory Data Analysis

In [21]:
# Descriptive Statistics
df_time_final.describe().show()

+-------+----------------------+----------------------+----------------------+---------------------+-------+------------------+
|summary|average_fouls_suffered|average_fouls_received|average_cards_received|average_shots_on_goal|  score|         idx_score|
+-------+----------------------+----------------------+----------------------+---------------------+-------+------------------+
|  count|                   150|                   150|                   150|                  150|    150|               150|
|   mean|     5.843333333333332|    3.0573333333333337|     3.758000000000001|   1.1993333333333331|   null|               1.0|
| stddev|    0.8280661279778625|   0.43586628493669793|    1.7652982332594667|   0.7622376689603465|   null|0.8192319205190404|
|    min|                   4.3|                   2.0|                   1.0|                  0.1|derrota|               0.0|
|    max|                   7.9|                   4.4|                   6.9|                  2.5|vito

In [23]:
# Correlation between variables
for i in df_time_final.columns:
    if not(isinstance(df_time_final.select(i).take(1)[0][0], str)) :
        print('Correlation of idx_score variable with:', i, df_time_final.stat.corr('idx_score', i))

Correlation of idx_score variable with: average_fouls_suffered -0.46003915650023686
Correlation of idx_score variable with: average_fouls_received 0.6183715308237437
Correlation of idx_score variable with: average_cards_received -0.6492418307641741
Correlation of idx_score variable with: average_shots_on_goal -0.5803770334306264
Correlation of idx_score variable with: idx_score 1.0


## Data Preprocessing

In [24]:
# Create a LabeledPoint (target, Vector[features])
# Remove non-relevant columns for the model or with low correlation
def transformaVar(row) :
    obj = (row['score'], row['idx_score'], Vectors.dense(row['average_fouls_suffered'],
                                                        row['average_fouls_received'],
                                                        row['average_cards_received'],
                                                        row['average_shots_on_goal']))
    return obj

In [25]:
# Apply function
df_time_final_RDD =  df_time_final.rdd.map(transformaVar)

In [26]:
df_time_final_RDD.take(5)

[Stage 25:>                                                         (0 + 1) / 1]                                                                                

[('vitoria', 2.0, DenseVector([4.8, 3.0, 1.4, 0.3])),
 ('vitoria', 2.0, DenseVector([5.1, 3.8, 1.6, 0.2])),
 ('vitoria', 2.0, DenseVector([4.6, 3.2, 1.4, 0.2])),
 ('vitoria', 2.0, DenseVector([5.3, 3.7, 1.5, 0.2])),
 ('vitoria', 2.0, DenseVector([5.1, 3.5, 1.4, 0.2]))]

In [27]:
# Convert RDD to DataFrame
df_spark = spSession.createDataFrame(df_time_final_RDD,['score', 'label', 'features'])

In [28]:
df_spark.cache()

DataFrame[score: string, label: double, features: vector]

In [29]:
df_spark.select('score', 'label', 'features').show(10)

+-------+-----+-----------------+
|  score|label|         features|
+-------+-----+-----------------+
|vitoria|  2.0|[4.8,3.0,1.4,0.3]|
|vitoria|  2.0|[5.1,3.8,1.6,0.2]|
|vitoria|  2.0|[4.6,3.2,1.4,0.2]|
|vitoria|  2.0|[5.3,3.7,1.5,0.2]|
|vitoria|  2.0|[5.1,3.5,1.4,0.2]|
|vitoria|  2.0|[4.9,3.0,1.4,0.2]|
|vitoria|  2.0|[4.7,3.2,1.3,0.2]|
|vitoria|  2.0|[4.6,3.1,1.5,0.2]|
|vitoria|  2.0|[5.0,3.6,1.4,0.2]|
|vitoria|  2.0|[5.4,3.9,1.7,0.4]|
+-------+-----+-----------------+
only showing top 10 rows



In [30]:
# Split train test data
(train_data, test_data) = df_spark.randomSplit([0.7, 0.3])

In [31]:
train_data.count()

102

In [32]:
test_data.count()

48

## Machine Learning

In [34]:
# Create object
dtClassifier = DecisionTreeClassifier(maxDepth = 2, labelCol = 'label', featuresCol = 'features')

In [35]:
# Train object with train data
model = dtClassifier.fit(train_data)

In [36]:
# hypeparameter set by default
model.numNodes

5

In [37]:
# Hyperparameter set by us
model.depth

2

In [40]:
# Predictions with test data
predictions = model.transform(test_data)

In [41]:
predictions

DataFrame[score: string, label: double, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [44]:
predictions.select('score', 'label', 'prediction', 'probability').collect()

[Row(score='derrota', label=0.0, prediction=0.0, probability=DenseVector([1.0, 0.0, 0.0])),
 Row(score='derrota', label=0.0, prediction=0.0, probability=DenseVector([1.0, 0.0, 0.0])),
 Row(score='derrota', label=0.0, prediction=0.0, probability=DenseVector([1.0, 0.0, 0.0])),
 Row(score='derrota', label=0.0, prediction=1.0, probability=DenseVector([0.0571, 0.9429, 0.0])),
 Row(score='derrota', label=0.0, prediction=0.0, probability=DenseVector([1.0, 0.0, 0.0])),
 Row(score='derrota', label=0.0, prediction=0.0, probability=DenseVector([1.0, 0.0, 0.0])),
 Row(score='derrota', label=0.0, prediction=0.0, probability=DenseVector([1.0, 0.0, 0.0])),
 Row(score='derrota', label=0.0, prediction=1.0, probability=DenseVector([0.0571, 0.9429, 0.0])),
 Row(score='vitoria', label=2.0, prediction=2.0, probability=DenseVector([0.0, 0.0, 1.0])),
 Row(score='vitoria', label=2.0, prediction=2.0, probability=DenseVector([0.0, 0.0, 1.0])),
 Row(score='vitoria', label=2.0, prediction=2.0, probability=DenseVe

In [53]:
# Evaluates Accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol = 'prediction',
                                             labelCol = 'label',
                                             metricName = 'accuracy')

In [54]:
evaluator.evaluate(predictions)

0.8958333333333334

In [57]:
# Summarizing the predictions - Confusion Matrix
predictions.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       1.0|    4|
|  2.0|       2.0|   15|
|  0.0|       0.0|   12|
|  1.0|       1.0|   16|
|  1.0|       0.0|    1|
+-----+----------+-----+



# The End