## Set Up
* Download the files, and extract them and write them into the required form

In [1]:
import urllib.request
import gzip
import shutil
import json

# Download gzip file
filename = 'ratebeer.json.gz'
urllib.request.urlretrieve('https://datarepo.eng.ucsd.edu/mcauley_group/data/beer/ratebeer.json.gz', filename)

# Extract gzip file into a json file
def unzip_gzip(input_file, output_file):
    with gzip.open(input_file, 'rb') as f_in:
        with open(output_file, 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)

def json_to_jsonlines(input_file):
   jsonHolder =  []
   with open(input_file, 'r') as input_file:
     for obj in input_file:
       try:
         data_dict = json.loads(obj.replace("'", "\""))
         jsonHolder.append({"review/profileName": data_dict["review/profileName"], "rating": data_dict['review/overall'], "beer/beerId": data_dict["beer/beerId"], "name": data_dict["beer/name"]})
       except:
         pass
   return jsonHolder

def convert_to_jsonl():
    with open('data.jsonl', 'w') as f:
      for entry in jsonHolder:
        json.dump(entry, f)
        f.write('\n')

unzip_gzip('ratebeer.json.gz', 'data.json')
jsonHolder = json_to_jsonlines('data.json')
convert_to_jsonl()

## Setup Clusters and run the beer prediction Algorithm

In [1]:
from dask.distributed import Client, LocalCluster, default_client
import dask
from dask import bag as db
import dask.dataframe as dd
import pandas as pd
import json
import numpy as np
import gc

# Start up clusters
try:
    if default_client() is not None:
        default_client().close()
except ValueError:
    pass

cluster = LocalCluster(n_workers=7, threads_per_worker=1, memory_limit='4GB')
client = Client(cluster)
dashboard_link = client.dashboard_link
print ("Dask Dashboard link: ", dashboard_link)

data_bag = db.read_text('data.jsonl', blocksize="10MB")
data_bag = data_bag.map(json.loads)

# data_bag = data_bag.random_sample(0.2) # For setting size of sample data to be ran
# print(data_bag.count().compute()) # Debug

# Filter the data, to only use relvant entres
profile_name_counts = data_bag.pluck("review/profileName").frequencies().compute()
beer_id_counts = data_bag.pluck("beer/beerId").frequencies().compute()

profile_name_counts_dict = dict(profile_name_counts)
beer_id_counts_dict = dict(beer_id_counts)

filtered_bag = data_bag.filter(lambda x: beer_id_counts_dict[x["beer/beerId"]] > 50)
filtered_bag = filtered_bag.filter(lambda x: profile_name_counts_dict[x["review/profileName"]] > 20)


result = filtered_bag.compute()

print(f"Number of entries after filtering: {len(result)}") # Debug


data_bag = db.from_sequence(result, npartitions=16)

# Get unique users and beers
beer_with_name = data_bag.map(lambda x: (x["beer/beerId"], x["name"]))
beer_with_name = beer_with_name.compute()
beer_with_name = dict(beer_with_name)

unique_profile_names = data_bag.pluck('review/profileName').distinct().compute()
unique_beer_ids = data_bag.pluck("beer/beerId").distinct().compute()

num_profiles = len(unique_profile_names)
num_beers = len(unique_beer_ids)

print(f"Number of unique beers: {num_beers}") # Debug
print(f"Number of unique users: {num_profiles}") # Debug

utility_matrix = [np.zeros(num_beers) for _ in range(num_profiles)]

# Map the index of the userId and beerId to there position in the utility matrix
user_index_map = {user: idx for idx, user in enumerate(unique_profile_names)}
beer_index_map = {beer: idx for idx, beer in enumerate(unique_beer_ids)}

# client.restart() # May be required

def update_sinle_record(row, user_index_map, beer_index_map):
    beer_id = beer_index_map[row['beer/beerId']]
    profile_id = user_index_map[row['review/profileName']]
    rating = int(row['rating'].split('/')[0])
    return (beer_id, profile_id, rating)

def apply_update(row):
    global user_index_map
    global beer_index_map
    return update_sinle_record(row, user_index_map, beer_index_map)

partitioned_bag = data_bag.repartition(npartitions=128)
updates = partitioned_bag.map(apply_update).compute()

for beer_id, user_id, rating in updates:
    utility_matrix[user_id][beer_id] = rating

# print(utility_matrix) # Debug

def clear_worker_data():
    import gc
    gc.collect()


client.run(clear_worker_data)
client.rebalance()

test_user = utility_matrix[0] # Set test user to the first user in matrix
# print(test_user) # Debug

utility_matrix_bag = db.from_sequence(utility_matrix)

# client.restart()

def cosine_similarity(u, v):
    dot_product = np.dot(u, v)
    norm_u = np.linalg.norm(u)
    norm_v = np.linalg.norm(v)
    # print(f"{dot_product} / ({norm_u} * {norm_v})")
    similarity = dot_product / (norm_u * norm_v)
    # print(similarity)
    return similarity


def calculate_similarity(utility_matrix, test_user):
    sims = utility_matrix.map(lambda x: (x, cosine_similarity(x, test_user)))
    return sims

similarities = calculate_similarity(utility_matrix_bag, test_user)

# print(similarities.compute()) # Debug

top_similar_users = sorted(similarities, key=lambda x: x[1], reverse=True)[1:11]
# print(top_similar_users) # Debug

def get_weighted_matrix(similar_users, utility_matrix):
    weighted_matrix = []
    for user_tup in similar_users:
        similarity = user_tup[1]
        user_matrix = user_tup[0]
        for i in range(len(user_matrix)):
            user_matrix[i] = similarity * user_matrix[i]
        weighted_matrix.append((user_id, user_matrix))
    
    return weighted_matrix

weighted_matrix = get_weighted_matrix(top_similar_users, utility_matrix)

# print(weighted_matrix) # Debug

def recommend_beer(weighted_matrix, num_similar, num_beers):
    weighted_sum = np.zeros(num_beers)
    for i in range(num_similar):
        for j in range(num_beers):
            weighted_sum[j] = weighted_sum[j] + weighted_matrix[i][1][j]
    return weighted_sum

num_similar = len(top_similar_users)
num_beers = len(utility_matrix[0])
    
weighted_sum = recommend_beer(weighted_matrix, num_similar, num_beers)

print(beer_with_name[unique_beer_ids[np.argmax(weighted_sum)]]) # Print the recommended bear



Dask Dashboard link:  http://127.0.0.1:8787/status
Number of entries after filtering: 1947817


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Number of unique beers: 8963
Number of unique users: 6479


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Westvleteren Extra 8
