A faire :
* Réécrire de manière clean la description de l'algorithme
* Travailler sur l'output de l'algo

# 1. Description of the algorithm (in natural language)

The approach of the article is "Having computed $q^∗\left(z;u,s;\hat{\theta}\right)$, we output three (key, value) pairs in the Mapper: $(u, q^∗)$, $(s, q^∗)$, and $(z, q^∗)$."

In the reduceByKey part however, we can only compute a sum on a couple of variables (e.g. $(u,s)$) and not on a single variable. So we can easily compute $N(z) = \sum_s\sum_u q^*(z;u,s)$ but not $N(z,s) = \sum_u q^*(z;u,s)$ or $\hat{p}(z,u)=\frac{\sum_s q^*(z;u,s)}{\sum_z\sum_s q^*(z;u,s)}$ ...


* Initialize :
 * document = 2 first columns of the data (remove the headline) --> co-occurences film-user
 * map ([u,s,z], random)


* ([s,z], q*) --> mapper1
* ([u,z], q*) --> mapper2

* reduceByKey N(z,s) : ([z,s], sum q* du mapper1) 
* map N(z,s)/N(z) : ([z,s], N(z,s)/sum_s N(z,s))
* reduceByKey p(z|u) : ([z,u] sum q^* du mapper 2)
* map p(z|u) : ([z,u] p(z|u)/ sum_z p(z|u))

* map q(z;u,s) : ([z,u,s], N(z,s)/N(z)$*$p(z|u))
* map q(z;u,s) : ([z,u,s], q(z;u,s)/sum_z q(z;u,s))

# 2. The algorithm

In [1]:
import pandas as pd
import pyspark
from numpy import random 

In [2]:
#pd.read_csv("ratings.csv").head(1000).to_csv("ratings_short.csv",sep=',',index=False)

In [3]:
sc = pyspark.SparkContext()

## 2.1. Running the 1st iteration of the algorithm step-by-step

In [4]:
# nb of 'clusters' is a broadcast variable (useful, but not necessary, when we do the cartesian product)
nb_z = sc.broadcast(3)

In [5]:
document = sc.textFile("/Users/sachaizadi/Desktop/Projet\ DataBase\ Management/Notebooks/ratings_short.csv")

In [6]:
# A parser for extracting the interresting information from the dataset.

def parseLine(line):
    ## Pour l'instant j'ai viré la headline à la mano
    ## To Do : virer la headline
    line = line.split(',')
    line = line[0]+','+line[1]
    return(line)

In [7]:
# for a click-log couple (u,s), create a tuple (u,s,z) for z in Z

def cartesianProd(us):
    to_return = []
    for z in range(nb_z.value):
        to_return += [us+','+str(z)]
    return(to_return)

In [8]:
# creation of the tuples (u,s,z ; q*)
q0 = document.map(parseLine).flatMap(cartesianProd).map(lambda usz : (usz,random.rand()))
q0.collect()[:3]

[('1,31,0', 0.8306421946730473),
 ('1,31,1', 0.9679607861144792),
 ('1,31,2', 0.39185659360881464)]

In [9]:
##### ********* M-step (computation of the N(s,z) & N(z) functions) ********

# return (s,z, N(s,z))
Nsz = q0.map(lambda q : (q[0].split(',')[1]+','+q[0].split(',')[2],q[1])).reduceByKey(lambda x,y : x+y)
print('(s,z, N(s,z)): \n', Nsz.collect()[:1],'\n')

# return (z, N(z)=∑N(s,z))
Nz = Nsz.map(lambda N : (N[0].split(',')[1], N[1])).reduceByKey(lambda x,y : x+y)
print('(z, N(z)): \n', Nz.collect()[:1],'\n')

# return (s,z, N(s,z)/N(z))
Nsz = Nsz.map(lambda x : (x[0].split(',')[1], (x[0].split(',')[0],x[1]))) #('0', ('31', 0.6602937910124607))
tmpN = Nsz.join(Nz) #('0', (('31', 0.6602937910124607), 501.224237413403))
Nsz_normalized = tmpN.map(lambda x : (x[1][0][0]+','+x[0], x[1][0][1]/x[1][1]))
print('(s,z, N(s,z)/N(z)): \n', Nsz_normalized.collect()[:1],'\n')

(s,z, N(s,z)): 
 [('31,0', 0.43382269053287104)] 

(z, N(z)): 
 [('0', 516.2809032327694)] 

(s,z, N(s,z)/N(z)): 
 [('31,0', 0.0008402842092675246)] 



In [10]:
##### ********* M-step (computation of the p(z|u) function) ********

# p~(z|u) is the un-normalized value of p(z|u), i.e. ∑p~(z|u) ≠ 1

# return (u,z, p~(z|u))
Puz = q0.map(lambda q : (q[0].split(',')[0]+','+q[0].split(',')[2],q[1])).reduceByKey(lambda x,y : x+y)
print('(u,z, p~(z|u)): \n', Puz.collect()[:1],'\n')

# return (u, ∑p~(z|u))
Pu = Puz.map(lambda p : (p[0].split(',')[0], p[1])).reduceByKey(lambda x,y : x+y)
print('(u, ∑p~(z|u)): \n', Pu.collect()[:1],'\n')

# return (u, z, p(z|u)=p_(z|u)/∑p_(z|u))
Puz = Puz.map(lambda x : (x[0].split(',')[0], (x[0].split(',')[1],x[1]))) #('1', ('1', 9.849132242962598))
tmpP = Puz.join(Pu) #('4', (('0', 104.52747209196086), 314.82837141455155))
Puz = tmpP.map(lambda x : (x[0]+','+x[1][0][0], x[1][0][1]/x[1][1]))
print('(u,z, p(z|u)): \n', Puz.collect()[:1],'\n')

(u,z, p~(z|u)): 
 [('1,1', 10.532771928584513)] 

(u, ∑p~(z|u)): 
 [('1', 33.75011761937426)] 

(u,z, p(z|u)): 
 [('4,0', 0.3330270768350648)] 



In [11]:
##### ********* E-step (computation of the q*(u,s;z) function) ********

# 1st pre-step: join q0(u,s;z) & p(u|z) on u & z - and forget old value of q*
tmpQ0 = q0.map(lambda x : (x[0].split(',')[0]+','+x[0].split(',')[2] , x[0].split(',')[1])) #('4,0', 0.346651199923261)
tmpQ0 = tmpQ0.join(Puz) #('9,0', ('1', 0.3019536803179005))

# 2nd pre-step: join q0(u,s;z) & N(s,z)/N(z) on s & z
tmpQ0 = tmpQ0.map(lambda x : (x[1][0]+','+x[0].split(',')[1] ,\
                             (x[0].split(',')[0],x[1][1]))) #('1,0', ('9', 0.3019536803179005))
tmpQ0 = tmpQ0.join(Nsz_normalized) #('534,0', (('9', 0.3019536803179005), 7.06974580116717e-05))




# 1st step for computing q*(u,s;z) --> return ((u,s;z), N(s,z)/N(z)*p(u|z))
# q*~ = N(s,z)/N(z)*p(u|z) is the unormalized version of q* - i.e. ∑q*~ ≠ 1
tmpQ0 = tmpQ0.map(lambda x : (x[1][0][0]+','+x[0],\
                             x[1][0][1]*x[1][1]))
print('(u,s;z, N(s,z)/N(z)*p(u|z)): \n', tmpQ0.collect()[:1],'\n')

# 2nd step for computing q*(u,s;z) --> return ((u,s), ∑N(s,z)/N(z)*p(u|z))
sumTmpQ0 = tmpQ0.map(lambda x : (x[0].split(',')[0]+','+x[0].split(',')[1],x[1])).reduceByKey(lambda x,y : x+y)
print('(u,s, ∑N(s,z)/N(z)*p(u|z)): \n', sumTmpQ0.collect()[:1],'\n')


# 3rd step for computing q*(u,s;z) --> return ((u,s,z), N(s,z)/N(z)*p(u|z)/{∑N(s,z)/N(z)*p(u|z))}
tmpQ0 = tmpQ0.map(lambda x : (x[0].split(',')[0]+','+x[0].split(',')[1],\
                             (x[0].split(',')[2],x[1])))
tmpQ0 = tmpQ0.join(sumTmpQ0)
q1 = tmpQ0.map(lambda x : (x[0]+','+x[1][0][0], x[1][0][1]/x[1][1]))
print('(u,s;z, q*(u,s;z)): \n', q1.collect()[:1],'\n')

(u,s;z, N(s,z)/N(z)*p(u|z)): 
 [('9,534,0', 0.00108285749580116)] 

(u,s, ∑N(s,z)/N(z)*p(u|z)): 
 [('12,608', 0.0019184037277609474)] 

(u,s;z, q*(u,s;z)): 
 [('10,2827,0', 0.29526173271002115)] 



## 2.2. Looping through the algorithm

In [12]:
# A parser for extracting the interresting information from the dataset.

def parseLine(line):
    ## Pour l'instant j'ai viré la headline à la mano
    ## To Do : virer la headline
    line = line.split(',')
    line = line[0]+','+line[1]
    return(line)

# for a click-log couple (u,s), create a tuple (u,s,z) for z in Z

def cartesianProd(us):
    to_return = []
    for z in range(nb_z.value):
        to_return += [us+','+str(z)]
    return(to_return)

In [13]:
nb_z = sc.broadcast(3)
nb_iter = 10
document = sc.textFile("/Users/sachaizadi/Desktop/Projet\ DataBase\ Management/Notebooks/ratings_short.csv")

In [14]:
###### Initialisation 
# creation of the tuples (u,s,z ; q*)
q = document.map(parseLine).flatMap(cartesianProd).map(lambda usz : (usz,random.rand()))
q.collect()[:3]

for k in range(nb_iter) :
    
    # *************************** M-step **************************
    # ********* computation of the N(s,z) & N(z) functions *********

    # return (s,z, N(s,z))
    Nsz = q.map(lambda Q : (Q[0].split(',')[1]+','+Q[0].split(',')[2],Q[1])).reduceByKey(lambda x,y : x+y)

    # return (z, N(z)=∑N(s,z))
    Nz = Nsz.map(lambda N : (N[0].split(',')[1], N[1])).reduceByKey(lambda x,y : x+y)

    # return (s,z, N(s,z)/N(z))
    Nsz = Nsz.map(lambda x : (x[0].split(',')[1], (x[0].split(',')[0],x[1])))
    tmpN = Nsz.join(Nz)
    Nsz_normalized = tmpN.map(lambda x : (x[1][0][0]+','+x[0], x[1][0][1]/x[1][1]))
    
    
    
    # ********* computation of the p(z|u) function *********
    # p~(z|u) is the un-normalized value of p(z|u), i.e. ∑p~(z|u) ≠ 1

    # return (u,z, p~(z|u))
    Puz = q.map(lambda Q : (Q[0].split(',')[0]+','+Q[0].split(',')[2],Q[1])).reduceByKey(lambda x,y : x+y)

    # return (u, ∑p~(z|u))
    Pu = Puz.map(lambda p : (p[0].split(',')[0], p[1])).reduceByKey(lambda x,y : x+y)

    # return (u, z, p(z|u)=p_(z|u)/∑p_(z|u))
    Puz = Puz.map(lambda x : (x[0].split(',')[0], (x[0].split(',')[1],x[1])))
    tmpP = Puz.join(Pu)
    Puz = tmpP.map(lambda x : (x[0]+','+x[1][0][0], x[1][0][1]/x[1][1]))
    
   
    
    
    
    # *************************** E-step **************************
    # ********* computation of the q*(u,s;z) function *********

    # 1st pre-step: join q(u,s;z) & p(u|z) on u & z - and forget old value of q*
    tmpQ = q.map(lambda x : (x[0].split(',')[0]+','+x[0].split(',')[2] , x[0].split(',')[1]))
    tmpQ = tmpQ.join(Puz)

    # 2nd pre-step: join q0(u,s;z) & N(s,z)/N(z) on s & z
    tmpQ = tmpQ.map(lambda x : (x[1][0]+','+x[0].split(',')[1] ,\
                                 (x[0].split(',')[0],x[1][1])))
    tmpQ = tmpQ.join(Nsz_normalized)




    # 1st step for computing q*(u,s;z) --> return ((u,s;z), N(s,z)/N(z)*p(u|z))
    # q*~ = N(s,z)/N(z)*p(u|z) is the unormalized version of q* - i.e. ∑q*~ ≠ 1
    tmpQ = tmpQ.map(lambda x : (x[1][0][0]+','+x[0],\
                                 x[1][0][1]*x[1][1]))

    # 2nd step for computing q*(u,s;z) --> return ((u,s), ∑N(s,z)/N(z)*p(u|z))
    sumTmpQ = tmpQ.map(lambda x : (x[0].split(',')[0]+','+x[0].split(',')[1],x[1])).reduceByKey(lambda x,y : x+y)

    # 3rd step for computing q*(u,s;z) --> return ((u,s,z), N(s,z)/N(z)*p(u|z)/{∑N(s,z)/N(z)*p(u|z))}
    tmpQ = tmpQ.map(lambda x : (x[0].split(',')[0]+','+x[0].split(',')[1],\
                                 (x[0].split(',')[2],x[1])))
    tmpQ = tmpQ.join(sumTmpQ)
    q = tmpQ.map(lambda x : (x[0]+','+x[1][0][0], x[1][0][1]/x[1][1]))
    
    # we need to collect at this point because otherwise it behaves like a recursive calling which may crash
    q = sc.parallelize(q.collect())