In [1]:
# Make necessary imports
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.mllib.random import RandomRDDs
from pyspark.sql import SQLContext
from pyspark import sql
import numpy as np

In [2]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
sqlContext = sql.SQLContext(sc)

In [3]:
def normalization(x):
    s = sum(x[1])
    s_last = 0
    x_new = []
    for i in range(len(x[1])-1):
        x_new.append(x[1][i] / s)
        s_last += x_new[i]
    x_new.append(1 - s_last)
    return (x[0], np.array(x_new))

def create_uniform_rdd_rowindexed_matrix(sc, nrow, ncol, normalize=False):
    rdd = []
    for i in range(nrow):
        rdd.append((i, np.random.rand(ncol)))
    #print(rdd)
    rdd = sc.parallelize(rdd)
    if normalize:
        rdd = rdd.map(normalization)
    return rdd

In [4]:
class MRAlgorithm:
    def __init__(self, datapath, k):
        self.k = k
        self.n = 2649429
        self.m = 17770
        self.sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
        self.input = self.sc.textFile(datapath).map(lambda x: (int(x.split(',')[0])-1,
                                                               int(x.split(',')[1])-1,int(x.split(',')[2])))
        self.A = self.input
        self.H = create_uniform_rdd_rowindexed_matrix(self.sc, self.n, self.k, normalize=True)
        self.W = create_uniform_rdd_rowindexed_matrix(self.sc, self.m, self.k)
        
    def compute_W(self):
        print("Compute W")
        #M5: Compute matrix S (repartition join of H and A)
        tmp_M5 = self.H.join(self.A.map(lambda x: (x[1], (x[0], x[2]))))

        #R5: Emit each hj according to row i
        tmp_M5 = tmp_M5.map(lambda x: (x[1][1][0], x[1][1][1] * x[1][0]))

        
        
        #M6: Finish the computation of matrix S -- identity mapper so no code needed
        #R6: Emit each row in matrix S
        S = tmp_M5.reduceByKey(lambda x, y : x +y)

        
        #M7: Calculate matrix C (same as calculating B in compute_H)
        tmp_M7 = self.H.map(lambda x: (0, np.outer(x[1], x[1].T)))
        #R7: Emit matrix C from the sum of matrices
        C = tmp_M7.reduceByKey(lambda x, y : x +y).map(lambda x: x[1])
        
        #M8: Compute matrix T
        C_b = sc.broadcast(C.collect())
        T = self.W.map(lambda x: (x[0], x[1]@C_b.value))
        

        #M9: update W with broadcast join of S and T. W = W * S/T
        T_b = self.sc.broadcast(T.collectAsMap())
        S_b = self.sc.broadcast(S.collectAsMap())
        self.W = self.W.map(lambda x: (x[0], x[1]*np.divide(S_b.value[x[0]], T_b.value[x[0]])))

    
        
    def compute_H(self):
        print("Compute H")
        
        # Define dummy H to continue with next operations
        self.H = self.sc.parallelize([(1, [0.2,0.3,0.5]), (2, [0.1,0.7,0.2]), (3, [0.4,0.3,0.3]), (4, [0.5,0.1,0.4])])
        
    def assign_clusters(self):
        #M10: Map user to cluster with highest probability
        self.clusters = self.H.map(lambda x: (x[0], x[1].index(max(x[1]))))
        
        #M11: Emit a 1 for each user that is in a cluster
        self.clustersizes = self.clusters.map(lambda x: (x[1], 1))
        
        #R11: Count the number of users per cluster
        self.clustersizes = self.clustersizes.reduceByKey(lambda x,y: x+y)
        
    def RM2_distribution(self):
        #M12: Emit each rating that a user gave
        self.ratings = self.input.map(lambda x: (x[1], x[2]))
        
        #R12: Sum ratings given by user
        self.ratings = self.ratings.reduceByKey(lambda x,y: x+y)
        
        #Accumulator containing total count of ratings
        globalcount = self.sc.accumulator(0)
        self.ratings.foreach(lambda x: globalcount.add(x[1]))
        
        #M13: Emit each rating that an item received
        self.items = self.input.map(lambda x: (x[0], x[2]))
        
        #Broadcast the total count of ratings, to be used in next reduce
        globalcount = self.sc.broadcast(globalcount.value)
        
        #R13: Compute the probability of item in the collection
        self.items = self.items.reduceByKey(lambda x,y: x+y)
        
        #Divide previous values by the total count of ratings, to normalize probabilities
        self.items = self.items.map(lambda x: (x[0], x[1]/globalcount.value))
        
        #Broadcast all users and which cluster they are in
        clusters = self.sc.broadcast(self.clusters.collectAsMap())
        
        #M14-a: Map each rating from input to the cluster assigned to user
        self.input2cluster = self.input.map(lambda x: (clusters[x[1]], (x[1], x[0], x[2])))
        
        #M14-b: Map sum of ratings of a user to their assigned cluster
        self.ratings2cluster = self.ratings.map(lambda x: (clusters[x[0]], (x[0], x[1])))
        
        #Repartition join these two maps by cluster
        self.fullClusters = self.input2cluster.join(self.ratings2cluster)
        
        #Broadcast sizes of the clusters
        clustersizes = self.sc.broadcast(self.clustersizes.collectAsMap())
        
        #Broadcast probabilities of each item
        items = self.sc.broadcast(self.items.collectAsMap())
        
        #Final reduce combines all information from above, needs W and H

In [5]:
Algorithm = MRAlgorithm('../data/small_dataset.txt', k=5)
Algorithm.compute_W()
#Algorithm.compute_H()
Algorithm.assign_clusters()
Algorithm.RM2_distribution()

Compute W


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 34) (129.104.243.25 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/meteor/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/Users/meteor/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/meteor/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/meteor/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-4-61371d6a0657>", line 53, in <lambda>
AttributeError: 'numpy.ndarray' object has no attribute 'index'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:835)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:567)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/meteor/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/Users/meteor/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/meteor/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/meteor/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-4-61371d6a0657>", line 53, in <lambda>
AttributeError: 'numpy.ndarray' object has no attribute 'index'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [14]:
Algorithm.W.take(10)

[(0, array([[0.00030694, 0.00128272, 0.00029735, 0.000974  , 0.00091656]])),
 (1, array([[3.52945688e-04, 2.85686513e-05, 2.40647311e-04, 5.50402613e-05,
          2.49898556e-04]])),
 (2, array([[0.00271794, 0.00166009, 0.00410615, 0.00200646, 0.00319731]])),
 (3, array([[3.16297667e-04, 2.51517831e-04, 2.96652769e-05, 1.51227657e-05,
          6.29792357e-05]])),
 (4, array([[0.00179002, 0.00220713, 0.00106682, 0.0015383 , 0.00178472]])),
 (5, array([[0.00108035, 0.00046918, 0.00201009, 0.00031   , 0.00189063]])),
 (6, array([[6.32077518e-05, 4.19817395e-06, 7.38204116e-05, 1.31035957e-05,
          2.00018221e-04]])),
 (7, array([[0.0152467 , 0.02716752, 0.00918566, 0.0167177 , 0.02058918]])),
 (8, array([[1.20626375e-04, 1.14048387e-04, 1.25292078e-04, 2.72180765e-05,
          7.64198213e-05]])),
 (9, array([[9.48164555e-05, 4.48320719e-04, 1.51229196e-04, 2.99707951e-04,
          4.63426461e-04]]))]

In [52]:
Algorithm.clusters.collectAsMap()

{1: 2, 2: 1, 3: 0, 4: 0}

In [53]:
Algorithm.clustersizes.collect()

[(0, 2), (1, 1), (2, 1)]

In [54]:
Algorithm.ratings.take(5)

[(1488844, 9), (30878, 8), (893988, 3), (1842128, 4), (2207774, 5)]

In [55]:
Algorithm.items.take(5)

[(2, 0.0031263442977540004),
 (4, 0.002356875836872686),
 (6, 0.019042829705117875),
 (8, 0.2881568503898842),
 (10, 0.004798574968645675)]

In [22]:
rdd = sc.parallelize([1, 2, 3, 4])
rdd_b = sc.broadcast(rdd.collect())