## Mise en place du context

In [82]:
from findspark import init
init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, lit
from pyspark.sql.types import LongType
from pyspark.ml.feature import VectorAssembler

Le contexte est récupéré si existant et dans l'autre cas il sera créé.

In [2]:
#sc = pyspark.SparkContext.getOrCreate() # ça marche pas pour moi avec sparkContext
spark = SparkSession.builder.getOrCreate() # Du coup je fais une sparkSession (ça doit revenir presque au même je pense)

## Chargement des données

In [3]:
import os.path as path
from os import makedirs

In [4]:
data_dir = "res"

In [5]:
if not path.isfile("dac.tar.gz"):
    !wget https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz

In [6]:
if not path.isdir(data_dir):
    makedirs(data_dir)

Le fichier [`dac.tar.gz`](https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz) doit se situer dans le même répertoire.

In [7]:
if not path.isfile(data_dir + "/train.txt") or not path.isfile(data_dir + "/test.txt"):
    !tar -xvzf dac.tar.gz -C $data_dir

In [8]:
df = spark.read.csv(data_dir + "/train.txt", header=False, sep="\t")

## Preparation des données

In [9]:
df2 = df.dropna()

In [10]:
print(df.count())
print(df2.count())

45840617
756554


In [10]:
df2.schema

StructType(List(StructField(_c0,StringType,true),StructField(_c1,StringType,true),StructField(_c2,StringType,true),StructField(_c3,StringType,true),StructField(_c4,StringType,true),StructField(_c5,StringType,true),StructField(_c6,StringType,true),StructField(_c7,StringType,true),StructField(_c8,StringType,true),StructField(_c9,StringType,true),StructField(_c10,StringType,true),StructField(_c11,StringType,true),StructField(_c12,StringType,true),StructField(_c13,StringType,true),StructField(_c14,StringType,true),StructField(_c15,StringType,true),StructField(_c16,StringType,true),StructField(_c17,StringType,true),StructField(_c18,StringType,true),StructField(_c19,StringType,true),StructField(_c20,StringType,true),StructField(_c21,StringType,true),StructField(_c22,StringType,true),StructField(_c23,StringType,true),StructField(_c24,StringType,true),StructField(_c25,StringType,true),StructField(_c26,StringType,true),StructField(_c27,StringType,true),StructField(_c28,StringType,true),StructFi

Renommage des colonnes par : label, f1, f2, f3,..., f39 + Conversion des colonnes 0 à 13 inclu en float

In [11]:
def mk_newNameCol():
    res = ['label']
    for i in range(1, 14):
        res.append('I' + str(i))
    for i in range(14, 14+27):
        res.append('C' + str(i - 13))
    return res

new_name = mk_newNameCol()
print(new_name)

df3 = df2.select(*[col(c).alias(new_c) for c, new_c in zip(df2.columns, new_name)])
df4 = df3.select(*(col(c).cast("float") if i < 14 else col(c) for i, c in enumerate(df3.columns)))
df4.head(1)

['label', 'I1', 'I2', 'I3', 'I4', 'I5', 'I6', 'I7', 'I8', 'I9', 'I10', 'I11', 'I12', 'I13', 'C1', 'C2', 'C3', 'C4', 'C5', 'C6', 'C7', 'C8', 'C9', 'C10', 'C11', 'C12', 'C13', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21', 'C22', 'C23', 'C24', 'C25', 'C26', 'C27']


[Row(label=1.0, I1=0.0, I2=127.0, I3=1.0, I4=3.0, I5=1683.0, I6=19.0, I7=26.0, I8=17.0, I9=475.0, I10=0.0, I11=9.0, I12=0.0, I13=3.0, C1='05db9164', C2='8947f767', C3='11c9d79e', C4='52a787c8', C5='4cf72387', C6='fbad5c96', C7='18671b18', C8='0b153874', C9='a73ee510', C10='ceb10289', C11='77212bd7', C12='79507c6b', C13='7203f04e', C14='07d13a8f', C15='2c14c412', C16='49013ffe', C17='8efede7f', C18='bd17c3da', C19='f6a3e43b', C20='a458ea53', C21='35cd95c9', C22='ad3062eb', C23='c7dc6720', C24='3fdb382b', C25='010f6491', C26='49d68486')]

Selectionner une partie des données

In [12]:
df5 = df4.limit(10000)

In [13]:
df5.count()

10000

Convertir les feature C1 à C26 en `int`

In [16]:
df6 = df5.select(*(col(c) for c in df5.columns)) # copy
fn_udf = udf(lambda x: int("0x" + x, 16))
for c in df6.columns[14:]:
    df6 = df6.withColumn(c, fn_udf(c))

In [17]:
df6.schema
df6.head(1)

[Row(label=1.0, I1=0.0, I2=127.0, I3=1.0, I4=3.0, I5=1683.0, I6=19.0, I7=26.0, I8=17.0, I9=475.0, I10=0.0, I11=9.0, I12=0.0, I13=3.0, C1='98275684', C2='2303194983', C3='298440606', C4='1386710984', C5='1291264903', C6='4222442646', C7='409410328', C8='185940084', C9='2805916944', C10='3467707017', C11='1998662615', C12='2035317867', C13='1912860750', C14='131152527', C15='739558418', C16='1224818686', C17='2399067775', C18='3172451290', C19='4137935931', C20='2757290579', C21='902665673', C22='2905629419', C23='3353110304', C24='1071331371', C25='17786001', C26='1238795398')]

In [22]:
df7 = df6.select(*(col(c) for c in df6.columns)) # copy
for c in df7.columns[14:]:
    df7 = df7.withColumn(c, df7[c].cast(LongType()))

In [23]:
df7.head(1)

[Row(label=1.0, I1=0.0, I2=127.0, I3=1.0, I4=3.0, I5=1683.0, I6=19.0, I7=26.0, I8=17.0, I9=475.0, I10=0.0, I11=9.0, I12=0.0, I13=3.0, C1=98275684, C2=2303194983, C3=298440606, C4=1386710984, C5=1291264903, C6=4222442646, C7=409410328, C8=185940084, C9=2805916944, C10=3467707017, C11=1998662615, C12=2035317867, C13=1912860750, C14=131152527, C15=739558418, C16=1224818686, C17=2399067775, C18=3172451290, C19=4137935931, C20=2757290579, C21=902665673, C22=2905629419, C23=3353110304, C24=1071331371, C25=17786001, C26=1238795398)]

In [85]:
assembler = VectorAssembler(
    inputCols=df7.columns[1:],
    outputCol="features")

In [88]:
data = assembler.transform(df7).select(col("label"), col("features"))

In [89]:
data.schema

StructType(List(StructField(label,FloatType,true),StructField(features,VectorUDT,true)))

## Apprentissage

In [90]:
seed = 54548421

In [91]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [92]:
tmp = data.randomSplit([0.6, 0.4], seed)
train = tmp[0]
test = tmp[1]

In [93]:
trainer = MultilayerPerceptronClassifier(maxIter=10, layers=[39, 2], blockSize=128, seed=seed)

In [94]:
model = trainer.fit(train)

In [96]:
res = model.transform(test)
pred_lbl = res.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(pred_lbl)))

Test set accuracy = 0.5150086827090052


## Arrêt du context

In [97]:
sc.stop()