# Assignment: Scalable Processing
## Yelp Reviews and Authenticity

course name | by ___ | ____@itu.dk | date

#### Flags & imports

In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.classification import LinearSVC, OneVsRest, LogisticRegression
from pyspark.ml.regression import LinearRegression, GeneralizedLinearRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator

In [2]:
code_development = False

#### Words and strings

In [3]:
auth_words = ['authentic', 'legit', 'credible', 'original'] #Note - these would also capture 'authenticity' 'legitimately' etc. 
bad_words = ['dirty', 'kitsch', 'cheap', 'rude', 'simple', 'tacky', 'sleazy', 'crude', 'messy', 'vulgar']

auth_string = ""
for i, w in enumerate(auth_words):
    auth_string += f'lower(text) LIKE "%{w}%"'
    if i < len(auth_words)-1:
        auth_string += " OR "

bad_string = ""
for i, w in enumerate(bad_words):
    bad_string += f'lower(text) LIKE "%{w}%"'
    if i < len(bad_words)-1:
        bad_string += " OR "

In [4]:
asian_cuisine_keywords = ['asian', 'chinese', 'sushi', 'vietnamese', 'thai', 'japanese', 'korean', 'indian', 'cantonese',
                          'afghan', 'poke', 'filipino', 'indonesian', 'russian']
southamer_cuisine_keywords = ['mexican', 'caribbean', 'brazilian', 'latin american', 'cuban', 'argentine', 'honduran', 'peruvian']
middleeast_cuisine_keywords = ['middle eastern', 'persian', 'turkish', 'arabic', 'lebanese', 'halal', 'moroccan', 'africa', 'ethiopian']
euro_cuisine_keywords = ['italian', 'greek', 'spanish', 'european', 'french', 'tapas', 'british', 'creperies', 'pizza', 'pretzel',
                        'fondue', 'german', 'gelato', 'ukrainian', 'brasseries', 'patisserie', 'portuguese']
american_cuisine_keywords = ['fast food', 'burgers', 'barbeque', 'diner', 'cheesesteak', 'frozen yogurt', 'donut', 'waffle', 'brunch',
                            'steakhouse', 'hot dog', 'cajun', 'american', 'cupcake', 'canadian', 'southern', 'hawaii']

asia_string = ""
for i, w in enumerate(asian_cuisine_keywords):
    asia_string += f'lower(categories) LIKE "%{w}%"'
    if i < len(asian_cuisine_keywords)-1:
        asia_string += " OR "
        
southamer_string = ""
for i, w in enumerate(southamer_cuisine_keywords):
    southamer_string += f'lower(categories) LIKE "%{w}%"'
    if i < len(southamer_cuisine_keywords)-1:
        southamer_string += " OR "

middleeast_string = ""
for i, w in enumerate(middleeast_cuisine_keywords):
    middleeast_string += f'lower(categories) LIKE "%{w}%"'
    if i < len(middleeast_cuisine_keywords)-1:
        middleeast_string += " OR "

euro_string = ""
for i, w in enumerate(euro_cuisine_keywords):
    euro_string += f'lower(categories) LIKE "%{w}%"'
    if i < len(euro_cuisine_keywords)-1:
        euro_string += " OR "
        
amer_string = ""
for i, w in enumerate(american_cuisine_keywords):
    amer_string += f'lower(categories) LIKE "%{w}%"'
    if i < len(american_cuisine_keywords)-1:
        amer_string += " OR "

## Connecting to the Spark Cluster job using the two JobParameters.json

To connect this jupyter notebook with your Spark cluster, we need to tell jupyter how it can access the spark cluster. Below code accomplishes that. 

In [5]:
#####################################################################
# DO NOT CHANGE ANYTHING HERE.
# IF YOU HAVE PROBLEMS, CHECK THE ASSIGNMENT GUIDE CAREFULLY 
#####################################################################
    
FIRST_EXECUTION = False
# Only execute this cell once.
if not FIRST_EXECUTION:
    FIRST_EXECUTION = True
    import os, json, pyspark
    from pyspark.conf import SparkConf
    from pyspark.sql import SparkSession, functions as F

    # Two files are automatically read: JobParameters.json for the Spark Cluster job using a temporary spark instance
    # and JobParameters.json for the Jupyter Lab job to extract the hostname of the cluster. 

    MASTER_HOST_NAME = None

    # Open the parameters Jupyter Lab app was launched with
    with open('/work/JobParameters.json', 'r') as file:
        JUPYTER_LAB_JOB_PARAMS = json.load(file)
        # from pprint import pprint; pprint(JUPYTER_LAB_JOB_PARAMS) 
        for resource in JUPYTER_LAB_JOB_PARAMS['request']['resources']:
            if 'hostname' in resource.keys():
                MASTER_HOST_NAME = resource['hostname']

    MASTER_HOST = f"spark://{MASTER_HOST_NAME}:7077"

    conf = SparkConf().setAll([
            ("spark.app.name", 'reading_job_params_app'), 
            ("spark.master", MASTER_HOST),
        ])
    spark = SparkSession.builder.config(conf=conf)\
                                .getOrCreate()

    CLUSTER_PARAMETERS_JSON_DF = spark.read.option("multiline","true").json('/work/JobParameters.json')

    # Extract cluster info from the specific JobParameters.json
    NODES = CLUSTER_PARAMETERS_JSON_DF.select("request.replicas").first()[0]
    CPUS_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select("machineType.cpu").first()[0] - 1
    MEM_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select("machineType.memoryInGigs").first()[0]

    CLUSTER_CORES_MAX = CPUS_PER_NODE * NODES
    CLUSTER_MEMORY_MAX = MEM_PER_NODE * NODES 

    EXECUTOR_CORES = CPUS_PER_NODE - 1  # set cores per executor on worker node
    EXECUTOR_MEMORY = int(
        MEM_PER_NODE / (CPUS_PER_NODE / EXECUTOR_CORES) * 0.5
    )  # set executor memory in GB on each worker node

    # Make sure there is a dir for spark logs
    if not os.path.exists('spark_logs'):
        os.mkdir('spark_logs')

    conf = SparkConf().setAll(
        [
            ("spark.app.name", 'spark_assignment'), # Change to your liking 
            ("spark.sql.caseSensitive", False), # Optional: Make queries strings sensitive to captialization
            ("spark.master", MASTER_HOST),
            ("spark.cores.max", CLUSTER_CORES_MAX),
            ("spark.executor.cores", EXECUTOR_CORES),
            ("spark.executor.memory", str(EXECUTOR_MEMORY) + "g"),
            ("spark.eventLog.enabled", True),
            ("spark.eventLog.dir", "spark_logs"),
            ("spark.history.fs.logDirectory", "spark_logs"),
            ("spark.deploy.mode", "cluster"),
        ]
    )

    ## check executor memory, taking into accout 10% of memory overhead (minimum 384 MiB)
    CHECK = (CLUSTER_CORES_MAX / EXECUTOR_CORES) * (
        EXECUTOR_MEMORY + max(EXECUTOR_MEMORY * 0.10, 0.403)
    )

    assert (
        int(CHECK) <= CLUSTER_MEMORY_MAX
    ), "Executor memory larger than cluster total memory!"

    # Stop previous session that was just for loading cluster params
    spark.stop()

    # Start new session with above config, that has better resource handling
    spark = SparkSession.builder.config(conf=conf)\
                                .getOrCreate()
    sc = spark.sparkContext

Click on the "SparkMonitor" tab at the top in Jupyter Lab to see the status of running code on the cluster.

## Loading the data
Here we specify where the yelp datasets are located on UCloud and read then using the spark session.

In [6]:
# Read in the business and review files
# This is the path to the shared datasets provided by adding an the dataset input folder
# when submitting the spark cluster job.
business = spark.read.json('file:////work/yelp/yelp_academic_dataset_business.json') # Use the file:/// prefix to indicate we want to read from the cluster's filesystem
business = business.persist()
# Persist 2 commonly used dataframes since they're used for later computations
# https://sparkbyexamples.com/spark/spark-difference-between-cache-and-persist/

users = spark.read.json("file:////work/yelp/yelp_academic_dataset_user.json")

reviews = spark.read.json('file:////work/yelp/yelp_academic_dataset_review.json')
reviews = reviews.persist()

In [7]:
# OPTIONAL:
# Reduce resource usage and make queries run faster
# by only using a small sample of the dataframe
# and overwriting previous variable "df".
# Useful while developing, not so much to
# provide final answers. Therefore: Remember to 
# to re-read the df when done developing code using
# df = spark.read etc like above.
if code_development:
    reviews = reviews.sample(withReplacement=False, fraction=1/50)
    users = users.sample(withReplacement=False, fraction=1/50)

Example: Say we're only interested in reviews of good mexican restaurants in Arizona. You can delete this when you do your own thing. 

## Part 1 - Definitive Questions
#### 1 - Analyze business.json to find the total number of reviews for all businesses. The output should be in the form of a Spark DataFrame with one value representing the count

In [8]:
p1_1 = business.agg({"review_count":"sum"})
p1_1.toPandas().to_csv("results/p1_1.csv", header=False, index=False, encoding='utf-8')
p1_1.show()

+-----------------+
|sum(review_count)|
+-----------------+
|          6745508|
+-----------------+



#### 2 - Analyze business.json to find all businesses that have received 5 stars and that have been reviewed by 500 or more users. The output should be in the form of DataFrame of (name, stars, review count).

In [9]:
p1_2 = business.filter(business.stars == 5.0).filter(business.review_count >= 500).select('name', 'stars', 'review_count')
p1_2.toPandas().to_csv("results/p1_2.csv", header=True, index=False, encoding='utf-8')
p1_2.show(5)

+------------------+-----+------------+
|              name|stars|review_count|
+------------------+-----+------------+
|   Blues City Deli|  5.0|         991|
|          Tumerico|  5.0|         705|
|Free Tours By Foot|  5.0|         769|
|              Yats|  5.0|         623|
| SUGARED + BRONZED|  5.0|         513|
+------------------+-----+------------+
only showing top 5 rows



#### 3 - Analyze user.json to find the influencers who have written more than 1000 reviews. The output should be in the form of DataFrame of user id.

In [10]:
p1_3 = users.filter(users.review_count > 1000).select('user_id')
p1_3.toPandas().to_csv("results/p1_3.csv", header=True, index=False, encoding='utf-8')
p1_3.show(5)

+--------------------+
|             user_id|
+--------------------+
|j14WgRoU_-2ZE1aw1...|
|q_QQ5kBBwlCcbL1s4...|
|MGPQVLsODMm9ZtYQW...|
|NIhcRW6DWvk1JQhDh...|
|QJI9OSEn6ujRCtrX0...|
+--------------------+
only showing top 5 rows



#### 4 - Analyze review.json, business.json, and a view created from your answer to Q3 to find the businesses that have been reviewed by more than 5 influencer users.

In [11]:
influencer_reviews = business.join(reviews, 'business_id', 'left').join(p1_3, 'user_id', 'inner')
influencer_reviews = influencer_reviews.groupby(business.name).agg({'review_id':'count'}).withColumnRenamed("count(review_id)", "no_reviews")

p1_4 = influencer_reviews.filter(influencer_reviews.no_reviews >= 5)
p1_4.toPandas().to_csv("results/p1_4.csv", header=True, index=False, encoding='utf-8')
p1_4.show(5)

+--------------------+----------+
|                name|no_reviews|
+--------------------+----------+
|  Costa's Restaurant|         6|
|Applebee's Grill ...|        94|
|  Fat Bottom Brewing|        10|
|    McAlister's Deli|        60|
|     Pinewood Social|        27|
+--------------------+----------+
only showing top 5 rows



#### 5 - Analyze review.json and user.json to find an ordered list of users based on the average star counts they have given in all their reviews.

In [12]:
p1_5 = reviews.groupby('user_id').agg({'stars':'avg'}).withColumnRenamed("avg(stars)", "avg_stars")
p1_5 = p1_5.orderBy('avg(stars)', ascending=False)
#this fucking collect in the to_pandas method doesn't work on this bullshit
#p1_5.toPandas().to_csv("results/p1_5.csv", header=True, index=False, encoding='utf-8') 
p1_5.show(5)

+--------------------+---------+
|             user_id|avg_stars|
+--------------------+---------+
|HTNGyJVoSGHgY5G9A...|      5.0|
|dsJ60MzCrBUo6MKVA...|      5.0|
|6TPO2yFzqUO-XL4gj...|      5.0|
|ygLSJmIj3bVFLBdnP...|      5.0|
|aFUiejZyIBzjojR2x...|      5.0|
+--------------------+---------+
only showing top 5 rows



## Part 2 - Authenticity Study
### Data Cleaning

In [13]:
business.createOrReplaceTempView('business')

In [14]:
spark.sql(f"""
    SELECT address, business_id, city, latitude, longitude, name, postal_code, review_count, stars, state,
        CASE WHEN {asia_string} THEN 'Asian Cuisine'
            WHEN {southamer_string} THEN 'South American Cuisine'
            WHEN {middleeast_string} Then 'Middle Eastern / African Cuisine'
            WHEN {euro_string} THEN 'European Cuisine' 
            WHEN {amer_string} THEN 'American Cuisine'
            ELSE 'Unknown'
        END AS cuisine
    FROM business
    WHERE (lower(          categories) LIKE '%restaurant%')
        AND lower(          categories) NOT LIKE '%bar%' AND lower(          categories) NOT LIKE '%hotel%' 
        AND lower(          categories) NOT LIKE '%arts & entertainment%' AND lower(          categories) NOT LIKE '%shopping%'
        AND lower(          categories) NOT LIKE '%services%' AND lower(          categories) NOT LIKE '%store%'
    """).toPandas().to_csv('results/business_with_cuisines.csv', index=False)

### Creating Views

In [15]:
reviews.createOrReplaceTempView('reviews')
business.createOrReplaceTempView('business')
users.createOrReplaceTempView('users')

restaurants = spark.read.option("header", "true").csv('file:////work/results/business_with_cuisines.csv')
restaurants.createOrReplaceTempView('restaurants')

### Question Answering
#### Q1: What is the percentage of reviews containing a variant of the word "authentic"?

In [16]:
P2_1 = spark.sql("""
    SELECT count(*) as total_reviews, sum(auth) as with_authentic, sum(auth) / count(*) * 100 as pct
    FROM (
        SELECT CASE WHEN lower(text) LIKE "%authentic%" THEN 1 ELSE 0 END as auth
        FROM reviews
        )
    """)

P2_1.toPandas().to_csv("results/p2_1.csv", header=True, index=False, encoding='utf-8')
P2_1.show()

+-------------+--------------+------------------+
|total_reviews|with_authentic|               pct|
+-------------+--------------+------------------+
|      6990280|        124634|1.7829614836601682|
+-------------+--------------+------------------+



#### Q2: How many reviews contain the string "legitimate" grouped by type of cuisine?

In [17]:
P2_2 = spark.sql("""
    SELECT cuisine, count(r.text) as legit_cnt, count(*) as tot_reviews, count(r.text) / count(*) * 100 as percent
    FROM restaurants b
        LEFT JOIN (select business_id, text FROM reviews WHERE text LIKE "%legitimate%") r
            ON b.business_id = r.business_id
    GROUP BY cuisine
    """)

P2_2.toPandas().to_csv("results/p2_2.csv", header=True, index=False, encoding='utf-8')
P2_2.orderBy(P2_2.legit_cnt, ascending=False).show()

+--------------------+---------+-----------+------------------+
|             cuisine|legit_cnt|tot_reviews|           percent|
+--------------------+---------+-----------+------------------+
|    American Cuisine|      501|      12730|3.9355852317360567|
|    European Cuisine|      319|       8005| 3.985009369144285|
|       Asian Cuisine|      273|       5738|  4.75775531544092|
|South American Cu...|      219|       4624| 4.736159169550173|
|             Unknown|      120|       4149|2.8922631959508314|
|Middle Eastern / ...|       39|        692| 5.635838150289017|
+--------------------+---------+-----------+------------------+



#### Q3 Is there a difference in the amount of authenticity language used in the different areas? (e.g., by state, north/south, urban/rural)

In [18]:
P2_3 = spark.sql("""
    SELECT b.city, b.state, count(*) as total_reviews, sum(r.auth) as with_authentic, sum(r.auth) / count(*) * 100 as pct
    FROM (
        SELECT *, CASE WHEN lower(text) LIKE "%authentic%" THEN 1 ELSE 0 END as auth
        FROM reviews
        ) r
        LEFT JOIN business b ON b.business_id = r.business_id
    GROUP BY b.city, b.state
    """)

P2_3.toPandas().to_csv("results/p2_3.csv", header=True, index=False, encoding='utf-8')
P2_3.orderBy(P2_3.pct).show()

+-------------------+-----+-------------+--------------+---+
|               city|state|total_reviews|with_authentic|pct|
+-------------------+-----+-------------+--------------+---+
|      Pass-a-Grille|   FL|            5|             0|0.0|
|              Bosie|   ID|           21|             0|0.0|
|            Delanco|   NJ|           55|             0|0.0|
|              Lemay|   MO|           33|             0|0.0|
|        TWN N CNTRY|   FL|            8|             0|0.0|
|              Erial|   NJ|           52|             0|0.0|
|Holland Southampton|   PA|           15|             0|0.0|
|        Talleyville|   DE|          253|             0|0.0|
|            Bellair|   FL|           11|             0|0.0|
|         Petersburg|   FL|           68|             0|0.0|
|            Truckee|   CA|          198|             0|0.0|
|           Lavergne|   TN|           13|             0|0.0|
|      Carneys Point|   NJ|          154|             0|0.0|
|    Monroe Township|   

### Hypothesis Testing
#### We want to test whether it is actually true that authenticity-implying words and down-grading words are generally present within the reviews of restaurants of specific cuisines

In [19]:
query_text = f"""
    SELECT b.cuisine,
        count(*) as total_reviews, 
        sum(r.auth) as with_authentic, 
        sum(r.bad_review) as bad_reviews, 
        sum(r.auth_in_bad_review) as bad_review_with_auth,
        sum(r.auth) / count(*) * 100 as with_authentic_pct, 
        sum(r.bad_review) / count(*) * 100 as bad_reviews_pct, 
        sum(r.auth_in_bad_review) / count(*) * 100 as bad_review_with_auth_pct,
        sum(r.auth_in_bad_review) / sum(r.bad_review) * 100 as bad_bad_review_with_auth_pct
    FROM (
        SELECT business_id, 
            CASE WHEN {auth_string} THEN 1 ELSE 0 END as auth,
            CASE WHEN {bad_string} THEN 1 ELSE 0 END as bad_review,
            CASE WHEN ({auth_string}) AND ({bad_string}) THEN 1 ELSE 0 END as auth_in_bad_review
        FROM reviews
        ) r
        LEFT JOIN restaurants b ON b.business_id = r.business_id
    WHERE b.cuisine is not null
    GROUP BY b.cuisine
    """
hypo_test = spark.sql(query_text)

In [20]:
hypo_test.orderBy(hypo_test.bad_bad_review_with_auth_pct, ascending=False).toPandas().to_csv('results/bad_and_auth_cuisines.csv')
hypo_test.orderBy(hypo_test.bad_bad_review_with_auth_pct, ascending=False).show()

+--------------------+-------------+--------------+-----------+--------------------+------------------+------------------+------------------------+----------------------------+
|             cuisine|total_reviews|with_authentic|bad_reviews|bad_review_with_auth|with_authentic_pct|   bad_reviews_pct|bad_review_with_auth_pct|bad_bad_review_with_auth_pct|
+--------------------+-------------+--------------+-----------+--------------------+------------------+------------------+------------------------+----------------------------+
|South American Cu...|       338048|         40129|      28059|                3624|11.870799413101098| 8.300300549034457|      1.0720371071563801|           12.91564203998717|
|       Asian Cuisine|       432387|         37533|      34451|                3178| 8.680418236440966| 7.967630849216095|      0.7349897198574425|           9.224695944965312|
|Middle Eastern / ...|        57644|          5614|       3821|                 342|  9.73908819651655| 6.628617028

### By State & Cuisine

In [21]:
query_text = f"""
    SELECT b.cuisine, b.state,
        count(*) as total_reviews, 
        sum(r.auth) as with_authentic, 
        sum(r.bad_review) as bad_reviews, 
        sum(r.auth_in_bad_review) as bad_review_with_auth,
        sum(r.auth) / count(*) * 100 as with_authentic_pct, 
        sum(r.bad_review) / count(*) * 100 as bad_reviews_pct, 
        sum(r.auth_in_bad_review) / count(*) * 100 as bad_review_with_auth_pct,
        sum(r.auth_in_bad_review) / sum(r.bad_review) * 100 as bad_bad_review_with_auth_pct
    FROM 
        restaurants b
        LEFT JOIN(
        SELECT business_id, 
            CASE WHEN {auth_string} THEN 1 ELSE 0 END as auth,
            CASE WHEN {bad_string} THEN 1 ELSE 0 END as bad_review,
            CASE WHEN ({auth_string}) AND ({bad_string}) THEN 1 ELSE 0 END as auth_in_bad_review
        FROM reviews
        ) r ON b.business_id = r.business_id
    WHERE b.review_count > 0
    GROUP BY b.cuisine, b.state
    """
hypo_test_by_state = spark.sql(query_text)

In [22]:
hypo_test_by_state.toPandas().to_csv('results/bad_and_auth_cuisines_by_state.csv', index=False)
hypo_test_by_state.show()

+--------------------+-----+-------------+--------------+-----------+--------------------+------------------+------------------+------------------------+----------------------------+
|             cuisine|state|total_reviews|with_authentic|bad_reviews|bad_review_with_auth|with_authentic_pct|   bad_reviews_pct|bad_review_with_auth_pct|bad_bad_review_with_auth_pct|
+--------------------+-----+-------------+--------------+-----------+--------------------+------------------+------------------+------------------------+----------------------------+
|South American Cu...|   PA|        56176|          7554|       4865|                 765|13.447023639988606| 8.660281970948448|      1.3617915123896325|          15.724563206577596|
|       Asian Cuisine|   AB|        14862|          1225|       1297|                 108| 8.242497645000672| 8.726954649441529|      0.7266855066612838|           8.326908249807248|
|    European Cuisine|   ID|        10721|           696|        811|                

### Ranking Cuisines By State, 
Based On percentage of bad reviews containing authenticity language

In [23]:
by_state = spark.read.option("header", "true").csv('file:////work/results/bad_and_auth_cuisines_by_state.csv')
by_state.createOrReplaceTempView('by_state')

In [24]:
rank_by_state = spark.sql("""
SELECT state, cuisine,
    RANK() OVER (PARTITION BY state ORDER BY float(bad_bad_review_with_auth_pct) DESC) as rank,
    bad_bad_review_with_auth_pct
FROM by_state
""")

rank_by_state.show()

+-----+--------------------+----+----------------------------+
|state|             cuisine|rank|bad_bad_review_with_auth_pct|
+-----+--------------------+----+----------------------------+
|   AB|South American Cu...|   1|           23.83419689119171|
|   AB|    European Cuisine|   2|           9.069767441860465|
|   AB|Middle Eastern / ...|   3|                     8.59375|
|   AB|       Asian Cuisine|   4|           8.326908249807248|
|   AB|             Unknown|   5|           6.388888888888888|
|   AB|    American Cuisine|   6|           3.494837172359015|
|   AZ|South American Cu...|   1|           8.911483253588516|
|   AZ|Middle Eastern / ...|   2|                        8.75|
|   AZ|       Asian Cuisine|   3|           8.035714285714286|
|   AZ|    European Cuisine|   4|           5.328504914640455|
|   AZ|             Unknown|   5|           3.898635477582846|
|   AZ|    American Cuisine|   6|          3.2616402741668638|
|   CA|South American Cu...|   1|          13.648528099

In [25]:
rank_by_state_pivot = rank_by_state.toPandas()
rank_by_state_pivot = rank_by_state_pivot.pivot(index='state', columns='rank', values='cuisine')

In [26]:
rank_by_state_pivot.to_csv('results/ranked_cuisines.csv')

In [37]:
cuisine_avg_rank = rank_by_state.groupBy('cuisine').agg({'rank':'avg', 'bad_bad_review_with_auth_pct':'avg'}).orderBy('avg(rank)')

In [38]:
cuisine_avg_rank.toPandas().to_csv('results/ranked_cuisines_avg.csv')
cuisine_avg_rank.show()

+--------------------+---------------------------------+------------------+
|             cuisine|avg(bad_bad_review_with_auth_pct)|         avg(rank)|
+--------------------+---------------------------------+------------------+
|South American Cu...|               13.310548375405846|1.2142857142857142|
|       Asian Cuisine|                8.536128832271766|2.7142857142857144|
|Middle Eastern / ...|                7.533637451779849|               3.0|
|    European Cuisine|                6.079933772364769| 3.642857142857143|
|             Unknown|                4.462931935634457|               4.4|
|    American Cuisine|                4.091874099768016|            5.1875|
+--------------------+---------------------------------+------------------+



# Part 3 - Machine Learning
#### We want to predict the rating in a review given the review text etc.
## Data Gathering
#### We want to extract meaningful features in order to get good predictions on the rating

In [48]:
review_features = spark.sql(f"""
    SELECT 
        text, 
        float(u.average_stars) as user_avg_stars, 
        int(u.review_count) as user_review_cnt,
        b.state as business_state,
        float(b.stars) as business_avg_stars,
        int(b.review_count) as business_review_cnt,
        CASE WHEN {auth_string} THEN 1 ELSE 0 END as auth,
        CASE WHEN {bad_string} THEN 1 ELSE 0 END as bad_review,
        int(r.stars) as label
    FROM reviews r
        LEFT JOIN users u ON r.user_id = u.user_id
        LEFT JOIN business b ON r.business_id = b.business_id
    """)
review_features.show()

+--------------------+--------------+---------------+--------------+------------------+-------------------+----+----------+-----+
|                text|user_avg_stars|user_review_cnt|business_state|business_avg_stars|business_review_cnt|auth|bad_review|label|
+--------------------+--------------+---------------+--------------+------------------+-------------------+----+----------+-----+
|Good beer but pri...|          4.33|              9|            DE|               4.5|                 49|   0|         0|    3|
|Off the beaten pa...|          4.23|             43|            PA|               4.5|                139|   0|         0|    5|
|Ocean View on a b...|          4.06|             15|            FL|               3.5|                662|   0|         0|    4|
|I have been searc...|          4.17|             29|            PA|               4.0|                120|   1|         0|    5|
|My favorite nail ...|           5.0|              1|            FL|               2.5|   

## Data Splitting & Preprocessing

In [49]:
train, test = review_features.randomSplit([0.9, 0.1], seed=12345)

In [50]:
assembler = VectorAssembler(
    inputCols=["user_avg_stars", "user_review_cnt", "business_avg_stars", "business_review_cnt", "auth", "bad_review"],
    outputCol="features", handleInvalid="skip")

indexer = StringIndexer(inputCol="business_state", outputCol="business_state_index", handleInvalid="keep")
encoder = OneHotEncoder(inputCols=["business_state_index"], outputCols=["business_state_1hot"], handleInvalid="keep")
assembler_w_state = VectorAssembler(
    inputCols=["user_avg_stars", "user_review_cnt", "business_avg_stars", "business_review_cnt", "auth", "bad_review", "business_state_1hot"],
    outputCol="features", handleInvalid="skip")

In [51]:
pre_pipe = Pipeline(stages=[assembler])
m = pre_pipe.fit(train)
train_pre = m.transform(train)
test_pre = m.transform(test)

pre_pipe_w_state = Pipeline(stages=[indexer, encoder, assembler_w_state])
m_w_state = pre_pipe_w_state.fit(train)
train_pre_w_state = m_w_state.transform(train)
test_pre_w_state = m_w_state.transform(test)

## Classifier 

In [52]:
lr = LinearRegression(maxIter=20, regParam=0.3, elasticNetParam=0.8)
lrModel = lr.fit(train_pre)

In [53]:
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
# print("numIterations: %d" % trainingSummary.totalIterations)
# print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
# trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: [0.5814306339455517,0.0,0.41635519001146554,0.0,0.0,0.0]
Intercept: 0.008366747345772422
RMSE: 1.148190
r2: 0.397000


In [54]:
predictions = lrModel.transform(test_pre)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)

+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
|3.8472007116727878|    5|[3.38000011444091...|
| 2.833109357552433|    1|[2.71000003814697...|
|  4.46933131255598|    5|[4.44999980926513...|
|3.9355527344343315|    5|[3.89000010490417...|
| 3.250601919784063|    1|[3.06999993324279...|
+------------------+-----+--------------------+
only showing top 5 rows



In [55]:
print("Test Error\nRMSE: ", rmse, "\nr2: ", r2)

Test Error
RMSE:  1.1491631769556325 
r2:  0.39669710339085373


## Classifier with state

In [56]:
lr_w_state = LinearRegression(maxIter=20, regParam=0.3, elasticNetParam=0.8)
lrModel_w_state = lr_w_state.fit(train_pre_w_state)

In [57]:
print("Coefficients: %s" % str(lrModel_w_state.coefficients))
print("Intercept: %s" % str(lrModel_w_state.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel_w_state.summary
# print("numIterations: %d" % trainingSummary.totalIterations)
# print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
# trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: [0.5814304877605618,0.0,0.4163546147294982,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]
Intercept: 0.00836945309654996
RMSE: 1.148190
r2: 0.397000


In [59]:
predictions = lrModel_w_state.transform(test_pre_w_state)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)

+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
|3.8472003345494294|    5|(34,[0,1,2,3,6],[...|
|2.8331099412959806|    1|(34,[0,1,2,3,10],...|
| 4.469330779014727|    5|(34,[0,1,2,3,5,7]...|
| 3.935552570397614|    5|(34,[0,1,2,3,15],...|
| 3.250602163260046|    1|(34,[0,1,2,3,13],...|
+------------------+-----+--------------------+
only showing top 5 rows



In [60]:
print("Test Error\nRMSE: ", rmse, "\nr2: ", r2)

Test Error
RMSE:  1.1491633010564772 
r2:  0.3966969730866381


## T-test

In [7]:
from statsmodels.stats.weightstats import ttest_ind
import numpy as np
import pandas as pd

In [13]:
rank_by_state = pd.melt(rank_by_state_pivot, id_vars='state', value_vars=['1', '2', '3', '4', '5', '6'])

In [14]:
rank_by_state

Unnamed: 0,state,variable,value
0,AB,1,Unknown
1,AZ,1,South American Cuisine
2,CA,1,South American Cuisine
3,CO,1,Unknown
4,DE,1,American Cuisine
...,...,...,...
97,NC,6,
98,NJ,6,Middle Eastern / African Cuisine
99,NV,6,Middle Eastern / African Cuisine
100,PA,6,Middle Eastern / African Cuisine


In [25]:
n_western_ranks = []
n_western_ranks += list(rank_by_state[rank_by_state['value'] == 'South American Cuisine']['variable'])
n_western_ranks += list(rank_by_state[rank_by_state['value'] == 'Middle Eastern / African Cuisine']['variable'])
n_western_ranks += list(rank_by_state[rank_by_state['value'] == 'Asian Cuisine']['variable'])

In [26]:
western_ranks = []
western_ranks += list(rank_by_state[rank_by_state['value'] == 'American Cuisine']['variable'])
western_ranks += list(rank_by_state[rank_by_state['value'] == 'European Cuisine']['variable'])

In [35]:
n_western_ranks = np.array(n_western_ranks, dtype=int)
western_ranks = np.array(western_ranks, dtype=int)
print(np.var(n_western_ranks), np.var(western_ranks))

3.4285714285714284 2.062222222222222


In [38]:
ttest_ind(n_western_ranks, western_ranks, alternative='larger', usevar='unequal')

(2.711656775104988, 0.004215939262446738, 69.41344616522615)