![Spark](http://spark.apache.org/docs/latest/img/spark-logo-hd.png)

<img src="http://hortonworks.com/wp-content/themes/hortonworks/images/svg/ui_logo.svg" width="240"/>

## Utilisation de Spark sur le cluster

Ce notebook est un exemple d'application d'une random forest sur le jeu de données nc13.

### Utilisation Jupyter

In [None]:
1+2

Ceci est de la doc

### Configuration de Python

In [None]:
# à utiliser pour stopper le contexte Spark
sc.stop()

Configuration du PYTHON_PATH :

In [1]:
import os
import sys
spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.9-src.zip'))

Configuration du client Spark, nous utilisons le mode yarn-client :
![Cluster Overview](http://spark.apache.org/docs/latest/img/cluster-overview.png)

In [2]:
import os
import pyspark

conf = pyspark.SparkConf()
conf.setMaster('yarn-client')

sc = pyspark.SparkContext(conf=conf)

In [None]:
sc.master

### Spark sur YARN

![HDP platform](HDP_architecture.png)

### Chargement des données

Les données ont été chargées sur HDFS dans le répertoire /user/geraud/shazam. Ici, nous allons utiliser les fichiers convertis au format Parquet (dans le répertoire /user/geraud/shazam/parquet).

In [4]:
from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as F

On initialise un SQLContext, qui va nous permettre de charger les données sous forme de Data.Frame.
Un DataFrame est une représentation tabulaire des données (à la Excel): https://spark.apache.org/docs/1.6.2/sql-programming-guide.html

In [5]:
sqlContext = SQLContext(sc)

In [6]:
mfcc = sqlContext.read.parquet('/user/geraud/shazam/parquet/nc13-train.parquet')
mfcc.show(5)

+-------+--------------+-------------+---------------+-------------+-------------+--------------+-------------+-------------+--------------+---------------+---------------+-------------+--------------+-------+--------+
|idSound|       bands_1|      bands_2|        bands_3|      bands_4|      bands_5|       bands_6|      bands_7|      bands_8|       bands_9|       bands_10|       bands_11|     bands_12|      bands_13|mfccSeq|   class|
+-------+--------------+-------------+---------------+-------------+-------------+--------------+-------------+-------------+--------------+---------------+---------------+-------------+--------------+-------+--------+
|    274|-901.902099609| 163.83505249| -11.8038024902|19.7777748108|19.9015960693|-4.03476715088|18.4968910217|12.1045322418|-1.85980224609|-0.522010803223|  1.58743667603|2.35488128662|-9.13202285767|      1|dog_bark|
|    274|-884.952514648|154.302032471| -24.4797782898|35.2823219299|8.37690734863|-21.6676330566|14.3066902161| 2.4754486084

In [7]:
mfcc.describe().toPandas()

Unnamed: 0,summary,idSound,bands_1,bands_2,bands_3,bands_4,bands_5,bands_6,bands_7,bands_8,bands_9,bands_10,bands_11,bands_12,bands_13,mfccSeq
0,count,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0,3388063.0
1,mean,625.994764855317,-797.7422305973884,130.5292495000719,-8.684178805994717,9.93409146380984,-2.213174698431618,2.79919745596125,0.2200321438486301,1.4286656845608057,-0.5015752394078973,0.7480491144689811,-0.911060909739898,0.2566778770198374,-0.5979051189166785,4830.286625130642
2,stddev,378.89575690717896,133.466217254147,53.08886778297777,40.18542865854186,26.5668552706278,20.88320375826346,18.63478751721859,16.730487031561562,14.2524646400452,13.331844923590172,12.479857078817725,11.4122629628578,11.01843263226996,10.718384230849887,4959.522103508154
3,min,1.0,-1138.42004395,-187.382949829,-245.321746826,-159.592193604,-156.033447266,-117.476898193,-116.255081177,-93.6179351807,-94.7189712524,-106.444740295,-77.8643798828,-86.0950698853,-102.984832764,1.0
4,max,1301.0,-303.506225586,327.894744873,159.329299927,163.017944336,125.203155518,111.745773315,102.688072205,105.055770874,91.7920837402,91.9693603516,91.0449066162,91.9013214111,86.9623718262,26210.0


### Opérations sur des DataFrames

In [8]:
mfcc.filter(mfcc['class'] == 'dog_bark').select('idSound', 'bands_1', 'class').show(5)

+-------+--------------+--------+
|idSound|       bands_1|   class|
+-------+--------------+--------+
|    274|-901.902099609|dog_bark|
|    274|-884.952514648|dog_bark|
|    274|-910.679870605|dog_bark|
|    274|-930.696655273|dog_bark|
|    274|-932.699462891|dog_bark|
+-------+--------------+--------+
only showing top 5 rows



In [9]:
mfcc.groupBy('idSound').mean('bands_1').toPandas()

Unnamed: 0,idSound,avg(bands_1)
0,1231,-681.757932
1,431,-746.012543
2,831,-815.922051
3,631,-790.621137
4,31,-765.044912
5,1232,-970.228533
6,632,-588.378465
7,232,-676.033586
8,832,-879.992056
9,1032,-742.044601


En mode SQL:

In [10]:
sqlContext.registerDataFrameAsTable(mfcc, 'mfcc')
sqlContext.tableNames()

[u'mfcc']

In [11]:
sqlContext.sql("SELECT idSound, AVG(bands_1), class FROM mfcc GROUP BY idSound, class").toPandas()

Unnamed: 0,idSound,_c1,class
0,147,-918.925428,dog_bark
1,747,-852.186924,dog_bark
2,520,-940.724946,dog_bark
3,347,-785.126516,engine_idling
4,1134,-760.566206,air_conditionner
5,796,-790.469515,drilling
6,1293,-862.802072,dog_bark
7,493,-704.569930,dog_bark
8,135,-933.997742,gun_shot
9,534,-670.566744,car_horn


#### Principes

Spark permet d'effectuer 2 types d'opérations sur les DataFrames: les *transformations*  et les *actions*.

* Exemples de transformations : map, flatMap, filter...
* Exemples d'actions : collect, show...

Les transformations ne sont exécutées qu'au moment de l'appel à une action.


### Construction du pipeline de Machine Learning

Nous allons mettre en oeuvre une succession de transformations au jeu de données:

* indexation des labels (la mllib de Spark ne fonctionne que sur des nombres)
* regroupement des features: les colones bands_* vont être regroupées dans une colone _features_
* calcul du modèle

La doc SparkML est là: https://spark.apache.org/docs/1.6.2/ml-guide.html

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

In [None]:
# comme d'habitude, on met une partie des données de côté
(trainingData, testData) = mfcc.randomSplit([0.7, 0.3])

labelIndexer = StringIndexer(inputCol='class', outputCol='classIndex').fit(mfcc)

assembler = VectorAssembler(
    inputCols=['bands_1', 'bands_2', 'bands_3', 'bands_4', 'bands_5', 'bands_6',
               'bands_7', 'bands_8', 'bands_9', 'bands_10', 'bands_11', 'bands_12', 'bands_13'],
    outputCol='features')

rf = RandomForestClassifier(labelCol="classIndex", featuresCol="features")

pipeline = Pipeline(stages=[labelIndexer, assembler, rf])

model = pipeline.fit(trainingData)

In [None]:
model.stages

Pendant que ça tourne, on peut vérifier l'avancement du job sur http://nn1.hdp3.bsa.broadsoftware.com:8088/cluster/apps

### Faire des prédictions

In [None]:
rawPredictions = model.transform(testData)
rawPredictions.limit(5).toPandas()

In [None]:
converter = IndexToString(inputCol="prediction", outputCol="classPrediction", labels = labelIndexer.labels)
predictions = converter.transform(rawPredictions)
predictions.select("idSound", "prediction", "classPrediction", "classIndex", "class", "features").show(5)

In [None]:
predictions.limit(1).toPandas()

### Évaluation du modèle

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="classIndex", predictionCol="prediction", metricName="precision")

accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

## Soumettre pour le challenge

In [None]:
def most_frequent(x, y):
    if(y[1] > x[1]):
        return y
    else:
        return x

Il va falloir regrouper les MFCC par idSound, donc un peu de debug ci-dessous sur le jeu de test:

In [None]:
predictions[predictions.idSound == 4].groupBy(['idSound', 'classPrediction']) \
           .count() \
           .collect()

In [None]:
predictions[predictions.idSound == 4].groupBy(['idSound', 'classPrediction']) \
    .count() \
    .map(lambda row: (row[0], (row[1], row[2]))) \
    .reduceByKey(most_frequent) \
    .collect()

    
# .agg(lambda x:x.value_counts().index[0]) 

In [None]:
groupedByIdSoundPredictions = predictions.groupBy(['idSound', 'classPrediction']) \
    .count() \
    .map(lambda row: (row[0], (row[1], row[2]))) \
    .reduceByKey(most_frequent) \
    .collect()

In [None]:
groupedByIdSoundPredictions[:10]

On prédit maintenant sur le jeu de validation:

In [None]:
validation = sqlContext.read.parquet('/user/geraud/shazam/parquet/nc13-validation.parquet')

In [None]:
validation.select(validation.columns[:15]).limit(5).toPandas()

In [None]:
rawPredictions = model.transform(validation.select(validation.columns[:15]))

predictions = converter.transform(rawPredictions)
predictions.select("idSound", "prediction", "classPrediction", "features").show(5)

In [None]:
groupedByIdSoundPredictions = predictions.groupBy(['idSound', 'classPrediction']) \
    .count() \
    .map(lambda row: (row[0], (row[1], row[2]))) \
    .reduceByKey(most_frequent) \
    .map(lambda row: (row[0], row[1][0])) \
    .collect()

In [None]:
groupedByIdSoundPredictions[:10]

In [None]:
import pandas as pd
submissionDf = pd.DataFrame(groupedByIdSoundPredictions, columns = ['idSound', 'class'])
    

In [None]:
submissionDf[:10]

In [None]:
submissionDf.to_csv('my-submission.csv', index = False)