![Spark](http://spark.apache.org/docs/latest/img/spark-logo-hd.png)

<img src="http://hortonworks.com/wp-content/themes/hortonworks/images/svg/ui_logo.svg" width="240"/>

## Utilisation de Spark sur le cluster

Ce notebook est un exemple d'application d'une random forest sur le jeu de données nc13.

### Utilisation Jupyter

In [None]:
1+2

Ceci est de la doc

### Configuration de Python

In [None]:
# à utiliser pour stopper le contexte Spark
sc.stop()

Configuration du PYTHON_PATH :

In [1]:
import os
import sys
spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.9-src.zip'))

Configuration du client Spark, nous utilisons le mode yarn-client :
![Cluster Overview](http://spark.apache.org/docs/latest/img/cluster-overview.png)

In [2]:
import os
import pyspark

conf = pyspark.SparkConf()
conf.setMaster('yarn-client')

sc = pyspark.SparkContext(conf=conf)

In [3]:
sc.master

u'yarn-client'

### Spark sur YARN

![HDP platform](HDP_architecture.png)

### Chargement des données

Les données ont été chargées sur HDFS dans le répertoire /user/geraud/shazam. Ici, nous allons utiliser les fichiers convertis au format Parquet (dans le répertoire /user/geraud/shazam/parquet).

In [4]:
from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as F

On initialise un SQLContext, qui va nous permettre de charger les données sous forme de Data.Frame.
Un DataFrame est une représentation tabulaire des données (à la Excel): https://spark.apache.org/docs/1.6.2/sql-programming-guide.html

In [5]:
sqlContext = SQLContext(sc)

In [6]:
mfcc = sqlContext.read.parquet('/user/geraud/shazam/parquet/nc13-train.parquet')
mfcc.show(5)

+-------+--------------+-------------+---------------+-------------+-------------+--------------+-------------+-------------+--------------+---------------+---------------+-------------+--------------+-------+--------+
|idSound|       bands_1|      bands_2|        bands_3|      bands_4|      bands_5|       bands_6|      bands_7|      bands_8|       bands_9|       bands_10|       bands_11|     bands_12|      bands_13|mfccSeq|   class|
+-------+--------------+-------------+---------------+-------------+-------------+--------------+-------------+-------------+--------------+---------------+---------------+-------------+--------------+-------+--------+
|    274|-901.902099609| 163.83505249| -11.8038024902|19.7777748108|19.9015960693|-4.03476715088|18.4968910217|12.1045322418|-1.85980224609|-0.522010803223|  1.58743667603|2.35488128662|-9.13202285767|      1|dog_bark|
|    274|-884.952514648|154.302032471| -24.4797782898|35.2823219299|8.37690734863|-21.6676330566|14.3066902161| 2.4754486084

In [7]:
mfcc.describe().toPandas()

Unnamed: 0,summary,idSound,bands_1,bands_2,bands_3,bands_4,bands_5,bands_6,bands_7,bands_8,bands_9,bands_10,bands_11,bands_12,bands_13,mfccSeq
0,count,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0
1,mean,625.994764855317,-797.7422305973885,130.52924950007187,-8.684178805994717,9.93409146380984,-2.213174698431618,2.79919745596125,0.2200321438486301,1.4286656845608057,-0.5015752394078973,0.7480491144689811,-0.911060909739898,0.2566778770198374,-0.5979051189166785,4830.286625130642
2,stddev,378.89575690717896,133.46621725414698,53.088867782977786,40.18542865854186,26.5668552706278,20.88320375826346,18.63478751721859,16.730487031561566,14.2524646400452,13.331844923590172,12.479857078817725,11.4122629628578,11.018432632269958,10.718384230849887,4959.522103508154
3,min,1.0,-1138.42004395,-187.382949829,-245.321746826,-159.592193604,-156.033447266,-117.476898193,-116.255081177,-93.6179351807,-94.7189712524,-106.444740295,-77.8643798828,-86.0950698853,-102.984832764,1.0
4,max,1301.0,-303.506225586,327.894744873,159.329299927,163.017944336,125.203155518,111.745773315,102.688072205,105.055770874,91.7920837402,91.9693603516,91.0449066162,91.9013214111,86.9623718262,26210.0


### Opérations sur des DataFrames

In [None]:
mfcc.filter(mfcc['class'] == 'dog_bark').select('idSound', 'bands_1', 'class').show(5)

In [None]:
mfcc.groupBy('idSound').mean('bands_1').toPandas()

En mode SQL:

In [8]:
sqlContext.registerDataFrameAsTable(mfcc, 'mfcc')
sqlContext.tableNames()

[u'mfcc']

In [None]:
sqlContext.sql("SELECT idSound, AVG(bands_1), class FROM mfcc GROUP BY idSound, class").toPandas()

#### Principes

Spark permet d'effectuer 2 types d'opérations sur les DataFrames: les *transformations*  et les *actions*.

* Exemples de transformations : map, flatMap, filter...
* Exemples d'actions : collect, show...

Les transformations ne sont exécutées qu'au moment de l'appel à une action.


In [9]:
%matplotlib inline

import matplotlib
import numpy as np
import matplotlib.pyplot as plt

In [None]:
# number of bins
n = 30

In [None]:
histo_1 = mfcc.select("bands_1").rdd.flatMap(lambda x: x).histogram(n)

In [None]:
width = histo_1[0][1] - histo_1[0][0]

In [None]:
plt.bar(histo_1[0][:n], histo_1[1], width)

In [None]:
featuresNames = [ column for column in mfcc.columns if 'bands_' in column ]
#featuresNames = [ 'bands_1', 'bands_2']
histograms = dict()
for feature in featuresNames:
    print("Computing histogram for %s" % feature)
    histograms[feature] = mfcc.select(feature).rdd.flatMap(lambda x: x).histogram(n)

In [None]:
color = 0
for feature in featuresNames:
    histo = histograms[feature]
    width = histo[0][1] - histo[0][0]
    plt.bar(histo[0][:n], histo[1], width, log = True, color = plt.cm.autumn(color))
    color = (color + 50) % 256
    plt.title('Distribution for %s' % feature)
    plt.show()

In [None]:
color = 0
for feature in featuresNames:
    histo = histograms[feature]
    width = histo[0][1] - histo[0][0]
    plt.bar(histo[0][:n], histo[1], width, log = False, color = plt.cm.autumn(color))
    color = (color + 50) % 256
    plt.title('Distribution for %s' % feature)
    plt.show()

### Ajout de features

In [None]:
# building aggregations list
aggregate = ['bands_1', 'bands_2', 'bands_3', 'bands_4', 'bands_5', 'bands_6',
               'bands_7', 'bands_8', 'bands_9', 'bands_10', 'bands_11', 'bands_12', 'bands_13']
funs = [ F.avg, F.min, F.max ]
exprs = [f(F.col(c)) for f in funs for c in aggregate]
exprs

In [None]:
mfcc_agg = mfcc.groupBy('idSound', 'class').agg(*exprs)
mfcc_agg.show(5)

In [None]:
mfcc_agg.cache()

In [10]:
predictors = [column for column in mfcc.columns if 'bands_' in column ] 
predictors

['bands_1',
 'bands_2',
 'bands_3',
 'bands_4',
 'bands_5',
 'bands_6',
 'bands_7',
 'bands_8',
 'bands_9',
 'bands_10',
 'bands_11',
 'bands_12',
 'bands_13']

### Construction du pipeline de Machine Learning

Nous allons mettre en oeuvre une succession de transformations au jeu de données:

* indexation des labels (la mllib de Spark ne fonctionne que sur des nombres)
* regroupement des features: les colones bands_* vont être regroupées dans une colone _features_
* calcul du modèle

La doc SparkML est là: https://spark.apache.org/docs/1.6.2/ml-guide.html

In [11]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml.feature import Normalizer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

In [None]:
# comme d'habitude, on met une partie des données de côté
(trainingData, testData) = mfcc_agg.randomSplit([0.8, 0.2], seed = 1234)

labelIndexer = StringIndexer(inputCol='class', outputCol='classIndex').fit(mfcc_agg)

assembler = VectorAssembler(
    inputCols=predictors,
    outputCol='features')

normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)

rf = RandomForestClassifier(labelCol="classIndex", featuresCol="normFeatures", seed = 1234)

pipeline = Pipeline(stages=[labelIndexer, assembler, normalizer, rf])

model = pipeline.fit(trainingData)

In [None]:
model.stages

Pendant que ça tourne, on peut vérifier l'avancement du job sur http://nn1.hdp3.bsa.broadsoftware.com:8088/cluster/apps

### Faire des prédictions

In [None]:
rawPredictions = model.transform(testData)
rawPredictions.limit(5).toPandas()

In [None]:
converter = IndexToString(inputCol="prediction", outputCol="classPrediction", labels = labelIndexer.labels)
predictions = converter.transform(rawPredictions)
predictions.select("idSound", "prediction", "classPrediction", "classIndex", "class", "features").show(5)

In [None]:
predictions.limit(1).toPandas()

### Évaluation du modèle

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="classIndex", predictionCol="prediction", metricName="precision")

accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

In [None]:
full_model = pipeline.fit(mfcc_agg)

In [None]:
validation = sqlContext.read.parquet('/user/geraud/shazam/parquet/nc13-validation.parquet')
validation_agg = validation.groupBy('idSound', 'class').agg(*exprs)
rawPredictions = full_model.transform(validation_agg.select('idSound', *predictors))

predictions = converter.transform(rawPredictions).select('idSound', 'classPrediction').collect()

In [None]:
predictions[1:10]

In [None]:
import pandas as pd
submissionDf = pd.DataFrame(predictions, columns = ['idSound', 'class'])
submissionDf[:10]

In [None]:
submissionDf.to_csv('my-submission.csv', index = False)

# Random Forest par mfcc

## Split par IdSound

In [12]:
mfcc.rdd.getNumPartitions()

6

In [13]:
idSounds = mfcc.select(mfcc.idSound, 'class').distinct()
idSounds.take(5)

# comme d'habitude, on met une partie des données de côté
(trainingIdSound, testIdSound) = idSounds.randomSplit([0.8, 0.2], seed = 1234)
print(trainingIdSound.count())
print(testIdSound.count())


852
190


In [14]:
trainingData = trainingIdSound.join(mfcc, mfcc.idSound == trainingIdSound.idSound, 'inner') \
    .drop(trainingIdSound.idSound) \
    .drop(trainingIdSound['class'])

trainingData.printSchema()
trainingData.persist()
trainingData.agg(F.approxCountDistinct(trainingData.idSound).alias('c')).show()

testingData = testIdSound.join(mfcc, mfcc.idSound == testIdSound.idSound) \
    .drop(testIdSound.idSound) \
    .drop(testIdSound['class'])

testingData.printSchema()
testingData.persist()
testingData.agg(F.approxCountDistinct(testingData.idSound).alias('c')).show()

root
 |-- idSound: long (nullable = true)
 |-- bands_1: double (nullable = true)
 |-- bands_2: double (nullable = true)
 |-- bands_3: double (nullable = true)
 |-- bands_4: double (nullable = true)
 |-- bands_5: double (nullable = true)
 |-- bands_6: double (nullable = true)
 |-- bands_7: double (nullable = true)
 |-- bands_8: double (nullable = true)
 |-- bands_9: double (nullable = true)
 |-- bands_10: double (nullable = true)
 |-- bands_11: double (nullable = true)
 |-- bands_12: double (nullable = true)
 |-- bands_13: double (nullable = true)
 |-- mfccSeq: long (nullable = true)
 |-- class: string (nullable = true)

+---+
|  c|
+---+
|863|
+---+

root
 |-- idSound: long (nullable = true)
 |-- bands_1: double (nullable = true)
 |-- bands_2: double (nullable = true)
 |-- bands_3: double (nullable = true)
 |-- bands_4: double (nullable = true)
 |-- bands_5: double (nullable = true)
 |-- bands_6: double (nullable = true)
 |-- bands_7: double (nullable = true)
 |-- bands_8: double (null

In [15]:
labelIndexer = StringIndexer(inputCol='class', outputCol='classIndex').fit(mfcc)

assembler = VectorAssembler(
    inputCols=predictors,
    outputCol='features')

normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)

rf = RandomForestClassifier(labelCol="classIndex", featuresCol="normFeatures", seed = 1234)

pipeline = Pipeline(stages=[labelIndexer, assembler, normalizer, rf])

model = pipeline.fit(trainingData)

In [16]:
# validation
rawPredictions = model.transform(testingData)
rawPredictions.limit(5).toPandas()
converter = IndexToString(inputCol="prediction", outputCol="classPrediction", labels = labelIndexer.labels)
predictions = converter.transform(rawPredictions)
predictions.select("idSound", "prediction", "classPrediction", "classIndex", "class", "features").show(5)

+-------+----------+---------------+----------+------------+--------------------+
|idSound|prediction|classPrediction|classIndex|       class|            features|
+-------+----------+---------------+----------+------------+--------------------+
|     32|       2.0|       dog_bark|       0.0|street_music|[-1136.52746582,2...|
|     32|       2.0|       dog_bark|       0.0|street_music|[-1094.26013184,4...|
|     32|       2.0|       dog_bark|       0.0|street_music|[-1068.75415039,7...|
|     32|       2.0|       dog_bark|       0.0|street_music|[-1046.7611084,96...|
|     32|       2.0|       dog_bark|       0.0|street_music|[-997.95880127,80...|
+-------+----------+---------------+----------+------------+--------------------+
only showing top 5 rows



In [18]:
def most_frequent(x, y):
    if(y[1] > x[1]):
        return y
    else:
        return x

In [19]:
groupedByIdSoundPredictions = predictions.groupBy(['idSound', 'classPrediction']) \
    .count() \
    .map(lambda row: (row[0], (row[1], row[2]))) \
    .reduceByKey(most_frequent) \
    .map(lambda row: (row[0], (row[1][0]))) \
    .collect()

In [None]:
pd.Categorical(groupedByIdSoundPredictions['class'])

In [21]:
import numpy as np
import pandas as pd

groupedByIdSoundPredictionsDf = pd.DataFrame(groupedByIdSoundPredictions, columns = ['idSound', 'class'])
groupedByIdSoundPredictionsDf['class'] = groupedByIdSoundPredictionsDf['class'].astype('category')
groupedByIdSoundPredictionsDf[:10]
groupedByIdSoundPredictionsDf.dtypes

idSound       int64
class      category
dtype: object

In [22]:
testIdSoundDf = testIdSound.toPandas()
testIdSoundDf['class'] = testIdSoundDf['class'].astype('category')
testIdSoundDf[:10]

Unnamed: 0,idSound,class
0,147,dog_bark
1,747,dog_bark
2,347,engine_idling
3,239,dog_bark
4,1066,engine_idling
5,298,children_playing
6,812,engine_idling
7,1173,gun_shot
8,1104,engine_idling
9,650,engine_idling


In [None]:
testIdSoundDf.sort('idSound')

In [None]:
groupedByIdSoundPredictionsDf.sort('idSound')

In [42]:
y_true = testIdSoundDf.sort_values('idSound')['class'].astype('category').rename('obs')
y_pred = groupedByIdSoundPredictionsDf.sort_values('idSound')['class'].astype('category').rename('pred')

In [43]:
y_true[:10]

51             drilling
67         street_music
19             car_horn
126            gun_shot
87             drilling
63             dog_bark
96         street_music
84             car_horn
68     children_playing
32        engine_idling
Name: obs, dtype: category
Categories (10, object): [air_conditionner, car_horn, children_playing, dog_bark, ..., gun_shot, jackhammer, siren, street_music]

In [44]:
y_pred[:10]

4         dog_bark
5     street_music
6     street_music
9         dog_bark
15        dog_bark
32    street_music
33    street_music
37        dog_bark
39        dog_bark
44    street_music
Name: pred, dtype: category
Categories (5, object): [air_conditionner, children_playing, dog_bark, engine_idling, street_music]

In [46]:
y_pred[y_pred == 'class']

Series([], Name: pred, dtype: category
Categories (5, object): [air_conditionner, children_playing, dog_bark, engine_idling, street_music])

In [47]:
y_true[y_true == 'class']

Series([], Name: obs, dtype: category
Categories (10, object): [air_conditionner, car_horn, children_playing, dog_bark, ..., gun_shot, jackhammer, siren, street_music])

In [56]:
pd.crosstab(y_true, y_pred, margins=True)

pred,air_conditionner,children_playing,dog_bark,engine_idling,street_music,All
obs,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
air_conditionner,1,0,0,0,2,3
car_horn,0,5,8,1,8,22
children_playing,0,1,16,0,5,22
dog_bark,1,4,20,0,23,48
drilling,0,2,5,0,8,15
engine_idling,0,3,11,0,6,20
gun_shot,0,3,5,0,5,13
jackhammer,0,1,2,0,1,4
siren,0,1,5,0,7,13
street_music,1,5,9,2,13,30


In [60]:
from sklearn.metrics import classification_report, f1_score
report = classification_report(y_true, y_pred)
print(report)
score = f1_score(y_true, y_pred)
print('F1 score is %.02f' % score)

             precision    recall  f1-score   support

air_conditionner       0.00      0.00      0.00         3
   car_horn       0.00      0.00      0.00        22
children_playing       0.20      0.23      0.21        22
   dog_bark       0.42      0.71      0.53        48
   drilling       0.00      0.00      0.00        15
engine_idling       1.00      0.15      0.26        20
   gun_shot       0.00      0.00      0.00        13
 jackhammer       0.00      0.00      0.00         4
      siren       0.00      0.00      0.00        13
street_music       0.36      0.93      0.52        30

avg / total       0.29      0.37      0.27       190

F1 score is 0.27


  sample_weight=sample_weight)


## Soumettre pour le challenge

On prédit maintenant sur le jeu de validation:

In [62]:
validation = sqlContext.read.parquet('/user/geraud/shazam/parquet/nc13-validation.parquet')

In [63]:
validation.select(validation.columns[:15]).limit(5).toPandas()

Unnamed: 0,idSound,bands_1,bands_2,bands_3,bands_4,bands_5,bands_6,bands_7,bands_8,bands_9,bands_10,bands_11,bands_12,bands_13,mfccSeq
0,1130,-1057.423462,87.011238,28.748173,-17.006512,-30.835201,-19.854256,0.493351,16.339136,18.646038,6.955988,-7.209547,-9.019821,0.059845,1
1,1130,-1043.122681,88.584984,40.072498,-21.283649,-24.326508,-20.599548,6.73312,23.518503,19.811245,5.924215,-20.479956,-9.84761,-14.712553,2
2,1130,-1083.237061,53.288303,34.889881,3.27261,-7.783813,-4.266098,-3.541752,6.613411,1.26812,6.998857,6.348866,5.566071,3.963509,3
3,1130,-1060.809326,71.071884,38.297337,3.900204,-7.461784,0.368042,-3.58218,-0.843277,-9.392647,5.319008,8.935528,6.912689,1.413979,4
4,1130,-1076.274902,72.305397,44.818089,17.522175,0.121017,-2.276543,-3.874554,1.093788,4.462513,3.175598,4.348476,2.092545,3.292015,5


In [64]:
rawPredictions = model.transform(validation.select(validation.columns[:15]))

predictions = converter.transform(rawPredictions)
predictions.select("idSound", "prediction", "classPrediction", "features").show(5)

+-------+----------+---------------+--------------------+
|idSound|prediction|classPrediction|            features|
+-------+----------+---------------+--------------------+
|   1130|       2.0|       dog_bark|[-1057.42346191,8...|
|   1130|       2.0|       dog_bark|[-1043.12268066,8...|
|   1130|       2.0|       dog_bark|[-1083.23706055,5...|
|   1130|       2.0|       dog_bark|[-1060.80932617,7...|
|   1130|       2.0|       dog_bark|[-1076.27490234,7...|
+-------+----------+---------------+--------------------+
only showing top 5 rows



In [65]:
groupedByIdSoundPredictions = predictions.groupBy(['idSound', 'classPrediction']) \
    .count() \
    .map(lambda row: (row[0], (row[1], row[2]))) \
    .reduceByKey(most_frequent) \
    .map(lambda row: (row[0], row[1][0])) \
    .collect()

In [66]:
groupedByIdSoundPredictions[:10]

[(0, u'street_music'),
 (401, u'children_playing'),
 (803, u'street_music'),
 (404, u'street_music'),
 (1204, u'street_music'),
 (1205, u'dog_bark'),
 (1005, u'children_playing'),
 (805, u'dog_bark'),
 (405, u'street_music'),
 (407, u'street_music')]

In [67]:
import pandas as pd
submissionDf = pd.DataFrame(groupedByIdSoundPredictions, columns = ['idSound', 'class'])
    

In [68]:
submissionDf[:10]

Unnamed: 0,idSound,class
0,0,street_music
1,401,children_playing
2,803,street_music
3,404,street_music
4,1204,street_music
5,1205,dog_bark
6,1005,children_playing
7,805,dog_bark
8,405,street_music
9,407,street_music


In [69]:
submissionDf.to_csv('my-submission.csv', index = False)