In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession 

spark = SparkSession.builder.appName("item similarity").config("spark.some.config.option", "some-value").getOrCreate()

user_raw_data = spark.read.json("/Users/elgin/Downloads/INF553/yelp_dataset/yelp_academic_dataset_user.json").select("user_id", "review_count")
review_raw_data = spark.read.json("/Users/elgin/Downloads/INF553/yelp_dataset/yelp_academic_dataset_review.json").select("user_id", "business_id", "stars")
business_raw_data_count = spark.read.json("/Users/elgin/Downloads/INF553/yelp_dataset/yelp_academic_dataset_business.json").select("business_id", "review_count")

# Functions

## calculate business similarity and predict

In [2]:
import math
import pandas as pd
def ItemSimilarity(x, y):
    """
    Input: 
        x : restaurant1 id
        y : restaurant2 id
    Output:
        similarity : similarity between x and y
    """
    #extract all reviews whose business_id equals x and y
    x_reviews_panda = review_bus_panda.loc[review_bus_panda['business_id'] == x]
    y_reviews_panda = review_bus_panda.loc[review_bus_panda['business_id'] == y]
    
    #calculate square_sum of x and y
    square_sum_x = 0.0
    x_userid_set = set()
    x_userid_rating_dic = {}
    for index, row in x_reviews_panda.iterrows():
        square_sum_x = square_sum_x + pow(row["stars"], 2)
        x_userid_set.add(row["user_id"])
        x_userid_rating_dic[row["user_id"]] = row["stars"]
    square_sum_x = math.sqrt(square_sum_x)
    
    square_sum_y = 0.0
    y_userid_set = set()
    y_userid_rating_dic = {}
    for index, row in y_reviews_panda.iterrows():
        square_sum_y = square_sum_y + pow(row["stars"], 2)
        y_userid_set.add(row["user_id"])
        y_userid_rating_dic[row["user_id"]] = row["stars"]
    square_sum_y = math.sqrt(square_sum_y)
    
    #check if square_sum equals 0
    if square_sum_x * square_sum_y == 0:
        return 0
    
    #calculate product of ratings of items rated both by x and y
    product_sum = 0.0
    common_userid = x_userid_set & y_userid_set
    for user in common_userid:
        product_sum = product_sum + (x_userid_rating_dic[user])*(y_userid_rating_dic[user])
    similarity = product_sum / (square_sum_x * square_sum_y)
    return similarity


def Prediction(business_id, user_id):
    """
    Input:
        candidate_set: a set of business_id that the user might try
    Output:
        prediction_list: a list of prediction(format: [candidate_business_id, prediction])
    """
    
    #extract business_id the user rated
    businessid_rating_dic = {}
    business_id_rated = []
    
    number_review_to_use = 20
    review_user_rated = review_bus_panda.loc[review_bus_panda['user_id'] == user_id].sort_values(by = ['breview_count'], ascending = 0)
    review_few_rating = review_user_rated.loc[review_user_rated['stars'] < 2]
    review_medium_rating = review_user_rated.loc[(review_user_rated['stars'] >= 2) & (review_user_rated['stars'] < 4)]
    review_high_rating = review_user_rated.loc[review_user_rated['stars'] >= 4]
    
    count_reviews = len(review_user_rated.index)
    num_few_raintg = math.ceil((len(review_few_rating.index) / count_reviews) * number_review_to_use)
    num_medium_rating = math.ceil((len(review_medium_rating.index) / count_reviews) * number_review_to_use)
    num_high_rating = number_review_to_use - num_few_raintg - num_medium_rating
    
    df_few = review_few_rating.head(num_few_raintg)
    df_medium = review_medium_rating.head(num_medium_rating)
    df_high = review_high_rating.head(num_high_rating)
    review_processed = pd.concat([df_few, df_medium, df_high], axis=0, ignore_index=True)
    
    for index, row in review_processed.iterrows():
        b_id = row["business_id"]
        rating = row["stars"]
        businessid_rating_dic[b_id] = rating
        business_id_rated.append(b_id)
    
    #calculate prediction
    prediction = 0.0
    sim_sum = 0.0
    for b in business_id_rated:
        sim = ItemSimilarity(business_id, b)
        rating = (businessid_rating_dic[b]) * sim
        prediction = prediction + rating
        sim_sum = sim_sum + sim
#         print(sim,businessid_rating_dic[b],rating)
    if sim_sum == 0:
        prediction = 0
    else:
        prediction = prediction / sim_sum  
    return prediction
    

## load test file

In [11]:
def LoadData(fileName):
    testData = []
    with open(fileName) as txtData:
        lines = txtData.readlines()
        for line in lines:
            lineData = line.strip().split(',')
            testData.append(lineData)
    return testData


## load prediction file

In [21]:
def LoadPred(prediction_file_name):
    predData = []
    with open(prediction_file_name) as txtData:
        lines = txtData.readlines()
        for line in lines:
            lineData = line.strip().split(' ')
            predData.append(lineData)
    return predData

## compute accuracy and precision

In [45]:
def CalResult(test_file_name, pred_file_name):
    """
    Input:
        test_file_name: path of test file 
        pred_file_name: path of prediction file
    Output:
        [TP,FP,FN,TN,LN]: LN is len(test_data)
    """
    TP = 0
    FP = 0
    FN = 0
    TN = 0
    
    test_data = LoadData(test_file_name)
    prediction_data = LoadPred(pred_file_name)
    
    for i in range(len(prediction_data)):
        pred = float(prediction_data[i][2])
        test = float(test_data[i][2])
        if pred >= 4 and test >= 4:
            TP = TP + 1
        elif pred >= 4 and test < 4:
            FP = FP + 1
        elif pred < 4 and test >= 4:
            FN = FN + 1
    TN = len(prediction_data) - TP - FP - FN
    LN = len(prediction_data)
    return([TP,FP,FN,TN,LN])

# Clean data

In [3]:
review_user = review_raw_data.join(user_raw_data, on = "user_id", how = "left")


In [4]:
review_user = review_user.filter(review_user["review_count"] > 20)


In [5]:
business_raw_data_count = business_raw_data_count.withColumnRenamed("review_count", "breview_count")
review_bus = review_user.join(business_raw_data_count, on = "business_id", how = "left")


In [6]:
review_bus = review_bus.filter(review_bus['breview_count'] > 10)


In [7]:
review_bus_panda = review_bus.toPandas()


In [10]:
user_avg_data = spark.read.json("/Users/elgin/Downloads/INF553/yelp_dataset/yelp_academic_dataset_user.json").select("user_id", "average_stars").toPandas()

In [8]:
business_avg = spark.read.json("/Users/elgin/Downloads/INF553/yelp_dataset/yelp_academic_dataset_business.json").select("business_id", "stars", "review_count")

In [9]:
business_avgStars = business_avg.toPandas()

In [15]:
test_data = LoadData("/Users/elgin/Downloads/test_small_2.txt")

# Run test data and generate output file

In [35]:
text_file = open("projectOutput1.txt", "w")
import time
start_time = time.time()

for item in test_data:
    res_s = ""
    res_s = res_s + item[0] + " " + item[1] + " "
    ipred = Prediction(item[1], item[0])
    pred = str(ipred)
    if ipred == 0:
        busi = business_avgStars.loc[business_avgStars['business_id'] == item[1]]
        pred = str(busi['stars'].values[0])
    res_s = res_s + pred
    print(res_s)
    text_file.write(res_s + "\n")
text_file.close()
end_time = time.time()
print(end_time - start_time)

QkO16gr4QaPaKPWeUTAEqA eDJ0aUIAzc6PNuZ_6ELR1g 4.999999999999999


KeyboardInterrupt: 

# Calculate accuray, precision, recall and RMSE

In [46]:
small_test_res_file = open("small_test_res.txt", "w")
small_test_res = CalResult("/Users/elgin/Downloads/test_small.txt","/Users/elgin/Desktop/item_ouput/test_small_item_output.txt")
accuracy = (small_test_res[0]+small_test_res[3]) / small_test_res[4]
precision = small_test_res[0] / (small_test_res[0]+small_test_res[1])
recall = small_test_res[0] / (small_test_res[0]+small_test_res[2])
small_test_res_file.write("accuracy : "+str(accuracy)+"\n")
small_test_res_file.write("precision : "+str(precision)+"\n")
small_test_res_file.write("recall : "+str(recall))
small_test_res_file.close()

In [47]:
small_test_res_file = open("small_test2_res.txt", "w")
small_test_res = CalResult("/Users/elgin/Downloads/test_small_2.txt","/Users/elgin/Desktop/item_ouput/test_small2_item_output.txt")
accuracy = (small_test_res[0]+small_test_res[3]) / small_test_res[4]
precision = small_test_res[0] / (small_test_res[0]+small_test_res[1])
recall = small_test_res[0] / (small_test_res[0]+small_test_res[2])
small_test_res_file.write("accuracy : "+str(accuracy)+"\n")
small_test_res_file.write("precision : "+str(precision)+"\n")
small_test_res_file.write("recall : "+str(recall))
small_test_res_file.close()

In [53]:
test_data_small = LoadData("/Users/elgin/Downloads/test_small.txt") 
pred_data_small = LoadPred("/Users/elgin/Desktop/item_ouput/test_small_item_output.txt")
dif_square_sum = 0.0
for i in range(len(test_data_small)):
    test_rating = float(test_data_small[i][2])
    pred_rating = float(pred_data_small[i][2])
    dif = test_rating - pred_rating
    dif_square_sum = dif_square_sum + pow(dif, 2)
dif_square_sum = dif_square_sum / len(test_data_small)
print(math.sqrt(dif_square_sum))

0.8011191927817288
