<a href="https://colab.research.google.com/github/capodic/Project_work_HAR/blob/dev/Project_Work_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#<center><font size=6 color="blue">**Sviluppo modello di Machine Learning per classificazione Attività Umane (HAR Human Activity Recognition) con Spark ML e predizione e raggruppamento dei risultati con strutture dati distribuite e modello di programmazione Map/Reduce**</font></center>

## <font size=5 color="blue">**Descrizione del progetto**</font>

Si vuole realizzare la classificazione di attività motorie umane (HAR) a partire da collezioni di dati da sensori inerziali (IMU) 9 assi (3 assi per accelerometro, magnetomero e giroscopio) realizzando un modello di machine learning com ML lib di Apache Spark e programmando una pipeline di calcolo Map/Reduce in grado di effettuare la predizione di massive quantità di dati (molte sorgenti di dati che emettono con continuità nel tempo).

#<font size=5 color="blue">**Fase 1 : Sviluppo modello di Machine Learning con Apache Spark ML lib**</font>

## <font color="red">✪</font> Installazione PySpark 

In [1]:
!pip install pyspark


Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/89/db/e18cfd78e408de957821ec5ca56de1250645b05f8523d169803d8df35a64/pyspark-3.1.2.tar.gz (212.4MB)
[K     |████████████████████████████████| 212.4MB 66kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 15.2MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=30b8f6cdc80b95c3aac41cdf8e03ff2529866fc49cb531e1770a8b1ebd194a39
  Stored in directory: /root/.cache/pip/wheels/40/1b/2c/30f43be2627857ab80062bef1527c0128f7b4070b6b2d02139
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


## <font color="red">✪</font> Inizializzazione SparkSession e App (include lo SparkContext come attributo..)
>Viene allocata memoria diversa del default per problemi di esaurimento durante il training dei modelli di ML

In [2]:
from pyspark.sql import SparkSession
# spark = SparkSession.builder.master("local[*]").getOrCreate() # errore heap esaurito in cvModel, occorre incrementare le risorse pre memory.offHeap.size
spark = SparkSession.builder.master("local[*]").config("spark.executor.memory", "70g").config("spark.driver.memory", "50g")\
                                    .config("spark.memory.offHeap.enabled",True).config("spark.memory.offHeap.size","8g").appName("Project Work").getOrCreate()
# spark = SparkSession.builder.master('local[*]').config("spark.driver.memory", "50g").appName('Project Work').getOrCreate()  #versione breve, dovrebbe andare anche solo incrementando driver.memory
print("SparkSession creata...",spark)

SparkSession creata... <pyspark.sql.session.SparkSession object at 0x7f0574b40750>


Print the Current Spark Context Settings/Configurations

In [3]:
configurations = spark.sparkContext.getConf().getAll()
for conf in configurations:
    print(conf)

('spark.driver.memory', '50g')
('spark.driver.host', '1877f2b01123')
('spark.memory.offHeap.size', '8g')
('spark.driver.port', '36397')
('spark.app.id', 'local-1626287149264')
('spark.app.startTime', '1626287147546')
('spark.executor.id', 'driver')
('spark.executor.memory', '70g')
('spark.sql.warehouse.dir', 'file:/content/spark-warehouse')
('spark.rdd.compress', 'True')
('spark.serializer.objectStreamReset', '100')
('spark.master', 'local[*]')
('spark.submit.pyFiles', '')
('spark.submit.deployMode', 'client')
('spark.memory.offHeap.enabled', 'True')
('spark.app.name', 'Project Work')
('spark.ui.showConsoleProgress', 'true')


## <font color="red">✪</font> Importazione librerie 
> **Pandas** per la gestione dei dataframe  
> **Numpy** per la gestione efficente di array e funzioni relative  
> **Matplotlib** per la visualizzazione di grafici (libreria a basso livello)  
>  **Seaborn** per la visualizzazione di grafici (libreria ad alto livello che si apploggia a Matplotlib)


In [4]:
import pandas as pd
import numpy as np
import seaborn as sns

import matplotlib.pyplot as plt

%matplotlib inline
print('lib caricate')

lib caricate


## <font color="red">✪</font> Caricamento dati da GitHub
> Vengono importate tutte le colonne relative a ora, timestamp (epoch), accelerazioni lungo gli assi x, y, z e viene aggiunta la colonna delle **label** non contenuta originariamente nei dati  
> NB: le righe sono già ordinate temporalmente

In [49]:
#definizione di valori per le label delle attività che si deducono dal nome del file di dati e non dati dati stessi 
SEDUTO=0; INPIEDI=1; SDRAIATO=2; CAMMINA=3; SCENDE=4; SALE=5;
#pathdir_read = '/content/drive/MyDrive/Colab Notebooks/data/'

pathdir_read ='https://raw.githubusercontent.com/capodic/Project_work_HAR/main/dataset/'  #github path
df_labels = [SEDUTO, INPIEDI, SDRAIATO, CAMMINA, SCENDE, SALE]

# prendo colonne di interesse, con header sulla prima riga (0)
sed = pd.read_csv(pathdir_read+"seduto-Accelerometer.csv", header = 0)#, usecols=[3,4,5])  
sed ['label']= SEDUTO

inp = pd.read_csv(pathdir_read+"inpiedi-Accelerometer.csv", header = 0)#, usecols=[3,4,5])
inp ['label']= INPIEDI 

sdr = pd.read_csv(pathdir_read+"sdraiato-Accelerometer.csv", header = 0)#, usecols=[3,4,5])
sdr ['label']= SDRAIATO 

cam = pd.read_csv(pathdir_read+"cammina-Accelerometer.csv", header = 0)#, usecols=[3,4,5])
cam ['label']= CAMMINA 

sce = pd.read_csv(pathdir_read+"scende-Accelerometer.csv", header = 0)#, usecols=[3,4,5])
sce ['label']= SCENDE 

sal = pd.read_csv(pathdir_read+"sali-Accelerometer.csv", header = 0)#, usecols=[3,4,5])
sal ['label']= SALE 

print("Dati dei singoli file CSV caricati")

# definizioni var da usare nel lavoro

#lista dei singoli df
df_list = [sed, inp, sdr, cam, sce, sal]
# df che contiene tutti i dati
df = pd.concat([sed, inp, sdr,cam,sce,sal], ignore_index=True)
# df che contiene le solo colonne delle features
df_features = df.iloc[:,:-1]
#ds che contiene la serie delle label
ds_labels = df.iloc[:,-1]
label_txt = ['Seduto','In Piedi','Sdraito','Cammina','Scende','Sale'] # label_text[SEDUTO] => restituisce 'Seduto'

df


Dati dei singoli file CSV caricati


Unnamed: 0,epoch (ms),time (01:00),elapsed (s),x-axis (g),y-axis (g),z-axis (g),label
0,1590567416332,2020-05-27T10:16:56.332,0.000,-0.818,-0.156,0.572,0
1,1590567416337,2020-05-27T10:16:56.337,0.005,-0.826,-0.158,0.577,0
2,1590567416342,2020-05-27T10:16:56.342,0.010,-0.824,-0.159,0.576,0
3,1590567416347,2020-05-27T10:16:56.347,0.015,-0.825,-0.160,0.570,0
4,1590567416352,2020-05-27T10:16:56.352,0.020,-0.823,-0.157,0.573,0
...,...,...,...,...,...,...,...
88075,1590568227882,2020-05-27T10:30:27.882,96.840,-0.885,-0.169,0.417,5
88076,1590568227887,2020-05-27T10:30:27.887,96.845,-0.893,-0.178,0.412,5
88077,1590568227892,2020-05-27T10:30:27.892,96.850,-0.899,-0.179,0.405,5
88078,1590568227897,2020-05-27T10:30:27.897,96.855,-0.898,-0.173,0.396,5


Per salvare il file completo su file csv...

In [None]:
pathdir_read='/content/drive/MyDrive/Colab Notebooks/data/'
df.to_csv(pathdir_read+'har_data.csv', encoding='utf-8', index=False)

## <font color="red">✪</font> Addestramento modello con libreria ML di Spark (spark.ml)

>**NB:** spark.ml è la nuova libreria per machine learning rilasciata in Spark 2.0 evoluzione di spark.mllib che era presente in Spark 1.0

NOTA:
>Unfortunately, at this time, only **logistic/linear regression**, **decision trees**, **random forests** and **naive bayes** support **`multiclass classification`** in spark mllib/ml.  

**Useremo quindi le Linear Regression e Random Forest**

####[SKIP] Normalizzazione del Dataframe Pandas 

Standardizzazione ovvero mettere le features in media nulla e dev std unitaria  
>Verrà mostrato come effettuarla sia sul DF di Pandas che sul DF di pyspark on le librerie di spark.ml, ma non sembra inficiare i rusultati cosi' come avveniva con sklearn 

In [None]:
X= df.iloc[:, :-1] # pandas df delle features
Y= df.iloc[:, -1] # pandas series delle label
# Scale the data to be between -1 and 1
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
scaler.fit(X)
media = scaler.mean_
stdDev = scaler.scale_
print("X mean: ", scaler.mean_)
print("X std dev (squadre root of variance): ", scaler.scale_)
#X_df = X
X = scaler.transform(X)
# X = scaler.fit_transform(X)
# # X e' ora un nparray, non piu' dataframe !!
X




X mean:  [-9.29320199e-17 -2.06515600e-16  1.03257800e-16]
X std dev (squadre root of variance):  [1. 1. 1.]


array([[-0.4662742 ,  0.12768344, -0.36389215],
       [-0.48984725,  0.10998886, -0.34100396],
       [-0.48395399,  0.10114157, -0.3455816 ],
       ...,
       [-0.70495138, -0.07580424, -1.1283574 ],
       [-0.70200475, -0.0227205 , -1.16955612],
       [-0.70495138, -0.00502592, -1.20159958]])

In [None]:
# anziche' su X e Y posso rimettere tutto come nel df originale -> scaled_features_df
from sklearn_pandas import DataFrameMapper

mapper = DataFrameMapper([(df.columns, StandardScaler())])
scaled_features = mapper.fit_transform(df.copy(), 4)
scaled_features_df = pd.DataFrame(scaled_features, index=df.index, columns=df.columns)
# rimetto la colonna delle label..al valore originale
scaled_features_df['label'] = Y
df = scaled_features_df
df
# per la standardizzazione si poteva usare MLlib: https://spark.apache.org/docs/latest/ml-features#standardscaler 

Unnamed: 0,x-axis (g),y-axis (g),z-axis (g),label
0,-0.466274,0.127683,-0.363892,0
1,-0.489847,0.109989,-0.341004,0
2,-0.483954,0.101142,-0.345582,0
3,-0.486901,0.092294,-0.373047,0
4,-0.481007,0.118836,-0.359315,0
...,...,...,...,...
88075,-0.663699,0.012669,-1.073426,5
88076,-0.687272,-0.066957,-1.096314,5
88077,-0.704951,-0.075804,-1.128357,5
88078,-0.702005,-0.022721,-1.169556,5


####<font color="red">✪</font> Preparazione del dataset con ML lib di Spark

>Per usare la ML lib di Spark per apprendimento supervisionato occorre trasformare il df in un vettore 2 colonne: 1 di features e 1 di label; quello delle features e' composto dai valori delle singole colonne separato da separatore

In [27]:
from pyspark.ml.feature import VectorAssembler
df = pd.concat([sed, inp, sdr,cam,sce,sal], ignore_index=True) # commentare se si vuole usare il df scalato!!
ds = spark.createDataFrame(data = df)
ds.printSchema()
ds.show(truncate=False)

#Input all the features in one vector column
assembler = VectorAssembler(inputCols=['x-axis (g)', 'y-axis (g)', 'z-axis (g)'], outputCol = 'features')

output = assembler.transform(ds)

#Input vs Output
finalized_data = output.select("features","label")
print("Assembled columns 'x-axis (g)', 'x-axis (g)', 'x-axis (g)' to vector column 'features'")
finalized_data.show(truncate=False)


root
 |-- epoch (ms): long (nullable = true)
 |-- time (01:00): string (nullable = true)
 |-- elapsed (s): double (nullable = true)
 |-- x-axis (g): double (nullable = true)
 |-- y-axis (g): double (nullable = true)
 |-- z-axis (g): double (nullable = true)
 |-- label: long (nullable = true)

+-------------+-----------------------+-----------+-------------------+--------------------+------------------+-----+
|epoch (ms)   |time (01:00)           |elapsed (s)|x-axis (g)         |y-axis (g)          |z-axis (g)        |label|
+-------------+-----------------------+-----------+-------------------+--------------------+------------------+-----+
|1590567416332|2020-05-27T10:16:56.332|0.0        |-0.818             |-0.156              |0.5720000000000001|0    |
|1590567416337|2020-05-27T10:16:56.337|0.005      |-0.826             |-0.158              |0.5770000000000001|0    |
|1590567416342|2020-05-27T10:16:56.342|0.01       |-0.8240000000000001|-0.159              |0.5760000000000001|0    

per la normalizzazione possiamo usare lo StandardScaler di pyspark.ml [https://spark.apache.org/docs/latest/ml-features#standardscaler ]

In [16]:
from pyspark.ml.feature import StandardScaler

dataFrame = finalized_data # finalized_data non scalato !!
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True) ## NB withMean=True per avere media zero

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show(truncate=False)


+-----------------------------------------------+-----+---------------------------------------------------------------+
|features                                       |label|scaledFeatures                                                 |
+-----------------------------------------------+-----+---------------------------------------------------------------+
|[-0.818,-0.156,0.5720000000000001]             |0    |[-0.4662715498012617,0.12768271190290903,-0.36389007980038346] |
|[-0.826,-0.158,0.5770000000000001]             |0    |[-0.4898444715827734,0.10998823146701235,-0.3410020285345797]  |
|[-0.8240000000000001,-0.159,0.5760000000000001]|0    |[-0.4839512411373958,0.10114099124906402,-0.34557963878774045] |
|[-0.825,-0.16,0.57]                            |0    |[-0.4868978563600844,0.09229375103111567,-0.37304530030670546] |
|[-0.823,-0.157,0.573]                          |0    |[-0.4810046259147065,0.1188354716849607,-0.35931246954722323]  |
|[-0.823,-0.16,0.573]                   

###<font color="red">✪</font>Linear Regression

In [None]:
#Split training and testing data
train_data,test_data = finalized_data.randomSplit([0.7,0.3])


from pyspark.ml.regression import LinearRegression

print("import VectorAssembler e LinearRegression effettuato...")
regressor = LinearRegression(featuresCol = 'features', labelCol = 'label')

#Learn to fit the model from training set
regressor = regressor.fit(train_data)

#To predict the prices on testing set
pred = regressor.evaluate(test_data)

#Predict the model
pred.predictions.show()

import VectorAssembler e LinearRegression effettuato...
+--------------------+-----+------------------+
|            features|label|        prediction|
+--------------------+-----+------------------+
|[-1.6626067682890...|    3|3.3081534623138715|
|[-1.6449269765905...|    3|3.3923627301860604|
|[-1.6095673931934...|    3|3.3888544562258227|
|[-1.5565280180978...|    3| 3.352240774265952|
|[-1.5093819069017...|    3|3.4457743566422763|
|[-1.5005420110524...|    1|2.2257364513494777|
|[-1.4828622193538...|    3| 3.259249662501487|
|[-1.4710756915548...|    3| 3.279767623494943|
|[-1.4475026359568...|    3| 3.343390079008718|
|[-1.4091964206099...|    3|3.2771679648740144|
|[-1.3856233650119...|    3|3.1779658606784236|
|[-1.3708902052631...|    3| 3.418268781382448|
|[-1.3090109343182...|    3| 3.024620447744475|
|[-1.2824912467704...|    3|2.9796819323817205|
|[-1.2677580870216...|    3| 3.149528795520894|
|[-1.2589181911723...|    3| 3.388505704520693|
|[-1.2441850314235...|    3|3.26

In [None]:
#coefficient of the regression model
coeff = regressor.coefficients

#X and Y intercept
intr = regressor.intercept

print ("The coefficient of the model is : %a" %coeff)
print ("The Intercept of the model is : %f" %intr)


The coefficient of the model is : DenseVector([-0.3471, -0.1905, 0.5891])
The Intercept of the model is : 2.657281


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

# RMSE: 1.756
# MSE: 3.082
# MAE: 1.537
# r2: 0.000

RMSE: 1.694
MSE: 2.868
MAE: 1.443
r2: 0.070


### <font color="red">✪</font> Random Forest Classifier

In [28]:
# https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
print("Librerie per RandomForestClssifier caricate..")


Librerie per RandomForestClssifier caricate..


####Training modello con vari numTrees e maxDepth (resto parametri al default)

In [30]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.

# sembra che non si possa fare a meno di indicizzare cosi' le lavel e le features..altrimenti da' errore e non se ne esce
# https://stackoverflow.com/questions/33942519/cannot-run-randomforestclassifier-from-spark-ml-on-a-simple-example 
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(finalized_data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(finalized_data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = finalized_data.randomSplit([0.7, 0.3], seed = 1)

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=15, maxDepth=20, seed=1) 
# numTree 10 (8 sec. 75.6%), 30, 100 (23 sec 76.81%) , 200 (77.11% 1m23sec.)
# numTree 10 e maxDepth=20 (82.20% 41sec.) seed=1
# numTree 10 e maxDepth=25 (82.09% 35sec.) seed=1
# numTree 15 e maxDepth=20 (82.36% 35sec.) seed=1 oltre finisce la RAM (>90%) 1m07s

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(10)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(  labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Accuracy = %g" % (accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only
# con NORMALIZZAZIONE:
# Test Error = 0.231858
# Accuracy = 0.768142
# RandomForestClassificationModel: uid=RandomForestClassifier_a0dcb797bc18, numTrees=100, numClasses=6, numFeatures=3

# valori default impurity='gini' o 'entropy'
# class pyspark.ml.classification.RandomForestClassifier(*, featuresCol='features', labelCol='label', predictionCol='prediction',
#   probabilityCol='probability', rawPredictionCol='rawPrediction', maxDepth=5, maxBins=32, minInstancesPerNode=1,
#   minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity='gini', numTrees=20,
#   featureSubsetStrategy='auto', seed=None, subsamplingRate=1.0, leafCol='', minWeightFractionPerNode=0.0, weightCol=None, 
#   bootstrap=True)

+--------------+-----+--------------------+
|predictedLabel|label|            features|
+--------------+-----+--------------------+
|             3|    3|[-1.22,-0.249,0.665]|
|             3|    3|[-1.218,-0.243,0....|
|             5|    3|[-1.172,-0.381,0....|
|             4|    3|[-1.171,-0.241000...|
|             5|    3|[-1.17,-0.324,0.6...|
|             5|    1|[-1.169,-0.171,0....|
|             3|    3|[-1.168,-0.228999...|
|             3|    3|[-1.13,-0.254,0.614]|
|             5|    3|[-1.125,-0.293,0....|
|             5|    3|[-1.125,-0.254,0....|
+--------------+-----+--------------------+
only showing top 10 rows

Test Error = 0.176321
Accuracy = 0.823679
RandomForestClassificationModel: uid=RandomForestClassifier_65a0a9eeb1b3, numTrees=15, numClasses=6, numFeatures=3


**Accuracy al 82.36% ... senza normalizzazione..**

####Training modello con tuning degli hyperparameter 
**..proviamo con hyperparameter tuning con la cross-fold validation**
>la cvModel e' abbastanza instabile, vuole molta memoria, probabile memory leakage perche' non viene eseguita più di una volta senza riavviare il kernel...

In [None]:
# 1m45s e 11.84GB RAM con  2 soli parametri...
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(finalized_data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(finalized_data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = finalized_data.randomSplit([0.7, 0.3], seed = 1)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)

rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", seed=1)
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [10]).addGrid(rf.maxDepth, [10, 25]).build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=2) 
# errore heap memory..incrementare .config("spark.memory.offHeap.size","8g")  
# val spark = SparkSession
#      .builder()
#      .master("local[*]")
#      .config("spark.executor.memory", "70g")
#      .config("spark.driver.memory", "50g")
#      .config("spark.memory.offHeap.enabled",true)
#      .config("spark.memory.offHeap.size","8g")   
#      .appName("sampleCodeForReference")
#      .getOrCreate()
cvModel = crossval.fit(trainingData)

# Make predictions on test documents. cvModel uses the best model found (lrModel).
predictions = cvModel.transform(testData)

# valori default e parametri di RandomForestClassifier
# class pyspark.ml.classification.RandomForestClassifier(*, featuresCol='features', labelCol='label', predictionCol='prediction', probabilityCol='probability', rawPredictionCol='rawPrediction', \
#     maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity='gini', numTrees=20, featureSubsetStrategy='auto', \
#     seed=None, subsamplingRate=1.0, leafCol='', minWeightFractionPerNode=0.0, weightCol=None, bootstrap=True)[source

In [19]:
# Select example rows to display.
predictions.select("predictedLabel", "label", "features", "prediction", "probability", "features").show(5)



+--------------+-----+--------------------+----------+--------------------+--------------------+
|predictedLabel|label|            features|prediction|         probability|            features|
+--------------+-----+--------------------+----------+--------------------+--------------------+
|             3|    3|[-1.22,-0.249,0.665]|       2.0|[0.25275291159657...|[-1.22,-0.249,0.665]|
|             3|    3|[-1.218,-0.243,0....|       2.0|[0.28534550418916...|[-1.218,-0.243,0....|
|             5|    3|[-1.172,-0.381,0....|       0.0|[0.68795001295001...|[-1.172,-0.381,0....|
|             4|    3|[-1.171,-0.241000...|       5.0|[0.26245682632779...|[-1.171,-0.241000...|
|             5|    3|[-1.17,-0.324,0.6...|       0.0|[0.62739651416121...|[-1.17,-0.324,0.6...|
+--------------+-----+--------------------+----------+--------------------+--------------------+
only showing top 5 rows



In [20]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Accuracy = %g" % (accuracy))

# Test Error = 0.174235
# Accuracy = 0.825765 .. non si riesce a capire con quali parametri !!! con MaxDepth 10,100 e numTree 5,10,25, numFolds = 3, =>8m 42sec. 

# Test Error = 0.175942 ..con MaxDepth 10 e numTree 10, 25, umFolds = 2, =>1m 42sec. 
# Accuracy = 0.824058

# Test Error = 0.179014
# Accuracy = 0.820986 con seed=1

Test Error = 0.176321
Accuracy = 0.823679


**Accuracy al 82.09% ... con numTrees 10 e maxDepth 25.. (aumenta un poco aumentando numTrees)**

In [22]:
print(type(predictions))
print(type(testData))


<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>


#### Stampa gli hyperparameter del miglor modello trovato 
> sfido a trovare questa informazione !!! sembra un segreto di stato

In [None]:
bestPipeline = cvModel.bestModel
bestLRModel = bestPipeline.stages[2]
print('numTrees - ', bestLRModel.getNumTrees)
print('maxDepth - ', bestLRModel.getOrDefault('maxDepth'))
print('maxBins - ', bestLRModel.getOrDefault('maxBins'))

numTrees -  10
maxDepth -  25
maxBins -  32


>Per la lista dei parametri che possono essere estratti prendere bestParams.name e passarlo alla bestLRModel.getOrDefault.  
bestParams.doc fornisce una descrizione del parametro

In [None]:
bestParams = bestLRModel.extractParamMap()
# print(type(bestParams))
# for _ in bestParams:
#   print(_.name, ":", _.doc)

# list(zip(cvModel.avgMetrics, paramGrid)) 

for _ in bestParams:
  print(_.name, " : ", bestLRModel.getOrDefault(_.name))

bootstrap  :  True
cacheNodeIds  :  False
checkpointInterval  :  10
featureSubsetStrategy  :  auto
featuresCol  :  indexedFeatures
impurity  :  gini
labelCol  :  indexedLabel
leafCol  :  
maxBins  :  32
maxDepth  :  25
maxMemoryInMB  :  256
minInfoGain  :  0.0
minInstancesPerNode  :  1
minWeightFractionPerNode  :  0.0
numTrees  :  10
predictionCol  :  prediction
probabilityCol  :  probability
rawPredictionCol  :  rawPrediction
seed  :  1
subsamplingRate  :  1.0


###Info varie di backup (NON PRESENTARE)

**qua usa map function per fare grid search**  
Boosting Parallelism for ML in Python using scikit-learn, joblib & PySpark

https://www.qubole.com/tech-blog/boosting-parallelism-for-ml-in-python-using-scikit-learn-joblib-pyspark/ 
https://grzegorzgajda.gitbooks.io/spark-examples/content/classification/rf-classification.html 


```
val predictionsAndLabels = test.map {
  point => (model.predict(point.features), point.label)
}
```



Machine Learning Library (MLlib) Guide  
https://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-classifier 

Unfortunately, at this time, only logistic regression, decision trees, random forests and naive bayes support multiclass classification in spark mllib/ml.



In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
"""
Gradient Boosted Tree Classifier Example.
"""
data = finalized_data

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="Attributes", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "Attributes").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only
# $example off$

Logistic regression
https://github.com/apache/spark/blob/master/examples/src/main/python/ml/logistic_regression_summary_example.py 

In [None]:
    from pyspark.ml.classification import LogisticRegression
    data = finalized_data
    (trainingData, testData) = data.randomSplit([0.7, 0.3])
    
    lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

    # Fit the model
    lrModel = lr.fit(trainingData)

    # $example on$
    # Extract the summary from the returned LogisticRegressionModel instance trained
    # in the earlier example
    trainingSummary = lrModel.summary

    # Obtain the objective per iteration
    objectiveHistory = trainingSummary.objectiveHistory
    print("objectiveHistory:")
    for objective in objectiveHistory:
        print(objective)

    # Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
    trainingSummary.roc.show()
    print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

    # Set the model threshold to maximize F-Measure
    fMeasure = trainingSummary.fMeasureByThreshold
    maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
    bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
        .select('threshold').head()['threshold']
    lr.setThreshold(bestThreshold)
    # $example off$

#<font size=5 color="blue">**Fase 2: Sviluppo modello di calcolo della classificazione della attività con Apache Spark secondo paradigma Map/Reduce**</font>

Si vuole elaborare un modello di calcolo che a partire dai dati su file csv contenenti le accelerazione XYZ rilevate secondo un tempo di campionamento fissato (200Hz) appartenenti a diversi soggetti (identificati con id_persona) restituisca, per ciascuna persona, la classificazione per ciascun **secondo** di attività.
>La classificazione per il secondo di attività viene attribuita secondo la regola del **plurarity vote rule** ovvero secondo la categoria più numerosa classificata nel secondo di attività in considerazione

###<font color="red">✪</font>Import dei dati

> **NB**: I dati sono genarati secondo la tecnica simile **data augumentation** [https://en.wikipedia.org/wiki/Data_augmentation] ovvero partendo da un dataset reale è stata aggiunta la colonna id_persona e per ciascun id_persona i dati delle accelerazioni XYZ sono prese dai dati originali di uno stesso soggetto ed alterati di una percentuale (+/-1%) secondo una funzione casuale. I dati così generati non sono usati per il trainign del modello al fine di ridurre la possibilità di overfitting così come per lo scopo pricipale della data augumentation ma per poter verificare l'efficacia del modello di calcolo con dati relativi a più soggetti creati artificiosamente.

**Nota**: Sappiamo che i dati originali sono stati anche utilizzati per il training del modello e quindi i risultati non sono attendibili per la valutazione in assoluto della bontà del modello.



In [51]:
# elimino colonna label e aggiungo id_persona
df = df.iloc[:,:-1]
df['id_persona'] = 0
df

Unnamed: 0,epoch (ms),time (01:00),elapsed (s),x-axis (g),y-axis (g),z-axis (g),id_persona
0,1590567416332,2020-05-27T10:16:56.332,0.000,-0.818,-0.156,0.572,0
1,1590567416337,2020-05-27T10:16:56.337,0.005,-0.826,-0.158,0.577,0
2,1590567416342,2020-05-27T10:16:56.342,0.010,-0.824,-0.159,0.576,0
3,1590567416347,2020-05-27T10:16:56.347,0.015,-0.825,-0.160,0.570,0
4,1590567416352,2020-05-27T10:16:56.352,0.020,-0.823,-0.157,0.573,0
...,...,...,...,...,...,...,...
88075,1590568227882,2020-05-27T10:30:27.882,96.840,-0.885,-0.169,0.417,0
88076,1590568227887,2020-05-27T10:30:27.887,96.845,-0.893,-0.178,0.412,0
88077,1590568227892,2020-05-27T10:30:27.892,96.850,-0.899,-0.179,0.405,0
88078,1590568227897,2020-05-27T10:30:27.897,96.855,-0.898,-0.173,0.396,0


In [52]:

import random as rnd
rnd.seed(1)

def genera_dati_per_persona(df, num_persone): # id_persona = 0 e' l'originale
  df_orig = df.copy()
  for i in range(1,num_persone+1):
    df_temp = df_orig.copy()
    df_temp['x-axis (g)'] = [round(x*rnd.randrange(10000,10100)/10000,3) for x in df_orig['x-axis (g)']]
    df_temp['y-axis (g)'] = [round(x*rnd.randrange(10000,10100)/10000,3) for x in df_orig['y-axis (g)']]
    df_temp['z-axis (g)'] = [round(x*rnd.randrange(10000,10100)/10000,3) for x in df_orig['z-axis (g)']]
    df_temp['id_persona'] = i
    frame = [df, df_temp]
    df = pd.concat(frame, ignore_index=True )
  return df

df_augumented = genera_dati_per_persona(df, 1)

df_augumented    

Unnamed: 0,epoch (ms),time (01:00),elapsed (s),x-axis (g),y-axis (g),z-axis (g),id_persona
0,1590567416332,2020-05-27T10:16:56.332,0.000,-0.818,-0.156,0.572,0
1,1590567416337,2020-05-27T10:16:56.337,0.005,-0.826,-0.158,0.577,0
2,1590567416342,2020-05-27T10:16:56.342,0.010,-0.824,-0.159,0.576,0
3,1590567416347,2020-05-27T10:16:56.347,0.015,-0.825,-0.160,0.570,0
4,1590567416352,2020-05-27T10:16:56.352,0.020,-0.823,-0.157,0.573,0
...,...,...,...,...,...,...,...
176155,1590568227882,2020-05-27T10:30:27.882,96.840,-0.893,-0.170,0.418,1
176156,1590568227887,2020-05-27T10:30:27.887,96.845,-0.895,-0.179,0.415,1
176157,1590568227892,2020-05-27T10:30:27.892,96.850,-0.907,-0.180,0.405,1
176158,1590568227897,2020-05-27T10:30:27.897,96.855,-0.903,-0.174,0.400,1


In [None]:
# per salvare i dati su file csv una volta generati..già fatto per 10 persone circa 29MB / 900k record... non eseguire
# pathdir_read='/content/drive/MyDrive/Colab Notebooks/data/'
# df_augumented.to_csv(pathdir_read+'har_augumented_data.csv', encoding='utf-8', index=False)

###Controllo e pulizia dei dati

isNaN(), ecc.. ecc.. non ci sono dati non buoni e lasciamo dentro il dataset eventuali outlier

###<font color="red">✪</font> Creazione DataFrame Spark
Trasformazione DataFrame pandas in un **Dataframe Spark** (in luogo dei **Resilient Distribudet Data (RDD)**, vedi Spark Dataframe vs Spark RDD https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html)  
> I DataFrame Spark, introdotti con la ver. 2.0 di Spark, al pari delle RDD, sono strutture dati distribuite,  non è richiesto né è possibile usare la parallelize.  
In generale le operazioni che possono essere effettuate su RDD hanno un equivalente per i DataFrame che p.e. hanno **select** o **selectExpr** per selezionare righe o **explode** in luogo della **flatMap** esistente per le RDD o ancora **collectAsList()**. E' ancor possibile usare le **map functions** ma occorre usare **df.rdd.map()** che effettua una conversione del DF in RDD perdendo i vantaggi del DF. Con i DF in luogo della **map()** si usa la **forEach()** per iterare su tutti gli elementi.  

**Le trasformazioni PySpark con DataFrame sono più efficienti ed offrono più funzioni rispetto agli RDD (p.e. uso di SQL (come la select($nome_Col)), aggiunta colonne in modo semplice, ecc..). Inoltre le librerie per Machine Learning pyspark.ml hanno una serie di Transformers che operano direttamente sui DataFrame, così come la pyspark.mllib opera su RDD.**

In [53]:
df_augumented.info()

# devo aggiungere timestamp e id_persona


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 176160 entries, 0 to 176159
Data columns (total 7 columns):
 #   Column        Non-Null Count   Dtype  
---  ------        --------------   -----  
 0   epoch (ms)    176160 non-null  int64  
 1   time (01:00)  176160 non-null  object 
 2   elapsed (s)   176160 non-null  float64
 3   x-axis (g)    176160 non-null  float64
 4   y-axis (g)    176160 non-null  float64
 5   z-axis (g)    176160 non-null  float64
 6   id_persona    176160 non-null  int64  
dtypes: float64(4), int64(2), object(1)
memory usage: 9.4+ MB


In [54]:
ds = spark.createDataFrame(data = df_augumented)
print(type(ds))
ds.printSchema()
ds.show(truncate=False)
print(ds.head(5))
print(ds.count())
print(ds.take(3))

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- epoch (ms): long (nullable = true)
 |-- time (01:00): string (nullable = true)
 |-- elapsed (s): double (nullable = true)
 |-- x-axis (g): double (nullable = true)
 |-- y-axis (g): double (nullable = true)
 |-- z-axis (g): double (nullable = true)
 |-- id_persona: long (nullable = true)

+-------------+-----------------------+-----------+-------------------+--------------------+------------------+----------+
|epoch (ms)   |time (01:00)           |elapsed (s)|x-axis (g)         |y-axis (g)          |z-axis (g)        |id_persona|
+-------------+-----------------------+-----------+-------------------+--------------------+------------------+----------+
|1590567416332|2020-05-27T10:16:56.332|0.0        |-0.818             |-0.156              |0.5720000000000001|0         |
|1590567416337|2020-05-27T10:16:56.337|0.005      |-0.826             |-0.158              |0.5770000000000001|0         |
|1590567416342|2020-05-27T10:16:56.342|0.01 

###Alcuni funzioni di map e reduce con RDD, ma poi proseguiremo con i Dataframe Spark.

In [13]:
# https://stackoverflow.com/questions/29635776/can-i-convert-pandas-dataframe-to-spark-rdd/29640042
# Il metodo parallelize di SparkContext creare una nuova RDD a partire da una collection, qua usiamo di dataframe
# altrim dovevo trasformare pd dataframe in collection
sparkRDD = ds.rdd.map(list)
df_rdd = sparkRDD.collect()
print(type(df_rdd))
print(df_rdd[0:2])



<class 'list'>
[[1590567416332, '2020-05-27T10:16:56.332', 0.0, -0.818, -0.156, 0.5720000000000001, 0, 0], [1590567416337, '2020-05-27T10:16:56.337', 0.005, -0.826, -0.158, 0.5770000000000001, 0, 0]]


In [None]:
# sparkRDD.map(lambda x: (x[0], x[1]))\
#     .groupByKey()\
#     .mapValues(lambda vals: len(set(vals)))\
#     .sortByKey()\
#     .collect()

In [None]:
# ds2 = ds.rdd.map(lambda x: (x,1)).printSchema()

In [None]:
# type(ds2)
# ds2.printSchema()
# ds2.show(truncate=False)

####Funzioni di MAP/REDUCE con RDD

In [14]:
print(sparkRDD.count())
print(sparkRDD.first())
print(sparkRDD.take(3))

176160
[1590567416332, '2020-05-27T10:16:56.332', 0.0, -0.818, -0.156, 0.5720000000000001, 0, 0]
[[1590567416332, '2020-05-27T10:16:56.332', 0.0, -0.818, -0.156, 0.5720000000000001, 0, 0], [1590567416337, '2020-05-27T10:16:56.337', 0.005, -0.826, -0.158, 0.5770000000000001, 0, 0], [1590567416342, '2020-05-27T10:16:56.342', 0.01, -0.8240000000000001, -0.159, 0.5760000000000001, 0, 0]]


In [None]:
# def myfunc(x):
#     if(x==0 or x==1 or x==2):
#       return 'no_act'
#     else:
#       return 'act'
# sparkRDD2 = sparkRDD.map(lambda x: (x,myfunc(x[3])))
# print(sparkRDD2.first())
# print(sparkRDD2.count())
# print(sparkRDD.take(5))
# print(sparkRDD2.take(5))

([-0.818, -0.156, 0.5720000000000001, 0], 'no_act')
88080
[[-0.818, -0.156, 0.5720000000000001, 0], [-0.826, -0.158, 0.5770000000000001, 0], [-0.8240000000000001, -0.159, 0.5760000000000001, 0], [-0.825, -0.16, 0.57, 0], [-0.823, -0.157, 0.573, 0]]
[([-0.818, -0.156, 0.5720000000000001, 0], 'no_act'), ([-0.826, -0.158, 0.5770000000000001, 0], 'no_act'), ([-0.8240000000000001, -0.159, 0.5760000000000001, 0], 'no_act'), ([-0.825, -0.16, 0.57, 0], 'no_act'), ([-0.823, -0.157, 0.573, 0], 'no_act')]


In [None]:
# sparkRDD3 = sparkRDD2.reduceByKey(lambda x,y: x+y)

In [15]:
# divido in modo binario le attività in no_act (inpiedi, sdraiato, seduto) e act (cammina, sale scende)
def myfunc(x):
    if(x==0 or x==1 or x==2):
      return 'no_act'
    else:
      return 'act'
sparkRDD3 = sparkRDD.map(lambda x: (myfunc(x[6]), 1))
print(sparkRDD3.first())
print(sparkRDD3.count())
print(sparkRDD3.take(5))

('no_act', 1)
176160
[('no_act', 1), ('no_act', 1), ('no_act', 1), ('no_act', 1), ('no_act', 1)]


In [17]:
sparkRDD4 = sparkRDD3.reduceByKey(lambda x,y: x+y)

for element in sparkRDD4.collect():
    print("attività (key):", element[0], "\t somma (val):", element[1])

attività (key): act 	 somma (val): 92888
attività (key): no_act 	 somma (val): 83272


In [18]:
sparkRDD5 = sparkRDD.map(lambda x: (x[7], 1)) # map per id_persona
print(sparkRDD5.first())
print(sparkRDD5.count())
print(sparkRDD5.take(5))

(0, 1)
176160
[(0, 1), (0, 1), (0, 1), (0, 1), (0, 1)]


In [19]:
sparkRDD6 = sparkRDD5.reduceByKey(lambda x,y: x+y)

for element in sparkRDD6.collect():
    print("id_persona (key):", element[0], "\t somma (val):", element[1])

id_persona (key): 0 	 somma (val): 88080
id_persona (key): 1 	 somma (val): 88080


###Predizione e classificazione con DataFrame

<font color="red">✪ </font>Eseguire prima il codice per addestrare il modello di classificazione (Random Forest) e di creazione Dataframe Spark

devo rendere il dataset compatibile con quello usato nella creazione del modello...che non aveva tutti i campi..usiamo la select()

In [55]:
from pyspark.sql.functions import col
new_ds = ds
# new_ds = ds.filter(ds.label==0) # col('label') in luogo di new_ds.label 
new_ds.printSchema()
print(new_ds.count())

root
 |-- epoch (ms): long (nullable = true)
 |-- time (01:00): string (nullable = true)
 |-- elapsed (s): double (nullable = true)
 |-- x-axis (g): double (nullable = true)
 |-- y-axis (g): double (nullable = true)
 |-- z-axis (g): double (nullable = true)
 |-- id_persona: long (nullable = true)

176160


In [22]:
# Make predictions.
#predictions = model.transform(testData)
# testData.printSchema()

In [57]:
#Input all the features in one vector column
assembler = VectorAssembler(inputCols=['x-axis (g)', 'y-axis (g)', 'z-axis (g)'], outputCol = 'features')
output = assembler.transform(new_ds)
#Input vs Output
finalized_data = output.select("features", "epoch (ms)", "time (01:00)", "elapsed (s)", "id_persona") # manca id_persona..me lo sono perso
print("Assembled columns 'x-axis (g)', 'x-axis (g)', 'x-axis (g)' to vector column 'features'")
finalized_data.show(truncate=False)

#labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(finalized_data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
#featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(finalized_data)

predictions = model.transform(finalized_data)
predictions.show(truncate=False)



Assembled columns 'x-axis (g)', 'x-axis (g)', 'x-axis (g)' to vector column 'features'
+-----------------------------------------------+-------------+-----------------------+-----------+----------+
|features                                       |epoch (ms)   |time (01:00)           |elapsed (s)|id_persona|
+-----------------------------------------------+-------------+-----------------------+-----------+----------+
|[-0.818,-0.156,0.5720000000000001]             |1590567416332|2020-05-27T10:16:56.332|0.0        |0         |
|[-0.826,-0.158,0.5770000000000001]             |1590567416337|2020-05-27T10:16:56.337|0.005      |0         |
|[-0.8240000000000001,-0.159,0.5760000000000001]|1590567416342|2020-05-27T10:16:56.342|0.01       |0         |
|[-0.825,-0.16,0.57]                            |1590567416347|2020-05-27T10:16:56.347|0.015      |0         |
|[-0.823,-0.157,0.573]                          |1590567416352|2020-05-27T10:16:56.352|0.02       |0         |
|[-0.823,-0.16,0.573]    

In [68]:
predictions.show(truncate=False)

+-----------------------------------------------+-------------+-----------------------+-----------+----------+-----------------------------------------------+--------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+----------+--------------+
|features                                       |epoch (ms)   |time (01:00)           |elapsed (s)|id_persona|indexedFeatures                                |rawPrediction                                                                               |probability                                                                                   |prediction|predictedLabel|
+-----------------------------------------------+-------------+-----------------------+-----------+----------+-----------------------------------------------+--------------------------------------------------------------------------------------------+---

In [59]:
# projection = predictions.select("id_persona", "label", "predictedLabel" )
# projection.show(truncate=False)
predictions.select("id_persona", "predictedLabel" ).show(truncate=False)

+----------+--------------+
|id_persona|predictedLabel|
+----------+--------------+
|0         |0             |
|0         |0             |
|0         |0             |
|0         |0             |
|0         |0             |
|0         |0             |
|0         |0             |
|0         |0             |
|0         |0             |
|0         |0             |
|0         |0             |
|0         |0             |
|0         |0             |
|0         |0             |
|0         |0             |
|0         |0             |
|0         |0             |
|0         |0             |
|0         |0             |
|0         |0             |
+----------+--------------+
only showing top 20 rows



<font color="red">✪ </font>Stampa dei risultati per Persona e per Attività (classe)

In [74]:
# print("predict as class 0 for id_person 0: ", predictions.select("predictedLabel", "id_persona").where((col('predictedLabel')==0) & (col('id_persona')==0)).count())
# print("predict as class 1 for id_person 0: ", predictions.select("predictedLabel", "id_persona").where((col('predictedLabel')==1) & (col('id_persona')==0)).count())
# print("predict as class 2:", predictions.select("predictedLabel", "features").where(col('predictedLabel')==2).count())
# print("predict as class 3:", predictions.select("predictedLabel", "features").where(col('predictedLabel')==3).count())
# print("predict as class 4:", predictions.select("predictedLabel", "features").where(col('predictedLabel')==4).count())
# print("predict as class 5:", predictions.select("predictedLabel", "features").where(col('predictedLabel')==5).count())
# print("on total of :", new_ds.count())
num_persone = predictions.select("id_persona").distinct().count()
for j in range(num_persone):
  print("Predicted classes for id_person {}:".format(j))
  for i in range(6):  
    print("\t# samples predicted in class {}: ".format(i), predictions.select("predictedLabel", "id_persona").where((col('predictedLabel')==i) & (col('id_persona')==j)).count())
print("on total of :", new_ds.count())

Predicted classes for id_person 0:
	# samples predicted in class 0:  13784
	# samples predicted in class 1:  13568
	# samples predicted in class 2:  14523
	# samples predicted in class 3:  15724
	# samples predicted in class 4:  11934
	# samples predicted in class 5:  18547
Predicted classes for id_person 1:
	# samples predicted in class 0:  13521
	# samples predicted in class 1:  13544
	# samples predicted in class 2:  14528
	# samples predicted in class 3:  15871
	# samples predicted in class 4:  11937
	# samples predicted in class 5:  18679
on total of : 176160


In [70]:
i=0; j=1;
print("predict as class {} for id_person {}: ".format(i,j))

predict as class 0 for id_person 1: 


In [63]:
predictions.select("id_persona").distinct().count()

2

Modifico colonna time per poter raggruppare in un secondo

In [77]:
from pyspark.sql.functions import *

predictions = predictions.withColumn("time (01:00)", predictions['time (01:00)'].substr(0, 19))
predictions.show(truncate=False)


+-----------------------------------------------+-------------+-------------------+-----------+----------+-----------------------------------------------+--------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+----------+--------------+
|features                                       |epoch (ms)   |time (01:00)       |elapsed (s)|id_persona|indexedFeatures                                |rawPrediction                                                                               |probability                                                                                   |prediction|predictedLabel|
+-----------------------------------------------+-------------+-------------------+-----------+----------+-----------------------------------------------+--------------------------------------------------------------------------------------------+---------------

###Funzione di reduce e classificazione

In [94]:
# predictions.select("predictedLabel", "id_persona", "time (01:00)").where( (col('id_persona')==0)).orderBy(col('time (01:00)'))\
#   .groupBy(col('time (01:00)'),col('predictedLabel')).agg(count(lit(1)).alias("Num Of Records")).show(truncate=False)

df_sorted_per_second = predictions.select("predictedLabel", "id_persona", "time (01:00)")\
  .groupBy(col('id_persona'), col('time (01:00)'),col('predictedLabel')).agg(count(lit(1)).alias("Num Of Records")).orderBy(col('id_persona'),col('time (01:00)'))

# https://stackoverflow.com/questions/34249841/spark-dataframe-reducebykey-like-operation
# df.registerTempTable("df")
# sqlContext.sql("SELECT key, SUM(value) AS value FROM df GROUP BY key")

# https://sparkbyexamples.com/pyspark/pyspark-count-distinct-from-dataframe/
# df.createOrReplaceTempView("EMP")
# spark.sql("select distinct(count(*)) from EMP").show()

In [95]:
type(df_sorted_per_second)

pyspark.sql.dataframe.DataFrame

In [96]:
df_sorted_per_second.show(100, truncate=False)

+----------+-------------------+--------------+--------------+
|id_persona|time (01:00)       |predictedLabel|Num Of Records|
+----------+-------------------+--------------+--------------+
|0         |2020-05-27T10:16:56|0             |134           |
|0         |2020-05-27T10:16:57|0             |200           |
|0         |2020-05-27T10:16:58|3             |1             |
|0         |2020-05-27T10:16:58|0             |199           |
|0         |2020-05-27T10:16:59|0             |200           |
|0         |2020-05-27T10:17:00|5             |2             |
|0         |2020-05-27T10:17:00|3             |1             |
|0         |2020-05-27T10:17:00|0             |197           |
|0         |2020-05-27T10:17:01|5             |3             |
|0         |2020-05-27T10:17:01|0             |195           |
|0         |2020-05-27T10:17:01|3             |2             |
|0         |2020-05-27T10:17:02|0             |197           |
|0         |2020-05-27T10:17:02|3             |3       

**Il prossimo step e' classificare ciascun secondo in base alla classe che ha il maggior numero di occorrenze nel secondo stesso...**


