Set up pyspark and SparkSession

In [58]:
import pyspark
from pyspark.sql.types import IntegerType
from pyspark import SparkContext
import json

spark = pyspark.sql.SparkSession.builder \
    .master("local") \
    .appName("movies") \
    .getOrCreate()

Load data

In [59]:
df = spark.read.csv(path="/home/jovyan/movielens/ratings.csv", header=True).limit(1000)
df = df.withColumn("rating", df["rating"].cast(IntegerType()))

Calculate average for each user

In [60]:
averages = df \
    .groupBy("user_id") \
    .avg("rating") \
    .select("*")

averages.take(3)

[Row(user_id='1', avg(rating)=4.188679245283019),
 Row(user_id='2', avg(rating)=3.7131782945736433),
 Row(user_id='3', avg(rating)=3.9019607843137254)]

Subtract the average from each users each rating

In [62]:
joined = averages \
    .join(df, df["user_id"] == averages["user_id"]) \
    .select(df["user_id"], "avg(rating)", "rating", "movie_id")\
    .collect()

joined[:3]

[Row(user_id='1', avg(rating)=4.188679245283019, rating=5, movie_id='1193'),
 Row(user_id='1', avg(rating)=4.188679245283019, rating=3, movie_id='661'),
 Row(user_id='1', avg(rating)=4.188679245283019, rating=3, movie_id='914')]

Convert DataFrame to RDD

In [63]:
sc = SparkContext.getOrCreate()
rdd = sc.parallelize(joined)

Map the whole RDD to a python dictionary for easy look up

In [89]:
def seq_op(acc, obj):
    user_id = obj["user_id"]
    movie_id = obj["movie_id"]
    average = obj["average"]
    rating = obj["rating"]
    diff = obj["diff"]
    
    if user_id not in acc:
        acc[user_id] = {
            "average": average,
            "ratings": {
                movie_id: {
                    "rating": rating,
                    "diff": diff
                }
            }
        }
    else:
        acc[user_id]["ratings"][movie_id] = {
            "rating": rating,
            "diff": diff
        }
    return acc


combOp = (lambda x, y: {**x, **y})


mapped = rdd.map(lambda row: {"user_id": row["user_id"], 
                              "rating": row["rating"],
                              "movie_id": row["movie_id"],
                              "average": row["avg(rating)"],
                              "diff": row["rating"] - row["avg(rating)"]
                              })\
            .aggregate({}, seq_op, combOp)


print(json.dumps(mapped['1'], indent=4))



{
    "average": 4.188679245283019,
    "ratings": {
        "1193": {
            "rating": 5,
            "diff": 0.8113207547169807
        },
        "661": {
            "rating": 3,
            "diff": -1.1886792452830193
        },
        "914": {
            "rating": 3,
            "diff": -1.1886792452830193
        },
        "3408": {
            "rating": 4,
            "diff": -0.18867924528301927
        },
        "2355": {
            "rating": 5,
            "diff": 0.8113207547169807
        },
        "1197": {
            "rating": 3,
            "diff": -1.1886792452830193
        },
        "1287": {
            "rating": 5,
            "diff": 0.8113207547169807
        },
        "2804": {
            "rating": 5,
            "diff": 0.8113207547169807
        },
        "594": {
            "rating": 4,
            "diff": -0.18867924528301927
        },
        "919": {
            "rating": 4,
            "diff": -0.18867924528301927
        },
        "595

Define function for calculating cosine similarity

In [82]:
import numpy as np


def cosine_similarity(first_list, second_list):

    a = np.array(first_list)
    b = np.array(second_list)
    
    dot_product = np.dot(a, b)
    
    a_length = np.linalg.norm(a)
    b_length = np.linalg.norm(b)
    
    similarity = dot_product / (a_length * b_length)
    
    return similarity


Define the compare method that compares the user to every other user

In [85]:
def compare(user_id, mapped):
    comparison = set()
    user_movies = mapped[user_id]['ratings']

    for other_user_id in mapped:
        user = []
        other = []
        other_user_movies = mapped[other_user_id]['ratings']
        for movie_id in other_user_movies:
            if movie_id in user_movies:  # movie rated by both
                user.append(user_movies[movie_id]['diff'])
                other.append(other_user_movies[movie_id]['diff'])

        smaller_id = user_id if user_id < other_user_id else other_user_id
        bigger_id = user if user_id > other_user_id else other_user_id

        similarity = cosine_similarity(user, other)

        comparison.add((smaller_id, bigger_id, similarity))
        print(comparison)

    return comparison


In [None]:
stream = sc.parallelize(mapped) \
    .map(lambda user_id: compare(user_id, mapped)) \
    .aggregate(set(), lambda x, y: x.update(y), lambda x, y: x.update(y))


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 79.0 failed 1 times, most recent failure: Lost task 0.0 in stage 79.0 (TID 278, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/pyspark/rdd.py", line 948, in func
    acc = seqOp(acc, obj)
  File "<ipython-input-91-f4571563b5a9>", line 1, in <lambda>
AttributeError: 'tuple' object has no attribute 'update'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/pyspark/rdd.py", line 948, in func
    acc = seqOp(acc, obj)
  File "<ipython-input-91-f4571563b5a9>", line 1, in <lambda>
AttributeError: 'tuple' object has no attribute 'update'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [36]:
def common_movie_ids(user_id1, user_id2, rows):
    movies_1 = rows.filter(lambda row: row["user_id"] == user_id1).map(lambda row: row["movie_id"]).collect()
    movies_2 = rows.filter(lambda row: row["user_id"] == user_id2).map(lambda row: row["movie_id"]).collect()
    return set(movies_1).intersection(movies_2)


In [59]:
user1_id = 1
user2_id = 2

movie_ids = common_movie_ids(user1_id, user2_id, mapped)
print(movie_ids)

{3105, 1193, 1962, 2028, 2321, 1207, 1246}


In [60]:
user1_rating_vector = mapped\
    .filter(lambda row: row["movie_id"] in movie_ids and row["user_id"] is user1_id)\
    .map(lambda row: row["diff"])\
    .collect()

user2_rating_vector = mapped\
    .filter(lambda row: row["movie_id"] in movie_ids and row["user_id"] is user2_id)\
    .map(lambda row: row["diff"])\
    .collect()