In [2]:
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
#import matplotlib.pyplot as plt
import dask.array as da
from numpy import cov
from numpy import std
from sklearn.metrics.pairwise import cosine_similarity
import random
import math

# User User based Collaborative filtering

In [3]:
def init_spark():
    spark = SparkSession \
        .builder \
        .config("spark.driver.memory", "15g") \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark

In [4]:
spark = init_spark()

In [5]:
# Reading Data from anime.csv
df2 = pd.read_csv("D:/Courses/big data/project/Anime/anime.csv")

In [6]:
# Redaing from rating.csv and transforming data
df = spark.read.text("D:/Courses/big data/project/Anime/rating.csv").rdd
data = df.map(lambda x: x[0].split(','))
data2 = data.groupBy(lambda x : x[0])

In [7]:
data3 = data2.collect()

In [8]:
# Preparing data : a dictionary utility = {users: {anime : ratings}}
utility = {}
all_animes = []
all_users = []
for user in range(len(data3)):
    if data3[user][0] == 'user_id':
        continue
    utility[int(data3[user][0])] = {}
    #if int(data3[user][0]) not in all_users:
    #    all_users.append(int(i[0]))
    for i in data3[user][1]:        
        if int(i[1]) not in all_animes:
            all_animes.append(int(i[1]))
        if int(i[2]) == -1:
            continue            
        utility[int(i[0])][int(i[1])] = int(i[2])

In [9]:
utility

{1: {8074: 10, 11617: 10, 11757: 10, 15451: 10},
 2: {11771: 10},
 3: {20: 8,
  154: 6,
  170: 9,
  199: 10,
  225: 9,
  341: 6,
  430: 7,
  527: 7,
  552: 7,
  813: 10,
  1119: 7,
  1121: 7,
  1122: 7,
  1132: 8,
  1292: 6,
  1313: 8,
  1526: 7,
  1535: 10,
  1564: 7,
  1689: 8,
  1764: 6,
  1943: 8,
  2201: 7,
  2404: 7,
  2847: 7,
  3588: 8,
  4026: 7,
  5114: 10,
  5231: 8,
  6178: 7,
  6702: 8,
  6880: 6,
  7695: 7,
  8074: 6,
  9107: 6,
  9135: 7,
  9760: 9,
  9917: 8,
  9919: 8,
  9989: 10,
  10408: 7,
  10507: 8,
  11111: 8,
  11703: 6,
  11737: 7,
  11757: 9,
  11759: 7,
  11771: 10,
  12671: 3,
  14075: 8,
  14093: 7,
  14345: 8,
  14513: 10,
  16498: 10,
  16512: 5,
  16782: 7,
  16894: 10,
  16918: 8,
  17265: 7,
  18097: 7,
  18115: 10,
  18393: 7,
  19315: 3,
  19815: 9,
  20021: 5,
  20159: 10,
  20507: 8,
  20583: 9,
  21507: 6,
  21881: 7,
  22199: 8,
  22297: 8,
  22319: 6,
  22547: 6,
  22729: 7,
  23301: 8,
  23321: 6,
  23333: 4,
  23755: 8,
  24415: 10,
  26243: 6

In [10]:
all_users = list(utility.keys())

In [11]:
del data3

In [1]:
import gc
gc.collect()

80

In [12]:
def sim2(matrix, user1, user2):
    #This method returns the similarity between two users
    #It uses Pearson Correlation Coefficient for similarity calculation
    matched = []
    for anime in matrix[user1].keys():
        if anime in matrix[user2].keys():
            matched.append(anime) 
    x = []
    y = []    
    for anime in matched:
        x.append(matrix[user1][anime])
        y.append(matrix[user2][anime])        

    f = (lambda i, j: i - j)
    x = f(x, np.mean(x))
    y = f(y, np.mean(y))
    # Pearson Correlation Coefficient formula below
    numerator = (lambda x, y : x*y)
    denominator = (lambda x: x*x)
    d = math.sqrt(sum(denominator(x))) * math.sqrt(sum(denominator(y)))
    if d == 0:
        return 0
    similarity = sum(numerator(x, y))/ d
    return similarity

In [13]:
def most_similar_user(matrix, user, n=500):
    # This method returna a list of n most similar users
    similarity = {}
    for p in matrix.keys():
        similarity[p] = sim2(matrix, user, p)
    sorted_list = sorted(similarity.items(), key = lambda kv:(kv[1], kv[0]))
    return sorted_list[-n:]     

In [14]:
def recommend(matrix, user):
    # this method returns the predictions wrt most similar users
    similar_users = most_similar_user(matrix, user)
    predictions = {}
    for anime in all_animes:
        num = 0
        den = 0
        for user in similar_users:
            if anime in matrix[user[0]].keys():
                num += matrix[user[0]][anime] * user[1]
                den += user[1]
        if den == 0:
            continue
        if anime in matrix[user[0]].keys():
            predictions[anime] = float(num/den)
    return predictions

In [24]:
# Creating Test data of randomly selecting 50 users
test= int(0.05 * len(all_users))
test_data_user = []
for _ in range(50):
    random_user = random.randint(1, len(all_users))
    if all_users[random_user] not in test_data_user:
        test_data_user.append(all_users[random_user])

In [16]:
# Creating test data of ramdomly selecting 30% of the total animes list
test_animes = int(0.3 * len(all_animes))
test_data_animes = []
for _ in range(test_animes):
    random_anime = random.randint(1, len(all_animes))
    if all_animes[random_anime] not in test_data_animes:
        test_data_animes.append(all_users[random_anime])

In [25]:
# making a deep copy of the data 
import copy
training_data = copy.deepcopy(utility)

In [26]:
#Creating training data by deleting the test data from the original data
for user in test_data_user:
    for anime in test_data_animes:
        if anime in training_data[user].keys():        
            del training_data[user][anime]                        

In [27]:
def start():
    predicted = {}
    for user in test_data_user:
        predicted[user] = recommend(training_data, user)
        print(user)
    return predicted

In [28]:
predicted = start()

40810
1666
19742
66694
8822
55424
50160
63723
14400
60088
43200
64191
22872
30539
47327
2307
8050
25680
36184
43676
159
36359
36608
46219
53694
41228
29142
69565
15495
36653
41529
50145
71212
48531
27471
45677
55439
36202
61599
46397
39009
8835
47192
4461
66867
45745
25171
45459
41926
70867


In [29]:
RMSE(utility, predicted)

1.8329778971646173

## Applying the common practice 

In [None]:
avg_rating_of_user = {}
for user in all_users:
    count = 0
    summ = 0
    for anime in utility[user].keys():
        summ += utility[user][anime]
        count += 1
    if count != 0:
        avg_rating_of_user[user] = summ/count
    else:
        avg_rating_of_user[user] = 0
avg_rating_of_animes = {}
for anime in all_animes:
    count = 0
    summ = 0
    for user in all_users:
        if anime in utility[user].keys():
            summ += utility[user][anime]
            count += 1
    if count != 0:
        avg_rating_of_animes[anime] = summ/count
    else:
        avg_rating_of_animes[anime] = 0
    

In [30]:
print(utility[70867])

{9253: 10, 14741: 8, 2904: 10, 10793: 10, 269: 10, 4181: 8}


In [31]:
print(predicted[70867])

{1536: 7.851851851851852, 1538: 7.428571428571429, 12291: 7.6938775510204085, 12293: 7.609090909090909, 22535: 8.788990825688073, 31240: 8.771929824561404, 25099: 6.955555555555556, 5507: 7.822222222222222, 27821: 8.585106382978724, 31251: 7.9, 3089: 7.529411764705882, 6675: 8.430769230769231, 20: 7.914027149321267, 11285: 7.195121951219512, 6166: 7.035087719298246, 18277: 7.560975609756097, 1569: 6.958333333333334, 431: 8.623076923076923, 9756: 8.673387096774194, 12317: 6.862068965517241, 1566: 8.714285714285714, 11013: 7.609756097560975, 376: 7.338235294117647, 5667: 7.484848484848484, 548: 7.4, 9253: 9.651270207852194, 550: 7.613636363636363, 1575: 9.401847575057737, 3087: 7.5, 10793: 7.755813953488372, 43: 8.36046511627907, 257: 6.377358490566038, 13357: 7.285714285714286, 4654: 8.06, 47: 8.109756097560975, 5680: 7.92972972972973, 5681: 8.465648854961833, 50: 7.809523809523809, 18295: 7.708333333333333, 565: 7.937499999999999, 13367: 7.0, 16009: 7.781818181818182, 57: 8.63333333333

In [22]:
def RMSE(actual, predicted):
    error = 0
    n = 0
    for user in test_data_user:
        for anime in test_data_animes:
            if anime in predicted[user].keys() and anime in actual[user].keys():
                error += (actual[user][anime] - predicted[user][anime])**2
                n += 1
    return math.sqrt(error/n)

In [52]:
def recommend_animes(for_user, n = 10):
    predictions = recommend(training_data, for_user)
    sorted_list = sorted(predictions.items(), key = lambda kv:(kv[1], kv[0]))
    for anime in sorted_list[-n:]:
        values = df2[['anime_id','name', 'genre', 'type']].loc[df2['anime_id']==anime[0]]
        print(str(values['anime_id']) + "," + str(values['name']) + "," + str(values['genre']) + "," + str(values['type']) + "\n")
    

In [53]:
recommend_animes(3)

458    4898
Name: anime_id, dtype: int64,458    Kuroshitsuji
Name: name, dtype: object,458    Action, Comedy, Demons, Fantasy, Historical, S...
Name: genre, dtype: object,458    TV
Name: type, dtype: object

248    7054
Name: anime_id, dtype: int64,248    Kaichou wa Maid-sama!
Name: name, dtype: object,248    Comedy, Romance, School, Shoujo
Name: genre, dtype: object,248    TV
Name: type, dtype: object

192    9617
Name: anime_id, dtype: int64,192    K-On! Movie
Name: name, dtype: object,192    Comedy, Music, Slice of Life
Name: genre, dtype: object,192    Movie
Name: type, dtype: object

778    2994
Name: anime_id, dtype: int64,778    Death Note Rewrite
Name: name, dtype: object,778    Mystery, Police, Psychological, Supernatural, ...
Name: genre, dtype: object,778    Special
Name: type, dtype: object

173    6811
Name: anime_id, dtype: int64,173    InuYasha: Kanketsu-hen
Name: name, dtype: object,173    Action, Adventure, Comedy, Demons, Fantasy, Ma...
Name: genre, dtype: object,173 