# <center> Projet TC6: Criteo Display Advertising Challenge </center>
 ## <center>Rapport</center>
<center>Trung VUTHANH,   Malik KAZI,    Nour NOUREDINE</center>

###### <center>Abstract</center>

Dans le cadre du projet de Science des données pour le Big Data nous avons choisit d'utiliser les données du défi Kaggle Criteo. Ce Notebook résume notre démarche en présentant le meilleur modèle que nous avons réussit à entrainer sur ce problème de classification binaire supervisé. L'exploration de ce projet nous à permis d'en apprendre plus sur le feature hash, technique que nous avons mis en place et qui à permis de réduie grandement le temps d'apprentissage et de prédiction de notre modèle en résuisant la taille de la représentation des données.

In [34]:
# Décommenter la ligne suivante si findspark n'est pas installé :
#!pip install findspark
import findspark
findspark.init()
import pyspark
import re
from pyspark.context import SparkContext
from pyspark import SQLContext
from pyspark.sql.session import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import udf, desc, rand, avg, when, isnan, trim
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.ml.feature import FeatureHasher
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
sc = SparkContext("local").getOrCreate()
sqlContext = SQLContext(sc)

In [3]:
sc.version

'2.3.0'

# Données et pre-traitement de données

Les données sont téléchargeables [ici](http://labs.criteo.com/2014/02/download-kaggle-display-advertising-challenge-dataset/?fbclid=IwAR0y5NGoecgyTheFdqT6vBjQVVIz47FCQD48aeOIfyOy-adcZcDj8iRgLJU).

Elles contienent un ensemble de publicités représentées par 39 features (13 numériques en représentation "count" et 26 string hachés) ainsi qu'un label indiquant si la publicité à enregistré un clic ou pas. Nous sommes donc dans le cadre d'un problème de classification binaire supervisée. Les données sont scindées en deux fichiers texte d'entrainement et de tests (les informations ci-dessous sont tirées de la description du défi Kaggle) :

- train.txt : L'ensemble d'entraînement consiste en une partie du trafic de Criteo sur une période de 7 jours. Chaque ligne correspond à une annonce d'affichage desservie par Criteo. Les exemples positifs (cliqués) et négatifs (sans clic) ont tous deux été sous-échantillonnés à des taux différents afin de réduire la taille du DataSet. Les exemples sont classés chronologiquement.

- test.txt: Le jeu de test est calculé de la même manière que l'ensemble de train, mais pour les événements le jour suivant de l'entraînement.

Dans notre traitement du problème nous n'avons pas utilisé l'ensemble de test fournis car nous n'avons pas réalisé de soumission au défi. De plus, nos premières approches ayant été longues à traiter nous avons préféré valider nos modèles sur un sous ensemble isolé et stratifié des données d'entrainement.

Pour des raisons de clareté nous présentons ici que notre modèle final. Le second notebook fournis présente certaines de nos expérimentations sur d'autre modèles au cas ou vous souhaiteriez les consulter.

In [16]:
# Indiquer le chemin vers le fichier train.txt dans la variable path ci 
# training_file_path ci dessous :
training_file_path = '../criteo/dac/train.txt'
new_file_path = '../criteo/dac/train_partition.txt'

In [17]:
# Création d'échantillons de données, décommenter une des ligne et relancer la cellule pour choisir le nombre
# d'exemples à prendre en compte
!head -200000 $training_file_path > $new_file_path
#!head -1000000 $training_file_path > $new_file_path

In [18]:
#data_train = sc.textFile('dac/train.txt')
data_train = sc.textFile(new_file_path)
# On split les tabulations 
data_train = data_train.map(lambda k : k.split('\t'))

In [19]:
# Création de liste contenant les noms des colonnes 
header = ['label']+['num_'+str(i) for i in range(13)]+['cat_'+str(i) for i in range(26)]
# Création d'une liste contenant les noms de colonnes des features (sans le label)
feature_names = header[1:]
# Liste contenant les noms de colonnes contenant des features numériques
numerical_feature_names = ['label']+['num_'+str(i) for i in range(13)]

Nous avons testé plusieurs manières de gérer les données manquantes, comme les remplacer par la valeur moyenne dans le cas des features numériques (voir second notebook) mais nous avons finalement otpé pour le retrait des exemples incomplets. Il reste cependant des Null (ce qui ne gène pas l'apprentissage). Nous avons tenté de les retirer mais cela avait eu un effet nefaste sur l'apprentissage ce qui est intriguant. Peut être que les modèles de la bibliothèque MLlib gèrent intelligeament les valeurs Null.

In [20]:
df = sqlContext.createDataFrame(data_train, header)
# On ignore les examples comportants des données non renseignées
# On convertit les Null en NaN puis on drop tout 
def to_null(c):
    ''' Fonction récupérée sur Stackoverflow 
    '''
    return when(~(col(c).isNull() | isnan(col(c)) | (trim(col(c)) == "")), col(c))
# Nous avons commenté la ligne suivante car les performances étaient réduites en retirant les exemples 
# contenant des Null
#df = df.select([to_null(c).alias(c) for c in df.columns])
df = df.na.drop()
# Converstion des features numériques en entiers 
for colname in numerical_feature_names:
    df = df.withColumn(colname, col(colname).cast(IntegerType()))
# Création d'un ensemble de test 
X_train, X_test = df.randomSplit([0.7, 0.3])

In [21]:
# Vérifiaction que la dataFrame est correcte 
df.show(2)
df.printSchema()

+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+------+--------+--------+--------+--------+
|label|num_0|num_1|num_2|num_3|num_4|num_5|num_6|num_7|num_8|num_9|num_10|num_11|num_12|   cat_0|   cat_1|   cat_2|   cat_3|   cat_4|   cat_5|   cat_6|   cat_7|   cat_8|   cat_9|  cat_10|  cat_11|  cat_12|  cat_13|  cat_14|  cat_15|  cat_16|  cat_17|  cat_18|  cat_19|  cat_20|cat_21|  cat_22|  cat_23|  cat_24|  cat_25|
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+------+--------+--------+--------+--------+
|    0|    1|    1|    5|    0| 1382|

La répartition des classes dans une échantillon est présentée à la figure ci-dessous. Cette dernière est biasée vers les exemples négatifs. Une fois nos modèles entrainés, il faudra donc bien veiller a vérifier que le modèle ne predit pas uniquement la classe majoritaire ce qui donnerai à tord un bon score de précision! (Le graph ci desous est tiré du second notebook)

![](class_repartition.png)

## Modèle de regression linéaire

Dans la section qui suit, nous créons un pipeline afin de préparer les données et d'entrainer un modèle de regression linéaire.

1. En premier lieu nous appliquons la fonction FeatureHasher à nos feature. Cette méthode permet de changer les features en une représentation unique pour toutes les features qui est dans notre cas un vecteur creux à valeurs binaires. Cette représentation est calculée par une fonction de hachage appliquée à chaque features, dont le hash représentera l'indice de cette feature dans la matrice originale. A cette étape il faut bien évidement faire attention à ne pas hasher les labels mais uniquement les features. Hasher les labels aboutierait a un "data leakage" se manifestant par un score suspicieusement élevé lors de la validation.

2. Dans une seconde étape nous entrainons un modèle de regression linéaire 

In [22]:
# 'Hash trick' pour réduire les dimenssions des données
hasher = FeatureHasher(inputCols=feature_names, outputCol="features")
# Modèle de regression linéaire
lr = LogisticRegression(maxIter=4, regParam=0.1, labelCol='label')
# Instaciation d'une pipeline 
pipeline = Pipeline(stages=[hasher, lr])

Le hash trick permet de réduire la représentation des données et accélère ainsi la phase d'apprentissage

In [23]:
# On entraine le modèle sur les données d'entrainement 
%time model = pipeline.fit(X_train)

CPU times: user 32.1 ms, sys: 9.22 ms, total: 41.3 ms
Wall time: 16.4 s


In [24]:
pred = model.transform(X_test)

In [None]:
# Dans un premier temps nous avons tenté de calculer un score de précision de la manière suivante mais le temps 
# de calcul était trop lent. Nous avons laissé la trace de notre recherche et avons finalement 
#pred.registerTempTable('predictions')
#Predictions.columns 
#df2 = sqlContext.sql("select prediction, label from predictions")
#success = sqlContext.sql("select prediction, label from predictions where prediction=label").count()
#num_pred = pred.count()

In [39]:
# Fonctions utiles pour calculer la précision et le log loss remerciement à notre camarade Robin Duraz. 

from sklearn.metrics import log_loss

def logloss(y_true, y_proba):
    y_true = [y.__getitem__("label") for y in y_true]
    y_proba = [y.__getitem__("probability") for y in y_proba]
    y_proba = [[i for i in j] for j in y_proba]
    return log_loss(y_true, y_proba)

def accuracy(y_true, y_pred):
    y_true = [y.__getitem__("label") for y in y_true]
    y_pred = [y.__getitem__("prediction") for y in y_pred]
    egal = [y_true[i] == y_pred[i] for i in range(len(y_true))]
    return sum(egal)/len(y_true)

In [25]:
# Les opérations de collect sont couteuses 
y_true = X_test.select("label").collect()
y_proba = pred.select("probability").collect()
y_pred = pred.select("prediction").collect()

In [26]:
l = logloss(y_true, y_proba)
acc = accuracy(y_true, y_pred)
print("Log loss = {:2f}, Accuracy = {:2%}".format(l, acc))

Log loss = 0.516653, Accuracy = 76.027545%


Comme nous l'avons remarqué au début, les données sont biaisées vers les exemples négatifs. Vérifions donc que notre classifieur à bien appris et qu'il n'a pas prédit que la classe majoritaire :

In [38]:
pred.select(['prediction']).groupBy('prediction').count().show()
pred.select(['label']).groupBy('label').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|51126|
|       1.0| 9139|
+----------+-----+

+-----+-----+
|label|count|
+-----+-----+
|    1|14674|
|    0|45591|
+-----+-----+



Notre modèle n'a pas prédit que des 0.

Dans la cellule suivante nous lançon une cross validation sur X_train. Quand nous avons testé nous avons obtenu les même résultats que précédement. Cela est surement du au fait que le modèle renvoie les prédictions du run ayant obtenu les meilleurs résultats parmis les k folds. Nous n'avons pas trouvé comment obtenir la moyenne des résultats des runs. D'autre part, si nous avions souhaité "tuner" nos paramètres nous aurions pu ajouter différentes valeurs à tester au ParamGridBuilder()

In [36]:
crossval = CrossValidator(estimator=pipeline,
                          evaluator=BinaryClassificationEvaluator(labelCol='label'),
                          estimatorParamMaps = ParamGridBuilder().build(),
                          numFolds=5)
cvModel = crossval.fit(X_train)

In [37]:
pred = cvModel.transform(X_test)
y_proba = pred.select("probability").collect()
y_pred = pred.select("prediction").collect()
l = logloss(y_true, y_proba)
acc = accuracy(y_true, y_pred)
print("Log loss = {:2f}, Accuracy = {:2%}".format(l, acc))

Log loss = 0.516653, Accuracy = 76.027545%


# Conclusion

Dans ce projet nous avons exploré plusieurs méthodes de traitement des données en manipulant des dataFrames. Après diverse expériences exploratoires consignées dans le second notebook, nous avons pris en main la bibliothèque MLlib afin d'entrainer un modèle de regression linéaire. Nous avons constaté une réduction du temps d'entrainement et de prédiction en ajoutant du "feature hashing" à notre pipeline. La suite logique de ce travail serait de mettre en placce une pipeline plus complexe, ajoutant par exemple une représentation TF-IDF sur les features numériques car elles sont en représentation "count" (même si cela allourdirait la représentation précédente). D'autre part il serait intéressant de lancer notre code avec un spark submit sur l'ensemble des données et de lui ajouter une fonction d'écriture d'un fichier résultats afin de pouvoir faire une soumission sur Kaggle.
