In [1]:
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.stat import Correlation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .master("local") \
    .appName("NaiveBayes") \
    .getOrCreate()

In [2]:
# load the dataset
data = spark.read.csv("GROUP2.csv.gz", header=True, sep=',',inferSchema="true")
#renaming winPlaceClass with label
data = data.withColumn("winPlaceClass", col("winPlaceClass") -1)
data = data.withColumnRenamed('winPlaceClass', 'label')
data.dtypes

[('groupId', 'string'),
 ('matchId', 'string'),
 ('matchDuration', 'int'),
 ('label', 'int'),
 ('isFirstPerson', 'boolean'),
 ('matchtype', 'int'),
 ('maxPlace', 'int'),
 ('numGroups', 'int'),
 ('hasDisconnected', 'int'),
 ('assist_SUM', 'int'),
 ('assist_MAX', 'int'),
 ('assist_AVG', 'int'),
 ('heals_SUM', 'int'),
 ('heals_MAX', 'int'),
 ('heals_AVG', 'int'),
 ('kills_SUM', 'int'),
 ('kills_MAX', 'int'),
 ('kills_AVG', 'int'),
 ('headshotKills_SUM', 'int'),
 ('headshotKills_MAX', 'int'),
 ('headshotKills_AVG', 'int'),
 ('killStreaks_MAX', 'int'),
 ('roadKills_SUM', 'int'),
 ('roadKills_MAX', 'int'),
 ('roadKills_AVG', 'int'),
 ('longestKill_MAX', 'double'),
 ('vehicleDestroys_MAX', 'int'),
 ('weaponsAcquired_SUM', 'int'),
 ('weaponsAcquired_MAX', 'int'),
 ('weaponsAcquired_AVG', 'int'),
 ('damageDealt_SUM', 'double'),
 ('damageDealt_MAX', 'double'),
 ('damageDealt_AVG', 'double'),
 ('distance_SUM', 'double'),
 ('distance_MAX', 'double'),
 ('distance_AVG', 'double'),
 ('rideDistance_SU

In [3]:
inputCols =  data.columns

toRemove = ['matchId', 'groupId', 'killPlace_MAX', 'rankPoints_MAX', 'killPoints_MAX', 'winPoints_MAX', 'label']
inputCols = list(set(inputCols) - set(toRemove))

In [4]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=inputCols,
    outputCol="features")

output = assembler.transform(data)

## Feature Selection (correlation) 

In [5]:
pearsonCorr = Correlation.corr(output, 'features', 'pearson').collect()[0][0]
pearsonCorr

DenseMatrix(51, 51, [1.0, 0.3992, -0.2968, 0.4574, 0.4153, 0.4189, 0.3961, 0.3112, ..., 0.0677, 0.9348, 0.0047, 0.0101, 0.152, 0.0438, 0.1656, 1.0], False)

In [6]:
corr_matrix = pd.DataFrame(pearsonCorr.toArray(), inputCols, inputCols)
corr_matrix

Unnamed: 0,heals_SUM,weaponsAcquired_MAX,maxPlace,DBNOs_MAX,assist_MAX,kills_MAX,longestKill_MAX,revives_AVG,kills_AVG,weaponsAcquired_AVG,...,DBNOs_AVG,teamKills_MAX,rideDistance_MAX,swimDistance_AVG,matchDuration,roadKills_MAX,distance_AVG,headshotKills_MAX,walkDistance_SUM,swimDistance_MAX
heals_SUM,1.0,0.399182,-0.296815,0.457439,0.415324,0.41885,0.396104,0.311227,0.27156,0.281917,...,0.380362,0.094602,0.34703,0.070174,0.106431,0.048749,0.460835,0.308784,0.584716,0.100414
weaponsAcquired_MAX,0.399182,1.0,-0.267301,0.33161,0.306735,0.365927,0.369079,0.146669,0.278281,0.844288,...,0.272987,0.072497,0.332247,0.080258,0.170362,0.043483,0.532899,0.267466,0.580772,0.099652
maxPlace,-0.296815,-0.267301,1.0,-0.550834,-0.340582,-0.221101,-0.198065,-0.140615,0.056869,-0.034376,...,-0.378047,-0.128474,-0.120615,-0.000373,0.040627,-0.013165,-0.127474,-0.17262,-0.417748,-0.041709
DBNOs_MAX,0.457439,0.33161,-0.550834,1.0,0.571159,0.707322,0.477852,0.282188,0.407448,0.158517,...,0.833345,0.117925,0.149221,0.008848,-0.024082,0.044246,0.233786,0.516546,0.499249,0.043349
assist_MAX,0.415324,0.306735,-0.340582,0.571159,1.0,0.523352,0.44133,0.224874,0.337005,0.178785,...,0.491447,0.055733,0.150679,0.021752,-0.015643,0.037528,0.265265,0.377743,0.495501,0.047524
kills_MAX,0.41885,0.365927,-0.221101,0.707322,0.523352,1.0,0.613261,0.227693,0.858948,0.279778,...,0.662103,0.039295,0.150838,0.037618,-0.023866,0.068633,0.323869,0.698397,0.446402,0.055177
longestKill_MAX,0.396104,0.369079,-0.198065,0.477852,0.44133,0.613261,1.0,0.1612,0.520145,0.285123,...,0.440763,0.037272,0.248146,0.058288,0.074274,0.04528,0.416216,0.490903,0.516033,0.078937
revives_AVG,0.311227,0.146669,-0.140615,0.282188,0.224874,0.227693,0.1612,1.0,0.182596,0.107754,...,0.310427,0.053589,0.085796,0.011853,0.02322,0.011997,0.145773,0.160437,0.185739,0.017093
kills_AVG,0.27156,0.278281,0.056869,0.407448,0.337005,0.858948,0.520145,0.182596,1.0,0.300259,...,0.535954,-0.008743,0.110116,0.044592,-0.009498,0.063457,0.302214,0.610181,0.25149,0.039967
weaponsAcquired_AVG,0.281917,0.844288,-0.034376,0.158517,0.178785,0.279778,0.285123,0.107754,0.300259,1.0,...,0.189896,0.041655,0.287551,0.082887,0.183654,0.037335,0.494343,0.202294,0.414687,0.084602


In [7]:
# Seleziono il triangolo superiore della correlation matrix
upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(np.bool))

# Cerco le features con correlazione > 0.9
to_drop = [column for column in upper.columns if any(upper[column] > 0.9)]

In [8]:
to_drop

['damageDealt_AVG',
 'assist_SUM',
 'DBNOs_SUM',
 'damageDealt_MAX',
 'revives_MAX',
 'numGroups',
 'kills_SUM',
 'damageDealt_SUM',
 'walkDistance_MAX',
 'heals_MAX',
 'teamKills_MAX',
 'rideDistance_MAX',
 'roadKills_MAX',
 'distance_AVG',
 'headshotKills_MAX',
 'swimDistance_MAX']

In [9]:
naiveBayesFeat = [col for col in inputCols if col not in to_drop]
naiveBayesFeat

['heals_SUM',
 'weaponsAcquired_MAX',
 'maxPlace',
 'DBNOs_MAX',
 'assist_MAX',
 'kills_MAX',
 'longestKill_MAX',
 'revives_AVG',
 'kills_AVG',
 'weaponsAcquired_AVG',
 'killStreaks_MAX',
 'weaponsAcquired_SUM',
 'walkDistance_AVG',
 'hasDisconnected',
 'assist_AVG',
 'distance_SUM',
 'teamKills_AVG',
 'roadKills_AVG',
 'revives_SUM',
 'vehicleDestroys_MAX',
 'heals_AVG',
 'teamKills_SUM',
 'matchtype',
 'roadKills_SUM',
 'rideDistance_AVG',
 'swimDistance_SUM',
 'isFirstPerson',
 'rideDistance_SUM',
 'distance_MAX',
 'headshotKills_SUM',
 'headshotKills_AVG',
 'DBNOs_AVG',
 'swimDistance_AVG',
 'matchDuration',
 'walkDistance_SUM']

In [10]:
assembler = VectorAssembler(
    inputCols=naiveBayesFeat,
    outputCol="features")

output = assembler.transform(data)

In [11]:
seed = 42
withReplacement=False
output = output.sample(withReplacement, 0.5, seed).select("features", "label")

(training,testing) = output.randomSplit([0.7,0.3])
print(F'training: {training.count()}')
print(F'testing: {testing.count()}')

training: 705702
testing: 302825


In [12]:
training.groupBy('label').count().toPandas()

Unnamed: 0,label,count
0,1,165125
1,3,173846
2,5,16566
3,4,167917
4,2,165594
5,0,16654


## Grid Search

paramGrid = ParamGridBuilder() .addGrid(nb.smoothing, [0.0, 1.0, 5.0])  \
    .addGrid(nb.modelType, ['multinomial', 'gaussian']) \
    .build()

In [13]:
nb = NaiveBayes()

paramGrid = ParamGridBuilder() .addGrid(nb.smoothing, [0.0, 0.001, 0.01, 1.0])  \
    .addGrid(nb.modelType, ['multinomial', 'gaussian']) \
    .build()

crossval = CrossValidator(estimator=nb,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3) 

cvModel = crossval.fit(training)

In [14]:
# Best parameters
print(cvModel.getEstimatorParamMaps()[np.argmax(cvModel.avgMetrics)])

{Param(parent='NaiveBayes_c9293d015e27', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 0.0, Param(parent='NaiveBayes_c9293d015e27', name='modelType', doc='The model type which is a string (case-sensitive). Supported options: multinomial (default), bernoulli and gaussian.'): 'gaussian'}


In [15]:
model = cvModel.bestModel

In [16]:
predictions = model.transform(testing)

## Evaluation

In [17]:
from pyspark.mllib.evaluation import MulticlassMetrics

predictionAndLabels = predictions.rdd.map(lambda x: (x.prediction, float(x.label)))
metrics = MulticlassMetrics(predictionAndLabels)

In [18]:
# Summary stats
print("Recall = %s" % metrics.weightedRecall)
print("Precision = %s" % metrics.weightedPrecision)
print("F1 measure = %s" % metrics.weightedFMeasure())
print("Accuracy = %s" % metrics.accuracy)

# Individual label stats
labels = [0.0, 1.0 ,2.0, 3.0, 4.0, 5.0]
for label in labels:
    print("Class %s precision = %s" % (label, metrics.precision(label)))
    print("Class %s recall = %s" % (label, metrics.recall(label)))
    print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label)))
    
print('Confusion Matrix')
print(metrics.confusionMatrix().toArray())

Recall = 0.34312556757203005
Precision = 0.4219803589184543
F1 measure = 0.36268647378873375
Accuracy = 0.34312556757203005
Class 0.0 precision = 0.23067031023951134
Class 0.0 recall = 0.6210328909405655
Class 0.0 F1 Measure = 0.33639382691932024
Class 1.0 precision = 0.6369647954329211
Class 1.0 recall = 0.37718680451869174
Class 1.0 F1 Measure = 0.47380434205637245
Class 2.0 precision = 0.4075125531093086
Class 2.0 recall = 0.2989021885402649
Class 2.0 F1 Measure = 0.3448580183861083
Class 3.0 precision = 0.33896864374763885
Class 3.0 recall = 0.2874687954718392
Class 3.0 F1 Measure = 0.3111018008191452
Class 4.0 precision = 0.3603967893381796
Class 4.0 recall = 0.3290787399397074
Class 4.0 F1 Measure = 0.34402648470479386
Class 5.0 precision = 0.10515925243485127
Class 5.0 recall = 0.9026973591300663
Class 5.0 F1 Measure = 0.18837397774994474
Confusion Matrix
[[4.3050e+03 2.1660e+03 3.6700e+02 5.8000e+01 1.0000e+00 3.5000e+01]
 [1.0891e+04 2.6778e+04 2.2586e+04 1.0221e+04 4.8400e+02

In [19]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# obtain evaluator.
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

# compute the classification error on test data.
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))

Accuracy = 0.343126


## Sklearn Evalutation

In [20]:
from sklearn.metrics import classification_report, confusion_matrix

In [21]:
y_true = predictions.select(['label']).collect()
y_pred = predictions.select(['prediction']).collect()

In [22]:
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

           0       0.23      0.62      0.34      6932
           1       0.64      0.38      0.47     70994
           2       0.41      0.30      0.34     70595
           3       0.34      0.29      0.31     74909
           4       0.36      0.33      0.34     72314
           5       0.11      0.90      0.19      7081

    accuracy                           0.34    302825
   macro avg       0.35      0.47      0.33    302825
weighted avg       0.42      0.34      0.36    302825



In [23]:
print(confusion_matrix(y_true, y_pred))

[[ 4305  2166   367    58     1    35]
 [10891 26778 22586 10221   484    34]
 [ 2942 10857 21101 26688  8427   580]
 [  464  2089  7025 21534 32738 11059]
 [   53   150   695  4935 23797 42684]
 [    8     0     6    92   583  6392]]


In [25]:
spark.stop()