In [60]:
from pyspark.sql import SparkSession
import pickle
from pyspark.sql.types import Row
from nltk.corpus import wordnet as wn
from textblob import TextBlob

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.json("yelp_dataset/yelp_academic_dataset_review.json")

In [61]:
# Load business_id that we want

las_vegas_business_ids = pickle.load(open("util_data/las_vegas_business_ids.pkl", "rb"))

# Filter reviews that are made on las_vegas business

df = df[df['business_id'].isin(las_vegas_business_ids)]
df.show(5)



+--------------------+----+----------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|      date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+----------+-----+--------------------+-----+--------------------+------+--------------------+
|iCQpiavjjPzJ5_3gP...|   0|2011-02-25|    0|x7mDIiDB3jEiPGPHO...|    2|The pizza was oka...|     0|msQe1u7Z_XuqjGoqh...|
|pomGBqfbxcqPv14c3...|   0|2012-11-13|    0|dDl8zu1vWPdKGihJr...|    5|I love this place...|     0|msQe1u7Z_XuqjGoqh...|
|jtQARsP6P-LbkyjbO...|   1|2014-10-23|    1|LZp4UX5zK3e-c5ZGS...|    1|Terrible. Dry cor...|     3|msQe1u7Z_XuqjGoqh...|
|elqbBhBfElMNSrjFq...|   0|2011-02-25|    0|Er4NBWCmCD4nM8_p1...|    2|Back in 2005-2007...|     2|msQe1u7Z_XuqjGoqh...|
|Ums3gaP2qM3W1XcA5...|   0|2014-09-05|    0|jsDu6QEJHbwP2Blom...|    5|Delicious healthy...|     0|msQe1u7Z_XuqjGoqh...|
+--------------------+----+-----

In [62]:
# Get all wordnet food names

food = wn.synset('food.n.02')
food_list = list(set([w.lower() for s in food.closure(lambda s:s.hyponyms()) for w in s.lemma_names()]))

In [65]:
# Extracting utilitiy matrix

def extract_food_in_text(text):
    ret_text = text.split()
    
    ret_text = [words for words in ret_text if words.lower() in food_list]
    
    return " ".join(ret_text)

def find_count(text):
    d = {}
    
    for word in text.split():
        if word in d:
            d[word] += 1
        else:
            d[word] = 1
    
    return d

def mix_dict(a,b):
    for key in a.keys():
        if key in b:
            b[key] += a[key]
        else:
            b[key] = a[key]
    
    return b

def map_to_category(food_counts, rating):
    return food_counts

def extract_food_and_sentiment(review):
    global_dict = {}
    
    for sentence in review.split("."):
        sentiment = get_sentiment(sentence)
        food_det = find_count(extract_food_in_text(sentence))
        
        for food in food_det:
            if food in global_dict:
                global_dict[food] += sentiment
            else:
                global_dict[food] = sentiment
    
    return global_dict
        
def get_sentiment(sentence):
    sentence = TextBlob(sentence)
    return sentence.sentiment.polarity

food_review_df = spark.createDataFrame(df.rdd.map(lambda row: Row(row['business_id'], 
                            map_to_category(extract_food_and_sentiment(row['text']),
                                            row['stars'])
                                                                   ,row['user_id']))).toDF(
    "business_id","stars","user_id")
food_review_df.show(5)

+--------------------+--------------------+--------------------+
|         business_id|               stars|             user_id|
+--------------------+--------------------+--------------------+
|iCQpiavjjPzJ5_3gP...|                  []|msQe1u7Z_XuqjGoqh...|
|pomGBqfbxcqPv14c3...|     [cheese -> 0.5]|msQe1u7Z_XuqjGoqh...|
|jtQARsP6P-LbkyjbO...|[bread -> -0.0666...|msQe1u7Z_XuqjGoqh...|
|elqbBhBfElMNSrjFq...|[plate -> 0.35, c...|msQe1u7Z_XuqjGoqh...|
|Ums3gaP2qM3W1XcA5...|[Fish -> 1.0, ste...|msQe1u7Z_XuqjGoqh...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [31]:
# get frequent food from the extracted food words from review

z = df.rdd.map(lambda row: find_count(extract_food_in_text(row['text'])))
z = z.reduce(lambda x,y: mix_dict(x,y))

# lower cased food items

food = {}

for food_val in z.keys():
    if food_val.lower() in food:
        food[food_val.lower()].append(food_val)
    else:
        food[food_val.lower()] = [food_val]

for food_val in food.keys():
    count = 0
    
    for similar_names in food[food_val]:
        count += z[similar_names]
    
    food[food_val] = count

food_names = set(food.keys())
food_frequency = food
list(food_frequency.items())[:5]

[('salmon', 25140),
 ('pork', 56494),
 ('chicken', 183237),
 ('lox', 930),
 ('potato', 26958)]

In [34]:
# Store the frequency of different food used

pickle.dump(food_frequency, open("util_data/food_frequency_in_review.pkl", "wb"))


with open("util_data/food_frequency.txt","w+") as food_file:
    for food, count in sorted(list(food_frequency.items()), key=lambda x:x[1], reverse=True):
        food_file.write(str(food)+":"+str(count)+"\n")