In [59]:
sc

<pyspark.context.SparkContext at 0x7f71180a3e50>

In [60]:
spark.version

u'2.1.0'

In [61]:
# -*- coding: utf-8 -*-
import numpy as np
import datetime as tm
import json
import pandas as pd
from pyspark.storagelevel import StorageLevel
from pyspark.sql import *
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext, HiveContext
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import cPickle
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
sns.set_style('whitegrid')
from tr_utils import *

#Téléchargement des données de 750gr sur une semaine
adv = 656228
dates = get_period('2017-06-07', '2017-06-14')
path = 'gs://tr-parquet/its_panel/'
dfs = []
for date in dates:
    dfs += [sqlContext.read.load(path+date+'/ad='+str(adv))]
    
data = reduce(lambda x, y: x.unionAll(y), dfs)

In [None]:
#On récupère les données sous forme d'un DataFrame
dl = data.filter((F.col('ty')=='datalayer')&(F.col('id2')!='')&(F.col('id2')!=-1)&(F.col('id2')!=0))
#Creation d'un schema puis application a l'objet dataLayer
test = spark.read.json(dl.limit(1000).rdd.map(lambda x: x.dl))
schema = test.schema
with open('schema_dl_jv.pickle', 'w') as f:
    cPickle.dump(schema, f)
    
dl_parsed = dl.withColumn('dl2', F.from_json('dl', schema))

dl_parsed.printSchema()
#Les données les plus importantes sont celles situées dans le dataLayer
#On sélectionne les identifiants des users, les recettes consultées par le users ainsi que les ingrédients associés aux recettes
#Les ingrédients ne sont pas donnés comme des mots clés, mais avec des quantités associées (sous forme de phrase)
df=dl_parsed.select('dl2.ses.uuid2','dl2.dataLayer.sectionData.averageScore','dl2.dataLayer.sectionData.title'
                   ,'dl2.dataLayer.sectionData.ingredients')

df=df.filter(df.title != 'null')

In [63]:
#Nous envelons les accents de nos noms de recettes et ingrédients, car cela peut nous poser des problèmes..
import unicodedata
def strip_accents(s):
    if not s : 

        return s
    return s.encode('ascii', 'ignore').decode('ascii')

strip_accents_udf = F.udf(strip_accents,StringType())

In [64]:
#On regroupe dans une même ligne toutes les données relatives à un user: recettes et ingrédients associés à ces recettes
from pyspark.sql.functions import collect_list,concat_ws
df2=df.select('uuid2','averageScore',strip_accents_udf('title').alias('title'),strip_accents_udf('ingredients').alias('ingredients')).dropDuplicates().cache()
df3= df2.select('uuid2','averageScore',concat_ws('---', df2.title, df2.ingredients).alias('concat'))

grouped = df3.groupBy(['uuid2']).agg(collect_list('concat').alias('concat2'))
grouped.show(truncate=False)

In [65]:
#On sélectionne les 100 recettes les plus consultées
toprecettes= df2.groupBy('title').count().orderBy(F.desc('count')).head(100)

toprecettes=sc.parallelize(toprecettes).map(lambda x: x.title)

toprecettes.collect()

[u'Clafoutis aux cerises',
 u'Confiture de cerises facile',
 u'Gteau au yaourt',
 u'Mojito : Le vrai',
 u'Clafoutis moelleux aux cerises',
 u'Confiture de cerises',
 u'Salade pimontaise maison',
 u'Salade de riz',
 u'Tarte aux abricots',
 u'Cookies aux ppites de chocolat',
 u'Salades de ptes',
 u'Salade de ptes au saumon',
 u'Tiramisu aux fraises',
 u'Tarte au citron meringue',
 u'Tian de lgumes',
 u'Fondant au chocolat',
 u'Gratin de ptes au jambon, gratin au fromage rp Gusto Intenso Giovanni Ferrari',
 u'Tiramisu',
 u'Quiche sans pte conomique',
 u'Taboul',
 u'Flan de courgettes express',
 u'Flan ptissier maison',
 u'Tarte aux fraises',
 u'Tarte aux lgumes du soleil',
 u'Mini-moelleux  la cerise',
 u'Tarte aux pommes',
 u'Pavs de courgettes',
 u'Beignets de courgettes faciles',
 u'Fraisier',
 u'Gele de groseilles',
 u'Tians aux lgumes du Soleil',
 u'Tarte  la tomate et au chvre',
 u'Tarte au thon et aux tomates conomique',
 u"Confiture d'abricots",
 u'Gratin de ptes au jambon',
 u'Gr

In [None]:
#Nous séparons notre base deux: la base train et la base test.
#Sur la base train, nous modéliserons nos 100 régressions logistiques, et évaluerons la pertinence du modèle sur la base test
#Sur la base test, nous ne gardons que 2/3 des recettes, qui nous servent à obtenir les recommendations, et testons 
#nos recommmendations sur le tiers restant.


def lr(u):
    l=[]
    for i in u.concat2:
        l.append(i.split('---')[0])
    return l

def ir(u):
    l=[]
    try:
        for i in u.concat2:
            l.append(i.split('---')[1])
        return l
    except:
        return l

rdd=grouped.rdd.map(lambda x: Row(uuid2=x[0], listerecettes2=lr(x),ingredientsrecettes2=ir(x)))
grouped = sqlContext.createDataFrame(rdd)

def countrecettes(u):
    p=set(u)
    return float(len(list(p)))

countrecettes_=F.udf(countrecettes,FloatType())


def enleverdoublons(u):
    p=set(u)
    return list(p)

enleverdoublons_=F.udf(enleverdoublons,ArrayType(StringType()))


grouped=grouped.select('uuid2',enleverdoublons_('listerecettes2').alias('listerecettes3'),enleverdoublons_('ingredientsrecettes2').alias('ingredientsrecettes3'),'listerecettes2')

In [None]:
#On s'aide de la base Redis pour pouvoir indexer notre DataFrame

! sudo sh /home/Tradelab/install_redis.sh

! redis-cli CONFIG SET protected-mode no

! ifconfig | grep addr:10
    

In [12]:
import redis
connection = "10.240.0.23"
POOL = redis.ConnectionPool(host=connection, port=6379, db=0)
my_server = redis.Redis(connection_pool=POOL)

def addindex (X, server):
    locindex = int(server.get("index"))
    server.incr("index")
    return X[:3] + (locindex, )

def addindex_mapP (list_of_tuples):
    POOL = redis.ConnectionPool(host="10.240.0.23", port=6379, db=0)
    server = redis.Redis(connection_pool=POOL)
    newiterator = []
    for t in list_of_tuples:
        newiterator.append(addindex(t, server))
    POOL.disconnect()
    return iter(newiterator)

index = my_server.set("index", 1)
gg = grouped.rdd.mapPartitions(addindex_mapP).cache()
gg.count()

485238

In [None]:
gg=gg.toDF(['uuid2','listerecettes','ingredientsrecettes2','index'])

grouped2=gg.filter(gg.index<=(2*gg.count())/3)

grouped2.count()

In [None]:
grouped3=gg.filter(gg.index>(2*gg.count())/3)


def countrecettes(u):
    p=set(u)
    return float(len(list(p)))

countrecettes_=F.udf(countrecettes,FloatType())

countgrouped=grouped3.select('uuid2','index','listerecettes','ingredientsrecettes2',countrecettes_('listerecettes').alias('count'))
#Pour notre base test, nous ne sélectionnons que les individus qui ont visionné plus de 3 recettes, 
#afin d'obtenir les recommendations à l'aide de 2 recettes au minimum, et de vérifier la pertinence des recommendations à l'aide 
#d'une recette au minimum
grouped3=countgrouped.where("count >= 3.0")
grouped3=grouped3.select('uuid2','index','listerecettes','ingredientsrecettes2')

grouped3.show()

def trainrecettes(u):
    return u[0:int((2*len(u))/3)]

trainrecettes_=F.udf(trainrecettes,ArrayType(StringType()))

#Base d'apprentissage
dftrain=grouped3.select('uuid2','index',trainrecettes_('ingredientsrecettes2').alias('ingredientsrecettes'),
                        trainrecettes_('listerecettes').alias('listerecettes2'))

def testrecettes(u):
    return u[int((2*len(u))/3):len(u)]

testrecettes_=F.udf(testrecettes,ArrayType(StringType()))

#Base de test
resultattest=grouped3.select('uuid2','index',testrecettes_('listerecettes').alias('recettestest'))
resultattest.cache().show()


In [None]:
from pyspark.sql.functions import *
grouped2=grouped2.select('uuid2','index',col("ingredientsrecettes2").alias("ingredientsrecettes"),col("listerecettes").alias("listerecettes2"))

In [None]:
grouped=grouped2.union(dftrain)


print((2*gg.count())/3)

limit=(2*gg.count())/3

In [17]:
grouped.cache().count()

343225

In [18]:
resultattest.cache().count()

19691

In [None]:
#On veut construire une liste contenant tous les mots de la liste d'ingrédients, en se débarassant dans un premier temps
#des quantités, puis des stopwords à l'aide de la commande existante du package NLTK.
#On se débarasse également des lettres seules exclues, provenant d'abréviations comme c a s pour 'cuillière à soupe',
#g pour grammes, mg, kg etc.

import re
from nltk.corpus import stopwords
from pyspark.ml.feature import StopWordsRemover

list_nb = '0 1 2 3 4 5 6 7 8 9'.split()
list_lt='a b c d e f g h i j k l m n o p q r s t u v w kg mg gr cl ml dl cuil'.split()
stopwords=StopWordsRemover.loadDefaultStopWords('french')

def transform (l):
    return [mot for recette in l for mot in re.split(' |;',''.join([i if i not in list_nb else '' for i in recette ])) if mot!='' if mot not in stopwords and mot not in list_lt] 

transform_df  = F.udf(transform,ArrayType(StringType()))

#Nouveau dataFrame auquel on a appliqué la fonction transform
grouped2=grouped.select('uuid2','index','listerecettes2',transform_df('ingredientsrecettes').alias('ingredientsrecettes2'))

In [None]:
#Nous obtenons pour chaque individu un vecteur de la taille du corpus des ingrédients des recettes, dont chaque élément 
#représente la fréquence de l'ingrédient dans les recettes consultées par l'individu

from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="ingredientsrecettes2", outputCol="CountVectorizer")

model = cv.fit(grouped2)

result1 = model.transform(grouped2)

result1.show(truncate=False)

vocabulary=model.vocabulary

from pyspark.mllib.linalg import SparseVector

def bigvect1(x):
    return ([int(i) for i in x])


bigvect1_  = F.udf(bigvect1,ArrayType(IntegerType()))

resultat1=result1.select('uuid2','index','listerecettes2','ingredientsrecettes2',bigvect1_('CountVectorizer').alias('CountVectorizer2'))

resultat1.count()


#resultat1.write.save('gs://tr-work/ma/joa/sample.parquet')


resultattrain=resultat1.filter(resultat1.index<=limit)

resultat2=resultat1.filter(resultat1.index>limit)

resultat2.count()

In [21]:
#Création de la variable explicative de la régression logistique 

def containrecette(x, recette):
    if u'{}'.format(recette) in x:
        return 1.0
    else :
        return 0.0

def containrecette_udf (recette):
    return F.udf(lambda x: containrecette(x, recette), FloatType())

In [22]:
containrecette_= F.udf(containrecette,FloatType())

In [23]:
#Fonction qui, pour chaque recette, modélise un logit dont la variable explicative est la consultation de la recette et
#les variables explicatives sont les indicatrices des fréquences des ingrédients du corpus de recettes

from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint

def train(x) : 
    recette=x
    print(recette)
    # On fait le logit
    dftrain=resultat1.select('uuid2','index','listerecettes2','ingredientsrecettes2','CountVectorizer2',
                             containrecette_udf(recette)('listerecettes2').alias('containrecette'))
    pers1=dftrain.filter(dftrain.containrecette==1.0)
    pers2= dftrain.filter((dftrain.containrecette==0.0) &  (dftrain.index<=10000))
    dftrain=pers1.union(pers2)
    rdd=dftrain.select('CountVectorizer2','containrecette').rdd.map(lambda x : LabeledPoint(x.containrecette,x.CountVectorizer2))
    # Build the model
    model = LogisticRegressionWithLBFGS.train(rdd)
    return  model
    

In [24]:
#Evaluation des 100 modèles logit

dic={}

for i in toprecettes.collect():
    dic[i]=train(i)

Clafoutis aux cerises
Confiture de cerises facile
Gteau au yaourt
Mojito : Le vrai
Clafoutis moelleux aux cerises
Confiture de cerises
Salade pimontaise maison
Salade de riz
Tarte aux abricots
Cookies aux ppites de chocolat
Salades de ptes
Salade de ptes au saumon
Tiramisu aux fraises
Tarte au citron meringue
Tian de lgumes
Fondant au chocolat
Gratin de ptes au jambon, gratin au fromage rp Gusto Intenso Giovanni Ferrari
Tiramisu
Quiche sans pte conomique
Taboul
Flan de courgettes express
Flan ptissier maison
Tarte aux fraises
Tarte aux lgumes du soleil
Mini-moelleux  la cerise
Tarte aux pommes
Pavs de courgettes
Beignets de courgettes faciles
Fraisier
Gele de groseilles
Tians aux lgumes du Soleil
Tarte  la tomate et au chvre
Tarte au thon et aux tomates conomique
Confiture d'abricots
Gratin de ptes au jambon
Gratin de courgettes aux lardons
Lasagne bolognaise
Charlotte aux fraises
Salade fracheur gourmande
Pain perdu
Salade russe maison
Gratin de ptes aux lardons &  la mozzarella
Courg

In [26]:
#Prédiction des consultations ou non des 100 recettes pour chaque individu de la base test
def predictlogit(x):
    return [(i, float(dic[i].predict(x.CountVectorizer2))) for i in dic.keys()] 

In [27]:
resultat2.rdd.map(lambda x: (x.uuid2,predictlogit(x))).take(2)

[(u'8651735390227840478',
  [(u'Gratin de courgettes et pommes de terre facile', 0.0),
   (u'Mousse aux groseilles', 0.0),
   (u'Salade de ptes au Caprice des Dieux', 0.0),
   (u'Gratin de ptes au jambon, gratin au fromage rp Gusto Intenso Giovanni Ferrari',
    0.0),
   (u'Feuillets au fromage ultra simples', 0.0),
   (u'Tarte aux pommes', 0.0),
   (u'Briouates  la viande hache', 0.0),
   (u'Salade nioise', 0.0),
   (u'Tarte aux abricots', 0.0),
   (u'Cuisses de poulet et pomme de terre au four', 0.0),
   (u'Gteau aux pommes facile  raliser', 0.0),
   (u'Mini-moelleux  la cerise', 0.0),
   (u'Gratin de ptes maison', 0.0),
   (u'Gratin de ptes aux lardons &  la mozzarella', 0.0),
   (u'Tarte  la tomate et au chvre', 0.0),
   (u'La tarte fine aux abricots', 0.0),
   (u'Tarte aux fraises et  la crme ptissire', 0.0),
   (u'Smoothie aux fraises', 0.0),
   (u'Tian de pommes de terre, tomates et oignons rouges  la mozzarella', 0.0),
   (u'Gteau au chocolat', 0.0),
   (u"Madeleines  l'ancienn

In [28]:
#Ici, à la place de 0 ou 1 (consultation ou non de la recette), nous obtenons les probabilités de consultation des 100 recettes
def predictlogitbis(x):
    l=[]
    for i in dic.keys():
        dic[i].clearThreshold()
        l.append((i,float(dic[i].predict(x.CountVectorizer2))))
    return l

In [29]:
resultat2.rdd.map(lambda x: (x.uuid2,predictlogitbis(x))).take(10)

[(u'8651735390227840478',
  [(u'Gratin de courgettes et pommes de terre facile', 8.411315816700786e-137),
   (u'Mousse aux groseilles', 1.0517845966456068e-19),
   (u'Salade de ptes au Caprice des Dieux', 4.237338847e-314),
   (u'Gratin de ptes au jambon, gratin au fromage rp Gusto Intenso Giovanni Ferrari',
    3.2173615134919843e-103),
   (u'Feuillets au fromage ultra simples', 2.667022494738053e-29),
   (u'Tarte aux pommes', 2.2890384457333244e-31),
   (u'Briouates  la viande hache', 7.462931203687508e-129),
   (u'Salade nioise', 2.6210536551345374e-99),
   (u'Tarte aux abricots', 3.0084606150109235e-71),
   (u'Cuisses de poulet et pomme de terre au four', 1.3658795617292013e-73),
   (u'Gteau aux pommes facile  raliser', 1.0983177647019014e-35),
   (u'Mini-moelleux  la cerise', 2.853352615616646e-122),
   (u'Gratin de ptes maison', 2.638899223470993e-241),
   (u'Gratin de ptes aux lardons &  la mozzarella', 1.637485054302946e-30),
   (u'Tarte  la tomate et au chvre', 1.4887829916749

In [30]:
resultat4=resultat2.rdd.map(lambda x: (x.uuid2,predictlogitbis(x))).toDF()

In [31]:
resultat4 = resultat4.select(col("_1").alias("uuid2"), col("_2").alias("predictrecettesbis"))

In [32]:
resultat3=resultat2.rdd.map(lambda x: (x.uuid2,predictlogit(x))).toDF()

In [34]:

resultat3 = resultat3.select(col("_1").alias("uuid2"), col("_2").alias("predictrecettes"))

In [35]:
resultat5=resultat4.join(resultat3.select('uuid2','predictrecettes'),'uuid2')

In [36]:
#Nous fusionnons nos 1/3 de recettes restant dans notre base test avec notre Dataframe contenant nos prédictions pour notre base 
#test, afin de les comparer et obtenir notre taux de performance
test=resultat5.join(resultattest.select('uuid2','recettestest'),'uuid2')

In [38]:
test.cache().count()

19691

In [39]:
#Fonction qui évalue la pertinence des recommendations (si une recette prédite appartient aux 1/3 des recettes restant
#dans notre base test
def resultpredict(x,y):
    for i in x:
        if i[1]==1.0:
            if i[0] in y:
                return 1.0
                break
    return 0.0

In [40]:
resultpredict_=F.udf(resultpredict,FloatType())

In [41]:
#Fonction qui calcule le nombre de recommendations
def countreco(x):
    nb=0.0
    for i in x:
        if i[1]==1.0:
            nb+=1.0
    return nb

In [42]:
countreco_= F.udf(countreco,FloatType())

In [43]:
results = test.select('uuid2','predictrecettes','predictrecettesbis','recettestest',
                      resultpredict_('predictrecettes','recettestest').alias('resultpredict'),
                      countreco_('predictrecettes').alias('countreco'))

In [44]:
l=[i.countreco for i in results.select('countreco').collect()]

In [None]:
nb=0
for i in l:
    print(i)
    nb+=i
    
    

In [46]:
ll=results.filter(results.countreco > 0.0)

In [47]:
#Nombre de recettes recommendées en moyenne 
countrecettesreco=float(nb)/float(len(l))

In [48]:
countrecettesreco

7.56604540145244

In [49]:
#Nombre d'individus qui n'ont pas recu de recommendations

In [50]:
print(len(l)-ll.count())

7176


In [51]:
lll=[i.countreco for i in ll.select('countreco').collect()]

In [None]:
nb2=0
for i in lll:
    print(i)
    nb2+=i
    
    

In [53]:
#Nombre de recettes recommendées en moyenne pour les individus qui ont reçu des recommendations
countrecettesrecopos= float(nb2)/float(len(lll))

In [54]:
countrecettesrecopos 

11.904354774270875

In [55]:
#Notre score de performance T
txprediction=float(results.filter(results.resultpredict==1.0).count())/float(results.count())

In [56]:
txprediction

0.12838352546848814

In [57]:
#Nous avons un problème :des individus ne reçoivent pas de recommendations et certains en reçoivent plus ou moins 5
#On résout ce problème en regardant les probabilités de consultation de recettes pour les 100 recettes et 
#et en recommendant les 5 recettes aux plus hautes probabilités

In [58]:
def otherreco(x):
    l1=[]
    l2=[]
    for i in x:
        l1.append(i[1])
    l1=sorted(l1,reverse=True)[:5]
    for i in x:
        if i[1] in l1:
            l2.append(i[0])
    return l2

In [59]:
otherreco_=F.udf(otherreco,ArrayType(StringType()))

In [60]:
results2 = results.select('uuid2','recettestest','predictrecettesbis','countreco',
                      otherreco_('predictrecettesbis').alias('otherreco'))

In [63]:
results2.count()

19691

In [64]:
def resultpredict2(x,y):
    for i in x:
        if i in y:
            return 1.0
            break
    return 0.0

In [65]:
resultpredict2_=F.udf(resultpredict2,FloatType())

In [66]:
results3= results2.select('uuid2','recettestest', 'otherreco',
                      resultpredict2_('otherreco','recettestest').alias('resultpredict2'))

In [68]:
#Notre score T de la deuxième méthode, il est meilleur comme prévu
txprediction2=float(results3.filter(results3.resultpredict2==1.0).count())/float(results3.count())

In [69]:
txprediction2

0.1484434513229394