In [1]:
import findspark

In [2]:
findspark.init('/home/huy/spark-2.2.3-bin-hadoop2.7')

In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

# définir un SparkContext en local. Permet de travailler avec les RDD
sc = SparkContext.getOrCreate()

# Construire une session Spark
# SparkSession est une couche supérieure à SparkContext
spark = SparkSession.builder.appName('Spark Context').getOrCreate()

spark

In [4]:
df_raw = spark.read.csv("YearPredictionMSD.txt")

# toutes les colonnes sont du type string
df_raw.sample(False, .00001, seed = 222).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,...,_c81,_c82,_c83,_c84,_c85,_c86,_c87,_c88,_c89,_c90
0,2007,49.83538,2.61013,17.00197,-9.50786,-23.94269,-14.72739,4.47792,4.06217,4.78611,...,10.59593,81.25012,33.10853,-7.46066,7.6057,91.36251,-66.34214,9.6222,119.81564,2.81304
1,2009,45.78269,34.74613,-8.52873,11.44336,-18.00745,-29.79324,11.83189,-5.26578,0.34463,...,8.23981,-120.6769,-56.06567,17.67264,8.23461,-46.53563,39.4015,9.44279,-24.93795,-8.67221
2,2007,43.61806,-12.13404,-21.41169,0.17262,23.94724,3.72902,13.61887,-3.44822,-2.09872,...,20.43382,-74.56965,59.65242,131.46903,6.0751,34.06221,-162.45943,2.73542,66.37659,-25.99604
3,2006,46.79964,54.54515,11.05619,2.00775,11.8697,-10.15977,6.25193,5.34281,1.45345,...,7.2679,-203.19105,84.5101,-73.76258,35.57993,59.14017,3.72889,-8.86871,45.4788,-11.222
4,2006,52.00132,35.66485,34.37876,1.69847,-34.0965,-19.7641,13.33325,5.54795,6.81785,...,-17.1087,-121.60406,-31.96296,-40.16986,-5.51689,15.41615,-78.71597,2.63836,-8.23705,0.84104


https://spark.apache.org/docs/latest/ml-tuning

Tuning is using data to find the best model or parameters for a given task.

Tuning may be done for 
- individual Estimators such as LogisticRegression, 
- or for entire Pipelines which include multiple algorithms, featurization, and other steps.

Users can tune an entire Pipeline at once, rather than tuning each element in the Pipeline separately.

MLlib supports model selection using tools such as CrossValidator and TrainValidationSplit. These tools require the following items:

- Estimator: algorithm or Pipeline to tune
- Set of ParamMaps: parameters to choose from, sometimes called a “parameter grid” to search over
- Evaluator: metric to measure how well a fitted Model does on held-out test data

At a high level, these model selection tools work as follows:

1. They split the input data into separate training and test datasets.
2. For each (training, test) pair, they iterate through the set of ParamMaps:
3. For each ParamMap, they fit the Estimator using those parameters, get the fitted Model, and evaluate the Model’s performance using the Evaluator.
4. They select the Model produced by the best-performing set of parameters.

3

The Evaluator can be
a RegressionEvaluator for regression problems,
a BinaryClassificationEvaluator for binary data,
a MulticlassClassificationEvaluator for multiclass problems,
a MultilabelClassificationEvaluator for multi-label classifications,
or a RankingEvaluator for ranking problems.

The default metric used to choose the best ParamMap can be overridden by the setMetricName method in each of these evaluators.

To help construct the **parameter grid**, users can use the ParamGridBuilder utility. 

Le model tuning permet de régler les paramètres d'un modèle pour éviter le overfitting.

Méthode la plus utilisée cross-validation

1 Choix du modèle à optimiser (model selection)

2 Création d'une grille de paramètres (parameter grid)

3 Choix d'une métrique d'évaluation (evaluator metric)

4 Mise en place d'un crossValidator


Objectif: régler les paramètres du modèle pour améliorer sa robustesse.

Robutesse: être efficace face à une nouvelle base de données. Le contraire de overfitting.

### Mettre le jeu de données en format svmlib

In [5]:
from pyspark.sql.functions import col

# pour ttes les colonnes de 1 à 13, cast en "double"
exprs = [col(c).cast("double") for c in df_raw.columns[1:13]]

# la première colonne est l'année et est en int
df_casted = df_raw.select(df_raw._c0.cast("int"), *exprs)

df = df_casted.sample(False, .1, seed = 222)

df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: double (nullable = true)
 |-- _c2: double (nullable = true)
 |-- _c3: double (nullable = true)
 |-- _c4: double (nullable = true)
 |-- _c5: double (nullable = true)
 |-- _c6: double (nullable = true)
 |-- _c7: double (nullable = true)
 |-- _c8: double (nullable = true)
 |-- _c9: double (nullable = true)
 |-- _c10: double (nullable = true)
 |-- _c11: double (nullable = true)
 |-- _c12: double (nullable = true)



In [6]:
from pyspark.ml.linalg import DenseVector

# créer un rdd à partir de la df en séparant la colonne à prédire des autres variables 
rdd_ml = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# mettre le rdd sous formt df en nommant les deux colonnes
df_ml = spark.createDataFrame(rdd_ml, ['label', 'features'])

df_ml.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
| 2005|[42.15927,-36.860...|
| 1997|[40.53094,-23.737...|
| 2000|[50.76908,29.8401...|
| 2003|[46.3173,13.03044...|
| 2009|[49.18434,51.1999...|
| 2009|[47.48739,43.6001...|
| 2009|[49.73491,106.182...|
| 2007|[42.26736,19.4688...|
| 2007|[45.13814,35.5714...|
| 2007|[44.85672,25.3150...|
| 2007|[45.14394,28.8183...|
| 2004|[48.58166,39.6431...|
| 2009|[41.53981,12.295,...|
| 2009|[46.54569,66.5667...|
| 2009|[43.76116,-11.945...|
| 2009|[43.667,-20.98101...|
| 2009|[40.00607,69.8926...|
| 1991|[31.1901,-98.5275...|
| 1991|[32.60431,-68.393...|
| 1991|[28.10519,-105.28...|
+-----+--------------------+
only showing top 20 rows



### Séparer la BD en deux: train et test

In [7]:
(train, test) = df_ml.randomSplit([.8, .2], seed = 222)

## 1 - Instancier un modèle de régression linéaire

In [8]:
from pyspark.ml.regression import LinearRegression

# créer la fonction avec les paramètres adaptés
lr = LinearRegression(labelCol='label', featuresCol='features')

## 2 - Création d'une grille de paramètres.

Donner à l'algo une grille de paramètres à tester

With 3 values for lr.regParam and 3 values for lr.elasticNetParam,
this grid will have 3 x 3 = 6 parameter settings for CrossValidator to choose from.

In [9]:
from pyspark.ml.tuning import ParamGridBuilder

param_grid = ParamGridBuilder().\
        addGrid(lr.regParam, [0, 0.5, 1]).\
        addGrid(lr.elasticNetParam, [0, 0.5, 1]).\
        build()

## 3 - Choix d'une métrique d'évaluation

In [10]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol='prediction',
                               labelCol='label',
                               metricName='r2')

## 4 - Mise en place d'u cross validator

Réglage des paramètres par validation croisée (k-fold cross validation)

CrossValidator begins by splitting the dataset into a set of folds which are used as separate training and test datasets. E.g., with k=3 folds, CrossValidator will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data for training and 1/3 for testing. To evaluate a particular ParamMap, CrossValidator computes the average evaluation metric for the 3 Models produced by fitting the Estimator on the 3 different (training, test) dataset pairs.

After identifying the best ParamMap, CrossValidator finally re-fits the Estimator using the best ParamMap and the entire dataset.

Note that cross-validation over a grid of parameters is expensive. E.g., in our case, the parameter grid has 3 values for lr.elasticNetParam and 3 values for lr.regParam, and CrossValidator uses 3 folds. This multiplies out to (3×3)×3=27 different models being trained.

In realistic settings, it can be common to try many more parameters and use more folds (k=3 and k=10 are common). In other words, using CrossValidator can be very expensive. However, it is also a well-established method for choosing parameters which is more statistically sound than heuristic hand-tuning.

In [11]:
from pyspark.ml.tuning import CrossValidator

# estimator could be a pipeline
cv = CrossValidator(estimator=lr,
                   estimatorParamMaps=param_grid,
                   evaluator=evaluator,
                   numFolds=3)



 Run cross-validation, and choose the best set of parameters.

In [12]:
cv_model = cv.fit(train)

Make predictions on test documents. cvModel uses the best model found (lrModel).

In [13]:
pred_train = cv_model.transform(train)

In [14]:
pred_test = cv_model.transform(test)

In [15]:
pred_test.show()

+-----+--------------------+------------------+
|label|            features|        prediction|
+-----+--------------------+------------------+
| 1929|[30.44087,-83.073...|1988.6880560508805|
| 1929|[35.07089,-67.509...|1993.5634458070965|
| 1929|[38.93054,-58.642...| 1986.886897722553|
| 1947|[36.61167,-144.91...|1991.6071515232034|
| 1949|[34.47809,-39.672...|1990.0110155312573|
| 1952|[43.31663,-11.926...|1992.6652285342857|
| 1953|[35.44492,-82.151...|1990.6781444410221|
| 1955|[44.76449,58.0813...|1998.3155282694706|
| 1956|[34.18681,-72.620...| 1996.954045240956|
| 1956|[34.6146,-142.972...|1999.9968724985056|
| 1956|[37.81231,-213.47...|2000.6635685699293|
| 1956|[37.86685,-69.030...|1999.8380995453879|
| 1956|[41.0475,-73.6027...|1992.2928622033496|
| 1956|[42.48217,23.8266...|1989.9725564838072|
| 1956|[46.16219,-35.359...|1997.2888093918527|
| 1957|[27.3026,-187.024...|1997.6059132793173|
| 1957|[27.6883,-29.8628...|1993.5598168315723|
| 1957|[34.48545,-81.813...|1992.0157368

In [16]:
# collect renvoie une liste
selected = pred_test.select("label", "prediction")
for row in selected.collect():
    print(row)

Row(label=1929, prediction=1988.6880560508805)
Row(label=1929, prediction=1993.5634458070965)
Row(label=1929, prediction=1986.886897722553)
Row(label=1947, prediction=1991.6071515232034)
Row(label=1949, prediction=1990.0110155312573)
Row(label=1952, prediction=1992.6652285342857)
Row(label=1953, prediction=1990.6781444410221)
Row(label=1955, prediction=1998.3155282694706)
Row(label=1956, prediction=1996.954045240956)
Row(label=1956, prediction=1999.9968724985056)
Row(label=1956, prediction=2000.6635685699293)
Row(label=1956, prediction=1999.8380995453879)
Row(label=1956, prediction=1992.2928622033496)
Row(label=1956, prediction=1989.9725564838072)
Row(label=1956, prediction=1997.2888093918527)
Row(label=1957, prediction=1997.6059132793173)
Row(label=1957, prediction=1993.5598168315723)
Row(label=1957, prediction=1992.0157368853127)
Row(label=1959, prediction=1999.1553901336272)
Row(label=1959, prediction=1993.9553491698623)
Row(label=1959, prediction=1995.5810786355073)
Row(label=1959,

Row(label=2007, prediction=1996.3339865853247)
Row(label=2007, prediction=1992.5623212034657)
Row(label=2007, prediction=2000.3972654408778)
Row(label=2007, prediction=1997.111639773166)
Row(label=2007, prediction=1998.324796832703)
Row(label=2007, prediction=1997.5262453943512)
Row(label=2007, prediction=1994.2822939753466)
Row(label=2007, prediction=1992.4863530397981)
Row(label=2007, prediction=1998.4222677415758)
Row(label=2007, prediction=2000.4316316576064)
Row(label=2007, prediction=1993.706089858345)
Row(label=2007, prediction=1996.8929311469553)
Row(label=2007, prediction=1996.6023523803196)
Row(label=2007, prediction=2005.3352524384268)
Row(label=2007, prediction=1995.5526722324887)
Row(label=2007, prediction=1998.248656651265)
Row(label=2007, prediction=2003.3976567651637)
Row(label=2007, prediction=1996.3087245692786)
Row(label=2007, prediction=2000.0861120705235)
Row(label=2007, prediction=2002.4154924272607)
Row(label=2007, prediction=1999.6042021208893)
Row(label=2007, p

Row(label=2003, prediction=1991.6586650894567)
Row(label=2003, prediction=1992.3222911722153)
Row(label=2003, prediction=1995.2012244577604)
Row(label=2003, prediction=1989.7441245107675)
Row(label=2003, prediction=1987.6479513022923)
Row(label=2003, prediction=2002.1042194589245)
Row(label=2003, prediction=1990.3692740643583)
Row(label=2003, prediction=1995.6837693165617)
Row(label=2003, prediction=1993.0206412648354)
Row(label=2003, prediction=1999.8913567431402)
Row(label=2003, prediction=1997.1858916889955)
Row(label=2003, prediction=1995.4648887409412)
Row(label=2003, prediction=1992.3554778807927)
Row(label=2003, prediction=2002.099027113338)
Row(label=2003, prediction=1992.4959509855807)
Row(label=2003, prediction=1996.591052361546)
Row(label=2003, prediction=1994.120452371329)
Row(label=2003, prediction=1993.4763743888543)
Row(label=2003, prediction=1998.7645978923392)
Row(label=2003, prediction=1998.0196811112737)
Row(label=2003, prediction=1994.600208730508)
Row(label=2003, p

Examiner les métriques

In [19]:
evaluator.setMetricName('r2').evaluate(pred_test)

0.1564198653389689

In [20]:
evaluator.setMetricName('rmse').evaluate(pred_test)

10.206660175163833

RMSE measures the differences between predicted values by the model and the actual values. However, RMSE alone is meaningless until we compare with the actual “MV” value, such as mean, min and max

In [21]:
train.describe().show()

+-------+------------------+
|summary|             label|
+-------+------------------+
|  count|             41247|
|   mean|1998.3525832181735|
| stddev|10.986808864337556|
|    min|              1922|
|    max|              2010|
+-------+------------------+



In [22]:
test.describe().show()

+-------+------------------+
|summary|             label|
+-------+------------------+
|  count|             10311|
|   mean|1998.3273203375036|
| stddev|11.113261770668421|
|    min|              1926|
|    max|              2010|
+-------+------------------+



View the optimal model information

In [23]:
bestModel = cv_model.bestModel.coefficients

In [24]:
bestModel

DenseVector([0.7608, -0.0558, -0.0747, 0.1319, 0.0133, -0.2082, -0.0612, -0.0458, -0.1376, 0.0989, -0.4352, 0.0504])

In [25]:
spark.stop()