<a href="https://colab.research.google.com/github/giovanniunimi1/CollaborativeFiltering/blob/main/CFIB.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#DOWNLOAD SPARK AND SPARK DEPENDENCIES (JDK AND FINDSPARK)
!wget -q https://downloads.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q findspark
!pip install pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

! pip install -q kaggle
#set kaggle for dataset (remove # for setting kaggle api for downloading dataset)
from google.colab import files
uploaded = files.upload()
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json

os.environ['KAGGLE_USERNAME'] = "giovannibuscemi"
os.environ['KAGGLE_KEY'] = "XXXXXXXX"
!kaggle datasets download -d yelp-dataset/yelp-dataset
! unzip yelp-dataset.zip -d yelp-dataset

#NGROCK FOR MONITORING PYSPARK EXECUTION
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
!pip install pyngrok
import getpass
from pyngrok import ngrok, conf

import json
import pandas as pd
import numpy as np
import re
import string
import sys
import findspark

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count
from pyspark import SparkContext, SparkConf

spark = SparkSession.builder.master("local[*]").config('spark.ui.port', '4050').appName("CFIB").getOrCreate()

print("Enter your authtoken, can be copied "
"from https://dashboard.ngrok.com")
conf.get_default().auth_token = getpass.getpass()

ui_port = 4050
public_url= ngrok.connect(ui_port).public_url
print(f" * ngrok tunnel \"{public_url}\" -> \"http://127.0.0.1:{ui_port}\"")

#UTIL FOR COMPUTING
def create_dict(values):
    dict = {}
    for key,value in values:
        dict.update({key:value})
    return dict
def normalize_row(values):
    row_values = list(values)
    mean = sum(value[1] for value in row_values) / len(row_values)
    updated_values = [(value[0], value[1] - mean) for value in row_values]
    return updated_values
#USED TO MAP NORMALIZED UTILITY MATRIX IN A COLUMN REPRESENTATION FOR SIMILARITY COMPUTING
def transform_row(row):
    user, business_ratings = row
    return [(business, (user, rating)) for business, rating in business_ratings]
#########################################
############# MAIN FUNCTION #############
#########################################

#Create Utility Matrix in sparse form
def map_to_indices(row,user_map,business_map):
        user_id = row.user_id
        business_id = row.business_id
        user_index = user_map.get(user_id, -1)
        business_index = business_map.get(business_id, -1)
        if user_index == -1 or business_index == -1:
            pass
        else :
            return (user_index, (business_index, row.stars))

def calculate_similarity(pair,columns_broadcast):
    columns = columns_broadcast.value
    business1, business2 = pair
    if business1==business2:
        return None
    dict1 = columns[business1]
    dict2 = columns[business2]
    common = set(dict1.keys()) & set(dict2.keys())
    if not common:
        return None
    else :
        numerator = 0
        denominator1 = 0
        denominator2 = 0
        for key in common:
            numerator += dict1[key] * dict2[key]
            denominator1 += dict1[key] ** 2
            denominator2 += dict2[key] ** 2
        if denominator1 == 0 or denominator2 == 0:
            return None
        else:
            similarity = numerator / (denominator1 ** 0.5 * denominator2 ** 0.5)
            return (business1,(business2,similarity) )

#Create Blank Prediction Matrix
def upgrade_prediction(row,global_index,similarity_rdd,k):
    similarity_matrix = similarity_rdd.value
    result = []

    ind = set(global_index) - set(row.keys())
    avg = sum(row.values())/len(row)
    for i in ind:
            if i in similarity_matrix:
                similarity_dict = similarity_matrix[i]
                #TOP_K_SIMILARITIES :
                sorted_dict = sorted(similarity_dict.items(), key=lambda item: item[1], reverse=True)
                top_k = dict(sorted_dict[:k])
                common = set(top_k.keys()) & set(row.keys())
                numerator=0
                denominator=0
                for key in common:
                    numerator += row[key]*top_k[key]
                    denominator += top_k[key]
                score = avg + (numerator / denominator) if denominator != 0 and numerator != 0 else -5
                if score != -5:
                    result.append({i: score})
            else :
                print('negative result')


    return sorted(result, key=lambda x: list(x.values())[0], reverse=True)



#READ DATAFRAME AND CREATE DICTS FORM USER AND BUSINESS MAP :
df_review = spark.read.json("yelp-dataset/yelp_academic_dataset_review.json")
#PREPROCESS :
df_u = spark.read.json("yelp-dataset/yelp_academic_dataset_user.json")
df_b = spark.read.json("yelp-dataset/yelp_academic_dataset_business.json")
#GLOBAL VARIABLE-HYPERPARAMETER
k = 10
t = 100
frac = 0.001

starting_user = df_u.filter(df_u["review_count"]>500).select("user_id")      #.withColumnRenamed("user_id")
starting_users = starting_user.sample(fraction)


business = df_review.join(starting_users,df_review.user_id == starting_users.user_id,"inner")\
            .select("business_id").distinct().withColumnRenamed("business_id","business_id1")
users = df_review .join(business,df_review.business_id == business.business_id1,"inner")\
          .select("user_id").distinct().withColumnRenamed("user_id","user_id1")
df_filtered = df_review.join(business,df_review.business_id == business.business_id1,"inner")\
                .join(users,df_review.user_id == users.user_id1,"inner")\
                .select("user_id","business_id","stars")
#CREATE MAP FOR ID TO INTEGER
unique_user_ids = users.select('user_id1').rdd.map(lambda row: row[0]).collect()
unique_business_ids = business.select('business_id1').rdd.map(lambda row: row[0]).collect()

user_map = {user_id: idx for idx, user_id in enumerate(unique_user_ids)}
business_map = {business_id: idx for idx, business_id in enumerate(unique_business_ids)}

#UTILITY MATRIX BY BUSINESS :
utility_row= df_filtered.rdd.map(lambda row: map_to_indices(row, user_map, business_map)) \
                        .filter(lambda pair: pair[0] != -1 and pair[1][0] != -1)\
                        .groupByKey().mapValues(lambda values : normalize_row(values))

columns = utility_row.flatMap(transform_row).groupByKey().mapValues(lambda values : create_dict(values))

#convert tuple into dict to optimize the operation
utility_row = utility_row.mapValues(lambda values : create_dict(values))

business_combinations = columns.keys() \
    .cartesian(columns.keys()) \
#Broadcast of Utility matrix by column
columns_broadcast = spark.sparkContext.broadcast(columns.collectAsMap())
#similarity matrix :
similarities = business_combinations.map(lambda pair: calculate_similarity(pair, columns_broadcast)).filter(lambda x: x is not None)
similarities_row = similarities.groupByKey().mapValues(lambda values : create_dict(values))

#Broadcast of similarity matrix for prediction computing
similarity_rdd = spark.sparkContext.broadcast(similarities_row.collectAsMap())
#index for blank value computing
global_index =  business_map.values()

upgraded_matrix = utility_row.map(lambda row: (row[0],upgrade_prediction(row[1],global_index,similarity_rdd,k)))
upgraded_matrix.saveAsTextFile("output")

^C
Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
