In [2]:
import json
import operator
from sets import Set

checkins = sc.textFile('../YelpDataset/yelp_academic_dataset_checkin.json')
businesses = sc.textFile('../YelpDataset/yelp_academic_dataset_business.json')
tips = sc.textFile('../YelpDataset/yelp_academic_dataset_tip.json')
users = sc.textFile('../YelpDataset/yelp_academic_dataset_user.json')
reviews = sc.textFile('../YelpDataset/yelp_academic_dataset_review.json')
positive_words = sc.textFile('positive-words.txt').collect()
negative_words = sc.textFile('negative-words.txt').collect()
negative_words = negative_words[0].split(",")
positive_words = positive_words[0].split(",")

negative_words = Set(negative_words)
positive_words = Set(positive_words)

selectedCategories = ['Restaurants', 'Shopping', 'Beauty & Spas', 'Bars', 'Automotive', 'Event Planning & Services',
                     'Fast Food', 'Coffee & Tea', 'Hotels', 'Real Estate', 'Dentists', 'Gyms']


  app.launch_new_instance()


In [3]:
# Reviews

def mapper_reviews_filter(rows):
    for row in rows:
        row = json.loads(row)
        yield (row['user_id'], (row['business_id'], row['stars']))

def mapper_business_filter(rows):
    for row in rows:
        row = json.loads(row)
        for category in row['categories']:
            if(category in selectedCategories):
                yield (row['business_id'], category)

def mapper_users_filter(rows):
    for row in rows:
        row = json.loads(row)
        elite = 0
        if len(row['elite']) == 0:
            elite = 1
        else:
            elite = 4 + len(row['elite'])
        yield (row['user_id'], elite)

def mapper_business_rating(rows):
    for row in rows:
        business_id = row[0]
        ratings_value = 0
        ratings_count = 0
        for rating in row[1]:
            ratings_value += rating[0]*rating[1]
            ratings_count += rating[1]
        if(ratings_value < 0):
            rating_value = 0
        yield (business_id, (float(ratings_value)/float(ratings_count), ratings_count))        

def mapper_normalized_total_reviews(records):
    for category in records:
        maxRatingValue = max(category[1],key=operator.itemgetter(1))[1][0] 
        minRatingValue = min(category[1],key=operator.itemgetter(1))[1][0] 
        maxRatingCount = max(category[1],key=lambda item:item[1][1])[1][1]
        minRatingCount = min(category[1],key=lambda item:item[1][1])[1][1]
        for business in category[1]:
            normalizedRatingValue = (float(business[1][0]) - minRatingValue)/(maxRatingValue - minRatingValue) * 35
            normalizedRatingCount = (float(business[1][1]) - minRatingCount)/(maxRatingCount - minRatingCount) * 5
            yield ((category[0], business[0]), normalizedRatingValue + normalizedRatingCount)   
            
reviews_filtered = reviews.mapPartitions(mapper_reviews_filter)
business_filtered = businesses.mapPartitions(mapper_business_filter)
users_filtered = users.mapPartitions(mapper_users_filter)

joined_reviews = reviews_filtered.join(users_filtered).map(lambda x: (x[1][0][0],(x[1][0][1], x[1][1]))).groupByKey()
total_reviews = joined_reviews.mapPartitions(mapper_business_rating)
category_join = total_reviews.join(business_filtered).map(lambda x: (x[1][1], (x[0], x[1][0]))).groupByKey().mapValues(list)
total_normalized_reviews = category_join.mapPartitions(mapper_normalized_total_reviews)
# total_sortedByRatingsCount = total_normalized.map(lambda (k, v): (k, sorted(v, key=lambda x: x[1][1], reverse=True)))
# total_sortedByRatingsValue = total_normalized.map(lambda (k, v): (k, sorted(v, key=lambda x: x[1], reverse=True)))
total_normalized_reviews.take(20)


[((u'Restaurants', u'sOcZcXcNm8LmdoOYqEDqpg'), 29.01123150105708),
 ((u'Restaurants', u'51qNc8sg9kCD5GEJ3TUrrw'), 21.875330338266384),
 ((u'Restaurants', u'Yl7jil0y_1GK1Z82vmUQew'), 19.689812367864693),
 ((u'Restaurants', u'VZYMInkjRJVHwXVFqeoMWg'), 19.52319482842739),
 ((u'Restaurants', u'bZX-b9fInWIk8j3Y50uBWQ'), 20.76096194503171),
 ((u'Restaurants', u'DeiWR9aL18WpVyzuey7dGg'), 29.167327343199435),
 ((u'Restaurants', u'fvzr5GVCnNpnF9V-MvYrFg'), 21.890525898520085),
 ((u'Restaurants', u'JL5B_s1EoN4zIru4JSw4Qg'), 19.693776427061312),
 ((u'Restaurants', u'dzZnC5YVzZVzW0he28HwEA'), 0.0),
 ((u'Restaurants', u'83C1eqPfmQHTy_8MC4Ug6w'), 17.745369692299857),
 ((u'Restaurants', u'2n41w1fel0glITinZ25syQ'), 28.438491014799155),
 ((u'Restaurants', u'_8tUKyMRMMt1YSlac04eqg'), 35.00033033826639),
 ((u'Restaurants', u'W7xb2iS4cYL14Yi5LOGSkA'), 21.28449859485381),
 ((u'Restaurants', u'3yRq8DjiSt4lSPEonpSaLQ'), 20.366014799154335),
 ((u'Restaurants', u'p3PrAbM6GkPkUXcp0FouNQ'), 30.625330338266384),


In [4]:
# Tips

def mapper_tips_filter(records):
    for row in records:
        row = json.loads(row)
        if (row["type"] == 'tip'):
            text = row["text"].split(" ")
            pos_counter = 0
            neg_counter = 0
            for i in text:
                if i in positive_words:
                    pos_counter += 1
                elif i in negative_words:
                    neg_counter += 1
            if pos_counter > neg_counter:
                yield (row["user_id"], (row["business_id"], 1))  #good
            else:
                yield (row["user_id"], (row["business_id"], -1)) #bad tip

def mapper_normalized_total_tips(records):
    for category in records:
        maxRatingValue = max(category[1],key=operator.itemgetter(1))[1][0] 
        minRatingValue = min(category[1],key=operator.itemgetter(1))[1][0] 
        maxRatingCount = max(category[1],key=lambda item:item[1][1])[1][1]
        minRatingCount = min(category[1],key=lambda item:item[1][1])[1][1]
        for business in category[1]:
            normalizedRatingValue = (float(business[1][0]) - minRatingValue)/(maxRatingValue - minRatingValue) * 9
            normalizedRatingCount = (float(business[1][1]) - minRatingCount)/(maxRatingCount - minRatingCount) * 1
            yield ((category[0], business[0]), normalizedRatingValue + normalizedRatingCount)   
                
tips_filtered = tips.mapPartitions(mapper_tips_filter)
joined_tips = tips_filtered.join(users_filtered).map(lambda x: (x[1][0][0],(x[1][0][1], x[1][1]))).groupByKey()
total_tips = joined_tips.mapPartitions(mapper_business_rating)
category_join_tips = total_tips.join(business_filtered).map(lambda x: (x[1][1], (x[0], x[1][0]))).groupByKey().mapValues(list)
total_normalized_tips = category_join_tips.mapPartitions(mapper_normalized_total_tips)
total_normalized_tips.take(20)

[((u'Bars', u'nx8B4SQC4pQnMoTVsMs0rg'), 2.2646718146718148),
 ((u'Bars', u'XKwbA7WVROVbLlPPzHZHgw'), 4.503861003861004),
 ((u'Bars', u'W-UBevF6tFgX4WiA8PvdPA'), 2.9795272624540914),
 ((u'Bars', u'oqL1hS934CyZc3msupQNRA'), 0.49555984555984545),
 ((u'Bars', u'D3a0vrdXzT_UkbPNhK8f7w'), 6.513127413127413),
 ((u'Bars', u'g8dcjPzIJGwY-u4GAeHxnQ'), 0.0),
 ((u'Bars', u'MnHlTUSAPev7YRbdZDv5ew'), 1.2108108108108109),
 ((u'Bars', u'cexR4j7jfgnTMbbb_YEDWg'), 2.706949806949807),
 ((u'Bars', u'CBoMK_j_QG-DdMIsqznTkQ'), 0.36545886545886525),
 ((u'Bars', u'-SB5exmdSotbAFh-NwMmbQ'), 2.462267462267462),
 ((u'Bars', u'1ESuAwWok4cI3dmnZ4G1NQ'), 0.444015444015444),
 ((u'Bars', u'pNozqOyMv4gq7IUYEdhpHA'), 0.008494208494208495),
 ((u'Bars', u'stPGLQ8-zTCnAWcp-PgBGA'), 5.4069498069498065),
 ((u'Bars', u'JWcfWhjO_cpfGsEDv8j2xg'), 2.248181263970738),
 ((u'Bars', u'TzImzfIWkZTnetKl2a4-SQ'), 1.4315070678707043),
 ((u'Bars', u'RJhYVlSttYwMFZzY0woV4w'), 2.9695316434446872),
 ((u'Bars', u'-yTdxWCadi8Kn1H05getKQ'), 6

In [5]:
#Checkins

def mapper_checkins_filter(records):
    for line in records:
        line = json.loads(line)
        checkin_info = line['checkin_info']
        for key, val in checkin_info.iteritems():
            yield (line['business_id'], val)

def mapper_normalization(records):
    for category in records:
        minCheckins = min(category[1], key=operator.itemgetter(1))[1]
        maxCheckins = max(category[1], key=operator.itemgetter(1))[1]
        for business in category[1]:
            normalizedRatingValue = (float(business[1]) - minCheckins)/(maxCheckins - minCheckins) * 20
            yield ((category[0], business[0]), normalizedRatingValue)    
        
checkins_filtered = checkins.mapPartitions(mapper_checkins_filter).reduceByKey(operator.add)
joined_checkins = checkins_filtered.join(business_filtered).map(lambda x: (x[1][1],(x[0],x[1][0]))).groupByKey().mapValues(list)
total_normalized_checkins = joined_checkins.mapPartitions(mapper_normalization)
total_normalized_checkins.take(20)

[((u'Coffee & Tea', u'sOcZcXcNm8LmdoOYqEDqpg'), 0.021596328624133895),
 ((u'Coffee & Tea', u'4K7nR-e9staGTNJk1ez7MA'), 0.039593269144245484),
 ((u'Coffee & Tea', u'ycutyiy4ozAX1t2wCIeP2w'), 0.04319265724826779),
 ((u'Coffee & Tea', u'OYZOzzipfXdWrYe1A2zeKw'), 0.025195716728156212),
 ((u'Coffee & Tea', u'0EXVIF07vw-QTTy9V7-ebg'), 0.18356879330513814),
 ((u'Coffee & Tea', u'OG27njKJGhjpesJb9McGPQ'), 0.10618194906865833),
 ((u'Coffee & Tea', u'qAkk0DaKLotxX2UzSPg9fQ'), 0.017996940520111582),
 ((u'Coffee & Tea', u'ziA1FCZrt9SMjlBlPo5EaQ'), 0.35813911635022044),
 ((u'Coffee & Tea', u'JJGBx48I4yudiEhw9fGbZw'), 0.026995410780167373),
 ((u'Coffee & Tea', u'sB0yyxeBcVhRxGd-lwR4yg'), 0.01619724646810042),
 ((u'Coffee & Tea', u'toDZ6L_a_0lWU9O1JtQRGg'), 0.31854584720597495),
 ((u'Coffee & Tea', u'SOtYTZQIB5-3EqIlFjo7lA'), 0.03059479888418969),
 ((u'Coffee & Tea', u'y3VxXuUvrhURPbvtF0GkTg'), 0.20876451003329435),
 ((u'Coffee & Tea', u'PkPcOZh1tQfQUV6bJRTrng'), 0.2267614505534059),
 ((u'Coffee & Te

In [6]:
def mapper_business_information(rows):
    for row in rows:
        row = json.loads(row)
        yield(row['business_id'], (row['name'], row['city'], row['latitude'], row['longitude']))

score = total_normalized_checkins.join(total_normalized_reviews).join(total_normalized_tips)
score = score.map(lambda x: (x[0][1], ((x[0][0], (x[1][0][0], x[1][0][1], x[1][1]), x[1][0][0] + x[1][0][1] + x[1][1]))))
business_information = businesses.mapPartitions(mapper_business_information)
business_information_joined = business_information.join(score)
business_information_joined.take(2)

[(u'OhmRdo9XL2ZWtpt7ewvPAg',
  ((u'Subway', u'Henderson', 36.0288225, -115.0624726),
   (u'Restaurants',
    (0.04447649957927635, 17.500991014799155, 1.2871830739639796),
    18.83265058834241))),
 (u'OhmRdo9XL2ZWtpt7ewvPAg',
  ((u'Subway', u'Henderson', 36.0288225, -115.0624726),
   (u'Fast Food',
    (0.16968585186883742, 17.505192107995846, 1.2921108742004264),
    18.96698883406511)))]

In [22]:
# (u'OhmRdo9XL2ZWtpt7ewvPAg',
#   ((u'Subway', u'Henderson', 36.0288225, -115.0624726),
#    (u'Restaurants',
#     (0.04447649957927635, 17.500991014799155, 1.2871830739639796),
#     18.83265058834241)))

def mapper_business_information(rows):
    for row in rows:
        row = json.loads(row)
        yield(row['business_id'], (row['name'], row['city'], row['latitude'], row['longitude']))

score = total_normalized_checkins.join(total_normalized_reviews).join(total_normalized_tips)
score = score.map(lambda x: (x[0][1], ((x[0][0], (x[1][0][0], x[1][0][1], x[1][1]), x[1][0][0] + x[1][0][1] + x[1][1]))))
business_information = businesses.mapPartitions(mapper_business_information)
business_information_joined = business_information.join(score)
business_information_joined.take(3)

import os

business_information_joined = business_information_joined.collect()
cities = ['LasVegas', 'Phoenix', 'Montreal', 'Edinburgh', 'Karlsruhe', 'Waterloo', 'Charlotte', 'Urbana', 'Madison', 'Champaign']
cities = Set(cities)



business = (u'OhmRdo9XL2ZWtpt7ewvPAg', ((u'Subway', u'Henderson', 36.0288225, -115.0624726),
   (u'Restaurants',
    (0.04447649957927635, 17.500991014799155, 1.2871830739639796),
    18.83265058834241)))

# print business[1][0][0]


data = {}
for business in business_information_joined:
    city = business[1][0][1].split('/')[0].replace(" ", "")
    if city in cities:
        category = business[1][1][0]
        if city in data:
            inner = data[city]
            if category in inner:
                inner[category].append((business[1][0][0], business[1][0][2], business[1][0][3], business[1][1][1][0], \
                                business[1][1][1][1], business[1][1][1][2],business[1][1][2]))
            else:
                inner[category] = [(business[1][0][0], business[1][0][2], business[1][0][3], business[1][1][1][0], \
                                business[1][1][1][1], business[1][1][1][2],business[1][1][2])]
            data[city] = inner
        else:
            inner = {}
            inner[category] = [(business[1][0][0], business[1][0][2], business[1][0][3], business[1][1][1][0], \
                                business[1][1][1][1], business[1][1][1][2],business[1][1][2])]
            data[city] = inner
            
# print data["LasVegas"]

with open('data.json', 'w') as fp:
    json.dump(data, fp)
    


# print data['Bars']
    
#     city = business[1][0][1].split('/')[0].replace(" ", "")
#     if city in cities:
#         mode = 'a' if os.path.exists(city) else 'w'
#         with open(city, mode) as f:
#             f.write(str(business) + "\n")
        


In [28]:
with open('data.json') as data_file:    
    data = json.load(data_file)
    print data["LasVegas"]["Bars"]


[[u'Gipsy', 36.105943, -115.151948, 0.026666666666666665, 16.059241700303787, 0.0023166023166023165, 16.088224969287054], [u'The Lodge At Tenaya', 36.2645546, -115.252457, 0.1575757575757576, 14.825921854141194, 3.8617760617760624, 18.845273673493015], [u"PT's", 36.0922493, -115.208489, 0.610909090909091, 20.437812940547374, 0.7390977443609025, 21.787819775817365], [u"Applebee's", 36.0977772186437, -115.296672616709, 0.8727272727272728, 17.4862845224292, 2.006497786985592, 20.365509582142064], [u'Kopper Keg', 36.284287, -115.285839, 0.33454545454545453, 21.69656312284284, 0.8259038259038256, 22.85701240329212], [u"Yorky's", 36.092069, -115.298114, 0.07515151515151515, 31.51385445530115, 3.7182375652963886, 35.30724353574905], [u'The Bounty Tavern', 35.9985376397497, -115.244087519182, 0.10424242424242425, 24.2766764553807, 3.0038610038610045, 27.38477988348413], [u'Whiskey Licker Bar', 36.1713276785166, -115.144672154371, 0.012121212121212121, 9.854687727869331, 9.0007722007722, 18.867