# Catégorisation des Produits Cdiscount avec [SparkML](https://spark.apache.org/docs/latest/ml-guide.html) de <a href="http://spark.apache.org/"><img src="http://spark.apache.org/images/spark-logo-trademark.png" style="max-width: 100px; display: inline" alt="R"/></a> -- Bruno ABILOU

In [1]:
import sys
print(sys.version)

3.6.3 |Anaconda custom (64-bit)| (default, Oct 15 2017, 03:27:45) [MSC v.1900 64 bit (AMD64)]


In [2]:
sc = SparkContext.getOrCreate()
print(sc)

<SparkContext master=local[*] appName=PySparkShell>


In [3]:
# Importation des packages génériques et ceux 
# des librairie ML et MLlib
##Nettoyage
import nltk
import re
##Liste
from numpy import array
##Temps
import time
##Row and Vector
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
##Hashage et vectorisation
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import IDF
##Regression logistique
from pyspark.ml.classification import LogisticRegression
##Decision Tree
from pyspark.ml.classification import DecisionTreeClassifier
##Random Forest
from pyspark.ml.classification import RandomForestClassifier 
##Pour la création des DataFrames
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.ml import Pipeline
import pandas as pd 
from pyspark import SparkContext 
from sklearn.cross_validation import train_test_split



## Importation des données

In [4]:
# Création de la base distribuée
sqlc=SQLContext(sc) 
data= pd.read_csv("train.csv",sep=",",encoding='latin-1')
data_all=sqlc.createDataFrame(data) 

# Cette ligne permet de visualiser les 5 premières lignes de la DataFrame 
data_all.limit(5).show()

+-----------+--------------------+--------------------+--------------------+
|category_id|            category|               title|         description|
+-----------+--------------------+--------------------+--------------------+
|          0|TELEPHONIE - GPS ...|Coque  Samsung AC...|Coque  Samsung AC...|
|          0|TELEPHONIE - GPS ...|Coque rigide Viol...|Coque rigide Viol...|
|          0|TELEPHONIE - GPS ...|Coque rigide Rose...|Coque rigide Rose...|
|          0|TELEPHONIE - GPS ...|Coque souple Gris...|Coque souple Gris...|
|          0|TELEPHONIE - GPS ...|Coque HTC One S 4...|Coque HTC One S 4...|
+-----------+--------------------+--------------------+--------------------+



## Split des données

In [5]:
# Division de la base en apprentissage et validation
data_all= pd.read_csv("train.csv",sep=",",encoding='latin-1')
dataTrain, DataTest = train_test_split(data_all, test_size = 0.2)

### Création d'un Transformer pour l'étape de stemming.

In [6]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType

class MyNltkStemmer(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(MyNltkStemmer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        STEMMER = nltk.stem.SnowballStemmer('french')
        def clean_text(tokens):
            tokens_stem = [ STEMMER.stem(token) for token in tokens]
            return tokens_stem
        udfCleanText =  udf(lambda lt : clean_text(lt), ArrayType(StringType()))
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udfCleanText(in_col))

### Définition des différentes étapes

In [10]:
import nltk
nltk.download('stopwords')
from pyspark.sql.types import ArrayType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.ml.feature import StringIndexer

# liste des mots à supprimer
STOPWORDS = set(nltk.corpus.stopwords.words('french'))
# Fonction tokenizer qui permet de remplacer un long texte par une liste de mot
regexTokenizer = RegexTokenizer(inputCol="description", outputCol="tokenizedDescr", 
                                pattern="[^a-z_]",minTokenLength=3, gaps=True)

# Fonction StopWordsRemover qui permet de supprimer des mots
remover = StopWordsRemover(inputCol="tokenizedDescr", outputCol="stopTokenizedDescr", 
                           stopWords = list(STOPWORDS))
# Stemmer 
stemmer = MyNltkStemmer(inputCol="stopTokenizedDescr", outputCol="cleanDescr")

# Indexer
indexer = StringIndexer(inputCol="categorie_id", outputCol="categoryIndex")

# Hasing
hashing_tf = HashingTF(inputCol="cleanDescr", outputCol='tf', numFeatures=10000)

# Inverse Document Frequency
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="tfidf")

#Logistic Regression
lr = LogisticRegression(maxIter=100, regParam=0.01, fitIntercept=False, tol=0.0001,
            family = "multinomial", elasticNetParam=0.0, featuresCol="tfidf", 
                        labelCol="categoryIndex") 
# Creation du pipeline
pipeline = Pipeline(stages=[regexTokenizer, remover, stemmer, indexer, hashing_tf, idf, lr ])


[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\ABILOU\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping corpora\stopwords.zip.


## Estimation du pipeline et de l'erreur sur l'échantillon test

In [None]:
model = pipeline.fit(dataTrain)
predictionsDF = model.transform(DataTest)
labelsAndPredictions = predictionsDF.select("categoryIndex","prediction").collect()
nb_good_prediction = sum([r[0]==r[1] for r in labelsAndPredictions])
testErr = 1-nb_good_prediction/n_test