In [None]:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import UserDefinedFunction, col, udf
from pyspark.sql.types import IntegerType, FloatType, StructField, StructType, StringType, MapType
import itertools
import pandas as pd
import builtins as p
import pickle

spark = SparkSession.builder.getOrCreate()
ratingDF = spark.read.option("header", "true").option("delimiter", ",")\
    .option("inferSchema", "true").csv("../ml-latest-small/ratings.csv")
ratingDF = ratingDF.select("userId", "movieId","rating")


In [None]:
# In order to process this data which is suitable for spark format some modifications at df should be done before
# data processing. Following steps are for converting our df into a format which enables us to process more efficiently.

# Running this takes time(about 10 min in my computer), thats why i saved the result with pickle and loading that dict
# run this cell if you want to see the creation of usersWithDict yourself, but I added the pickle object with my submission
# so you don't need to run this cell if you don't want to wait
def returnUserDict(user):
    ratingDF.createOrReplaceTempView('rating') 
    user1DF = spark.sql("select * from rating where userId = {}".format(user))
    userdict = {user: dict(zip(user1DF.select("movieId").collect(),user1DF.select("rating").collect()))}
    return userdict

usersWithDict = [returnUserDict(user.__getattr__("userId")) for user in ratingDF.select("userId").distinct().collect()]


In [33]:
#usersWithDict, run this cell for saving usersWithDict dict since calculating this parameeter takes about 10 min.
with open('usersWithDict.pickle', 'wb') as handle:
    pickle.dump(usersWithDict, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [2]:
# loading the dict from the disk instead of calculating again
with open('usersWithDict.pickle', 'rb') as handle:
    usersWithDict = pickle.load(handle)

In [3]:
userDict = {}
# convert our usersWithDict to plain dict object and create a sparkdf from that dict 
for user in usersWithDict:
    ratingDict = list(user.values())[0]
    ratingDict = {str(k.__getattr__("movieId")): v.__getattr__("rating") for k,v in ratingDict.items()}
    userDict[list(user.keys())[0]] = ratingDict

userDict


{148: {'356': 4.0,
  '1197': 3.0,
  '4308': 4.0,
  '4886': 3.0,
  '4896': 4.0,
  '4993': 3.0,
  '5618': 3.0,
  '5816': 4.0,
  '5952': 3.0,
  '6377': 3.0,
  '7153': 3.0,
  '8368': 4.0,
  '30816': 5.0,
  '31658': 4.0,
  '40629': 5.0,
  '40815': 4.0,
  '44191': 4.0,
  '50872': 3.0,
  '54001': 4.0,
  '60069': 4.5,
  '68954': 4.0,
  '69757': 3.5,
  '69844': 4.0,
  '72998': 4.0,
  '76093': 3.0,
  '79091': 3.5,
  '79132': 1.5,
  '79702': 4.0,
  '81834': 4.0,
  '81847': 4.5,
  '88125': 4.0,
  '89745': 4.0,
  '98243': 4.5,
  '98491': 5.0,
  '99149': 3.0,
  '108932': 4.0,
  '110102': 4.0,
  '112175': 2.5,
  '112852': 3.5,
  '115617': 3.5,
  '116797': 4.5,
  '122882': 4.0,
  '122886': 3.5,
  '122920': 3.5,
  '134853': 4.0,
  '152081': 4.0,
  '157296': 3.0,
  '160718': 4.5},
 463: {'110': 4.5,
  '296': 4.0,
  '356': 4.0,
  '520': 4.0,
  '527': 4.0,
  '552': 3.5,
  '780': 3.5,
  '1088': 3.5,
  '1092': 3.0,
  '1221': 4.5,
  '1320': 4.0,
  '1552': 4.5,
  '1690': 4.0,
  '2006': 3.0,
  '2019': 4.0,
  '

In [4]:
def returnDictElem(key):
    return userDict[key]

returnDictElemUDF = UserDefinedFunction(returnDictElem,MapType(StringType(), FloatType()))

usersList = [x.userId for x in
    ratingDF.select("userId").distinct().collect()]
usersDF = spark.createDataFrame(usersList,IntegerType())
usersDF = usersDF.withColumnRenamed("value","id")

# take combinations of the dataset and eliminate same combinations.
# (a,b) = (b,a)
usersDF = usersDF.toDF('user1').crossJoin(usersDF.toDF('user2')).filter('user1 != user2').filter("user1 <= user2")
usersDF = usersDF.withColumn("user1 ratings", returnDictElemUDF(usersDF["user1"]))
usersDF = usersDF.withColumn("user2 ratings", returnDictElemUDF(usersDF["user2"]))
usersDF.show()

# At this point, our data is ready for processing for finding similar user pairs.

[Stage 5:>                                                          (0 + 1) / 1]

+-----+-----+--------------------+--------------------+
|user1|user2|       user1 ratings|       user2 ratings|
+-----+-----+--------------------+--------------------+
|  148|  463|{110102 -> 4.0, 6...|{7320 -> 4.0, 637...|
|  148|  471|{110102 -> 4.0, 6...|{68157 -> 4.0, 71...|
|  148|  496|{110102 -> 4.0, 6...|{4993 -> 4.0, 109...|
|  148|  243|{110102 -> 4.0, 6...|{44 -> 4.0, 48 ->...|
|  148|  392|{110102 -> 4.0, 6...|{2694 -> 4.0, 277...|
|  148|  540|{110102 -> 4.0, 6...|{47 -> 4.5, 8874 ...|
|  148|  516|{110102 -> 4.0, 6...|{4333 -> 4.0, 104...|
|  148|  251|{110102 -> 4.0, 6...|{47 -> 5.0, 5971 ...|
|  148|  451|{110102 -> 4.0, 6...|{25 -> 5.0, 593 -...|
|  148|  580|{110102 -> 4.0, 6...|{44665 -> 4.0, 90...|
|  148|  458|{110102 -> 4.0, 6...|{48 -> 3.0, 590 -...|
|  148|  255|{110102 -> 4.0, 6...|{1220 -> 5.0, 123...|
|  148|  481|{110102 -> 4.0, 6...|{1483 -> 1.0, 118...|
|  148|  588|{110102 -> 4.0, 6...|{47 -> 3.0, 590 -...|
|  148|  296|{110102 -> 4.0, 6...|{166528 -> 2.0

                                                                                

In [5]:
# Similarity is calculated by adding common movie ratings differences between two users
# for ex: if user1 rated movies as a = 3, b = 4, c = 2, d = 1, e=5 and 
# user2 rated movies c = 1,d = 4, e = 2, f = 5, g = 1
# rating score will be = (5 - abs(2 - 1)) + (5 - abs(4 - 1)) + (5 - abs(1 - 2))
# by subtracting the absolute value of difference from 5, I'm penalising the difference between ratings
# and awarding the similar rates. 

def compareRatings(dict1, dict2):
    similarity_score = 0
    common_movies = list(dict1.keys() & dict2.keys())
    for movie in common_movies:
        similarity_score += (5 - abs(dict1[movie] - dict2[movie]))
    return (similarity_score)

compareRatingsUDF = UserDefinedFunction(compareRatings, FloatType())

# create a small partition of data so i can test my approach
# usersDF_mock = spark.createDataFrame(usersDF.head(50), usersDF.schema)
# usersDF_mock.show()


# usersDF_mock = usersDF_mock.withColumn("Similarity Score", compareRatingsUDF(usersDF_mock["user1 ratings"], usersDF_mock["user2 ratings"]))
# usersDF_mock = usersDF_mock.sort("Similarity Score",ascending=False)
# usersDF_mock.show()


usersDF = usersDF.withColumn("Similarity Score", compareRatingsUDF(usersDF["user1 ratings"], usersDF["user2 ratings"]))
usersDF = usersDF.sort("Similarity Score",ascending=False)
usersDF.show()

                                                                                

+-----+-----+--------------------+--------------------+----------------+
|user1|user2|       user1 ratings|       user2 ratings|Similarity Score|
+-----+-----+--------------------+--------------------+----------------+
|  414|  599|{180497 -> 4.0, 4...|{112326 -> 1.0, 9...|          5443.5|
|  414|  474|{180497 -> 4.0, 4...|{4970 -> 4.0, 279...|          4585.0|
|  414|  448|{180497 -> 4.0, 4...|{136778 -> 2.0, 9...|          3868.5|
|   68|  414|{44665 -> 4.0, 38...|{180497 -> 4.0, 4...|          3817.5|
|  274|  414|{44665 -> 3.5, 38...|{180497 -> 4.0, 4...|          3599.5|
|  474|  599|{4970 -> 4.0, 279...|{112326 -> 1.0, 9...|          3444.5|
|  448|  599|{136778 -> 2.0, 9...|{112326 -> 1.0, 9...|          3293.5|
|  274|  599|{44665 -> 3.5, 38...|{112326 -> 1.0, 9...|          3222.0|
|   68|  599|{44665 -> 4.0, 38...|{112326 -> 1.0, 9...|          3210.0|
|  288|  414|{42004 -> 3.5, 25...|{180497 -> 4.0, 4...|          3052.5|
|  380|  414|{44665 -> 4.0, 13...|{180497 -> 4.0, 4