In [1]:
train_dataset = "/Users/arun/Downloads/Restaurants/train/PA_train_yelp_academic_dataset_review.csv"
test_dataset = '/Users/arun/Downloads/Restaurants/test/PA_test_yelp_academic_dataset_review.csv'
valid_dataset = '/Users/arun/Downloads/Restaurants/valid/PA_valid_yelp_academic_dataset_review.csv'

In [76]:
# train_dataset='/Users/arun/Downloads/datasets/yelp/NV/train/NV_train_yelp_academic_dataset_review.csv'
# #dev_dataset='/Users/arun/Downloads/datasets/yelp/PA/Restaurants/valid/PA_valid_yelp_academic_dataset_review.csv'
# test_dataset='/Users/arun/Downloads/datasets/yelp/NV/test/NV_test_yelp_academic_dataset_review.csv'

In [2]:
import pyspark
# Make sure pyspark version and spark version match
sc = pyspark.SparkContext('local[*]')

In [3]:
pyspark.SparkContext.setCheckpointDir(sc, '/tmp/spark-checkpoints')

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("graph analysis").getOrCreate()


In [5]:
train_df = spark.read.csv(train_dataset, header=True, quote='"', escape='"', multiLine=True)
valid_df = spark.read.csv(valid_dataset, header=True, quote='"', escape='"', multiLine=True)
test_df = spark.read.csv(test_dataset, header=True, quote='"', escape='"', multiLine=True)

In [8]:
train_df.columns

['funny',
 'user_id',
 'review_id',
 'text',
 'business_id',
 'stars',
 'date',
 'useful',
 'cool',
 '1overN',
 '2overN',
 'percentile\r']

In [9]:
train_rdd = train_df.select("user_id", "business_id", "stars").rdd.map(lambda (user, business, star): (user, business, int(star)))
valid_rdd = valid_df.select("user_id", "business_id", "stars").rdd.map(lambda (user, business, star): (user, business, int(star)))
test_rdd = test_df.select("user_id", "business_id", "stars").rdd.map(lambda (user, business, star): (user, business, int(star)))

In [10]:
train_rdd.count()

84887

In [11]:
train_rdd.take(5)

[(u'eG6HneK9zLcuZpVuKcsCGQ', u'XqNDr54eLDLRfZwo4l4dVA', 4),
 (u'AlzerMK7z84E4KU6GjPzIQ', u'PyTHy9VPOhBCiGLsi-PA2Q', 3),
 (u'AlzerMK7z84E4KU6GjPzIQ', u'zzwhN7x37nyjP0ZM8oiHmw', 4),
 (u'AlzerMK7z84E4KU6GjPzIQ', u'Ul6JwluSTm12PVDIqnNaTg', 4),
 (u'AlzerMK7z84E4KU6GjPzIQ', u'2Ezp_HYCIVE-h7hpBBvtxw', 4)]

In [12]:
def preprocess_data(train, dev, test):
    
    train_dev_test = sc.union([train, dev, test])
    
    user_index = train_dev_test.map(lambda x: x[0]).distinct().zipWithIndex()
    bus_index = train_dev_test.map(lambda x: x[1]).distinct().zipWithIndex()
    
    train_index = train.map(lambda x: (x[0], (x[1], x[2]))).join(user_index)\
    .map(lambda x: (x[1][0][0], (x[1][0][1], x[1][1]))).join(bus_index)\
    .map(lambda x: (x[1][0][1], x[1][1], int(x[1][0][0])))
    
    dev_index = dev.map(lambda x: (x[0], (x[1], x[2]))).join(user_index)\
    .map(lambda x: (x[1][0][0], (x[1][0][1], x[1][1]))).join(bus_index)\
    .map(lambda x: (x[1][0][1], x[1][1], int(x[1][0][0])))
    
    test_index = test.map(lambda x: (x[0], (x[1], x[2]))).join(user_index)\
    .map(lambda x: (x[1][0][0], (x[1][0][1], x[1][1]))).join(bus_index)\
    .map(lambda x: (x[1][0][1], x[1][1], int(x[1][0][0])))
    
    return (train_index, dev_index, test_index, user_index, bus_index)

In [14]:
# train, dev, test, user_index, bus_index = preprocess_data(train_rdd, valid_rdd, test_rdd)
train, dev, test, user_index, bus_index = preprocess_data(train_rdd, valid_rdd, test_rdd)

In [15]:
train.checkpoint()
dev.checkpoint()
test.checkpoint()

## Train ALS model

In [29]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
import numpy as np

ranks = [5, 10]
lambda_ = [0.1, 0.2, 0.3, 0.4, 0.5]
iterations = [30]

def grid_search(train, dev, dev_actual_kv):
    for rank in ranks:
        for l in lambda_:
            for it in iterations:
                print rank, l, it

                model = ALS.train(train, rank, lambda_=l, iterations=it, seed=10)
                predictions = model.predictAll(dev)
                predictions_kv = predictions.map(lambda x: ((x[0], x[1]), x[2]))
                test_merged = predictions_kv.join(dev_actual_kv)
                mse = test_merged.map(lambda x: (x[1][0] - x[1][1]) ** 2).mean()

                print rank, l, it, np.sqrt(mse)

In [17]:
dev_data = dev.map(lambda x: (x[0], x[1]))
dev_actual_kv = dev.map(lambda x: ((x[0], x[1]), x[2]))

# test_data = test.map(lambda x: (x[0], x[1]))
# test_actual_kv = test.map(lambda x: ((x[0], x[1]), x[2]))


In [30]:
grid_search(train, dev_data, dev_actual_kv)


5 0.1 30
5 0.1 30 1.5111275867367675
5 0.2 30
5 0.2 30 1.421761059250058
5 0.3 30
5 0.3 30 1.3884958167873793
5 0.4 30
5 0.4 30 1.3785463561970674
5 0.5 30
5 0.5 30 1.3878138849424924
10 0.1 30
10 0.1 30 1.5126671549280883
10 0.2 30
10 0.2 30 1.424092589355232
10 0.3 30
10 0.3 30 1.3886587616343984
10 0.4 30
10 0.4 30 1.377688186847247
10 0.5 30
10 0.5 30 1.3860366337064651


In [43]:
rank = 10
l = 0.4
it = 100

model = ALS.train(train, rank, lambda_=l, iterations=it, seed=10)

In [22]:
test.count()


65591

In [19]:
## Performance in test set

In [44]:
test_data = test.map(lambda x: (x[0], x[1]))
test_actual_kv = test.map(lambda x: ((x[0], x[1]), x[2]))

predictions = model.predictAll(test_data)
predictions_kv = predictions.map(lambda x: ((x[0], x[1]), x[2]))
test_merged = predictions_kv.join(test_actual_kv)
mse = test_merged.map(lambda x: (x[1][0] - x[1][1]) ** 2).mean()

print rank, l, it, np.sqrt(mse)

10 0.4 100 1.3979137792599632


In [78]:
#10 0.4 100 1.3979137792599632
#HIT RATIO Score:  0.00506714801374


In [35]:
recommendations = model.recommendProductsForUsers(50)

In [37]:
recommendations = recommendations.collect()

In [47]:
len(recommendations)

12072

In [45]:
train_set = set()

for row in train.collect():
    train_set.add((row[0], row[1]))
for row in dev.collect():
    train_set.add((row[0], row[1]))

In [52]:
recommendation_list = dict()

In [67]:
b_index = dict()
reverse_b_index = dict()

for row in bus_index.collect():
    b_index[row[0]] = row[1]
    reverse_b_index[row[1]] = row[0]


In [68]:
u_index = dict()
reverse_u_index = dict()

for row in user_index.collect():
    u_index[row[0]] = row[1]
    reverse_u_index[row[1]] = row[0]

In [73]:
for user, rec in recommendations:
    recommendation_list[user] = []
    for r in rec:
        if ((user, r[1]) in train_set):
            pass
        else:
            recommendation_list[user].append(reverse_b_index[r[1]])

In [66]:
## Convert to user id, product id and write to file

In [75]:
with open("als_predictions.txt", "w") as outfile:
    for user in recommendation_list:
        line = reverse_u_index[user] + ","
        line += ",".join(recommendation_list[user])
        
        outfile.write(line + "\n")
    

In [None]:
## Remove the ones in train and dev and write the remaining results?