In [5]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import VectorAssembler

In [6]:
# Start a spark session
spark = SparkSession.builder.appName("Collaborative_Filtering").getOrCreate()

23/01/26 23:31:17 WARN Utils: Your hostname, Hasti-2.local resolves to a loopback address: 127.0.0.1; using 192.168.1.4 instead (on interface en0)
23/01/26 23:31:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/26 23:31:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
# Read in the ratings data
ratings_data = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("./Dataset/ratings.csv")

                                                                                

In [8]:
# create cosine similarity model
als = ALS(userCol="user_id", itemCol="game_id", ratingCol="rating", coldStartStrategy="drop")

In [9]:
# number all of users
n = int(ratings_data.describe().collect()[4][2])
# number all of games
m = int(ratings_data.describe().collect()[4][1])

                                                                                

In [10]:
# # create a matrix of ratings
# rating = np.zeros((n, m))
# for row in ratings_data.collect():
#     rating[row[1] - 1][row[0] - 1] = row[2]

# # save the matrix
# np.save("rating.npy", rating)

# load the matrix
rating = np.load("rating.npy")

In [11]:
# def cosine_similarity(v1, v2, norm1, norm2 ):
#     return np.dot(v1, v2) / (norm1 * norm2)

ratings_data.describe().show()

+-------+-----------------+------------------+------------------+
|summary|          game_id|           user_id|            rating|
+-------+-----------------+------------------+------------------+
|  count|           981548|            981548|            981548|
|   mean|4943.316270829343|25616.592174809586|3.8564950465998606|
| stddev|2873.219878464294|15228.359436640887|0.9839536925631067|
|    min|                1|                 1|                 1|
|    max|            10000|             53424|                 5|
+-------+-----------------+------------------+------------------+



In [12]:
norm = np.linalg.norm(rating, axis=1, keepdims=True)

In [14]:
def cosine_similarity(v1, v2, norm1, norm2 ):
    return np.dot(v1, v2) / (norm1 * norm2)

# calc cosine similarity of specific user
def calc_similarity(user_id):
    similarity = np.zeros((n, 1))
    if norm[user_id - 1] != 0:
        for i in range(n):
            if norm[i] == 0:
                similarity[i][0] = 0
            else:
                similarity[i][0] = cosine_similarity(rating[user_id - 1], rating[i], norm[user_id - 1], norm[i])
    return similarity

In [88]:
# find the most similar users to a specific user
def find_similar_users(user_id, k):
    similarity = calc_similarity(user_id)
    similar_users = np.zeros((k, 2))
    for i in range(k):

        # save the most similar user id and similarity
        similar_users[i][0] = int(np.argmax(similarity) + 1)
        # print(similar_users[i][0])
        similar_users[i][1] = similarity[np.argmax(similarity)]
        # print(similar_users[i][1])

        # remove the most similar user
        similarity[np.argmax(similarity)] = -1
    return similar_users

In [55]:
user_id = 1
k = 10
similar_users = find_similar_users(user_id, k)
# for i in range(k):
#     print(similar_users[i][0], similar_users[i][1])

In [89]:
# recommend games to a specific user
# k: number of similar users
# m: number of games to recommend
def recommend_games(user_id, k, m):

    # find the most similar users with shape (k, 2)
    similar_users = find_similar_users(user_id, k)
    
    # find the games that the user has not rated 
    games_not_rated = np.where(rating[user_id - 1] == 0)[0]

    game_numbers = 10000

    # set the number of rated games for each game
    rated_number = np.zeros((game_numbers, 1))

    # set score for each game 
    game_score = np.zeros((game_numbers, 1))
    for i in range(k):
        for j in games_not_rated:
            result = rating[int(similar_users[i][0]) - 1][j] * similar_users[i][1]
            game_score[j] += result
            if result != 0:
                rated_number[j] += 1
    
    # replace the games that have not been rated by any similar users with -1
    rated_number = np.where(rated_number == 0, -1, rated_number)

    # calculate the average score of each game
    game_score = game_score / rated_number

    # find the most similar games by their scores and id
    recommended_games = np.zeros((m, 2))
    for i in range(m):
        recommended_games[i][0] = int(np.argmax(game_score) + 1)
        recommended_games[i][1] = game_score[np.argmax(game_score)]
        game_score[np.argmax(game_score)] = -1
    return recommended_games

In [83]:
# read games dataset
games_data = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("./Dataset/games.csv")

# get name of game
def get_game_name(game_id):
    return games_data.filter(games_data.game_id == game_id).collect()[0][1]

In [90]:
user_id = 1000
k = 50
m = 5

recommend_game = recommend_games(user_id, k, m)
for i in range(m):
    print(get_game_name(int(recommend_game[i][0])), recommend_game[i][1])

Halcyon 6: Starbase Commander 3.14970394174356
Spring Break 2.439750182371333
AirForce Delta Strike 2.439750182371333
Senran Kagura Shinovi Versus 2.3741786762424573
Squad Assault: West Front 2.0447945297729913
