# Assignment: Scalable Processing
## Yelp Reviews and Authenticity

Large Scale Data Analysis / Big Data Management | by Tobias Lysdal Hansen | tolh@itu.dk | October 2024

## Connecting to the Spark Cluster job using the two JobParameters.json

To connect this jupyter notebook with your Spark cluster, we need to tell jupyter how it can access the spark cluster. Below code accomplishes that. Do not worry about how it works, just run the cell once to connect. 

In [1]:
#####################################################################
# DO NOT CHANGE ANYTHING HERE.
# IF YOU HAVE PROBLEMS, CHECK THE ASSIGNMENT GUIDE CAREFULLY 
#####################################################################
from IPython.display import Javascript, display
import jupyterlab
import os, json, pyspark
from pyspark.sql import SparkSession, functions as F
from pyspark.conf import SparkConf
from py4j.protocol import Py4JJavaError


def show_popup(message):
    display(Javascript(f'alert("{message}")'))

def check_correct_file_location():
    items = os.listdir('/work')
    items_expected = ['yelp', 'Home','JobParameters.json']
    if sorted(items) != sorted(items_expected):
        notebook_location = os.getcwd()
        items_to_be_moved = [item for item in items if item not in items_expected and item[0] != '.'] # Ignore hidden files starting with .
        show_popup(f"Warning: Found these files {items_to_be_moved} that should (most likely) be moved inside your Home folder. Make sure your Git repository and notebooks are all saved inside your Home folder and not at the 'root'/top of filesystem. Please move your files to prevent them from disappearing.")
    if 'yelp' not in items_expected:
        show_popup(f'Error: the folder "yelp" does not seem to be accessible - did you remeber to add it to the Spark Cluster job and JupyterLab job?')
    
def job_timeout_warning(APP_NAME):
    show_popup(f"Warning: Your UCloud job {APP_NAME} will shut down in 2 minutes. Save your work and/or extend the job on https://cloud.sdu.dk/app/jobs to prevent data loss.")
    
check_correct_file_location()

SUPPORTED_SPARK_VERSION = "3.3.1"
SUPPORTED_JUPYTERLAB_VERSION = "3.5.1"
if jupyterlab.__version__ != SUPPORTED_JUPYTERLAB_VERSION:
    show_popup(f"Wrong JupyterLab version :( When starting the UCloud job you selected {jupyterlab.__version__} but it should have been {SUPPORTED_JUPYTERLAB_VERSION}")
    show_popup("Please shutdown this JupyterLab job and follow the instructions carefully in the UCloud setup guide PDF on LearnIT") 
elif '_EXECUTED_' in globals(): # Only execute this cell once.
    # check if variable '_EXECUTED_' exists in the global variable namespace
    print("Already been executed once, not running again!")
else:
    print("Cell has not been executed before. Please restart the UCloud jobs if any error message pops up.")
    # Two files are automatically read: JobParameters.json for the Spark Cluster job using a temporary spark instance
    # and JobParameters.json for the Jupyter Lab job to extract the hostname of the cluster. 

    MASTER_HOST_NAME = None

    # Open the parameters Jupyter Lab app was launched with
    with open('/work/JobParameters.json', 'r') as file:
        JUPYTER_LAB_JOB_PARAMS = json.load(file)
        # from pprint import pprint; pprint(JUPYTER_LAB_JOB_PARAMS) 
        for resource in JUPYTER_LAB_JOB_PARAMS['request']['resources']:
            if 'hostname' in resource.keys():
                MASTER_HOST_NAME = resource['hostname']
    
    if MASTER_HOST_NAME != "spark-cluster":
        show_popup(f"The JupyterLab job was started using spark hostname {MASTER_HOST_NAME}. This is not recommended, please start it using spark-cluster instead")
    MASTER_HOST = f"spark://{MASTER_HOST_NAME}:7077"

    conf = SparkConf().setAll([
            ("spark.app.name", 'reading_job_params_app'), 
            ("spark.master", MASTER_HOST),
        ])

    spark = SparkSession.builder.config(conf=conf)\
                                .getOrCreate()
    
    if spark.version != SUPPORTED_SPARK_VERSION:
        show_popup(f"Wrong Spark Cluster version :( When starting the UCloud job you selected {spark.version} but it should have been {SUPPORTED_SPARK_VERSION}")
        show_popup("Please shutdown this JupyterLab job, the Spark Cluster and follow the instructions carefully in the UCloud setup guide PDF on LearnIT") 

    CLUSTER_PARAMETERS_JSON_DF = spark.read.option("multiline","true").json('/work/JobParameters.json')
    
    # Extract cluster info from the specific JobParameters.json
    NODES = CLUSTER_PARAMETERS_JSON_DF.select("request.replicas").first()[0]
    CPUS_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select("machineType.cpu").first()[0] - 1
    MEM_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select("machineType.memoryInGigs").first()[0]

    CLUSTER_CORES_MAX = CPUS_PER_NODE * NODES
    CLUSTER_MEMORY_MAX = MEM_PER_NODE * NODES 
    
    if CPUS_PER_NODE > 1:
        EXECUTOR_CORES = CPUS_PER_NODE - 1  # set cores per executor on worker node
    else:
        EXECUTOR_CORES = CPUS_PER_NODE 

    try:
        EXECUTOR_MEMORY = int(
            MEM_PER_NODE / (CPUS_PER_NODE / EXECUTOR_CORES) * 0.5
        )  # set executor memory in GB on each worker node
    except ZeroDivisionError:
        show_popup(f"Please make sure you selected 3 nodes for the Spark Cluster, each with 24 GB of ram. You selected {MEM_PER_NODE} GB ram and {NODES} node(s)")
        
    # Make sure there is a dir for spark logs
    if not os.path.exists('spark_logs'):
        os.mkdir('spark_logs')

    conf = SparkConf().setAll(
        [
            ("spark.app.name", 'spark_assignment'), # Change to your liking 
            ("spark.sql.caseSensitive", False), # Optional: Make queries strings sensitive to captialization
            ("spark.master", MASTER_HOST),
            ("spark.cores.max", CLUSTER_CORES_MAX),
            ("spark.executor.cores", EXECUTOR_CORES),
            ("spark.executor.memory", str(EXECUTOR_MEMORY) + "g"),
            ("spark.eventLog.enabled", True),
            ("spark.eventLog.dir", "spark_logs"),
            ("spark.history.fs.logDirectory", "spark_logs"),
            ("spark.deploy.mode", "cluster"),
        ]
    )

    ## check executor memory, taking into accout 10% of memory overhead (minimum 384 MiB)
    CHECK = (CLUSTER_CORES_MAX / EXECUTOR_CORES) * (
        EXECUTOR_MEMORY + max(EXECUTOR_MEMORY * 0.10, 0.403)
    )

    assert (
        int(CHECK) <= CLUSTER_MEMORY_MAX
    ), "Executor memory larger than cluster total memory!"

    # Stop previous session that was just for loading cluster params
    spark.stop()

    # Start new session with above config, that has better resource handling
    spark = SparkSession.builder.config(conf=conf)\
                                .getOrCreate()
    sc = spark.sparkContext
    _EXECUTED_ = True
    print("Success!")

Cell has not been executed before. Please restart the UCloud jobs if any error message pops up.


<IPython.core.display.Javascript object>

Success!


Click on the "SparkMonitor" tab at the top in Jupyter Lab to see the status of running code on the cluster.

## Loading the data
Here we specify where the yelp datasets are located on UCloud and read then using the spark session.

In [11]:
# Read in the business and review files
# This is the path to the shared datasets provided by adding an the dataset input folder
# when submitting the spark cluster job.
business = spark.read.json('file:////work/yelp/yelp_academic_dataset_business.json') # Use the file:/// prefix to indicate we want to read from the cluster's filesystem
business = business.persist()
# Persist 2 commonly used dataframes since they're used for later computations
# https://sparkbyexamples.com/spark/spark-difference-between-cache-and-persist/

users = spark.read.json("file:////work/yelp/yelp_academic_dataset_user.json")

reviews = spark.read.json('file:////work/yelp/yelp_academic_dataset_review.json')
reviews = reviews.persist()

## PySpark example usage

In [3]:
# Show PySpark dataframes:
#reviews.show()

In [4]:
#business.show()

In [10]:
# Get number of rows with no sampling:
reviews.count()

139885

In [6]:
# OPTIONAL:
# Reduce resource usage and make queries run faster
# by only using a small sample of the dataframe
# and overwriting previous variable "df".
# Useful while developing, not so much to
# provide final answers. Therefore: Remember to 
# to re-read the df when done developing code using
# df = spark.read etc like above.
reviews_frac = reviews.sample(withReplacement=False, fraction=1/50)

# Get number of rows after sampling:
#reviews_frac.count() 

In [5]:
##sampling 1/50th of the total number
#business = business.sample(withReplacement=False, fraction=1/50)
#business.show()

Example: Say we're only interested in reviews of good mexican restaurants in Arizona. You can delete this when you do your own thing. 

In [7]:
# Filter to only Arizona businesses with "Mexican" as part of their categories
az_mex = business.filter(business.state == "AZ")\
                .filter(business.categories.rlike("Mexican"))\
                .select("business_id", "name")

# Join with the reviews
az_mex_rs = reviews.join(az_mex, on="business_id", how="inner")

# Filter to only 5 star reviews
good_az_mex_rs = az_mex_rs.filter(az_mex_rs.stars == 5)\
                        .select("name","text")

# Print the top 20 rows of the DataFrame
#good_az_mex_rs.show()

# Convert to pandas (local object) and save to local file system
good_az_mex_rs.toPandas().to_csv("good_az_reviews.csv", header=True, index=False, encoding='utf-8')


See assignment PDF for task descriptions.

### Task 3.1.1: Total Number of Reviews

In [8]:
# Find the total number of reviews for all businesses. The output should be in the form of a Spark Table/DataFrame with one value representing the count.
# Count the total number of reviews
total_reviews = reviews.count()

# Create a DataFrame with one value representing the total number of reviews
total_reviews_df = spark.createDataFrame([{"total_reviews": total_reviews}])

# Show the resulting DataFrame
total_reviews_df.show()

# Optionally, persist the result total_reviews_df.persist()

+-------------+
|total_reviews|
+-------------+
|      6990280|
+-------------+



### Task 3.1.2: Businesses with 5 Stars and 500+ Reviews

In [12]:
# Find all businesses that have received 5 stars and that have been reviewed by 500 ormore users. The output should be in the formof a DataFrame of (name, stars, review count).

#filtering based on the criterias. utilizing review_count to look at the review coulmn
filter_business = business.filter((business.stars == 5) & (business.review_count >= 500))

#creating DF with relevant columns
star_review_df = filter_business.select("name", "stars", "review_count")

star_review_df.show(truncate=False) # to avoid truncating long names i.e. shortening of long names

+----------------------------------+-----+------------+
|name                              |stars|review_count|
+----------------------------------+-----+------------+
|Blues City Deli                   |5.0  |991         |
|Tumerico                          |5.0  |705         |
|Free Tours By Foot                |5.0  |769         |
|Yats                              |5.0  |623         |
|SUGARED + BRONZED                 |5.0  |513         |
|Nelson's Green Brier Distillery   |5.0  |545         |
|Smiling With Hope Pizza           |5.0  |526         |
|Carlillos Cocina                  |5.0  |799         |
|Barracuda Deli Cafe St. Pete Beach|5.0  |521         |
+----------------------------------+-----+------------+



### Task 3.1.3: Influencers with 1000+ Reviews

In [13]:
# Find influencers who have written more than 1000 reviews. The output should be in the form of a Spark Table/DataFrame of user id. User gets queried with users
users_sample = users.sample(withReplacement=False, fraction=1/50)
#users_sample.show()

reviews_user = users.filter(users.review_count >1000)

reviews_user_df = reviews_user.select("user_id")

reviews_user_df.show()

+--------------------+
|             user_id|
+--------------------+
|j14WgRoU_-2ZE1aw1...|
|q_QQ5kBBwlCcbL1s4...|
|MGPQVLsODMm9ZtYQW...|
|NIhcRW6DWvk1JQhDh...|
|QJI9OSEn6ujRCtrX0...|
|AkBtT43dYcttxQ3qO...|
|2l0O1EI1m0yWjFo2z...|
|RgDVC3ZUBqpEe6Y1k...|
|lquc6IF6uGIeRomDL...|
|VHdY6oG2JPVNjihWh...|
|om5ZiponkpRqUNa3p...|
|K7thO1n-vZ9PFYiC7...|
|UQFE3BT1rsIYrcDvu...|
|jVYzrVblDFSuL3GHt...|
|3zxy3LVBV3ttxoYbY...|
|0G-QF457q_0Z_jKqh...|
|ITa3vh5ERI90G_WP4...|
|P5bUL3Engv-2z6kKo...|
|ouODopBKF3AqfCkuQ...|
|YMgZqBUAddmFErxLt...|
+--------------------+
only showing top 20 rows



### Task 3.1.4: Businesses Reviewed by 5+ Influencers

In [14]:
# Find the businesses names that have been reviewed by more than 5 influencer users. You can use a view created from your answer to Q3.
#users_sample.show()

#need to join influencer users with the review DF
influencer_reviews = reviews.join(reviews_user_df, on="user_id")

# Then grp by business_id, and count the number of infl reviews on each.
business_influencer_count = influencer_reviews.groupBy("business_id").count()

#filtering
business_with_influencers = business_influencer_count.filter("count > 5")

#join it with business DF to get the business names
business_names_with_influencers = business_with_influencers.join(business, on="business_id").select("name")

business_names_with_influencers.show(truncate=False)

+-------------------------------------+
|name                                 |
+-------------------------------------+
|Fleming’s Prime Steakhouse & Wine Bar|
|Fat Dan's Deli                       |
|BJ's Nevada Barbecue                 |
|The Cheesecake Factory               |
|Strangelove's                        |
|Polite Society                       |
|Peacemaker Lobster and Crab          |
|Houlihan's                           |
|Slice Pizzeria                       |
|Blue Willow Restaurant & Gift Shop   |
|Dim Sum Garden                       |
|Pho Saigon                           |
|Jeremiah's Italian Ice               |
|Flying Fish Brewing Co               |
|Regal Park Place & RPX               |
|Barista Parlor                       |
|Vanilla Bean Bakery                  |
|Daredevil Brewing                    |
|Antoine's Restaurant                 |
|Repo Records                         |
+-------------------------------------+
only showing top 20 rows



### Task 3.1.5: Ordered List of Users by Average Star Rating

In [15]:
#Find an ordered list of users based on the average star counts they have given in all their reviews.

# Users contain a column called average_stars
# So we can just use orderBy, where ascending then = False
ordered_users_by_avg_stars = users.orderBy("average_stars", ascending=False)

presentable_df = ordered_users_by_avg_stars.select("name" , "average_stars")

#ordered_users_by_avg_stars.show()
presentable_df.show()

+-------+-------------+
|   name|average_stars|
+-------+-------------+
|Brandon|          5.0|
|   Dave|          5.0|
|William|          5.0|
|Michael|          5.0|
|   Kari|          5.0|
|    Meg|          5.0|
|  Glenn|          5.0|
|Allison|          5.0|
|   Kyla|          5.0|
|  Emily|          5.0|
|Pierric|          5.0|
|Nichole|          5.0|
| Pierce|          5.0|
|  Peter|          5.0|
|Cameron|          5.0|
|  Scott|          5.0|
|  Craig|          5.0|
|Melissa|          5.0|
|   Alan|          5.0|
| Audrey|          5.0|
+-------+-------------+
only showing top 20 rows



### Task 3.2.1: Data Exploration

In [16]:
# Look in the data for the use of "authenticity language", as defined in the Eater New York article. These queries should include (but not be limited to) the following questions:
# • What is the percentage of reviews that contain a variant of the word "authentic"? 

#reviews_sample = reviews.sample(withReplacement=False, fraction=1/50)
#reviews_sample.show()

#can be done using col, lower from sql.function without converting into rdd
from pyspark.sql.functions import lower, col

reviews_count = reviews.count()

#lower(col()) is to make sure we can cross reference the text. .contains() works like maps or sets in java
reviews_filter_authentic = reviews.filter(lower(col("text")).contains("authentic"))

reviews_filter_likelegitimate = reviews.filter(lower(col("text")).contains("legitimate") | lower(col("text")).contains("legit"))

#counting the amount of reviews containing our filter words
authentic_reviews_count = reviews_filter_authentic.count()

legitimate_reviews_count = reviews_filter_likelegitimate.count()

overall_filterwords_df = spark.createDataFrame([{"contains_authentic": authentic_reviews_count, "contains_legitimate": legitimate_reviews_count}])

overall_filterwords_df.show()

percentage_with_authentic = (authentic_reviews_count/reviews_count)*100
percentage_with_legitimate = (legitimate_reviews_count/reviews_count)*100

percentage_with_filterwordsdf = spark.createDataFrame([{"%authentic": percentage_with_authentic, "%legitimate": percentage_with_legitimate}])

percentage_with_filterwordsdf.show()




+------------------+-------------------+
|contains_authentic|contains_legitimate|
+------------------+-------------------+
|            124634|              23187|
+------------------+-------------------+

+------------------+-------------------+
|        %authentic|        %legitimate|
+------------------+-------------------+
|1.7829614836601682|0.33170345107778226|
+------------------+-------------------+



In [17]:
#adding the filtered words toegether in a table we can take from.
from pyspark.sql.functions import lower, col
reviews_with_filterwords = reviews.filter(lower(col("text")).contains("legitimate")\
                                                 | lower(col("text")).contains("legit")\
                                                 | lower(col("text")).contains("authentic"))
reviews_with_filterwords.count()

146428

In [5]:
# Looking for country and cuisines from the 'categories' column
unique_categories = business.select("categories").distinct()

# Show the unique categories
#unique_categories.show(truncate=False)

In [18]:
# How many reviews contain the string "legitimate" grouped by type of cuisine?
from pyspark.sql import functions as func

target_businesses = business.withColumn("cuisine", func.when(func.col("categories").rlike("(?i)Chinese"),"Chinese")\
                                        .when(func.col("categories").rlike("(?i)American"),"American")
                                        .when(func.col("categories").rlike("(?i)Korean"),"Korean")
                                        .when(func.col("categories").rlike("(?i)Japanese"),"Japanese")
                                        .when(func.col("categories").rlike("(?i)Thai"),"Thai")
                                        .when(func.col("categories").rlike("(?i)Mexican"),"Mexican")
                                        .when(func.col("categories").rlike("(?i)Mediterranean"),"Mediterranean")
                                        .when(func.col("categories").rlike("(?i)Irish"),"Irish")
                                        .when(func.col("categories").rlike("(?i)Italian"),"Italian")
                                        .when(func.col("categories").rlike("(?i)French"),"French")
                                        .when(func.col("categories").rlike("(?i)Taiwanese"),"Taiwanese")
                                        .when(func.col("categories").rlike("(?i)Caribbean"),"Caribbean")
                                        .when(func.col("categories").rlike("(?i)Indian"),"Indian")
                                        .when(func.col("categories").rlike("(?i)Vietnamese"),"Vietnamese")
                                        .when(func.col("categories").rlike("(?i)African"),"African"))
#consider potential null values
target_businesses = target_businesses.filter(~target_businesses.cuisine.rlike("null"))

#Joining target business with the reviews w filtered words
totalauthentic_review_with_cuisine = reviews_with_filterwords.join(target_businesses, on="business_id")

#then we group it by cuisine
review_by_cuisine = totalauthentic_review_with_cuisine.groupBy("cuisine").count().withColumnRenamed("count", "authentic_reviews")

review_by_cuisine.show()




+-------------+-----------------+
|      cuisine|authentic_reviews|
+-------------+-----------------+
|      Mexican|            31739|
|         Thai|             5887|
|       Indian|             5366|
|      Chinese|            13432|
|      African|              258|
|    Taiwanese|              207|
|        Irish|              587|
|     Japanese|             4696|
|   Vietnamese|             4167|
|      Italian|            11337|
|    Caribbean|             1627|
|       Korean|             3090|
|       French|             1073|
|Mediterranean|             5145|
|     American|            20720|
+-------------+-----------------+



In [19]:
# How big of a percentage of the total reviews are "authentic" reviews?

# Start by counting the total amount reviews per targeted cuisines
table_incl_treviews = reviews.join(target_businesses, on="business_id")

totalreviews_by_cuisine = table_incl_treviews.groupBy("cuisine").count().withColumnRenamed("count", "total_reviews")

#combine totalreviews with the previous review_by_cuisine table.
reviews_incl_percent = review_by_cuisine.join(totalreviews_by_cuisine, on="cuisine")

# Calculate the % directly in a coulmn, by using the other coulmns and overwride the old table
# remaning the count column of the % ws_count")/F.col("totalReviewsCuisines")*100)

reviews_incl_percent = reviews_incl_percent.withColumn("%authentic_reviews", func.col("authentic_reviews")/func.col("total_reviews")*100)

reviews_incl_percent.orderBy("%authentic_reviews").show()

+-------------+-----------------+-------------+------------------+
|      cuisine|authentic_reviews|total_reviews|%authentic_reviews|
+-------------+-----------------+-------------+------------------+
|     American|            20720|      1718517|1.2056907205456797|
|       French|             1073|        39742| 2.699914448190831|
|     Japanese|             4696|       165915|2.8303649459060365|
|      Italian|            11337|       338519| 3.348999613020244|
|        Irish|              587|        16750| 3.504477611940299|
|Mediterranean|             5145|        99829| 5.153813020264653|
|      Chinese|            13432|       224265| 5.989342964796112|
|       Korean|             3090|        44993| 6.867734980997044|
|         Thai|             5887|        82347| 7.149015750421994|
|   Vietnamese|             4167|        57331| 7.268319059496607|
|      African|              258|         3483|7.4074074074074066|
|       Indian|             5366|        70532| 7.607894289116

In [8]:
# Is there a difference in the amount of authenticity language used in the different areas? (e.g., by state, north/south, urban/rural)
#business.show()

USA_mid_latitude = 39.8283
USA_mid_longitude = -98.5795

#USA_south = business.filter("laitutde" < USA_mid_latitude) #need to import col here to make it work

# Filter using a string-based condition, referencing the latitude column directly
USA_south = business.filter(f"latitude < {USA_mid_latitude}")
USA_north = business.filter(f"latitude > {USA_mid_latitude}")

USA_east = business.filter(f"longitude > {USA_mid_longitude}")
USA_west = business.filter(f"longitude < {USA_mid_longitude}")

#total reviews
total_reviews_north = reviews.join(USA_north, "business_id").count()
total_reviews_south = reviews.join(USA_south, "business_id").count()

total_reviews_east = reviews.join(USA_east, "business_id").count()
total_reviews_west= reviews.join(USA_west, "business_id").count()

#USA_south.show()
#USA_north.show()


#reviews_filter = reviews.filter(lower(col("text")).contains("authentic"))

#reviews_with_filterwords

# Join reviews with south and north businesses based on 'business_id'
authentic_reviews_north = reviews_with_filterwords.join(USA_north, "business_id")
authentic_reviews_south = reviews_with_filterwords.join(USA_south, "business_id")

authentic_reviews_east = reviews_with_filterwords.join(USA_east, "business_id")
authentic_reviews_west = reviews_with_filterwords.join(USA_west, "business_id")

# Count the number of 'authentic' reviews in the north and south
reviews_WN_authentic_count = authentic_reviews_north.count()
reviews_WS_authentic_count = authentic_reviews_south.count()

reviews_WE_authentic_count = authentic_reviews_east.count()
reviews_WW_authentic_count = authentic_reviews_west.count()

# count % usage in order to see the difference between east, west, north and south
percentage_with_filterwords_north = (reviews_WN_authentic_count/total_reviews_north)*100
percentage_with_filterwords_south = (reviews_WS_authentic_count/total_reviews_south)*100
percentage_with_filterwords_east = (reviews_WE_authentic_count/total_reviews_east)*100
percentage_with_filterwords_west = (reviews_WW_authentic_count/total_reviews_west)*100

# Make DataFrames. Keep in mind each {} indicates a dictionary
USA_north_df = spark.createDataFrame([{"amount_north": reviews_WN_authentic_count,
                                      "total_amount_north": total_reviews_north,
                                      "%authentic": percentage_with_filterwords_north
                                      }])


USA_south_df = spark.createDataFrame([{"amount_south": reviews_WS_authentic_count,
                                      "total_amount_south": total_reviews_south,
                                      "%authentic": percentage_with_filterwords_south
                                      }])

USA_east_df = spark.createDataFrame([{"amount_east": reviews_WE_authentic_count,
                                      "total_amount_east": total_reviews_east,
                                      "%authentic": percentage_with_filterwords_east
                                      }])

USA_west_df = spark.createDataFrame([{"amount_west": reviews_WW_authentic_count,
                                      "total_amount_west": total_reviews_west,
                                      "%authentic": percentage_with_filterwords_west
                                      }])

USA_north_df.select("total_amount_north", "amount_north", "%authentic").show()
USA_south_df.select("total_amount_south", "amount_south", "%authentic").show()
USA_east_df.select("total_amount_east", "amount_east", "%authentic").show()
USA_west_df.select("total_amount_west", "amount_west", "%authentic").show()


# Note: As part of answering this question, you could compute the full cube or rollup combining the location of the business and whether the review contains authenticity language,
# and use this to aggregate their counts per state and city.

+------------------+------------+------------------+
|total_amount_north|amount_north|        %authentic|
+------------------+------------+------------------+
|           2318184|       51555|2.2239390833514507|
+------------------+------------+------------------+

+------------------+------------+------------------+
|total_amount_south|amount_south|        %authentic|
+------------------+------------+------------------+
|           4672096|       94873|2.0306303637596486|
+------------------+------------+------------------+

+-----------------+-----------+-----------------+
|total_amount_east|amount_east|       %authentic|
+-----------------+-----------+-----------------+
|          5512078|     118118|2.142894204327297|
+-----------------+-----------+-----------------+

+-----------------+-----------+------------------+
|total_amount_west|amount_west|        %authentic|
+-----------------+-----------+------------------+
|          1478202|      28310|1.9151645039040672|
+------------

In [20]:
# Let's try to create quadrants rather than halving USA based on north/south and east/west
# For this purpose I'll do nortEast, northWest, southEast and southWest
USA_mid_latitude = 39.8283
USA_mid_longitude = -98.5795

# including quadrants in the business filtering
business_with_geo = business.withColumn("geo_area",\
                                        func.when((func.col("latitude") > USA_mid_latitude) & (func.col("longitude") > USA_mid_longitude), "NorthEast")
                                        .when((func.col("latitude") > USA_mid_latitude) & (func.col("longitude") < USA_mid_longitude), "NorthWest")
                                        .when((func.col("latitude") < USA_mid_latitude) & (func.col("longitude") > USA_mid_longitude), "SouthEast")
                                        .when((func.col("latitude") < USA_mid_latitude) & (func.col("longitude") < USA_mid_longitude), "SouthWest")
                                       )
#deal with null values
business_with_geo = business_with_geo.filter(~business_with_geo.geo_area.rlike("null"))


In [29]:
#combining the idea of geo area with the total, authentic and %reviews
business_geo_authentic = business_with_geo.join(reviews_with_filterwords, on="business_id")

# let's group by geo area
business_geo_authentic_sorted = business_geo_authentic.groupBy("geo_area").count().withColumnRenamed("count", "authentic_per_area")

# query total review by quadrant
business_geo_total = business_with_geo.join(reviews, on="business_id")

#Sort by area
business_geo_total_sorted = business_geo_total.groupBy("geo_area").count().withColumnRenamed("count", "total_per_area")

#combine the tables and craft the percentage column
business_geo_percent = business_geo_total_sorted.join(business_geo_authentic_sorted, on="geo_area").withColumn("%_authentic_review",\
                                                                                                               (func.col("authentic_per_area")/
                                                                                                               func.col("total_per_area"))*100)
business_geo_percent.show()

+---------+--------------+------------------+------------------+
| geo_area|total_per_area|authentic_per_area|%_authentic_review|
+---------+--------------+------------------+------------------+
|SouthEast|       3460933|             72058| 2.082039727437659|
|SouthWest|       1211163|             22815|1.8837266329965494|
|NorthWest|        267039|              5495| 2.057751863959946|
|NorthEast|       2051145|             46060|2.2455750324818577|
+---------+--------------+------------------+------------------+



In [22]:
# Now let's try to include the cuisines in the table to look for potential biases
#reviews_incl_percent = reviews_incl_percent.withColumn("%authentic_reviews", func.col("authentic_reviews")/func.col("total_reviews")*100)

#totalreviews_by_cuisine = table_incl_treviews.groupBy("cuisine").count().withColumnRenamed("count", "total_reviews")

totalauthentic_with_geo = totalauthentic_review_with_cuisine.join(business_with_geo.select("business_id", "geo_area"), on="business_id")

#filtered_review_w_geo = totalauthentic_with_geo.join(business_geo_authentic, on="business_id")

#business_id, cuisine, geo_area|

#business_geo_total 

geo_cuisine_reviews = totalauthentic_with_geo.groupBy("geo_area", "cuisine").count().withColumnRenamed("count", "authentic_reviews")

geo_cuisine_reviews.show(100)

+---------+-------------+-----------------+
| geo_area|      cuisine|authentic_reviews|
+---------+-------------+-----------------+
|SouthWest|       Indian|              709|
|SouthEast|       French|              557|
|SouthEast|      African|              119|
|SouthWest|         Thai|              752|
|NorthEast|     Japanese|             1544|
|SouthEast|      Chinese|             4610|
|NorthEast|       Indian|             2208|
|SouthEast|      Italian|             5383|
|SouthWest|    Taiwanese|               27|
|NorthWest|       Korean|              208|
|SouthEast|     American|            11631|
|NorthWest|       French|               71|
|NorthWest|      Italian|              482|
|SouthEast|    Caribbean|             1003|
|NorthEast|     American|             5957|
|SouthEast|   Vietnamese|             2152|
|NorthWest|Mediterranean|              159|
|SouthEast|         Thai|             3106|
|SouthEast|      Mexican|            14658|
|NorthWest|      Mexican|       

### Task 3.2.2: Hypothesis Testing

In [35]:
# Write your code here...

# Let's look into the hypothesis good reviews vs bad reviews
# We go back and grab our target business with cuisines, which has our total amount of authentic reviews
negative_authentic_reviews = totalauthentic_review_with_cuisine.filter(
    func.col("text").rlike("(?i)dirt(y)|kitsch(y)|tacky|corny|dodgy|oily|greasy|cheap|simple|rude")
)

# now that we've filtered we can groupby and rename the column ot make more sense for displaying.
negative_authentic_reviews_filtered = negative_authentic_reviews.groupBy("cuisine").count()
negative_authentic_reviews_filtered = negative_authentic_reviews_filtered.withColumnRenamed("count", "amount_negative_words")

#rejoin our filtered version with table containing the overall authentic review count. Here we can use the grouped one to get authentic count
negative_authentic_reviews_stats = negative_authentic_reviews_filtered.join(review_by_cuisine, "cuisine")

#Then we add the percentage column
negative_authentic_reviews_stats = negative_authentic_reviews_stats.withColumn("%negative_reviews",\
                                                                               (func.col("amount_negative_words")\
                                                                                /func.col("authentic_reviews")*100))

negative_authentic_reviews_stats.select("cuisine", "authentic_reviews",\
                                        "amount_negative_words", "%negative_reviews").orderBy("%negative_reviews").show()

+-------------+-----------------+---------------------+------------------+
|      cuisine|authentic_reviews|amount_negative_words| %negative_reviews|
+-------------+-----------------+---------------------+------------------+
|    Caribbean|             1627|                   87| 5.347264904732637|
|       Indian|             5366|                  379| 7.062989191203877|
|        Irish|              587|                   46| 7.836456558773425|
|Mediterranean|             5145|                  413| 8.027210884353742|
|      African|              258|                   21|  8.13953488372093|
|      Italian|            11337|                  927| 8.176766340301667|
|         Thai|             5887|                  490| 8.323424494649228|
|       Korean|             3090|                  289| 9.352750809061488|
|     American|            20720|                 2082| 10.04826254826255|
|    Taiwanese|              207|                   21|10.144927536231885|
|     Japanese|          

In [26]:
# We go back and grab our target business with cuisines, which has our total amount of authentic reviews
positive_authentic_reviews = totalauthentic_review_with_cuisine.filter(
    func.col("text").rlike("(?i)elegant|elegance|tasteful|cultivated|refined|beautiful|innovative")
)

# now that we've filtered we can groupby and rename the column ot make more sense for displaying.
positive_authentic_reviews_filtered = positive_authentic_reviews.groupBy("cuisine").count()
positive_authentic_reviews_filtered = positive_authentic_reviews_filtered.withColumnRenamed("count", "amount_positive_words")

#rejoin our filtered version with table containing the overall authentic review count. Here we can use the grouped one to get authentic count
positive_authentic_reviews_stats = positive_authentic_reviews_filtered.join(review_by_cuisine, "cuisine")

#Then we add the percentage column
positive_authentic_reviews_stats = positive_authentic_reviews_stats.withColumn("%positive_reviews",\
                                                                               (func.col("amount_positive_words")\
                                                                                /func.col("authentic_reviews")*100))

positive_authentic_reviews_stats.select("cuisine", "authentic_reviews",\
                                        "amount_positive_words", "%positive_reviews").orderBy("%positive_reviews").show()

+-------------+-----------------+---------------------+------------------+
|      cuisine|authentic_reviews|amount_positive_words| %positive_reviews|
+-------------+-----------------+---------------------+------------------+
|      Mexican|            31739|                  687|2.1645294432716846|
|       Korean|             3090|                   68|2.2006472491909386|
|   Vietnamese|             4167|                   97| 2.327813774898008|
|      Chinese|            13432|                  326|2.4270399047051816|
|        Irish|              587|                   16|  2.72572402044293|
|      African|              258|                    8|  3.10077519379845|
|    Taiwanese|              207|                    7|3.3816425120772946|
|    Caribbean|             1627|                   57| 3.503380454824831|
|       Indian|             5366|                  194|3.6153559448378685|
|     Japanese|             4696|                  191| 4.067291311754684|
|     American|          

In [41]:
# Let's try to look at hte positive reviews based on our geographical data.
#negative_authentic_reviews.show(10)
negative_authentic_reviews_for_geo = negative_authentic_reviews.join(business_with_geo, on="business_id")

# We then group by geo_area column and count them.
negative_authentic_reviews_grouped = negative_authentic_reviews_for_geo.groupBy("geo_area").count()
negative_authentic_reviews_grouped = negative_authentic_reviews_grouped.withColumnRenamed\
("count","amount_reviews_with_negative_words")

#negative_authentic_reviews_grouped.show(10)

#Join with the table over geographical locations
overall_geo_area_and_negative_words = negative_authentic_reviews_grouped.join(business_geo_percent, on="geo_area")

overall_geo_area_and_negative_words = overall_geo_area_and_negative_words.withColumn("%negative_reviews",\
                                                                         (F.col("amount_reviews_with_negative_words")\
                                                                         /F.col("authentic_per_area"))*100)

overall_geo_area_and_negative_words.select("geo_area","authentic_per_area","amount_reviews_with_negative_words","%negative_reviews").orderBy("%negative_reviews").show()

+---------+------------------+----------------------------------+-----------------+
| geo_area|authentic_per_area|amount_reviews_with_negative_words|%negative_reviews|
+---------+------------------+----------------------------------+-----------------+
|SouthEast|             72058|                              4853| 6.73485247994671|
|NorthWest|              5495|                               411|7.479526842584168|
|SouthWest|             22815|                              1846|8.091168091168091|
|NorthEast|             46060|                              4009|8.703864524533216|
+---------+------------------+----------------------------------+-----------------+



In [42]:
#positive_authentic_reviews.show(10)
positive_authentic_reviews_for_geo = positive_authentic_reviews.join(business_with_geo, on="business_id")

# We then group by geo_area column and count them.
positive_authentic_reviews_grouped = positive_authentic_reviews_for_geo.groupBy("geo_area").count()
positive_authentic_reviews_grouped = positive_authentic_reviews_grouped.withColumnRenamed\
("count","amount_reviews_with_positive_words")

#negative_authentic_reviews_grouped.show(10)

#Join with the table over geographical locations
overall_geo_area_and_positive_words = positive_authentic_reviews_grouped.join(business_geo_percent, on="geo_area")

overall_geo_area_and_positive_words = overall_geo_area_and_positive_words.withColumn("%positive_reviews",\
                                                                         (F.col("amount_reviews_with_positive_words")\
                                                                         /F.col("authentic_per_area"))*100)

overall_geo_area_and_positive_words.select("geo_area","authentic_per_area","amount_reviews_with_positive_words","%positive_reviews").orderBy("%positive_reviews").show()

+---------+------------------+----------------------------------+------------------+
| geo_area|authentic_per_area|amount_reviews_with_positive_words| %positive_reviews|
+---------+------------------+----------------------------------+------------------+
|SouthEast|             72058|                              1653| 2.293985400649477|
|NorthWest|              5495|                               141|2.5659690627843497|
|SouthWest|             22815|                               610|2.6736795967565197|
|NorthEast|             46060|                              1337|2.9027355623100304|
+---------+------------------+----------------------------------+------------------+



### Task 3.3: Building a Rating Prediction Model

In [None]:
# Write your code here...