In [1]:
from pyspark.context import SparkContext
import json
from datetime import datetime
import pytz

In [2]:
sc = SparkContext()
sc.setLogLevel('ERROR')

In [3]:
FOLDER_PATH = '/Users/veersingh/Desktop/competition_files/'
TRAIN_FILE_PATH = FOLDER_PATH + 'yelp_train.csv'

BUSINESS_FILE_PATH = FOLDER_PATH + 'business.json'
CHECKIN_FILE_PATH = FOLDER_PATH + 'checkin.json'
PHOTO_FILE_PATH = FOLDER_PATH + 'photo.json'
TIP_FILE_PATH = FOLDER_PATH + 'tip.json'
USER_FILE_PATH = FOLDER_PATH + 'user.json'

TESTING_FILE_PATH = '/Users/veersingh/Desktop/competition_files/yelp_val.csv'
OUTPUT_FILE_PATH = '/Users/veersingh/Desktop/Recommendation-System-to-predict-Yelp-ratings/output.csv'

In [4]:
# Read in the training dataset. Remove the header and convert a csv string into a list of 3 elements
# [user_id, business_id, rating(float type)]
train_RDD = sc.textFile(TRAIN_FILE_PATH)
headers_train = train_RDD.first()
train_RDD = train_RDD.filter(lambda x:x!=headers_train).map(lambda x:x.split(',')).map(lambda x:[x[0], x[1], float(x[2])])

In [5]:
def get_latitude(latitude_value):
    if not latitude_value:
        return 0
    return latitude_value

def get_longitude(longitude_value):
    if not longitude_value:
        return 0
    return longitude_value

def get_num_attributes(attributes_dict):
    if not attributes_dict:
        return 0
    return len(attributes_dict)

def get_rate_true_attributes(attributes_dict):
    if not attributes_dict:
        return 0
    num_total = 0
    num_true = 0
    for k,v in attributes_dict.items():
        if v in ('True', 'False'):
            num_total += 1
            if v == 'True':
                num_true += 1
    if num_total == 0:
        return 0
    return num_true/num_total
            

def get_num_categories(categories):
    if not categories:
        return 0
    categories = categories.split(',')
    return len(categories)

In [6]:
# Get the following features for each business: id, latitude, longitude, stars, review_count, if its open or closed, rate of true attributes i.e. num true attributes/total attributes and number of categories
business_RDD = sc.textFile(BUSINESS_FILE_PATH).map(lambda x: json.loads(x)).map(lambda x: (x['business_id'],
                                                                                              [float(get_latitude(x['latitude'])),
                                                                                              float(get_longitude(x['longitude'])),
                                                                                              float(x['stars']),
                                                                                              int(x['review_count']),
                                                                                              int(x['is_open']),
                                                                                              get_rate_true_attributes(x['attributes']),
                                                                                              get_num_categories(x['categories'])]
                                                                                          ))

In [7]:
def get_num_checkins(checkin_data):
    return sum(checkin_data.values())

In [8]:
# Get the total number of check ins for a business
checkIn_RDD = sc.textFile(CHECKIN_FILE_PATH).map(lambda x: json.loads(x)).map(lambda x: (x['business_id'], get_num_checkins(x['time']))).map(lambda x: (x[0], [x[1]]))


In [9]:
# Get the total number of photos for a business
photo_RDD = sc.textFile(PHOTO_FILE_PATH).map(lambda x: json.loads(x)).map(lambda x: (x['business_id'], 1)).reduceByKey(lambda x,y: x+y).map(lambda x: (x[0], [x[1]]))



In [10]:
# Get the total number of tips given by a user and the total number of tips for each business
tip_RDD = sc.textFile(TIP_FILE_PATH).map(lambda x: json.loads(x))

tips_business_RDD = tip_RDD.map(lambda x: (x['business_id'], 1)).reduceByKey(lambda x,y: x+y).map(lambda x: (x[0], [x[1]]))
tips_user_RDD = tip_RDD.map(lambda x: (x['user_id'], 1)).reduceByKey(lambda x,y: x+y).map(lambda x: (x[0], [x[1]]))



In [11]:
def get_yelping_since(yelping_since):
    date_obj = datetime.strptime(yelping_since, '%Y-%m-%d')
    utc_date = pytz.utc.localize(date_obj)
    return int(utc_date.timestamp())

def get_num_friends(friends):
    if friends == 'None':
        return 0
    friends = friends.split(',')
    return len(friends)

def get_num_elites(elite):
    if elite == 'None':
        return 0
    elite = elite.split(',')
    return len(elite)

In [12]:
# Get the features for each user
user_RDD = sc.textFile(USER_FILE_PATH).map(lambda x: json.loads(x)).map(lambda x: (x['user_id'],
                                                                               [
                                                                                   int(x['review_count']),
                                                                                   get_yelping_since(x['yelping_since']),
                                                                                   get_num_friends(x['friends']),
                                                                                   int(x['useful']),
                                                                                   int(x['funny']),
                                                                                   int(x['cool']),
                                                                                   int(x['fans']),
                                                                                   get_num_elites(x['elite']),
                                                                                   float(x['average_stars']),
                                                                                   int(x['compliment_hot']),
                                                                                   int(x['compliment_more']),
                                                                                   int(x['compliment_profile']),
                                                                                   int(x['compliment_cute']),
                                                                                   int(x['compliment_list']),
                                                                                   int(x['compliment_note']),
                                                                                   int(x['compliment_plain']),
                                                                                   int(x['compliment_cool']),
                                                                                   int(x['compliment_funny']),
                                                                                   int(x['compliment_writer']),
                                                                                   int(x['compliment_photos'])
                                                                               ]))

In [64]:
def combine_lists(data_row):
    # fix nonetype error
    if data_row[1][1] == None:
        return[data_row[0], data_row[1][0] + [0]]
    if type(data_row[1][0]) == str:
        return [data_row[0], [data_row[1][0]] + data_row[1][1]]
    return [data_row[0], data_row[1][0] + data_row[1][1]]

In [14]:
# Combine the following RDDs to create a vector for each business with business id as key and list of features as value
# business_RDD + checkIn_RDD + photo_RDD + tips_business_RDD
# make sure to fix NoneType error when combining lists since some values are None

business_features_RDD = business_RDD.leftOuterJoin(checkIn_RDD).map(lambda x: combine_lists(x)).leftOuterJoin(photo_RDD).map(lambda x: combine_lists(x)).leftOuterJoin(tips_business_RDD).map(lambda x: combine_lists(x))




In [15]:
# Combine the following RDDs to create a vector for each user with user id as key and list of features as value
# user_RDD + tips_user_RDD
# make sure to fix NoneType error when combining lists since some values are None

user_features_RDD = user_RDD.leftOuterJoin(tips_user_RDD).map(lambda x: combine_lists(x))

In [67]:
# join the train_RDD and business_features_RDD
# we need to have the business_id as the key for this
train_RDD_tmp = train_RDD.map(lambda x: (x[1], x[0]))
train_join_business_features_RDD = train_RDD_tmp.leftOuterJoin(business_features_RDD).map(lambda x: combine_lists(x))



In [83]:
def switch_keys(data_row):
    bus_id = data_row[0]
    usr_id = data_row[1][0]
    features = data_row[1][1:]
    
    return (usr_id, [bus_id] + features)

In [92]:
# now join this with the user_features_RDD. We need to have the user_id as key for this
train_join_business_features_RDD_tmp = train_join_business_features_RDD.map(lambda x: switch_keys(x))
train_join_business_features_user_features_RDD = train_join_business_features_RDD_tmp.leftOuterJoin(user_features_RDD)



In [110]:
def join_all(data_row):
    usr_id = data_row[0]
    bus_id = data_row[1][0][0]
    bus_features = data_row[1][0][1:]
    usr_features = data_row[1][1]
    
    return ((usr_id, bus_id), bus_features + usr_features)

In [124]:
# format the data as (user_id, business_id) [feature1, feature2, ...]
train_all_joined_MAP = train_join_business_features_user_features_RDD.map(lambda x: join_all(x)).collectAsMap()

In [127]:
# get the values in trainRDD
labels_MAP = train_RDD.map(lambda x: ((x[0], x[1]), x[2])).collectAsMap()