# Etude de cas CDiscount à l'aide de l'environnement pyspark

Ce notebook reprend le cas CDIscount mais cette fois ci à l'aide de l'environnement pyspark.

## 0. Importation des premiers modules.

Nous nous baserons ici sur pyspark qui devrait déjà être installé et en particulier nous utiliserons les modules ci-dessous pour cette étude de cas.


In [None]:
##Nettoyage des données
import nltk
import re
##Liste
from numpy import array
##Temps
import time

from pyspark import SparkContext
##Row and Vector
from pyspark.sql import Row, SQLContext, SparkSession
from pyspark.ml.linalg import Vectors
##Hachage et vectorisation
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import IDF
##Regression logistique
from pyspark.ml.classification import LogisticRegression
##Decision Tree
from pyspark.ml.classification import DecisionTreeClassifier
##Random Forest
from pyspark.ml.classification import RandomForestClassifier 
##Pour la création des DataFrames


### Spark context initialisation

In [None]:
# TO DO
import findspark

findspark.init()
sc = SparkContext("local[*]","CDiscount")

## 1. Impoi

Il s'agit ici d'importer vos données.
Il faudra donc:
 + Créer la base distribuée à partir du fichier csv.
 + Créer le dataframe spark SQL correspondant.
 + Completer les valeurs manquantes par la chaîne de caractères nulle.
 + Afficher les 10 premières lignes du dataframe obtenu.

In [None]:
# Création de la base distribuée avec SQL context
# TO COMPLETE
spark = SparkSession.builder.getOrCreate()

In [None]:
df = spark.read.csv('./Data/Categorie_reduit.csv'
                   , sep = ';')

In [None]:
# Vérification du dataframe obtenu : son schema 
df = df.selectExpr('_c0 as Categorie1'
              ,'_c1 as Categorie2'
              ,'_c2 as Categorie3'
              ,'_c3 as Description'
              ,'_c4 as Libelle'
              ,'_c5 as Marque')


In [None]:
#Filling na
df = df.na.fill("null")

Combien avez-vous de produits dans le fichier ?

Le fichier a beaucoup de lignes. Comme avec python, dans un premier temps, nous allons travailler sur une sous-partie des données. Nous allons pour cela utiliser la fonction `randomSplit` de pyspark SQL.

In [None]:
# TO DO
taux_donnees= .80 # TO COMPLETE
datakeep,data_drop = df.randomSplit([taux_donnees, 1 - taux_donnees])  # TO COMPLETE

## 2. Séparation des données en un ensemble d'apprentissage et un ensemble de validation.

Nous allons ici séparer l'ensemble des données disponibles en 2 sous-ensembles, un ensemble pour apprendre le modèle de prédiction, i.e. l'ensemble d'apprentissage et un ensemble de validation. On utilisera aussi la fonction randomSplit de pyspark SQL.

In [None]:
# TO DO
tauxsep= .80 # TO COMPLETE
(trainDF, testDF) = datakeep.randomSplit([tauxsep, 1-tauxsep]) # TO COMPLETE

## 3. Nettoyage des données

Nous devons traiter des données textuelles et il nous faudra donc construire une représentation numérique de ces données. Pour cela, il est d'abord nécessaire de nettoyer ces données. 

Dans cette étude de cas, nous représenterons les données à partir de la liste des mots les constituant et il faudra donc :
 + Construire un dictionnaire de mots. Ce dictionnaire de mot sera l'espace de représentation de vos données. Pour cela, il  vous faut : 
  + Découper le texte en mots 
   + Nettoyer le texte, le simplifier : suppression des ponctuations, des termes numériques, des caractéres mal codés, passage de tous les mots en minuscules.
   + Supprimer les mots non porteurs de sens (ou stop words) à l'aide de la liste `lucene_stopwords.txt`.
   + Lemmatiser ou Raciniser (transformer un mot en sa forme canonique) afin de reduire la taille du dictionnaire et donc l'espace de représentation des données.
   
Vous pourrez pour cela utiliser la bibiothèque MLIB. La documentation est [ici](https://spark.apache.org/docs/2.2.0/ml-features.html).

Pour la racinisation, il faudra faire appel, par contre, à la bibliothèque nltk.

In [None]:
from nltk.corpus import stopwords
from pyspark.sql.types import ArrayType
from pyspark.sql.functions import udf,col
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover

# liste des mots à supprimer
# TO COMPLETE
start_time = time.time()

stopwords = list(stopwords.words('French'))

# Tokenisation : remplacer un texte par la liste de ses mots
tokenizer = RegexTokenizer(inputCol="Description", outputCol="Descrip_tokenized")
trainDF = tokenizer.transform(trainDF)
testDF = tokenizer.transform(testDF)


# TO COMPLETE
# Filtrage Stop Words
remover = StopWordsRemover(inputCol = 'Descrip_tokenized', outputCol = 'Descrip_clen')
trainDF = remover.transform(trainDF)
testDF = remover.transform(testDF)

# TO COMPLETE

from nltk.stem.snowball import SnowballStemmer
from pyspark.sql.functions import udf
from pyspark.sql.types import  ArrayType, StringType

def stemmer(words, language = 'french'):
    stemmer = SnowballStemmer(language)
    return [stemmer.stem(word) for word in words]
udf_stemmer = udf(stemmer, ArrayType(StringType()))

trainDF = trainDF.withColumn('Description_finale', udf_stemmer('Descrip_clen'))
testDF = testDF.withColumn('Description_finale', udf_stemmer('Descrip_clen'))

# TO COMPLETE
trainDF = trainDF.drop('Descrip_tokenized', 'Descrip_clen')
testDF = trainDF.drop('Descrip_tokenized', 'Descrip_clen')


end_time = time.time()

print("elapsed_time",end_time-start_time)

## 4.  Représentation des données.

Pour représenter nos données (i.e. la description textuelle des produits), plusieurs principes seront utilisés et comparés :


 + L'approche de représentation d'un document textuel par un sac de mots et une pondération [tf-idf](https://fr.wikipedia.org/wiki/TF-IDF) vue dans les premiers cours disponible dans MLib : doucmentation [ici](https://spark.apache.org/docs/2.2.0/ml-features.html#tf-idf) et qui intégre l'approche de hachage comme expliquée [ici](https://spark.apache.org/docs/2.2.0/api/python/pyspark.mllib.html#pyspark.mllib.feature.HashingTF) 
 + Une représentation de type word2vec, toujours avec Mlib: [ici](https://spark.apache.org/docs/2.2.0/mllib-feature-extraction.html#word2vec)
 

#### TF-IDF et hachage

In [None]:
# representation TF-IDF
# TO COMPLETE
hashingTF = HashingTF(inputCol="Description_finale", outputCol="rawFeatures")
trainDF = hashingTF.transform(trainDF)
testDF = hashingTF.transform(testDF)

In [None]:
start_time = time.time()
idf = IDF(inputCol = 'rawFeatures', outputCol = 'features')
idf_model = idf.fit(trainDF)
trainDF_IDF = idf_model.transform(trainDF)
testDF_IDF = idf_model.transform(testDF)
end_time = time.time()

print('elapsed time: ', end_time - start_time )

In [None]:
trainDF_IDF.columns

In [None]:
# Application à votre jeu de données et mesure du temps
# TO COMPLETE
trainDF_IDF = trainDF_IDF.selectExpr(
    'Categorie1',
 'Categorie2',
 'Categorie3',
 'features as features'
)

In [None]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Categorie1", outputCol="label")
indexer = indexer.fit(trainDF_IDF)
trainDF_IDF = indexer.transform(trainDF_IDF)
testDF_IDF = indexer.transform(testDF_IDF)