### Pyspark initialisation

In [1]:
import findspark
pyspark_path = '/Users/charlotte.caucheteux/Desktop/spark-2.2.0-bin-hadoop2.7'
findspark.init(pyspark_path)

In [2]:
from pyspark import SparkContext
from time import time
import numpy as np
from operator import add

In [3]:
sc = SparkContext()

### Loading / preprocessing of the data 

Each line in the ratings dataset (ratings.csv) is formatted as: **userId,movieId,rating,timestamp**

In [4]:
small_ratings_raw_data = sc.textFile('../datasets/ml-latest-small/ratings.csv')
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [5]:
def parser(x):
    x = x.split(",")
    return x

In [6]:
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]
data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header).map(parser).map(lambda tokens: (tokens[0],tokens[1])).cache()
data.take(5)

[('1', '31'), ('1', '1029'), ('1', '1061'), ('1', '1129'), ('1', '1172')]

### EM Algorithm 

The EM algorithm for computing PLSI parameters can be parallelized by writing the equations as: 

$$q^*(z; u, y ; \hat \theta )= p(z| u,y ; \hat \theta )= \frac{\hat p (z|u) * \frac{N(z,y)}{N(z)}}{\sum_{z}\hat p (z|u) * \frac{N(z,y)}{N(z)}}$$
$$ N(z,y) = \sum_{u}q^*(z; u, y ; \hat \theta )$$
$$ N(z) \sum_{y}\sum_{u} q^*(z; u, y ; \hat \theta )$$
$$ \hat p(z|u) = \frac{\sum_{s} q^*(z; u, y ; \hat \theta}{\sum_{z}\sum_{y} q^*(z; u, y; \hat \theta)}$$

#### GroupByKey() Version

In [7]:
nu = data.map(lambda x: x[0]).distinct().count()
ns = data.map(lambda x: x[1]).distinct().count()
print('Number of users:',nu)
print('Number of movies:',ns)

Number of users: 671
Number of movies: 9066


In [8]:
nz = 5
print('Number of communities:',nz)
z_ids = sc.parallelize(list(range(1,nz+1)))

Number of communities: 5


In [9]:
#Specify a complete data model usz: (u, s, z)
usz = data.cartesian(z_ids).map(lambda x: (x[0][0], x[0][1], x[1]))
print("(u,s,z):",usz.take(1))

(u,s,z): [('1', '31', 1)]


In [10]:
#Initialization of q
np.random.seed(123)

#q: (u,s,z) => ((u,s,z), q)
q = usz.map(lambda x :(x,np.random.rand()))
print('(u,s,z),q)):', q.take(1))

(u,s,z),q)): [(('1', '31', 1), 0.7991169055789756)]


In [11]:
###Maximisation (1)

#Nzs: (z,(s,N(z,s)))
Nzs = q.map(lambda x:((x[0][2],x[0][1]),x[1])).reduceByKey(lambda x,y:x+y).map(lambda x:(x[0][0],(x[0][1],x[1])))
print('(z, (s, N(s,z))', Nzs.take(1))

#Nzs_norm: (z,(s,N(z,s))) => (z,(s,N(z,s)/N(z)))
Nzs_norm= Nzs.groupByKey().flatMap(lambda x: [(x[0],(i[0],i[1]/sum([i[1] for i in x[1]]))) for i in x[1]])
print('(z, (s, N(s,z)/N(z))', Nzs_norm.take(1))

###Sanity Check: Probabilities add up to 1
Nzs_norm.map(lambda x : (x[0],x[1][1])).reduceByKey(lambda x,y : x+y).take(5)

(z, (s, N(s,z)) [(1, ('1172', 22.3773845802829))]
(z, (s, N(s,z)/N(z)) [(1, ('1172', 0.00044871722964156944))]


[(1, 0.9999999999999989),
 (2, 0.9999999999999983),
 (3, 0.9999999999999989),
 (4, 1.0000000000000002),
 (5, 1.0000000000000047)]

In [42]:
###Maximsisation (2)

#Pzu: (u,s,z),q)) => (z,(u,q))
Pzu =  q.map(lambda x:((x[0][2],x[0][0]),x[1])).reduceByKey(lambda x,y:x+y).map(lambda x:(x[0][0],(x[0][1],x[1])))
print('(z,(u,q))', Pzu.take(1))

#Pzu_norm: (z,(u,q)) => (z, (u, p(z|u)))
Pzu_norm = Pzu.groupByKey().flatMap(lambda x: [(x[0],(i[0],i[1]/sum([i[1] for i in x[1]]))) for i in x[1]])
print('z, (u, p(z|u))', Pzu_norm.take(1))

###Sanity Check: Probabilities add up to 1
Pzu_norm.map(lambda x : (x[0],x[1][1])).reduceByKey(lambda x,y : x+y).take(5)

(z,(u,q)) [('3', (1, 27.97362685528661))]
z, (u, p(z|u)) [(2, ('4', 0.2031906738034227))]


[('4', 1.0), ('10', 1.0), ('12', 1.0), ('16', 1.0), ('20', 1.0)]

We can now compute the new $q^{*}(z; u, s)$ given the new parameters. 

In [43]:
###Expectation

#(z, ((s,q*), (u,q*))) ==> ((u,s), (z, Pzu * N(z,s)/N(z)))
temp_q = Pzu_norm.join(Nzs_norm).map(lambda x : ((x[1][0][0],x[1][1][0]), (x[0], x[1][0][1]*x[1][1][1])))
print('(u,s),(z,Pzu * N(z,s)/N(z))):', temp_q.take(1))

#((u,s,z),q*)
q = temp_q.groupByKey().flatMap(lambda x: [((x[0][0],x[0][1],i[0]),i[1]/sum([i[1] for i in x[1]])) for i in x[1]])
print('(u,s,z),q)):', q.take(1))

(u,s),(z,Pzu * N(z,s)/N(z))): [(('4', '1172'), (1, 0.0001036446083791483))]
(u,s,z),q)): [(('4', '3101', 1), 0.19345679835867843)]


In [None]:
#final result 
temp_q.map(lambda x : ((x[0][0],x[0][1]),x[1][1])).reduceByKey(lambda x,y : x+y).take(10)

#### Full GroupyBy() EM Algorithm  

In [19]:
import time

nz = 5
z_ids = sc.parallelize(list(range(1,nz+1)))
usz = data.cartesian(z_ids).map(lambda x: (x[0][0], x[0][1], x[1]))
max_iter = 20


#Initialize
q = usz.map(lambda x :(x,np.random.rand()))

sc.setCheckpointDir('/Users/alexmomeni/Documents/Alex/Polytechnique/MAP543/Project')

for i in range (max_iter):
    #Cache in memory
    q.persist()
    print(q.getNumPartitions())
    
    #print("Maximisation")
    
    #print("Calculating Nzs_norm")
    Nzs = q.map(lambda x:((x[0][2],x[0][1]),x[1])).reduceByKey(lambda x,y:x+y).map(lambda x:(x[0][0],(x[0][1],x[1])))
    Nzs_norm= Nzs.groupByKey().flatMap(lambda x: [(x[0],(i[0],i[1]/sum([i[1] for i in x[1]]))) for i in x[1]])
    
    #print("Calculating Pzu_norm")
    Pzu =  q.map(lambda x:((x[0][2],x[0][0]),x[1])).reduceByKey(lambda x,y:x+y).map(lambda x:(x[0][1],(x[0][0],x[1])))
    Pzu_norm = Pzu.groupByKey().flatMap(lambda x: [(i[0],(x[0],i[1]/sum([j[1] for j in x[1]]))) for i in x[1]])
    
    
    #print("Expectation")
    temp_q = Pzu_norm.join(Nzs_norm).map(lambda x : ((x[1][0][0],x[1][1][0]), (x[0], x[1][0][1]*x[1][1][1])))
    q = temp_q.groupByKey().flatMap(lambda x: [((x[0][0],x[0][1],i[0]),i[1]/sum([i[1] for i in x[1]])) for i in x[1]])
    
    #Coalesce
    q = q.coalesce(8)
    
    #Checkpoint
    if i % 2 == 0:
        q.checkpoint()

8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8


In [20]:
s_time = time.time()
q.collect()
e_time = time.time()
print('Elapsed time : ' + str(e_time-s_time))

Elapsed time : 16.390833377838135


In [None]:
#final result 
temp_p.map(lambda x : ((x[0][0],x[0][1]),x[1][1])).reduceByKey(lambda x,y : x+y).take(10)

#### Join Version 

In [53]:
#Initialization of q
np.random.seed(123)

#q: (u,s,z) => ((u,s,z), q)
q = usz.map(lambda x :(x,np.random.rand()))
print('(u,s,z),q)):', q.take(1))

(u,s,z),q)): [(('1', '31', 1), 0.18800826990919994)]


In [54]:
###Maximisation (1)

Nzs = q.map(lambda x : ((x[0][1],x[0][2]),x[1])).reduceByKey(lambda x,y : x+y)
print('(s,z, N(z,s)):', Nzs.take(1))

Nz = Nzs.map(lambda x : (x[0][1], x[1])).reduceByKey(lambda x,y : x+y)
print('(z, N(z)):', Nz.take(1))

Nzs_norm = Nzs.map(lambda x : (x[0][1], (x[0][0],x[1]))).join(Nz).\
            map(lambda x : (x[1][0][0],x[0], x[1][0][1]/x[1][1])) 
print('s,z, N(s,z)/N(z)', Nzs_norm.take(1))

(s,z, N(z,s)): [(('1172', 1), 0.33363518032561046)]
(z, N(z)): [(1, 503.52692891363654)]
s,z, N(s,z)/N(z) [('1172', 1, 0.0006625964991493723)]


In [51]:
###Maximisation (2)

Pzu_nominator = q.map(lambda x : ((x[0][0], x[0][2]),x[1])).reduceByKey(lambda x,y : x+y)
print('(u,z, Pzunom:', Pzu_nominator.take(1))

Pzu_denominator = Pzu_nominator.map(lambda x: (x[0][0], x[1])).reduceByKey(lambda x,y : x+y)
print('(z,(u,q))', Pzu_denominator.take(1))

Pzu_norm = Pzu_nominator.map(lambda x : (x[0][0], (x[0][1],x[1]))).join(Pzu_denominator).\
map(lambda x : (x[0],x[1][0][0], x[1][0][1]/x[1][1])) 
print('(u,z, p(z|u))', Pzu_norm.take(1))

(u,z, Pzunom: [(('3', 1), 24.882038207376006)]
(z,(u,q)) [('4', 528.9769289105363)]
(u,z, p(z|u)) [('4', 1, 0.21069253771255209)]


In [52]:
###Expectation

q_nom = Pzu_norm.map(lambda x: (x[1], (x[0],x[2]))).\
join(Nzs_norm.map(lambda x: (x[1],(x[0],x[2])))).\
map(lambda x: ((x[0],x[1][1][0],x[1][0][0]),x[1][0][1] * x[1][1][1]))
print('(s,z,u, p(z|u)*N(z,s)/N(z)', q_nom.take(1))

q_denom = q_nom.map(lambda x: ((x[0][1],x[0][2]),x[1])).reduceByKey(lambda x,y : x+y)
print('(s,u, Sum p(z|u)*N(z,s)/N(z)', q_denom.take(1))

q = q_nom.map(lambda x : ((x[0][1], x[0][2]),(x[0][0],x[1]))).join(q_denom).\
    map(lambda x : (x[0][1],x[0][0], x[1][0][0], x[1][0][1]/x[1][1]))
    
q = q.map(lambda x: ((x[0],x[1],x[2]),x[3]))
print('(u,s,z),q)):', q.take(1))

(s,z,u, p(z|u)*N(z,s)/N(z) [((1, '1172', '4'), 3.728629865139878e-05)]
(s,u, Sum p(z|u)*N(z,s)/N(z) [(('1293', '4'), 0.000800382978345479)]
(u,s,z),q)): [(('4', '2003', 1), 0.15970077522791679)]


#### Full Join EM Algorithm  

In [25]:
import time

nz = 5
z_ids = sc.parallelize(list(range(1,nz+1)))
usz = data.cartesian(z_ids).map(lambda x: (x[0][0], x[0][1], x[1]))
max_iter = 10


#Initialize
q = usz.map(lambda x :(x,np.random.rand()))
n = q.getNumPartitions()

sc.setCheckpointDir('/Users/alexmomeni/Documents/Alex/Polytechnique/MAP543/Project')

for i in range (max_iter):
    #Cache in memory
    q.persist()
    print(q.getNumPartitions())
    
    #print("Maximisation")
    
    #print("Calculating Nzs_norm")
    Nzs = q.map(lambda x : ((x[0][1],x[0][2]),x[1])).reduceByKey(lambda x,y : x+y)
    Nz = Nzs.map(lambda x : (x[0][1], x[1])).reduceByKey(lambda x,y : x+y)
    Nzs_norm = Nzs.map(lambda x : (x[0][1], (x[0][0],x[1]))).join(Nz).\
            map(lambda x : (x[1][0][0],x[0], x[1][0][1]/x[1][1])) 
    
    #print("Calculating Pzu_norm")
    Pzu_nominator = q.map(lambda x : ((x[0][0], x[0][2]),x[1])).reduceByKey(lambda x,y : x+y)
    Pzu_denominator = Pzu_nominator.map(lambda x: (x[0][0], x[1])).reduceByKey(lambda x,y : x+y)
    Pzu_norm = Pzu_nominator.map(lambda x : (x[0][0], (x[0][1],x[1]))).join(Pzu_denominator).\
    map(lambda x : (x[0],x[1][0][0], x[1][0][1]/x[1][1])) 
    
    #print("Expectation")
    q_nom = Pzu_norm.map(lambda x: (x[1], (x[0],x[2]))).\
    join(Nzs_norm.map(lambda x: (x[1],(x[0],x[2])))).\
    map(lambda x: ((x[0],x[1][1][0],x[1][0][0]),x[1][0][1] * x[1][1][1]))
    
    q_denom = q_nom.map(lambda x: ((x[0][1],x[0][2]),x[1])).reduceByKey(lambda x,y : x+y)
    
    q = q_nom.map(lambda x : ((x[0][1], x[0][2]),(x[0][0],x[1]))).join(q_denom).\
    map(lambda x : (x[0][1],x[0][0], x[1][0][0], x[1][0][1]/x[1][1]))
    
    q = q.map(lambda x: ((x[0],x[1],x[2]),x[3]))
    
    #Coalesce
    q = q.coalesce(n*2)
    
    #Checkpoint
    #if i % 2 == 0:
     #   q.checkpoint()

8
16
16
16
16
16
16
16
16
16


In [56]:
s_time = time.time()
q.collect()
e_time = time.time()
print('Elapsed time : ' + str(e_time-s_time))

Elapsed time : 24.65009880065918


#### Broadcast Version

In [12]:
#norm : (z,q) => (z,sum_q)
norm = q.map(lambda x:(x[0][2],x[1])).reduceByKey(lambda x,y:x+y)
smallDict = dict((x[0], x[1]) for x in norm.collect())
bc=sc.broadcast(smallDict)
print(bc.value)

{1: 50046.04036815751, 2: 49986.32360003928, 3: 50030.54070440937, 4: 50026.263838033316, 5: 50018.513985679965}


In [13]:
Nzs = q.map(lambda x:((x[0][2],x[0][1]),x[1])).reduceByKey(lambda x,y:x+y).map(lambda x:(x[0][0],(x[0][1],x[1])))
print('(z, (s, N(s,z))', Nzs.take(1))

(z, (s, N(s,z)) [(1, ('1172', 22.460923561318676))]


In [14]:
#Pzu: ((u,s,z),q) => ((z,u),q) => ((z,u),sum_q) => ((z,(u,q)))
Pzu =  q.map(lambda x:((x[0][2],x[0][0]),x[1])).reduceByKey(lambda x,y:x+y).map(lambda x:(x[0][0],(x[0][1],x[1])))
print('(z,(u,q))', Pzu.take(1))

(z,(u,q)) [(1, ('3', 21.883575999305602))]


In [15]:
#Pzu: (z,(u,q))
#Pzu_norm: ((z,u),q)) => (z, (u, p(z|u)))
Pzu_norm = Pzu.map(lambda x : (x[0], (x[1][0],x[1][1]/bc.value[x[0]])))
Pzu_norm.take(5)

[(1, ('3', 0.0004372688795821164)),
 (1, ('6', 0.0005255711779934553)),
 (1, ('15', 0.0168190012154185)),
 (1, ('30', 0.010042694969735701)),
 (1, ('32', 0.0005281333881601544))]

In [16]:
#Nzs: (z,(s,N(z,s)))
#Nzs_norm: (z,(s,N(z,s))) => (z,(s,N(z,s)/N(z)))
Nzs_norm = Nzs.map(lambda x : (x[0], (x[1][0],x[1][1]/bc.value[x[0]])))
Nzs_norm.take(5)

[(1, ('1172', 0.00044880520808614763)),
 (1, ('1287', 0.00041244992025307286)),
 (1, ('3671', 0.0005798141051646732)),
 (1, ('47', 0.0019037539990893048)),
 (1, ('225', 0.0006063832206878866))]

In [17]:
#sanity checks
print(Nzs_norm.map(lambda x : (x[0],x[1][1])).reduceByKey(lambda x,y : x+y).take(5))
print(Pzu_norm.map(lambda x : (x[0],x[1][1])).reduceByKey(lambda x,y : x+y).take(5))

[(1, 1.001150988129259), (2, 1.000298279355365), (3, 1.0012002101640978), (4, 0.9989566079328333), (5, 1.0022440524790346)]
[(1, 0.9995886003004822), (2, 1.0007091944792883), (3, 0.9998248994910307), (4, 0.9997897344969366), (5, 0.9999563707234314)]


In [18]:
###Expectation

#(z, ((s,q*), (u,q*)))
joined = Pzu_norm.join(Nzs_norm)
#(z, ((s,q*), (u,q*))) ==> ((u,s), (z, Pzu * N(z,s)/N(z)))
temp_q = joined.map(lambda x : ((x[1][0][0],x[1][1][0]), (x[0], x[1][0][1]*x[1][1][1])))
print('(u,s),(z,Pzu * N(z,s)/N(z))):', temp_q.take(1))

#((u,s,z),q*)
q = temp_q.groupByKey().flatMap(lambda x: [((x[0][0],x[0][1],i[0]),i[1]/sum([i[1] for i in x[1]])) for i in x[1]])
print('(u,s,z),q)):', q.take(1))

(u,s),(z,Pzu * N(z,s)/N(z))): [(('3', '1172'), (1, 1.962485504904484e-07))]
(u,s,z),q)): [(('3', '31', 1), 0.17986899936450254)]


In [19]:
#final result 
temp_q.map(lambda x : ((x[0][0],x[0][1]),x[1][1])).reduceByKey(lambda x,y : x+y).take(10)

[(('3', '31'), 1.1177308759721276e-06),
 (('3', '1029'), 1.009527024957251e-06),
 (('3', '1263'), 1.2386458898008923e-06),
 (('3', '2294'), 1.3215719865072165e-06),
 (('3', '62'), 2.2229665092741207e-06),
 (('3', '168'), 1.0161878882654983e-06),
 (('3', '273'), 6.743859725465466e-07),
 (('3', '350'), 1.6956357378361594e-06),
 (('3', '592'), 4.853356941211625e-06),
 (('3', '593'), 7.749281489854942e-06)]

#### Full Broadcast EM Algorithm  

In [None]:
nz = 5
z_ids = sc.parallelize(list(range(1,nz+1)))
usz = data.cartesian(z_ids).map(lambda x: (x[0][0], x[0][1], x[1]))
max_iter = 2

#Initialize
q = usz.map(lambda x :(x,np.random.rand()))

#sc.setCheckpointDir('/Users/alexmomeni/Documents/Alex/Polytechnique/MAP543/Project')

for i in range (max_iter):
    #Cache in memory
    q.persist()
    print(q.getNumPartitions())
    
    #print("Maximisation")
    norm = q.map(lambda x:(x[0][2],x[1])).reduceByKey(lambda x,y:x+y)
    smallDict = dict((x[0], x[1]) for x in norm.collect())
    bc=sc.broadcast(smallDict)
    
    #print("Calculating Nzs_norm")
    Nzs = q.map(lambda x:((x[0][2],x[0][1]),x[1])).reduceByKey(lambda x,y:x+y).map(lambda x:(x[0][0],(x[0][1],x[1])))
    Nzs_norm = Nzs.map(lambda x : (x[0], (x[1][0],x[1][1]/bc.value[x[0]])))
    #print('Nzs_norm ', Nzs_norm.take(3))
    
    #print("Calculating Pzu_norm")
    Pzu =  q.map(lambda x:((x[0][2],x[0][0]),x[1])).reduceByKey(lambda x,y:x+y).map(lambda x:(x[0][0],(x[0][1],x[1])))
    Pzu_norm = Pzu.map(lambda x : (x[0], (x[1][0],x[1][1]/bc.value[x[0]])))
    #print('Pzu_norm ', Pzu_norm.take(3))
    
    #print("Expectation")
    temp_q = Pzu_norm.join(Nzs_norm).map(lambda x : ((x[1][0][0],x[1][1][0]), (x[0], x[1][0][1]*x[1][1][1])))
    q = temp_q.groupByKey().flatMap(lambda x: [((x[0][0],x[0][1],i[0]),i[1]/sum([i[1] for i in x[1]])) for i in x[1]])
    #print('q ', q.take(3))
    
    #Coalesce
    q = q.coalesce(8)
    
    #Checkpoint
    #if i % 2 == 0:
    #    q.checkpoint()

In [None]:
#final result
temp_q.map(lambda x : ((x[0][0],x[0][1]),x[1][1])).reduceByKey(lambda x,y : x+y).take(10)

## Multinomial Model 

#### Basic Implementation

Extract the data - multinom_data is the same dataset, but includes the ratings (ceonverted in integers). 

In [20]:
data_mutlinom = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header).map(parser).map(lambda tokens: (tokens[0],tokens[1], int(float(tokens[2])))).cache()
data_mutlinom.take(10)

[('1', '31', 2),
 ('1', '1029', 3),
 ('1', '1061', 3),
 ('1', '1129', 2),
 ('1', '1172', 4),
 ('1', '1263', 2),
 ('1', '1287', 2),
 ('1', '1293', 2),
 ('1', '1339', 3),
 ('1', '1343', 2)]

In [21]:
nz = 5
print('Number of communities:',nz)
z_ids = sc.parallelize(list(range(1,nz+1)))

#usz: (u, s, z)
usz = data_mutlinom.cartesian(z_ids).map(lambda x: (x[0][0], x[0][1], x[1]))
print("(u,s,z):",usz.take(1))

Number of communities: 5
(u,s,z): [('1', '31', 1)]


In [22]:
#N : ((u,s), Nus)
N = data_mutlinom.map(lambda x : ((x[0],x[1]),x[2]))
R = N.map(lambda x : x[1]).reduce(lambda x,y: x+y)
print( "((u,s), N(u,s))", N.take(1))
R

((u,s), N(u,s)) [(('1', '31'), 2)]


341626

In [23]:
#Initialization of q
np.random.seed(123)

#q: (u,s,z) => ((u,s,z), q)
q = usz.map(lambda x :(x,np.random.rand()))
print('(u,s,z),q)):', q.take(1))

(u,s,z),q)): [(('1', '31', 1), 0.7991169055789756)]


In [24]:
%%time
###Maximisation (1)

#Nzs: (z,(s,Nzs))
#((u,s,z),q) => ((z,s),q) => ((z,s),sum_q) => ((z,(s,N(z,s))))
Nzs = q.map(lambda x:((x[0][2],x[0][1]),x[1])).reduceByKey(lambda x,y:x+y).map(lambda x:(x[0][0],(x[0][1],x[1])))
print('(z, (s, N(s,z))', Nzs.take(1))

#Nzs_norm: (z,(s,N(z,s))) => (z,(s,N(z,s)/N(z)))
Nzs_norm = Nzs.groupByKey().flatMap(lambda x: [(x[0],(i[0],i[1]/sum([i[1] for i in x[1]]))) for i in x[1]])
print('(z, (s, N(s,z)/N(z))', Nzs_norm.take(1))

###Sanity Check: Probabilities add up to 1
print(Nzs_norm.map(lambda x : (x[0],x[1][1])).reduceByKey(lambda x,y : x+y).take(5))

(z, (s, N(s,z)) [(1, ('1172', 22.3773845802829))]
(z, (s, N(s,z)/N(z)) [(1, ('1172', 0.00044871722964156944))]
[(1, 0.9999999999999989), (2, 0.9999999999999983), (3, 0.9999999999999989), (4, 1.0000000000000009), (5, 1.000000000000001)]
CPU times: user 76.2 ms, sys: 15.9 ms, total: 92.2 ms
Wall time: 31.1 s


In [25]:
%%time
###Maximsisation (2)

#Pzu: ((u,s,z),q) => ((z,u),q) => ((z,u),sum_q) => ((z,(u,q)))
Pzu =  q.map(lambda x:((x[0][2],x[0][0]),x[1])).reduceByKey(lambda x,y:x+y).map(lambda x:(x[0][0],(x[0][1],x[1])))
print('(z,(u,q))', Pzu.take(1))

#Pzu_norm: ((z,(u,q)) => (z, (u, p(z|u)))
#Pzu_norm = Pzu.groupByKey().flatMap(lambda x: [(i[0],(x[0],i[1]/sum([j[1] for j in x[1]]))) for i in x[1]])
Pzu_norm = Pzu.groupByKey().flatMap(lambda x: [(x[0],(i[0],i[1]/sum([j[1] for j in x[1]]))) for i in x[1]])
print('z, (u, p(z|u))', Pzu_norm.take(1))

###Sanity Check: Probabilities add up to 1
print(Pzu_norm.map(lambda x : (x[0],x[1][1])).reduceByKey(lambda x,y : x+y).take(5))

(z,(u,q)) [(1, ('3', 24.55147097661122))]
z, (u, p(z|u)) [(1, ('3', 0.0004907296749332845))]
[(1, 0.9999999999999993), (2, 0.9999999999999991), (3, 1.0000000000000002), (4, 0.9999999999999994), (5, 1.0000000000000009)]
CPU times: user 126 ms, sys: 23.4 ms, total: 149 ms
Wall time: 2.44 s


In [None]:
%%time
###Expectation

#(z, ((s,q*), (u,q*)))
joined = Pzu_norm.join(Nzs)
print(joined.take(5))
#(z, ((s,q*), (u,q*))) ==> ((u,s), (z, Pzu * N(z,s)/N(z)))
temp_p = joined.map(lambda x : ((x[1][0][0],x[1][1][0]), (x[0], x[1][0][1]*x[1][1][1]/R)))
print('(u,s),(z,Pzu * N(z,s)/N(z))):', temp_p.take(1))

#((u,s,z),p*)
p = temp_p.groupByKey().flatMap(lambda x: [((x[0][0],x[0][1],i[0]),i[1]/sum([i[1] for i in x[1]])) for i in x[1]])
print('(u,s,z),p)):', p.take(1))

[(1, (('3', 0.0004907296749332845), ('1172', 22.3773845802829))), (1, (('3', 0.0004907296749332845), ('1287', 24.355663075997526))), (1, (('3', 0.0004907296749332845), ('3671', 27.612690278485033))), (1, (('3', 0.0004907296749332845), ('47', 101.7409165846745))), (1, (('3', 0.0004907296749332845), ('225', 22.71589088108749)))]
(u,s),(z,Pzu * N(z,s)/N(z))): [(('3', '1172'), (1, 3.2144060056726715e-08))]


In [None]:
#temp_q : ( (u,s), ((z,q*), N) ) 
temp_q = p.map(lambda x : ((x[0][0],x[0][1]),(x[0][2],x[1]))).join(N)

#q : ((u,s,z), q*)
q = temp_q.map(lambda x : ((x[0][0],x[0][1],x[1][0][0]),x[1][0][1]*x[1][1]))
print('(u,s,z),q)):', q.take(1))

In [None]:
#final result 
temp_p.map(lambda x : ((x[0][0],x[0][1]),x[1][1])).reduceByKey(lambda x,y : x+y).take(10)

#### Full Multinomial EM Algorithm

In [None]:
nz = 5
z_ids = sc.parallelize(list(range(1,nz+1)))
usz = data.cartesian(z_ids).map(lambda x: (x[0][0], x[0][1], x[1]))
max_iter = 2
N = data_mutlinom.map(lambda x : ((x[0],x[1]),x[2]))
R = N.map(lambda x : x[1]).reduce(lambda x,y: x+y)

#Initialize
q = usz.map(lambda x :(x,np.random.rand()))

#sc.setCheckpointDir('/Users/alexmomeni/Documents/Alex/Polytechnique/MAP543/Project')

for i in range (max_iter):
    #Cache in memory
    q.persist()
    print(q.getNumPartitions())
    
    #print("Maximisation")
    
    #print("Calculating Nzs_norm")
    Nzs = q.map(lambda x:((x[0][2],x[0][1]),x[1])).reduceByKey(lambda x,y:x+y).map(lambda x:(x[0][0],(x[0][1],x[1])))
    Nzs_norm = Nzs.groupByKey().flatMap(lambda x: [(x[0],(i[0],i[1]/sum([i[1] for i in x[1]]))) for i in x[1]])

    #print("Calculating Pzu_norm")
    Pzu =  q.map(lambda x:((x[0][2],x[0][0]),x[1])).reduceByKey(lambda x,y:x+y).map(lambda x:(x[0][0],(x[0][1],x[1])))
    Pzu_norm = Pzu.groupByKey().flatMap(lambda x: [(x[0],(i[0],i[1]/sum([j[1] for j in x[1]]))) for i in x[1]])

    #print("Expectation")
    temp_p = Pzu_norm.join(Nzs).map(lambda x : ((x[1][0][0],x[1][1][0]), (x[0], x[1][0][1]*x[1][1][1]/R)))
    p = temp_p.groupByKey().flatMap(lambda x: [((x[0][0],x[0][1],i[0]),i[1]/sum([i[1] for i in x[1]])) for i in x[1]])

    temp_q = p.map(lambda x : ((x[0][0],x[0][1]),(x[0][2],x[1]))).join(N)
    q = temp_q.map(lambda x : ((x[0][0],x[0][1],x[1][0][0]),x[1][0][1]*x[1][1]))
    
    #Coalesce
    q = q.coalesce(8)
    
    #Checkpoint
    #if i % 2 == 0:
        #q.checkpoint()

In [None]:
#final result
temp_p.map(lambda x : ((x[0][0],x[0][1]),x[1][1])).reduceByKey(lambda x,y : x+y).take(10)

#### Full Multinomial EM Algorithm (with broadcast)

In [None]:
import time

nz = 5
z_ids = sc.parallelize(list(range(1,nz+1)))
usz = data.cartesian(z_ids).map(lambda x: (x[0][0], x[0][1], x[1]))
max_iter = 2
N = data_mutlinom.map(lambda x : ((x[0],x[1]),x[2]))
R = N.map(lambda x : x[1]).reduce(lambda x,y: x+y)


#Initialize
q = usz.map(lambda x :(x,np.random.rand()))

#sc.setCheckpointDir('/Users/alexmomeni/Documents/Alex/Polytechnique/MAP543/Project')

for i in range (max_iter):
    #Cache in memory
    q.persist()
    print(q.getNumPartitions())
    
    #print("Maximisation")
    norm = q.map(lambda x:(x[0][2],x[1])).reduceByKey(lambda x,y:x+y)
    smallDict = dict((x[0], x[1]) for x in norm.collect())
    bc=sc.broadcast(smallDict)
    
    #print("Calculating Nzs_norm")
    Nzs = q.map(lambda x:((x[0][2],x[0][1]),x[1])).reduceByKey(lambda x,y:x+y).map(lambda x:(x[0][0],(x[0][1],x[1])))
    #Nzs_norm = Nzs.map(lambda x : (x[0], (x[1][0],x[1][1]/bc.value[x[0]])))

    #print("Calculating Pzu_norm")
    Pzu =  q.map(lambda x:((x[0][2],x[0][0]),x[1])).reduceByKey(lambda x,y:x+y).map(lambda x:(x[0][0],(x[0][1],x[1])))
    Pzu_norm = Pzu.map(lambda x : (x[0], (x[1][0],x[1][1]/bc.value[x[0]])))

    #print("Expectation")
    temp_p = Pzu_norm.join(Nzs).map(lambda x : ((x[1][0][0],x[1][1][0]), (x[0], x[1][0][1]*x[1][1][1]/R)))
    p = temp_p.groupByKey().flatMap(lambda x: [((x[0][0],x[0][1],i[0]),i[1]/sum([i[1] for i in x[1]])) for i in x[1]])

    temp_q = p.map(lambda x : ((x[0][0],x[0][1]),(x[0][2],x[1]))).join(N)
    q = temp_q.map(lambda x : ((x[0][0],x[0][1],x[1][0][0]),x[1][0][1]*x[1][1]))
    
    #Coalesce
    q = q.coalesce(8)
    
    #Checkpoint
    #if i % 2 == 0:
        #q.checkpoint()

In [None]:
#final result
temp_q.map(lambda x : ((x[0][0],x[0][1]),x[1][1])).reduceByKey(lambda x,y : x+y).take(10)