In [None]:
#---------------------------------------------------------------------- Module 1
#---------------------------------------------------------------------- Introduction à PySpark


#------------------------------------------------------ Import de SparkContext du module pyspark
from pyspark import SparkContext

# Définiton d'un SparkContext en local
sc = SparkContext('local')
sc

#------------------------------------------------------ time 
# Importer la bibliothèque time et calcul du temps au début de l'exécution (t0)
from time import time
t0 = time()

### Insérez votre code d'ici 
raw_rdd = sc.textFile('2008_raw.csv')

### Ne modifier pas le code ci-dessous
# Calcul du temps de la lecture du fichier
t1 = time() - t0
print("Réalisé en {} secondes".format(round(t1,3)))



#------------------------------------------------------ take (prend ) 
# Calcul du temps au début de l'exécution (t0)
t0 = time()

list_a = []

### Insérez votre code d'ici 
for i in raw_rdd.take(5):
    list_a.append(i)
    print (i)
    
### Ne modifier pas le code ci-dessous
# Calcul du temps de l'affichage des 5 éléments
t1 = time() - t0
print("Réalisé en {} secondes".format(round(t1,3)))



#------------------------------------------------------ count ( compte ) 
# Calcul du temps au début de l'exécution (t0)
t0 = time()

## Insérez le code ici
count = raw_rdd.count()

### Ne modifier pas le code ci-dessous
# Calcul du temps de l'affichage du nombre de lignes du RDD
t1 = time() - t0
print("Nombre de lignes :", count)
print("Réalisé en {} secondes".format(round(t1,3)))



#------------------------------------------------------ Map 
airplane_rdd = raw_rdd.map(lambda line : line.split(','))

print (airplane_rdd.take (1))
airplane_rdd.take (1)



#------------------------------------------------------ reduceByKey 
# Création d'un nouveau rdd en résumant les lignes par l'aéroport de départ
hist_rdd = airplane_rdd.map(lambda x: (x[7], 1)).reduceByKey(lambda x,y: x+y)

# Affichage d'un 5 premières lignes 
hist_rdd.take(20)



#------------------------------------------------------ collect pour forcer l'évaluation 
hist  = hist_rdd.collect()
hist 



#------------------------------------------------------ Trier et filtrer un RDD
sorted(hist, key= lambda x: x[1], reverse= 1)



# Calcul et affichage du nombre de vols annulés par ville d'origine
airplane_rdd \
    .filter(lambda x: x[10] == "1") \
    .map(lambda x: (x[8], 1)) \
    .reduceByKey(lambda x,y: x+y) \
    .collect()

#------------------------------------------------------ Fermeture du SparkContext
sc.stop()

In [1]:
---------------------------------------------------------------------- Module 2
# ---------------------------------------------------------------------- Data Processing 


#--------------------------------------------- Init
# Import de SparkContext du module pyspark
from pyspark import SparkContext

# Défintion d'un SparkContext
sc = SparkContext.getOrCreate()


#--------------------------------------------- Importation de la base de données
# Chargement du fichier "miserables_full.txt" et affichage des 10 premières lignes
miserables = sc.textFile("miserables_full.txt")
miserables.take(10)


#--------------------------------------------- Mise en forme de la base
miserables_clean = rdd.map(lambda x : x.lower().replace(',',' ').replace('.',' '))


#--------------------------------------------- Separe les mots
# Création d'un rdd séparant les mots
miserables_flat = miserables_clean.flatMap(lambda line: line.split(" "))
miserables_flat.take(10)



#--------------------------------------------- Map/Reduce
# Insérez votre code ici 
mots = miserables_flat.map(lambda x : (x,1) )\
                      .reduceByKey (lambda x,y : x +y )


#--------------------------------------------- Compter occurence

#Pour compter le nombre d'occurrences d'un élément, une très bonne technique consiste à :

#- utiliser la méthode map pour créer un couple clé/valeur où chaque mot est une clé, 
            #chaque valeur vaut 1
#- utiliser reduceByKey pour additionner les valeurs pour chaque mot

### Première méthode de tri
# tri en utilisant la fonction 'sorted' des rdd
mots_sorted  = sorted(mots.collect(),
                     key= lambda x: x[1],
                     reverse= 0)

### Deuxième méthode de tri
# tri en utilisant la fonction 'sortBy' des rdd puis convertir en liste en utilisant collect
mots_sorted_2 = mots.sortBy(lambda couple: couple[1], ascending = True) \
                    .collect()

#--------------------------------------------- Succession de méthodes
#Création directe d'une liste contenant les mots
mots_sorted_3 = sc.textFile("miserables_full.txt") \
                  .map(lambda x : x.lower().replace(',', ' ').replace('.', ' ').replace('-', ' ').replace('’', ' ')) \
                  .flatMap(lambda line: line.split(" ")) \
                  .map(lambda x : (x,1)) \
                  .reduceByKey(lambda x,y : x + y) \
                  .sortBy(lambda couple: couple[1], ascending = True) \
                  .collect()
                
mots_sorted_3

#---------------------------------------------# Fermeture du SparkContext
sc.stop()


ModuleNotFoundError: No module named 'pyspark'

In [3]:
#---------------------------------------------------------------------- Module 3
# ---------------------------------------------------------------------- Les DataFrames


#--------------------------------------------- Spark SQL

# Import de Spark Session et SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Définition d'un SparkContext
SparkContext.getOrCreate() 

# Définition d'une SparkSession
spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Introduction au DataFrame") \
    .getOrCreate()
    
spark

#--------------------------------------------- 
# Création d'un raccourci vers le SparkContext déjà créé
sc = SparkContext.getOrCreate()
sc


#--------------------------------------------- Créer un Spark DataFrame
# Import de Row du package pyspark.sql
from pyspark.sql import Row

# Chargement du fichier '2008_raw.csv'
rdd = sc.textFile('2008_raw.csv').map(lambda line: line.split(","))

# Création d'un nouveau rdd en sélectionnant les variables explicatives
rdd_row = rdd.map(lambda line: Row(annee = line[0],
                                   mois = line[1],
                                   jours = line[2],
                                   flightNum = line[5]))

# Créer d'un data frame à partir d'un rdd
df = spark.createDataFrame(rdd_row)


#--------------------------------------------- afficher les 5 premiere lignes
# Affichage des 5 premières lignes
df.show(5)


#--------------------------------------------- Créer Un DataFrame à partir d'un CSV
# Lecture du fichier '2008.csv'
raw_df = spark.read.csv('2008.csv', header=True)

#--------------------------------------------- 
# Affichage du schéma des variables
raw_df.printSchema()


#--------------------------------------------- Explorer et manipuler un DataFrame
# Création d'un data frame ne contenant que les variables explicatives
flights1 = raw_df.select('annee', 'mois', 'jours', 'flightNum', 'origin', 'dest', 'distance', 'canceled', 'cancellationCode', 'carrierDelay')

# Affichage de 20 premières lignes
flights1.show() # 'show' affiche 20 lignes par défaut


#--------------------------------------------- changer le type des var
# Création d'un data frame en spécifiant le type des colonnes
flights = raw_df.select(raw_df.annee.cast("int"),   #la col annee passe en int
                        raw_df.mois.cast("int"),
                        raw_df.jours.cast("int"),
                        raw_df.flightNum.cast("int"),
                        raw_df.origin.cast("string"),
                        raw_df.dest.cast("string"),
                        raw_df.distance.cast("int"),
                        raw_df.canceled.cast("boolean"),
                        raw_df.cancellationCode.cast("string"),
                        raw_df.carrierDelay.cast("int"))



#--------------------------------------------- Compter (prend an compte les doubllons)
flights.select('flightNum').distinct().count() #distinct prend en compte les doublons


#--------------------------------------------- describe 
# Affichage d'un résumé en utilisant l'option truncate de la méthode show
flights.describe().show(truncate = 8)

### Deuxième méthode
# Affichage d'un résumé en utilisant la méthode toPandas
flights.describe().toPandas()

#--------------------------------------------- Groupby
# Affichage du résumé de la variable catégorielle 'cancellationCode'
flights.groupBy('cancellationCode').count().show()

# Affichage du résumé de la variable catégorielle 'cancellationCode' et 'canceled'
flights.groupBy('cancellationCode', 'canceled').count().show()


#--------------------------------------------- filtrer sur conditions

# Affichage des 20 premièrs vols annulés pour la raison "C"
flights.filter(flights.cancellationCode == 'C').show()


#--------------------------------------------- Filtrer et grouper les données
# Calcul du nombre vols annulés par mois
flights.filter(flights.canceled == True).groupBy('mois').count().show()


# On remarque que le mois de Décembre décompte beaucoup plus d'annulations que les autres mois,
# cela peut être lié à une planification anticipée plus grande des vacances de Noël.



#--------------------------------------------- Création et aggrégation de variables
# Création d'une nouvelle variable 'isLongFlight' et affichage des 10 premières lignes
flights.withColumn('isLongFlight', flights.distance > 1000 ).show(10)


#--------------------------------------------- Gestion des valeurs manquantes
# Remplacement des valeurs manquantes par des 0 et affichage des 6 premières lignes
flights = flights.fillna(0, 'carrierDelay').show(6)



#--------------------------------------------- Remplacer des val

flights.replace ( ['A','B','C' ],['1', '2' , '3'], 'cancellationCode' ).show()


#---------------------------------------------  trier
# Ordonner le data frame par numéro de vol décroissant
flights = flights.orderBy(flights.flightNum.desc()).show()



#--------------------------------------------- Requêtes SQL 
# Création d'une vue SQL
flights.createOrReplaceTempView("flightsView")

# Création d'un data frame ne contenant que la variable "flightsView"
sqlDF = spark.sql("SELECT carrierDelay FROM flightsView")

# Affichage des 10 premières lignes
sqlDF.show(10)


#--------------------------------------------- Sample & Astuce d'Affichage
# Affichage d'un dizaine de lignes de la base de données
flights.sample(False, .0001, seed = 222).toPandas()

#withRemplacement : un booléen à spécifier False si l'on ne veut pas écraser le DataFrame
#fraction : la fraction des données à conserver
#seed : un entier quelconque qui permet de reproduire les résultats

#toPandas = show
#--------------------------------------------- Fermer la session spark 

# Fermeture de la session Spark
spark.stop() 


ModuleNotFoundError: No module named 'pyspark'

In [1]:
#---------------------------------------------------------------------- Module 4
# ---------------------------------------------------------------------- Régression avec PySpark


#--------------------------------------------- Introduction à Spark ML
# Import de SparkSession et SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Définition d'un SparkContext en local
sc = SparkContext.getOrCreate()

# Construction d'une Session Spark
spark = SparkSession \
    .builder \
    .appName("Introduction à Spark ML") \
    .getOrCreate()
    
spark

#--------------------------------------------- Importation de la base de données
# Chargement du fichier " YearPredictionMSD.txt" dans un data frame
df_raw = spark.read.csv('YearPredictionMSD.txt')

# Première méthode d'affichage 
df_raw.show(2, truncate = 4)
# En modifiant les valeurs de 'truncate', cette méthode ne permet pas de bien visualiser les données
# en vertu du nombre de variables

# Deuxième méthode d'affichage
df_raw.sample(False, .00001, seed = 222).toPandas()
# En utilisant toPandas permet de mieux visualiser les données


#--------------------------------------------- 
# Import de col du package pyspark.sql.functions
from pyspark.sql.functions import col

# Convertir des colonnes relatives au timbre en double et l'année en int
exprs = [col(c).cast("double") for c in df_raw.columns[1:91]]
df = df_raw.select(df_raw._c0.cast('int'), *exprs)

# affichage du schéma des variables "df"
df.printSchema()


#---------------------------------------------
# Affichage d'un résumé descriptif des données
df.describe().toPandas()


#--------------------------------------------- Mise en forme de la base en format svmlib
# Import de DenseVector du package pyspark.ml.linalg
from pyspark.ml.linalg import DenseVector

# Création d'un rdd en séparant la variable à expliquer des features
rdd_ml = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Création d'un data frame composé de deux variables : label et features
df_ml = spark.createDataFrame(rdd_ml, ['label', 'features'])

# Affichage des 10 premières lignes du data frame
df_ml.show(10)


#--------------------------------------------- 
# Décomposition des données en deux ensembles d'entraînement et de test
# par défaut l'échantillon est aléatoirement réparti
train, test = df_ml.randomSplit([.8, .2], seed= 1234)



#--------------------------------------------- Régession Linéaire
# Import de LinearRegression du package pyspark.ml.regression
from pyspark.ml.regression import LinearRegression

# Création d'une fonction de régression linéaire
lr = LinearRegression(labelCol='label', featuresCol= 'features')

# Apprentissage des données d'entraînement : "train"
linearModel = lr.fit(train)


#--------------------------------------------- 
# Calcul des prédictions des données de test
predicted = linearModel.transform(test)

# Affichage des prédictions
predicted.show()


#--------------------------------------------- Evaluation du modèle
# Calcul et affichage du RMSE
print("RMSE:", linearModel.summary.rootMeanSquaredError)

# Calcul et affichage du R2
print("R2:  ", linearModel.summary.r2)


#--------------------------------------------- 
from pprint import pprint

# Affichage des Coefficients du modèle linéaire
pprint(linearModel.coefficients)

# Fermeture de la session Spark 
spark.stop()


ModuleNotFoundError: No module named 'pyspark'

Allez plus loin — Autres algorthimes de régression

Maintenant que vous avez appris à programmer une régression linéaire en utilisant Spark ML, vous n'êtes qu'à quelques pas de maîtriser tout algorithme de régression distribué sous Spark. Pour vous aider à retenir l'essentiel, en voici un aperçu :
1. Transformer la base de données en format svmlib:   • Sélectionner les variables numériques à utiliser pour la régression
  • Placer la variable à expliquer en première position
  • Mapper un couple (label, vecteur de features) dans un RDD
  • Convertisser ce RDD en DataFrame et nommer les variables 'label' et 'features'
2. Séparez la base de données en deux échantillon train et test
3. Appliquez un modèle de classification

4. Evaluez le modèle

Spark est en constante amélioration et possède aujourd'hui quelques régresseurs notables. Ils sont utilisables de la même façon en important ces fonction depuis pyspark.ml.regression. Vous êtes invité à consulter la documentation pour observer les différents paramètres à prendre en compte pour optimiser ces algorithmes :

  • LinearRegression() pour effectuer une régression linéaire lorsque le label est présupposé suivre une loi normale
  
  • GeneralizedLinearRegression() pour effectuer une régression linéaire généralisée lorsque le label est présupposé suivre une autre loi que l'on spécifie dans le paramètre family (gaussian, binomial, poisson, gamma)
  
  • AFTSurvivalRegression() pour effectuer une analyse de survie

Il est également possible d'utiliser les algorithmes, qui gèrent également les variables catégorielles, détaillés dans l'exercice suivant :

  • DecisionTreeRegressor() pour un arbre de décision
  
  • RandomForestRegressor() pour une forêt aléatoire d'arbres de décision
  
  • GBTRegressor() pour une forêt d'arbres gradient-boosted

In [None]:
#---------------------------------------------------------------------- Module 5
# ------------------------------------------------- Utilisation des ML Pipelines 

# Les ML Pipelines permettent de faire enchaîner une succession d'estimateurs 
# ou de transformateurs # de façon à définir un processus de Machine Learning.

#--------------------------------------------- 
# Import de SparkSession et SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Définition d'un SparkContext en local
sc = SparkContext.getOrCreate()

# Construction d'une Session Spark
spark = SparkSession \
    .builder \
    .appName("Pipelines Spark ML") \
    .getOrCreate()
    
spark


#--------------------------------------------- Les variables catégorielles
# Chargement du fichier 'HR_comma_sep.csv'
hr = spark.read.csv('HR_comma_sep.csv', header = True)

# Affichage d'un extrait du data frame
hr.sample(False, 0.001 , seed = 222).toPandas()
#--------------------------------------------- 
# Ordonner les variables de telle sorte d'avoir le label en première colonne
hr = hr.select( 'left',
               'satisfaction_level',
               'last_evaluation',
               'number_project',
               'average_montly_hours',
               'time_spend_company',
               'Work_accident',
               'promotion_last_5years',
               'sales',
               'salary')

# Affichage d'un description des variables
hr.describe().toPandas()

#Dans l exo precedent, l application de l algo ne c est pas faite sur les var 
#categorielles (car cela cree une value error car non convertible)

#--------------------------------------------- 
# Import de StringIndexer du package pyspark.ml.feature
from pyspark.ml.feature import StringIndexer

# Création d'un indexeur transformant une variable sales en indexedSales
salesIndexer = StringIndexer(inputCol='sales', outputCol='indexedSales').fit(hr)

# Création d'un DataFrame hrSalesIndexed indexant la variable sales
hrSalesIndexed = salesIndexer.transform(hr)

# Affichage d'un extrait du data frame hrSalesIndexed 
hrSalesIndexed.sample(False, 0.001 , seed = 222).toPandas()


#--------------------------------------------- 
# Import de IndexToString du package pyspark.ml.feature
from pyspark.ml.feature import IndexToString

# Création d'une nouvelle colonne salesReconstructed
SalesReconstructor = IndexToString(inputCol='indexedSales',
                                   outputCol='salesReconstructed',
                                   labels = salesIndexer.labels)

# Appliquer le transformateur SalesReconstructor
hrSalesReconstructed = SalesReconstructor.transform(hrSalesIndexed)

# Affichage d'un extrait de la base de données
hrSalesReconstructed.sample(False, 0.001 , seed = 222).toPandas()
# On voit apparaître une nouvelle colonne 'salesReconstructed' égale à la colonne 'sales'


#--------------------------------------------- Les Pipelines
# Import de Pipeline du package pyspark.ml
from pyspark.ml import Pipeline

# Création des indexeurs
SalesIndexer = StringIndexer(inputCol='sales', outputCol='indexedSales')
SalaryIndexer = StringIndexer(inputCol='salary', outputCol='indexedSalary')

# Création d'un pipeline
indexer = Pipeline(stages =  [SalaryIndexer, SalesIndexer])

# Indexer les variables de "hr"
hrIndexed = indexer.fit(hr).transform(hr)

# Affichage d'un extrait
hrIndexed.sample(False, 0.001 , seed = 222).toPandas()


#--------------------------------------------- Mise en forme de la base en format svmlib
# Import de DenseVector du package pyspark.ml.linalg
from pyspark.ml.linalg import DenseVector

# Création d'une base de données excluant les variables non indexées
hrNumeric = hrIndexed.select('left',
                             'satisfaction_level',
                             'last_evaluation',
                             'number_project',
                             'average_montly_hours',
                             'time_spend_company',
                             'Work_accident',
                             'promotion_last_5years',
                             'indexedSales',
                             'indexedSalary')

# Création d'une variable DenseVector contenant les features en passant par la structure RDD
hrRdd = hrNumeric.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# transformation en DataFrame et nommer les variables pour obtenir une base de la forme libsvm
hrLibsvm = spark.createDataFrame(hrRdd, ['label', 'features'])

# Affichage d'un extrait
hrLibsvm.sample(False, .001, seed = 222).toPandas()


#--------------------------------------------- Application d'un classifieur Spark ML
hrNumeric.describe().toPandas()

# Nous avons 2 variables catégorielles: 'sales' et 'salary'
# On peut obtenir le nombre de modalités en regardant 'min' et 'max' des variables indexées:
# 'sales' possède le plus de modalités et en possède 10 (10 entiers entre 0.0 et 9.0)


#--------------------------------------------- 
# Import de VectorIndexer du package pyspark.ml.feature
from pyspark.ml.feature import VectorIndexer

# Création d'un transformateur indexant les features
featureIndexer = VectorIndexer(inputCol="features",
                               outputCol="indexedFeatures",
                               maxCategories = 10).fit(hrLibsvm)


#--------------------------------------------- 
# Import du classificateur RandomForestClassifier du package pyspark.ml.classification
from pyspark.ml.classification import RandomForestClassifier

# Création des transformateurs
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(hrLibsvm)
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories = 10).fit(hrLibsvm)

# Création d'un classificateur 
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", predictionCol='prediction', seed = 222)

# Création d'un transformateur permettant de rétablir les labels des prédictions
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Création d'un Pipeline 
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Décomposition des données en deux ensemble : données d'entraînement  et de test
(train, test) = hrLibsvm.randomSplit([0.7, 0.3], seed = 222)

# Apprentissage du modèle en utilisant les données d'entraînement
model = pipeline.fit(train)


#--------------------------------------------- 
#Calcul des prédictions 
predictions = model.transform(test)

# Affichage d'un extrait des prédictions 
predictions.sample(False, 0.001 , seed = 222).toPandas()


#--------------------------------------------- Evaluation du modèle
# Import d'un évaluateur MulticlassClassificationEvaluator du package pyspark.ml.evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Création d'un évaluateur 
evaluator = MulticlassClassificationEvaluator(metricName='accuracy',
                                              labelCol= 'indexedLabel',
                                              predictionCol= 'prediction')
# Calcul et affichage de la précision du modèle 
accuracy = evaluator.evaluate(predictions)
print(accuracy)

La métrique accuracy correspond au nombre de prédictions correctes divisées par le nombre de prédictions effectuées. Elle est donc entre 0 et 1 ; une accuracy de 0 correspond à des prédictions toutes fausses et une accuracy de 1 correspond à l'absence d'erreurs dans la prédiction.

Allez plus loin — Autres algorithmes de classification

Vous avez maintenant en main tous les outils pour effectuer tout type de classification en Spark ML. Pour résumer de façon concise, une classification s'effectue de la façon suivante :

1. Transformer toutes les variables en numérique
2. Transformer la base en format svmlib
3. Créer un pipeline contenant :
  • La transformation de la variable label en catégorie
  
  • La transformation des features catégoriels
  
  • Un modèle de classification
  
  • Un transformateur inverse de l'indexation pour les prédictions créées
4. Evaluer le modèle

Spark est en constante amélioration est possède aujourd'hui quelques classifieurs notables que vous pouvez utiliser de la même façon en important ces fonction depuis pyspark.ml.classification. Vous êtes invités à consulter la documentation pour observer les différents paramètres à prendre en compte pour optimiser ces algorithmes :

  • LogisticRegression() pour effectuer une régression linéaire https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegression
  
  • DecisionTreeClassifier() pour un arbre de régression simple
https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.DecisionTreeClassifier

  • RandomForestClassifier() pour une forêt aléatoire
https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.RandomForestClassifier

  • GBTClassifier() pour une forêt d'arbres gradient-boosted
https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.GBTClassifier

  • LinearSVC() pour un SVM de régression à noyau linéaire
https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.LinearSVC

  • NaiveBayes() pour une classification naïve bayesienne https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.NaiveBayes

Pour une liste complète des algorithmes de Spark ML, vous pouvez également consutler la documentation générale de pyspark.ml ; la liste des algorithmes disponibles ne cesse de grandir au fur et à mesure du développement de Spark.

In [None]:
#---------------------------------------------------------------------- Module 6
# ---------------------------------------------------------------- Model Tuning  

# Il existe plusieurs méthodes de tuning qui permettent de régler les 
# paramètres d'un modèle, de façon à éviter le sur-apprentissage 
# (overfitting en anglais). 
#La plus utilisée est la validation croisée (cross validation en anglais).

#--------------------------------------------- 
# Import de SparkSession et de SParkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext

#Création d'un SparkContext
sc = SparkContext.getOrCreate()

# Création d'une session Spark
spark = SparkSession \
    .builder \
    .appName("ML Tuning") \
    .getOrCreate()
        
spark


#--------------------------------------------- Importation de la base de données
# Chargement de la base de données brute
df_full = spark.read.csv('YearPredictionMSD.txt', header=False)

# On infère les bons types des colonnes
from pyspark.sql.functions import col
exprs = [col(c).cast("double") for c in df_full.columns[1:13]]

df_casted = df_full.select(df_full._c0.cast('int'),
                           *exprs)

# Enfin, pour un soucis de rapidité des calculs,
# on ne traitera qu'un extrait de la base de données dans cet exercice
df = df_casted.sample(False, .1, seed = 222)

df.sample(False, .001, seed = 222).toPandas()

#--------------------------------------------- 

from pyspark.ml.linalg import DenseVector

# Conversion de la base de données au format svmlib
rdd_ml = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
df_ml = spark.createDataFrame(rdd_ml, ['label', 'features'])

df_ml.show()

#--------------------------------------------- 

# Décomposition des données en deux ensembles d'entraînement et de test
# par défaut l'échantillon est aléatoirement réparti
train, test = df_ml.randomSplit([0.8, 0.2], seed=222)

#--------------------------------------------- 

# Import de LinearRegression du package pyspark.ml.regression
from pyspark.ml.regression import LinearRegression

# Création d'un estimateur : Régression Linéaire
lr = LinearRegression(featuresCol = 'features', labelCol = 'label')

#--------------------------------------------- Création d'une grille de paramètres

# Import de ParamGridBuilder du package pyspark.ml.tuning
from pyspark.ml.tuning import ParamGridBuilder

# Création d'une grille de paramètres
param_grid = ParamGridBuilder().\
    addGrid(lr.regParam, [0, 0.5, 1]).\
    addGrid(lr.elasticNetParam, [0, 0.5, 1]).\
    build()

#--------------------------------------------- Choix d'une métrique d'évaluation

# Import de RegressionEvaluator du package pyspark.ml.evaluation
from pyspark.ml.evaluation import RegressionEvaluator

# Création d'un évaluateur ayant pour métrique d'évaluation r2
ev = RegressionEvaluator(predictionCol='prediction',
                                labelCol='label',
                                metricName='r2')

#--------------------------------------------- Réglage des paramètres par validation croisée

#Import de CrossValidator du package pyspark.ml.tuning
from pyspark.ml.tuning import CrossValidator

# Création d'un cross validator 3-fold
cv = CrossValidator(estimator = lr,
                    estimatorParamMaps = param_grid,
                    evaluator=ev,
                    numFolds=3)

#--------------------------------------------- Application du modèle

# Import de la bibliothèque time et calcul du temps au début de l'exécution (t0)
from time import time
t0 = time()

lr = LinearRegression(featuresCol = 'features', labelCol = 'label')
ev = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='r2')
cv = CrossValidator(estimator = lr, estimatorParamMaps = param_grid, evaluator = ev, numFolds = 3)

cv_model = cv.fit(train)

tt = time() - t0
print("Réalisé en {} secondes".format(round(tt,3)))

#--------------------------------------------- 

# Calcul des prédication des données d'entraînement
pred_train = cv_model.transform(train)

# Calcul des prédication des données de test
pred_test  = cv_model.transform(test)

#--------------------------------------------- RMSE

ev.setMetricName('rmse').evaluate(pred_test)

#--------------------------------------------- Exploitation des résultats

# Affichage des coefficients du modèle
cv_model.bestModel.coefficients


Examen ¶

Traitement de données massives avec PySpark 

L'exercice est composé de plusieurs questions, faites-les dans l'ordre et faites attention à respecter le nom des variables. N'hésitez pas à contacter l'équipe DataScientest si vous rencontrez des problèmes sur help@datascientest.com

Jeu de données
Le jeu de données de cet exercice contient des transactions effectuées en septembre 2013 par des titulaires de cartes bancaires européennes et compte 492 transactions frauduleuses parmi 284 807 transactions. L'ensemble de données est très déséquilibré, les classes positives (fraudes) représentent 0,172% de toutes les transactions. Les variables explicatives sont anonymisées et représentées par les trente premières colonnes. La variable cible est stockée dans la dernière colonne.

L'objet de cet exercice consiste à implémenter et entraîner un modèle de random forest avec PySpark pour faire de la détection de transaction frauduleuses.

Principe des forêts aléatoires
Les forêts aléatoires sont composées d'un ensemble d'arbres décisionnels. Ces arbres se distinguent les uns des autres par le sous-échantillon de données sur lequel ils sont entraînés. Ces sous-échantillons sont tirés au hasard dans le jeu de données initial.

Le principe de fonctionnement des forêts aléatoire est simple : de nombreux petits arbres de classification sont produits sur une fraction aléatoire de données. Random Forest fait voter les arbres de classification afin de déduire l'ordre et l'importance des variables explicatives.

Construire une session Spark nommée spark, sans utiliser findspark
   Rappelez-vous qu'avant de regarder la solution, vous avez toujours accès à l'aide officielle de Python en tapant help(nom_fonction) dans la console.

Construire une session Spark nommée spark, sans utiliser findspark

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

# Construction d'une Session Spark
spark = SparkSession \
    .builder \
    .appName("Introduction à Spark ML") \
    .getOrCreate()
    
spark


Importer le fichier creditcard.csv en tant que DataFrame appelé df_raw

Afficher un extrait du DataFrame df_raw

In [None]:
# Insérez votre code ici

df_raw = spark.read.csv('creditcard.csv')
df_raw.sample(False, 0.00001, seed = 1234 ).toPandas()

Créer un DataFrame df à partir de df_raw en changeant les colonnes des variables cibles en double et la variable cible, Class, en int

Afficher le schéma des variables de df

   Une bonne pratique est de mettre la variable à prédire ou la variable cible dans la première colonne

In [None]:
from pyspark.sql.functions import col

exprs = [col(c).cast("double") for c in df_raw.columns[:30]]
df = df_raw.select(df_raw._c30.cast('int'), *exprs)

df.printSchema()


Supprimer les lignes contenant des valeurs manquantes du DataFrame df

Créer un rdd rdd_ml séparant la variable à expliquer des features (à mettre sous forme DenseVector)

Créer un DataFrame df_ml contenant notre base de données sous deux variables : 'labels' et 'features'

In [None]:
# Insérez votre code ici

df = df.dropna()


from pyspark.ml.linalg import DenseVector

rdd_ml = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

df_ml = spark.createDataFrame(rdd_ml, ['label', 'features'])


Créer deux DataFrames appelés train et test contenant chacun respectivement 80% et 20% des données

Créer un classificateur Random Forest appelé clf

Apprendre le modèle des forêts aléatoires sur l'ensemble d'entraînement

   L'apprentissage des données massives prend généralement un temps d'exécution long. La cellule ci-dessous doit s'exécuter en trois minutes dans le pire des cas.

In [None]:
from pyspark.ml.classification import RandomForestClassifier

train, test = df_ml.randomSplit([.8, .2], seed= 1234)

rf = RandomForestClassifier(labelCol="label", featuresCol="features", predictionCol='prediction', seed = 222)

model = rf.fit(train)

predictions = model.transform(test)

Calculer la précision, accuracy, du modèle entraîné

In [None]:
# Insérez votre code ici

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Création d'un évaluateur 
evaluator = MulticlassClassificationEvaluator(metricName='accuracy',
                                              labelCol= 'label',
                                              predictionCol= 'prediction')
# Calcul et affichage de la précision du modèle 
accuracy = evaluator.evaluate(predictions)
print(accuracy)