# Assignment: Scalable Processing
## Yelp Reviews and Authenticity

Large Scale Data Analysis | by Maciej Jalocha | macja@itu.dk | 10.03.2025

## Connecting to the Spark Cluster job using the two JobParameters.json

To connect this jupyter notebook with your Spark cluster, we need to tell jupyter how it can access the spark cluster. Below code accomplishes that. Do not worry about how it works, just run the cell once to connect. 

In [2]:
#####################################################################
# DO NOT CHANGE ANYTHING HERE.
# IF YOU HAVE PROBLEMS, CHECK THE ASSIGNMENT GUIDE CAREFULLY
#####################################################################
import json
import os

import jupyterlab
import pyspark
from IPython.display import Javascript, display
from py4j.protocol import Py4JJavaError
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


def show_popup(message):
    display(Javascript(f'alert("{message}")'))


def check_correct_file_location():
    items = os.listdir("/work")
    items_expected = ["yelp", "Home", "JobParameters.json", "emails"]
    if sorted(items) != sorted(items_expected):
        items_to_be_moved = [
            item for item in items if item not in items_expected and item[0] != "."
        ]  # Ignore hidden files starting with .
        show_popup(
            f"Warning: Found these files {items_to_be_moved} that should (most likely) be moved inside your Home folder. Make sure your Git repository and notebooks are all saved inside your Home folder and not at the 'root'/top of filesystem. Please move your files to prevent them from disappearing."
        )
    if "emails" not in items_expected:
        show_popup(
            f'Error: the folder "emails" does not seem to be accessible - did you remeber to add it to the Spark Cluster job and JupyterLab job?'
        )


check_correct_file_location()

SUPPORTED_SPARK_VERSION = "3.3.1"
SUPPORTED_JUPYTERLAB_VERSION = "3.5.1"
if jupyterlab.__version__ != SUPPORTED_JUPYTERLAB_VERSION:
    show_popup(
        f"Wrong JupyterLab version :( When starting the UCloud job you selected {jupyterlab.__version__} but it should have been {SUPPORTED_JUPYTERLAB_VERSION}"
    )
    show_popup(
        "Please shutdown this JupyterLab job and follow the instructions carefully in the UCloud setup guide PDF on LearnIT"
    )
elif "_EXECUTED_" in globals():  # Only execute this cell once.
    # check if variable '_EXECUTED_' exists in the global variable namespace
    print("Already been executed once, not running again!")
else:
    print(
        "Cell has not been executed before. Please restart the UCloud jobs if any error message pops up. Running setup cell now."
    )
    # Two files are automatically read: JobParameters.json for the Spark Cluster job using a temporary spark instance
    # and JobParameters.json for the Jupyter Lab job to extract the hostname of the cluster.

    MASTER_HOST_NAME = None

    # Open the parameters Jupyter Lab app was launched with
    with open("/work/JobParameters.json", "r") as file:
        JUPYTER_LAB_JOB_PARAMS = json.load(file)
        # from pprint import pprint; pprint(JUPYTER_LAB_JOB_PARAMS)
        for resource in JUPYTER_LAB_JOB_PARAMS["request"]["resources"]:
            if "hostname" in resource.keys():
                MASTER_HOST_NAME = resource["hostname"]

    if MASTER_HOST_NAME != "spark-cluster":
        msg = f"The JupyterLab job was started using spark hostname {MASTER_HOST_NAME}. This is not recommended, please start it using spark-cluster instead"
        show_popup(msg)
        print(msg)
    else:
        MASTER_HOST = f"spark://{MASTER_HOST_NAME}:7077"

        conf = SparkConf().setAll(
            [
                ("spark.app.name", "reading_job_params_app"),
                ("spark.master", MASTER_HOST),
            ]
        )

        spark = SparkSession.builder.config(conf=conf).getOrCreate()

        if spark.version != SUPPORTED_SPARK_VERSION:
            show_popup(
                f"Wrong Spark Cluster version :( When starting the UCloud job you selected {spark.version} but it should have been {SUPPORTED_SPARK_VERSION}"
            )
            show_popup(
                "Please shutdown this JupyterLab job, the Spark Cluster and follow the instructions carefully in the UCloud setup guide PDF on LearnIT"
            )

        CLUSTER_PARAMETERS_JSON_DF = spark.read.option("multiline", "true").json(
            "/work/JobParameters.json"
        )

        # Extract cluster info from the specific JobParameters.json
        NODES = CLUSTER_PARAMETERS_JSON_DF.select("request.replicas").first()[0]
        CPUS_PER_NODE = (
            CLUSTER_PARAMETERS_JSON_DF.select("machineType.cpu").first()[0] - 1
        )
        MEM_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select(
            "machineType.memoryInGigs"
        ).first()[0]

        CLUSTER_CORES_MAX = CPUS_PER_NODE * NODES
        CLUSTER_MEMORY_MAX = MEM_PER_NODE * NODES

        if CPUS_PER_NODE > 1:
            EXECUTOR_CORES = CPUS_PER_NODE - 1  # set cores per executor on worker node
        else:
            EXECUTOR_CORES = CPUS_PER_NODE

        try:
            EXECUTOR_MEMORY = int(
                MEM_PER_NODE / (CPUS_PER_NODE / EXECUTOR_CORES) * 0.5
            )  # set executor memory in GB on each worker node
        except ZeroDivisionError:
            show_popup(
                f"Please make sure you selected 3 nodes for the Spark Cluster, each with 24 GB of ram. You selected {MEM_PER_NODE} GB ram and {NODES} node(s)"
            )

        # Make sure there is a dir for spark logs
        if not os.path.exists("spark_logs"):
            os.mkdir("spark_logs")
        conf = SparkConf().setAll(
            [
                ("spark.app.name", "spark_assignment"),  # Change to your liking
                (
                    "spark.sql.caseSensitive",
                    False,
                ),  # Optional: Make queries strings sensitive to captialization
                ("spark.master", MASTER_HOST),
                ("spark.cores.max", CLUSTER_CORES_MAX),
                ("spark.executor.cores", EXECUTOR_CORES),
                ("spark.executor.memory", str(EXECUTOR_MEMORY) + "g"),
                ("spark.eventLog.enabled", True),
                ("spark.eventLog.dir", "spark_logs"),
                ("spark.history.fs.logDirectory", "spark_logs"),
                ("spark.deploy.mode", "cluster"),
            ]
        )

        ## check executor memory, taking into accout 10% of memory overhead (minimum 384 MiB)
        CHECK = (CLUSTER_CORES_MAX / EXECUTOR_CORES) * (
            EXECUTOR_MEMORY + max(EXECUTOR_MEMORY * 0.10, 0.403)
        )

        assert (
            int(CHECK) <= CLUSTER_MEMORY_MAX
        ), "Executor memory larger than cluster total memory!"

        # Stop previous session that was just for loading cluster params
        spark.stop()

        # Start new session with above config, that has better resource handling
        spark = SparkSession.builder.config(conf=conf).getOrCreate()
        sc = spark.sparkContext
        _EXECUTED_ = True
        print("Success!")

<IPython.core.display.Javascript object>

Cell has not been executed before. Please restart the UCloud jobs if any error message pops up. Running setup cell now.
Success!


Click on the "SparkMonitor" tab at the top in Jupyter Lab to see the status of running code on the cluster.

## Loading the data
Here we specify where the yelp datasets are located on UCloud and read then using the spark session.

In [3]:
# Read in the business and review files
# This is the path to the shared datasets provided by adding an the dataset input folder
# when submitting the spark cluster job.
business = spark.read.json(
    "file:////work/yelp/yelp_academic_dataset_business.json"
)  # Use the file:/// prefix to indicate we want to read from the cluster's filesystem
business = business.persist()
# Persist 2 commonly used dataframes since they're used for later computations
# https://sparkbyexamples.com/spark/spark-difference-between-cache-and-persist/

users = spark.read.json("file:////work/yelp/yelp_academic_dataset_user.json")
users = users.persist()

reviews = spark.read.json("file:////work/yelp/yelp_academic_dataset_review.json")
reviews = reviews.persist()

# checkin = spark.read.json('file:////work/yelp/yelp_academic_dataset_checkin.json')
# tip = spark.read.json('file:////work/yelp/yelp_academic_dataset_tip.json')

## PySpark example usage

In [9]:
# Show PySpark dataframes:
reviews.columns

['business_id',
 'cool',
 'date',
 'funny',
 'review_id',
 'stars',
 'text',
 'useful',
 'user_id']

In [None]:
business.show()

In [None]:
# Get number of rows with no sampling:
reviews.count()

In [None]:
# OPTIONAL:
# Reduce resource usage and make queries run faster
# by only using a small sample of the dataframe
# and overwriting previous variable "df".
# Useful while developing, not so much to
# provide final answers. Therefore: Remember to
# to re-read the df when done developing code using
# df = spark.read etc like above.


# Get number of rows after sampling:
reviews.count()

In [97]:
business.show()

+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-------+-------------+--------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|          city|               hours|is_open|     latitude|     longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-------+-------------+--------------+--------------------+-----------+------------+-----+-----+
|1616 Chapala St, ...|{null, null, null...|Pns2l4eNsfO8kk83d...|Doctors, Traditio...| Santa Barbara|                null|      0|   34.4266787|  -119.7111968|Abby Rappoport, L...|      93101|           7|  5.0|   CA|
|87 Grasso Plaza S...|{null, null, null...|mpf3x-BjTdTEA3yCZ...|Shipping Centers,...|        Affton|{8:0-18:30, 0:0-0...|      1|   

Example: Say we're only interested in reviews of good mexican restaurants in Arizona. You can delete this when you do your own thing. 

### Example

In [110]:
# Filter to only Arizona businesses with "Mexican" as part of their categories
az_mex = (
    business.filter(business.state == "AZ")
    .filter(business.categories.rlike("Mexican"))
    .select("business_id", "name", "categories")
)
az_mex.toPandas().to_csv("az_mex.csv", header=True, index=False, encoding="utf-8")
# Join with the reviews
az_mex_rs = reviews.join(az_mex, on="business_id", how="inner")

# Filter to only 5 star reviews
good_az_mex_rs = az_mex_rs.filter(az_mex_rs.stars == 5).select("name", "text")

# Print the top 20 rows of the DataFrame
good_az_mex_rs.show()

# Convert to pandas (local object) and save to local file system
good_az_mex_rs.toPandas().to_csv(
    "good_az_reviews.csv", header=True, index=False, encoding="utf-8"
)

+--------------------+--------------------+
|                name|                text|
+--------------------+--------------------+
|Casa Molina Del N...|We've been coming...|
|               Penca|Wow. That was inc...|
|               Penca|Street Tacos in T...|
|               Penca|Last minute decis...|
|             Micha's|The best Carne Se...|
|Street- Taco and ...|Cool lil spot dow...|
|               Penca|A little pricey b...|
|Taqueria Pico De ...|LOVE THIS PLACE!\...|
|Casa Molina Del N...|I love the margar...|
|      El Charro Cafe|Really happy I fo...|
|            BK Tacos|Absolutely love t...|
|St Mary's Mexican...|Best tortillas I'...|
|Charro Steak & De...|Visiting from out...|
|            BK Tacos|Most people go fo...|
|             Micha's|We travel to Tucs...|
|      El Charro Cafe|Had a great dinne...|
|Charro Steak & De...|Excellent night a...|
|Street- Taco and ...|This is a fabulou...|
|Indian Frybread-M...|I love this place...|
|               Penca|It was del

In [99]:
business_sam.select(
    explode(split(col("categories"), ",")).alias("category")
).withColumn("category", trim(col("category"))).distinct().sort("category").show()

+--------------------+
|            category|
+--------------------+
|          Acai Bowls|
|         Accessories|
|         Accountants|
|         Active Life|
|         Acupuncture|
|               Adult|
|     Adult Education|
| Adult Entertainment|
|         Advertising|
|       Aestheticians|
|             African|
|   Air Duct Cleaning|
|            Airlines|
|    Airport Shuttles|
|   Airport Terminals|
|            Airports|
|          Allergists|
|Alternative Medicine|
|Amateur Sports Teams|
|      American (New)|
+--------------------+
only showing top 20 rows



See assignment PDF for task descriptions.

### Samplling

In [12]:
reviews = reviews.sample(withReplacement=False, fraction=1 / 50)
business = business.sample(withReplacement=False, fraction=1 / 50)

# Exercises

### Task 3.1.1:

In [53]:
# Write your code here...
business.count()

150346

In [None]:
business.columns

### Task 3.1.2:

In [6]:
business.show(5)

+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+----------+------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|         city|               hours|is_open|  latitude|   longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+----------+------------+--------------------+-----------+------------+-----+-----+
|1616 Chapala St, ...|{null, null, null...|Pns2l4eNsfO8kk83d...|Doctors, Traditio...|Santa Barbara|                null|      0|34.4266787|-119.7111968|Abby Rappoport, L...|      93101|           7|  5.0|   CA|
|87 Grasso Plaza S...|{null, null, null...|mpf3x-BjTdTEA3yCZ...|Shipping Centers,...|       Affton|{8:0-18:30, 0:0-0...|      1| 38.551126|  -90.335695|    

In [17]:
reviews.columns

['business_id',
 'cool',
 'date',
 'funny',
 'review_id',
 'stars',
 'text',
 'useful',
 'user_id']

In [15]:
# Write your code here...
business.filter((business.stars >= 5) & (business.review_count >= 500)).select(
    "name", "stars", "review_count"
).count()

9

In [37]:
respected_businesses.toPandas().to_csv(
    "3_1_2_respected_businesses.csv", header=True, index=False, encoding="utf-8"
)

In [34]:
from pyspark.sql.functions import col, countDistinct

c_name = "unique_reviewers"
business.filter((business.stars >= 5)).join(
    reviews, on="business_id", how="inner"
).groupby("business_id", "name").agg(countDistinct("user_id").alias(c_name)).filter(
    col(c_name) >= 500
).select(
    "name", "unique_reviewers"
).show()
# .show()

+--------------------+----------------+
|                name|unique_reviewers|
+--------------------+----------------+
|Barracuda Deli Ca...|             521|
|    Carlillos Cocina|             799|
|  Free Tours By Foot|             769|
|Smiling With Hope...|             526|
|   SUGARED + BRONZED|             513|
|     Blues City Deli|             991|
|Nelson's Green Br...|             545|
|            Tumerico|             705|
|                Yats|             623|
+--------------------+----------------+



In [None]:
respected_businesses.toPandas().to_csv(
    "3_1_2_respected_businesses.csv", header=True, index=False, encoding="utf-8"
)

### Task 3.1.3: 

In [7]:
# Write your code here...
influencers = users.filter(users.review_count >= 1000).select("user_id")
influencers.toPandas().to_csv(
    "3_1_3_influencers.csv", header=True, index=False, encoding="utf-8"
)

### Task 3.1.4: 

In [None]:
business

In [2]:
# Write your code here...

reviews = reviews.sample(withReplacement=False, fraction=1 / 50)
business_sam = business.sample(withReplacement=False, fraction=1 / 50)

NameError: name 'reviews' is not defined

In [9]:
from pyspark.sql.functions import col

counts = (
    business.join(reviews, on="business_id", how="inner")
    .join(influencers, on="user_id", how="inner")
    .select("name", "user_id")
    .groupBy("name")
    .count()
    .filter(col("count") >= 5)
    .toPandas()
    .to_csv("3_1_4_occupied_businesses.csv", header=True, index=False, encoding="utf-8")
)

### Task 3.1.5: 

Find anordered list of users based on the average star counts they have given in all their reviews.

In [None]:
# Write your code here...
from pyspark.sql.functions import col

counts = (
    reviews.join(users, on="user_id", how="right")
    .select("stars", "user_id")
    .groupBy("user_id")
    .mean()
    .sort(col("avg(stars)"), ascending=False)
    .show()
)
# .toPandas().to_csv("3_1_5_ordered_users.csv", header=True, index=False, encoding='utf-8')
# too big to save haha.
# include all users, including with no corresponding reviews.

### Task 3.2.1: Data Exploration

 What is the percentage of reviews that contain a variant of the word "authentic"?

##### Total counts

In [14]:
reviews.count()

6990280

In [9]:
business.count()

150346

In [11]:
users.count()

1987897

In [4]:
reviews.columns

['business_id',
 'cool',
 'date',
 'funny',
 'review_id',
 'stars',
 'text',
 'useful',
 'user_id']

### Percentage of authentic and legitimate language

In [4]:
# Write your code here...
from pyspark.sql.functions import asc, col, desc, lower, when

In [9]:
c = lower(col("text"))
n_authentic = reviews.filter(c.contains("authentic")).count()
total = reviews.count()
percentage_authentic = n_authentic / total
print(n_authentic)
print(percentage_authentic)

124634
0.01782961483660168


In [14]:
c = lower(col("text"))
n_authentic = reviews.filter(c.contains("legitimate")).count()
total = reviews.count()
percentage_authentic = n_authentic / total
print(n_authentic)
print(percentage_authentic)

5066
0.000724720612049875


In [10]:
c = lower(col("text"))
n_authentic = reviews.filter(c.contains("authentic") | c.contains("legitimate")).count()
total = reviews.count()
percentage_authentic = n_authentic / total
print(n_authentic)
print(percentage_authentic)

129503
0.018526153458802794


In [7]:
print(percentage_authentic)

0.018526153458802794


In [6]:
n_authentic

124634

Grouped by cuisine

In [4]:
# Filter to only Arizona businesses with "Mexican" as part of their categories
from pyspark.sql.functions import asc, col, desc, lower, when


def has(cuisine: str):
    c = col("categories")
    return c.contains(cuisine) & c.contains("Restaurants")


business = business.withColumn(
    "cuisine",
    when(has("Mexican"), "Mexican")
    .when(has("Chinese"), "Chinese")
    .when(has("Thai"), "Thai")
    .when(has("Japanese"), "Japanese")
    .when(has("Indian"), "Indian")
    .when(has("French"), "French")
    .when(has("Italian"), "Italian")
    .when(has("Korean"), "Korean")
    .when(has("Mediterranean"), "Mediterranean")
    .when(has("Soul"), "Soul")  # .when(has("German"), "German")\
    # .when(has("Greek"), "Greek")\
    # .when(has("Spanish"), "Spanish")\
    .otherwise("non-restaurant"),
)

# TODO: OVERLAP BETWEEN DIFFERENT CUSINES! especially French/italian!

In [16]:
business.columns

['address',
 'attributes',
 'business_id',
 'categories',
 'city',
 'hours',
 'is_open',
 'latitude',
 'longitude',
 'name',
 'postal_code',
 'review_count',
 'stars',
 'state',
 'cuisine']

### For the final report

In [9]:
from pyspark.sql.functions import col, rand

sample_df = (
    (
        business.filter(col("cuisine") != "non-restaurant")
        .orderBy(rand())  # randomize
        .limit(100)  # pick 20
    )
    .toPandas()
    .to_csv("sample_business_3.csv", header=True, index=False, encoding="utf-8")
)

In [47]:
reviews_cuisine_text.columns

['cuisine', 'text', 'review_id', 'state', 'city']

In [14]:
from pyspark.sql.functions import round

c = lower(col("text"))
condition = c.contains("legitimate") | c.contains("authentic")
reviews_cuisine_text = (
    business.filter(col("cuisine") != "non-restaurant")
    .join(reviews, on="business_id", how="inner")
    .select("cuisine", "text", "review_id", "state", "city")
    .cache()
)

review_mentions_authentic = reviews_cuisine_text.withColumn(
    "mentions_authentic", condition
).cache()

review_mentions_authentic.filter(col("mentions_authentic") == True).groupBy(
    "cuisine"
).count().withColumnRenamed("count", "mentions_count").show()

AnalysisException: Column 'cuisine' does not exist. Did you mean one of the following? [city, hours, is_open, latitude, name, state, address, business_id, longitude, stars, attributes, categories, postal_code, review_count];
'Filter NOT ('cuisine = non-restaurant)
+- Relation [address#963,attributes#964,business_id#965,categories#966,city#967,hours#968,is_open#969L,latitude#970,longitude#971,name#972,postal_code#973,review_count#974L,stars#975,state#976] json


### Comeback

In [5]:
from pyspark.sql.functions import round


def a_l():
    c = lower(col("text"))
    return c.contains("legitimate") | c.contains("authentic")

In [42]:
reviews_cuisine_text = (
    business.filter(col("cuisine") != "non-restaurant")
    .join(reviews, on="business_id", how="inner")
    .select("cuisine", "text", "review_id", "state", "city")
    .cache()
)

review_mentions_authentic = reviews_cuisine_text.withColumn(
    "mentions_authentic", a_l()
).cache()

In [42]:
review_mentions_authentic.filter(col("mentions_authentic") == True).groupBy(
    "cuisine"
).count().withColumnRenamed("count", "mentions_count").join(
    reviews_cuisine_text.groupBy("cuisine")
    .count()
    .withColumnRenamed("count", "total_count"),
    on="cuisine",
).withColumn(
    "ratio", round(col("mentions_count") / col("total_count"), 2)
).show()

+-------------+--------------+-----------+-----+
|      cuisine|mentions_count|total_count|ratio|
+-------------+--------------+-----------+-----+
|      Mexican|         34175|     432248| 0.08|
|         Thai|          6031|      93006| 0.06|
|       Indian|          5502|      75821| 0.07|
|      Chinese|         12640|     220481| 0.06|
|     Japanese|          4688|     177474| 0.03|
|      Italian|         12097|     424605| 0.03|
|       Korean|          2271|      30851| 0.07|
|       French|          1427|      89780| 0.02|
|         Soul|          1144|      63684| 0.02|
|Mediterranean|          4883|     108168| 0.05|
+-------------+--------------+-----------+-----+



In [110]:
from pyspark.sql.functions import max_by, struct

restaurant_reviews_on_business = reviews_cuisine_text
window = Window.partitionBy("state").orderBy(col("count").desc())

totals = (
    restaurant_reviews_on_business.rollup("state", "city")
    .count()
    .filter(col("state").isNotNull())
    .withColumn("rank", row_number().over(window))
    .filter(col("rank") <= 2)
    .select("state", "city", "count")
)

city_df = totals.filter(col("city").isNotNull()).select(
    "state", "city", col("count").alias("city_count")
)
state_df = totals.filter(col("city").isNull()).select(
    "state", col("count").alias("state_count")
)
totals = city_df.join(state_df, on="state")

auth_totals = (
    restaurant_reviews_on_business.filter(a_l())
    .rollup("state", "city")
    .count()
    .filter(col("state").isNotNull())
    .select("state", "city", "count")
)

auth_city_df = auth_totals.filter(col("city").isNotNull()).select(
    "state", "city", col("count").alias("auth_city_count")
)
auth_state_df = auth_totals.filter(col("city").isNull()).select(
    "state", col("count").alias("auth_state_count")
)
auth_totals = auth_city_df.join(auth_state_df, on="state")

combined = totals.join(auth_totals, on=["state", "city"], how="left")
combined = combined.withColumn(
    "city_auth_ratio", round(col("auth_city_count") / col("city_count"), 2)
).withColumn(
    "state_auth_ratio", round(col("auth_state_count") / col("state_count"), 2)
)  # .select('state', 'state_auth_ratio', 'city', 'city_auth_ratio')

combined.show()

NameError: name 'Window' is not defined

### Task 3.2.2: Hypothesis Testing

In [3]:
def l_l():
    c = lower(col("text"))
    return (
        c.contains("dirty")
        | c.contains("kitsch")
        | c.contains("cheap")
        | c.contains("rude")
        | c.contains("simple")
    )

In [9]:
review_mentions_dirty = reviews_cuisine_text.withColumn("mentions_dirty", l_l()).cache()

NameError: name 'reviews_cuisine_text' is not defined

In [8]:
review_mentions_dirty.columns

NameError: name 'review_mentions_dirty' is not defined

In [55]:
# now it contains cuisine and boolean variable if two conditions are fulfilled. We can easily operate on it!
review_mentions_authentic_or_dirty = (
    review_mentions_dirty.select("review_id", "mentions_dirty")
    .join(review_mentions_authentic, on="review_id", how="inner")
    .withColumn(
        "mentions_authentic_dirty", col("mentions_dirty") & col("mentions_authentic")
    )
    .cache()
)
authentic_review_mentions_dirty = review_mentions_authentic_or_dirty.filter(
    col("mentions_authentic")
)
authentic_review_mentions_dirty.show(n=5)

+--------------------+--------------+-------------+--------------------+-----+---------------+------------------+------------------------+
|           review_id|mentions_dirty|      cuisine|                text|state|           city|mentions_authentic|mentions_authentic_dirty|
+--------------------+--------------+-------------+--------------------+-----+---------------+------------------+------------------------+
|-GehB4C8_DeDOjToM...|         false|Mediterranean|The wife and I ag...|   IN|   Indianapolis|              true|                   false|
|-LcqwMWUQXjVjE21F...|         false|      Mexican|Taco Bus man o ma...|   FL|Treasure Island|              true|                   false|
|-WN9tM07OQF8BjyiV...|         false|      Italian|By far the best a...|   PA|   Philadelphia|              true|                   false|
|0AHkb3ZVcGY-bXNNL...|         false|Mediterranean|Tried this place ...|   PA|   Philadelphia|              true|                   false|
|0CrvG-_VaGmFbKLJU...|     

For the 2x2 contigency table we need:
- to merge them into European and South America + Asian + Soul (SAAS)
- get total
- get margin counts for European
- get margin counts for SAAS
- get margin for 'authenticity + dirty' language
- get margin for non 'authenticity + dirty' language

##### Total

In [56]:
total = authentic_review_mentions_dirty.count()
total

84858

##### Merge

In [58]:
# Write your code here...
def c_c():
    c = col("cuisine")
    return (
        c.contains("Italian") | c.contains("French") | c.contains("Mediterranean")
    )  # already filtered - no need to check for restaurant


european_saas_review_mentions_authentic_dirty = (
    authentic_review_mentions_dirty.withColumn(
        "global_cuisine", when(c_c(), "european").otherwise("asian_or_south_american")
    )
    .select("mentions_authentic_dirty", "global_cuisine")
    .cache()
)

european_saas_review_mentions_authentic_dirty.show(n=5)

+------------------------+--------------------+
|mentions_authentic_dirty|      global_cuisine|
+------------------------+--------------------+
|                   false|            european|
|                   false|asian_or_south_am...|
|                   false|            european|
|                   false|            european|
|                   false|asian_or_south_am...|
+------------------------+--------------------+
only showing top 5 rows



##### Cell Counts

In [59]:
tmp = european_saas_review_mentions_authentic_dirty.groupby(
    "global_cuisine", "mentions_authentic_dirty"
).count()
tmp.show()
tmp = tmp.collect()
print(tmp)

# i = 0
i = 0
european_true = tmp[i][-1]
assert tmp[i][0] == "european" and tmp[i][1] == True

# i = 1
i = 1
asian_or_south_american_false = tmp[i][-1]
assert tmp[i][0] == "asian_or_south_american" and tmp[i][1] == False

# i = 2
i = 2
asian_or_south_american_true = tmp[i][-1]
assert tmp[i][0] == "asian_or_south_american" and tmp[i][1] == True

# i = 3
i = 3
european_false = tmp[i][-1]
assert tmp[i][0] == "european" and tmp[i][1] == False

+--------------------+------------------------+-----+
|      global_cuisine|mentions_authentic_dirty|count|
+--------------------+------------------------+-----+
|            european|                    true| 1319|
|asian_or_south_am...|                   false|60845|
|asian_or_south_am...|                    true| 5606|
|            european|                   false|17088|
+--------------------+------------------------+-----+

[Row(global_cuisine='european', mentions_authentic_dirty=True, count=1319), Row(global_cuisine='asian_or_south_american', mentions_authentic_dirty=False, count=60845), Row(global_cuisine='asian_or_south_american', mentions_authentic_dirty=True, count=5606), Row(global_cuisine='european', mentions_authentic_dirty=False, count=17088)]


In [60]:
!pip install scipy

Collecting scipy
  Downloading scipy-1.15.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (37.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m37.7/37.7 MB[0m [31m37.8 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Installing collected packages: scipy
Successfully installed scipy-1.15.3


#### Proper Tests

#### test for independnce
This is just test for independence

$H_0$: The two variables are independent
i.e.
$H_0: (\forall i, j)[p_{ij} = p_i \cdot q_j]$

In [62]:
import numpy as np
from scipy.stats import chi2_contingency

table = np.array(
    [
        [european_true, european_false],
        [asian_or_south_american_true, asian_or_south_american_false],
    ]
)
chi2, p, dof, expected = chi2_contingency(table)

print("Chi-square statistic:", chi2)
print("p-value:", p)
print("Degrees of freedom:", dof)
print("Expected frequencies:")
print(expected)
print("Observed frequencies:")
print(table)

Chi-square statistic: 30.877183496580166
p-value: 2.748846254431418e-08
Degrees of freedom: 1
Expected frequencies:
[[ 1502.13857267 16904.86142733]
 [ 5422.86142733 61028.13857267]]
Observed frequencies:
[[ 1319 17088]
 [ 5606 60845]]


Conclusion, is that we reject $H_0$ in favour of $H_1$ so actually these are dependent.

##### Directional test
So here, I treat european and asian_or_south_american as population (for 2x2 it's fine), and start with hypothesis that:

Probability of Authenticity+Dirty language in SAAS is higher than in European restaurant i.e. 

$H_0: p_{11} = p_{21}$ vs $H_A:p_{11}<p_{21}$

In [63]:
probs = table / table.sum(axis=1)[:, np.newaxis]
(p_11, p_12), (p_21, p_22) = probs
probs

array([[0.07165752, 0.92834248],
       [0.08436291, 0.91563709]])

1 check: if indeed the prob for SAAS is higher

In [64]:
assert p_11 < p_21, "H_A is rejected"

2nd check: divide $p$ value by half and get info

In [65]:
print(p / 2)

1.374423127215709e-08


In [66]:
p_11

0.07165752159504536

In [68]:
from scipy.stats import norm

# Compute sample sizes for each group
n1 = european_true + european_false
n2 = asian_or_south_american_true + asian_or_south_american_false
p2 = p_21
p1 = p_11
diff = p2 - p1
SE_unpooled = np.sqrt(p1 * (1 - p1) / n1 + p2 * (1 - p2) / n2)
z_star = norm.ppf(1 - 0.05 / 2)
margin = z_star * SE_unpooled
CI_lower = diff - margin
CI_upper = diff + margin
print("Group 1 (European):")
print("  n =", n1, "; p1 =", p1)
print("Group 2 (Asian/South American):")
print("  n =", n2, "; p2 =", p2)
print("Difference in proportions (p1 - p2):", diff)
print("Unpooled Standard Error:", SE_unpooled)
print(
    "95% Confidence Interval for the difference: ({:.4f}, {:.4f})".format(
        CI_lower, CI_upper
    )
)

Group 1 (European):
  n = 18407 ; p1 = 0.07165752159504536
Group 2 (Asian/South American):
  n = 66451 ; p2 = 0.08436291402687694
Difference in proportions (p1 - p2): 0.01270539243183158
Unpooled Standard Error: 0.0021855063725427235
95% Confidence Interval for the difference: (0.0084, 0.0170)


As we see, we have enough info to accept H_A in favour of H_0

### Task 3.3: Building a Rating Prediction Model

### Let's recall fields again

In [39]:
users.columns

['average_stars',
 'compliment_cool',
 'compliment_cute',
 'compliment_funny',
 'compliment_hot',
 'compliment_list',
 'compliment_more',
 'compliment_note',
 'compliment_photos',
 'compliment_plain',
 'compliment_profile',
 'compliment_writer',
 'cool',
 'elite',
 'fans',
 'friends',
 'funny',
 'name',
 'review_count',
 'useful',
 'user_id',
 'yelping_since']

In [5]:
reviews.columns

['business_id',
 'cool',
 'date',
 'funny',
 'review_id',
 'stars',
 'text',
 'useful',
 'user_id']

In [73]:
business.columns

['address',
 'attributes',
 'business_id',
 'categories',
 'city',
 'hours',
 'is_open',
 'latitude',
 'longitude',
 'name',
 'postal_code',
 'review_count',
 'stars',
 'state',
 'cuisine']

### Model Training

In [4]:
from pyspark.sql.functions import udf


def zero_center_stars(star):
    return star - 1


zero_center_stars_udf = udf(zero_center_stars)

In [5]:
import tempfile

from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC, LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.feature import (
    HashingTF,
    OneHotEncoder,
    StringIndexer,
    Tokenizer,
    VectorAssembler,
    IDF,
    StopWordsRemover
)
from pyspark.ml.regression import LinearRegression
from pyspark.sql import functions as f
from pyspark.sql.functions import asc, col, desc, lower, when, udf

In [6]:
def l_l():
    c = lower(col("text"))
    return (
        c.contains("dirty")
        | c.contains("kitsch")
        | c.contains("cheap")
        | c.contains("rude")
        | c.contains("simple")
    )

def a_l():
    c = lower(col("text"))
    return c.contains("legitimate") | c.contains("authentic")
def has(cuisine: str):
    c = col("categories")
    return c.contains(cuisine) & c.contains("Restaurants")


business = business.withColumn(
    "cuisine",
    when(has("Mexican"), "Mexican")
    .when(has("Chinese"), "Chinese")
    .when(has("Thai"), "Thai")
    .when(has("Japanese"), "Japanese")
    .when(has("Indian"), "Indian")
    .when(has("French"), "French")
    .when(has("Italian"), "Italian")
    .when(has("Korean"), "Korean")
    .when(has("Mediterranean"), "Mediterranean")
    .when(has("Soul"), "Soul")  # .when(has("German"), "German")\
    # .when(has("Greek"), "Greek")\
    # .when(has("Spanish"), "Spanish")\
    .otherwise("non-restaurant"),
)



data = (
    business.filter(col("cuisine") != "non-restaurant")
    .select("business_id", "state")
    .join(reviews, on="business_id", how="inner")
    .withColumn("mentions_dirty", (l_l()).cast("integer"))
    .withColumn("mentions_authentic", (a_l()).cast("integer"))
    .select(
        "text", "stars", "state", "mentions_dirty", "mentions_authentic", "review_id"
    )
    .cache()
)


def stratified_train_test_split(df, frac, label, join_on, seed=42):
    """stratfied split of a dataframe in train and test set."""
    fractions = (
        df.select(label)
        .distinct()
        .withColumn("fraction", f.lit(frac))
        .rdd.collectAsMap()
    )
    df_frac = df.stat.sampleBy(label, fractions, seed)
    df_remaining = df.join(df_frac, on=join_on, how="left_anti")
    return df_frac, df_remaining



In [7]:
# reviews_sam = data.sample(withReplacement=False, fraction=1/100)
# reviews_sam = reviews_sam.withColumn('stars', zero_center_stars(reviews_sam['stars']))
# reviews_sam.count()

raw_train_valid, raw_test = stratified_train_test_split(
    data, 0.8, label="stars", join_on="review_id"
)
raw_train, raw_valid = stratified_train_test_split(
    raw_train_valid, 0.75, label="stars", join_on="review_id"
)

In [8]:
raw_train_valid=raw_train_valid.cache()
raw_test=raw_test.cache()
raw_train=raw_train.cache()
raw_valid=raw_valid.cache()

print(
    raw_train_valid.count(),
    raw_test.count(),
    raw_train.count(),
    raw_valid.count()
)

1371571 344547 1028658 342913


In [21]:
# text
tokenizer = Tokenizer(inputCol =  "text", outputCol="words")
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="clean_text")

hashingtf = HashingTF(
    numFeatures=2**14, inputCol=remover.getOutputCol(), outputCol="rawTextFeatures"
)
idf = IDF(inputCol = hashingtf.getOutputCol(), outputCol="textFeatures")

feats = {"featuresCol": "allFeatures", "labelCol": "stars"}

### Old approach

In [None]:
# combine
assembler = VectorAssembler(inputCols=["textFeatures"], outputCol="allFeatures")

In [17]:
lsvc = LinearSVC(maxIter=10, regParam=0.1, **feats)
ovr = OneVsRest(classifier=lsvc, **feats)
revaluator = RegressionEvaluator(predictionCol="prediction", labelCol="stars")

### Optional params

In [44]:
state_indexer = StringIndexer(
    inputCol="state", outputCol="stateIndex", handleInvalid="keep"
)
state_encoder = OneHotEncoder(inputCols=["stateIndex"], outputCols=["stateVec"])

### New

In [10]:
test_lr.dtypes

NameError: name 'test_lr' is not defined

In [23]:
import math

from pyspark.sql.types import DoubleType

# Models\# -- **Multinomial Logistic Regression** --

assembler = VectorAssembler(inputCols=["textFeatures"], outputCol="allFeatures")

mlr = LogisticRegression(
    family="multinomial",
    featuresCol="allFeatures",
    labelCol="stars",
)
# -- **Linear Regression (with sigmoid head)** --
lr = LinearRegression(
    featuresCol="allFeatures",
    labelCol="stars",
)
def squash(z):
    size = 4
    s = float(1 / (1 + math.exp(-z)) * size) + 1
    return s
squash_udf = udf(squash, DoubleType())
pipe_mlr = Pipeline(stages=[tokenizer, remover, hashingtf, idf, assembler, mlr])
pipe_lr = Pipeline(stages=[tokenizer, remover, hashingtf, idf, assembler, lr])

# Fit
trained_mlr = pipe_mlr.fit(raw_train)
trained_lr = pipe_lr.fit(raw_train)

IllegalArgumentException: requirement failed: Column textFeatures already exists.

In [None]:
# Predict
mlr_valid_preds = trained_mlr.transform(raw_valid)

# Post-process lr prediction
lr_valid_preds = trained_lr.transform(raw_valid)
# lr_valid_preds = lr_valid_preds.withColumn("prediction", squash_udf(col("prediction")))

In [None]:
# Evaluators
eval_mae = RegressionEvaluator(
    predictionCol="prediction", labelCol="stars", metricName="mae"
)
eval_rmse = RegressionEvaluator(
    predictionCol="prediction", labelCol="stars", metricName="rmse"
)
eval_acc = MulticlassClassificationEvaluator(
    predictionCol="prediction", labelCol="stars", metricName="accuracy"
)

In [None]:
# Evaluate
metrics = {
    "MLR": {
        "MAE": eval_mae.evaluate(mlr_valid_preds),
        "RMSE": eval_rmse.evaluate(mlr_valid_preds),
        "Acc": eval_acc.evaluate(mlr_valid_preds),
    },
    "LR": {"MAE": eval_mae.evaluate(lr_valid_preds), 
           "RMSE": eval_rmse.evaluate(lr_valid_preds)},
}

print(metrics)

In [None]:
lr_valid_preds = trained_lr.transform(raw_valid)
lr_valid_preds = lr_valid_preds.withColumn("prediction", squash_udf(col("prediction")))

In [None]:
eval_mae.evaluate(lr_valid_preds) 

In [None]:
eval_rmse.evaluate(lr_valid_preds)

#### Baseline (val)

In [33]:
mean_star = float(raw_valid.agg({"stars": "avg"}).first()[0])

In [34]:
eval_mae.evaluate(raw_valid.withColumn("prediction", f.lit(mean_star)))

1.1408135381241062

In [35]:
eval_rmse.evaluate(raw_valid.withColumn("prediction", f.lit(mean_star)))

1.379874173289605

### Below I compute four model for the four different setups.

In [40]:
def train_and_eval(pipeline, train, test):
    trained_model_pipeline = pipeline.fit(train)
    preds = trained_model_pipeline.transform(test)

    metrics = {
        "MLR": {
            "MAE": eval_mae.evaluate(preds),
            "RMSE": eval_rmse.evaluate(preds),
            "accuracy": eval_acc.evaluate(preds),
        }
    }
    print(metrics)
    return pipeline, metrics

In [41]:
# combine
assembler = VectorAssembler(inputCols=["textFeatures"], outputCol="allFeatures")

pipeline = Pipeline(stages=[tokenizer, hashingtf, assembler, mlr])

pipeline,metrics=train_and_eval(pipeline, raw_train, raw_valid)

{'MLR': {'MAE': 0.41020608725828417, 'RMSE': 0.7899612334498372, 'accuracy': 0.6659969146693184}}


In [42]:
assembler = VectorAssembler(
    inputCols=["textFeatures", "mentions_dirty", "mentions_authentic"],
    outputCol="allFeatures",
)

pipeline = Pipeline(stages=[tokenizer, hashingtf, assembler, mlr])

pipeline,metrics=train_and_eval(pipeline, raw_train, raw_valid)

{'MLR': {'MAE': 0.41021483583299556, 'RMSE': 0.7898929365841179, 'accuracy': 0.6659590041789025}}


In [45]:
assembler = VectorAssembler(
    inputCols=["textFeatures", "stateVec"], outputCol="allFeatures"
)

pipeline = Pipeline(
    stages=[tokenizer, hashingtf, state_indexer, state_encoder, assembler, mlr]
)

pipeline,metrics=train_and_eval(pipeline, raw_train, raw_valid)

{'MLR': {'MAE': 0.4101535958100159, 'RMSE': 0.7898873987415991, 'accuracy': 0.6660173280103117}}


In [46]:
assembler = VectorAssembler(
    inputCols=["textFeatures", "stateVec", "mentions_dirty", "mentions_authentic"],
    outputCol="allFeatures",
)


pipeline = Pipeline(
    stages=[tokenizer, hashingtf, state_indexer, state_encoder, assembler, mlr]
)

pipeline,metrics=train_and_eval(pipeline, raw_train, raw_valid)

{'MLR': {'MAE': 0.4101215177027409, 'RMSE': 0.7897674026184484, 'accuracy': 0.6660056632440299}}


In [47]:
assembler = VectorAssembler(
    inputCols=["stateVec", "mentions_dirty", "mentions_authentic"],
    outputCol="allFeatures",
)


pipeline = Pipeline(stages=[state_indexer, state_encoder, assembler, mlr])

pipeline,metrics=train_and_eval(pipeline, raw_train, raw_valid)

{'MLR': {'MAE': 1.1616182530262777, 'RMSE': 1.7993792950109329, 'accuracy': 0.4568243257036041}}


### Cross Val

In [51]:
import math

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import avg, col, explode, expr, lit, udf
from pyspark.sql.types import DoubleType

In [85]:
assembler = VectorAssembler(inputCols=["textFeatures"], outputCol="allFeatures")
pipeline = Pipeline(stages=[tokenizer, hashingtf, assembler, mlr])
paramGrid = (
    ParamGridBuilder().addGrid(mlr.regParam, [1e-5, 1e-4, 1e-3])
    .addGrid(hashingtf.numFeatures, [2**12, 2**13, 2**14])
    .build()
)
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=RegressionEvaluator(
        labelCol="stars", predictionCol="prediction", metricName="rmse"
    ),
    numFolds=3,
    parallelism=1,
)

# Fit CV model and select best
cv_model = cv.fit(raw_train_valid)
best_model = cv_model.bestModel

IllegalArgumentException: clean_text does not exist. Available: text, stars, state, mentions_dirty, mentions_authentic, review_id, CrossValidator_4e62581a6a7b_rand, words

In [57]:
cv_model.avgMetrics

[1.7981911068677103,
 1.7981911068677103,
 1.7981911068677103,
 1.7981911068677103,
 1.7981911068677103,
 1.7981911068677103,
 1.7981911068677103,
 1.7981911068677103,
 1.7981911068677103]

In [56]:
cv_model.getEstimatorParamMaps()

[{Param(parent='LogisticRegression_c0720e4be8fd', name='regParam', doc='regularization parameter (>= 0).'): 1e-05,
  Param(parent='HashingTF_09da511e2252', name='numFeatures', doc='Number of features. Should be greater than 0.'): 16384},
 {Param(parent='LogisticRegression_c0720e4be8fd', name='regParam', doc='regularization parameter (>= 0).'): 1e-05,
  Param(parent='HashingTF_09da511e2252', name='numFeatures', doc='Number of features. Should be greater than 0.'): 32768},
 {Param(parent='LogisticRegression_c0720e4be8fd', name='regParam', doc='regularization parameter (>= 0).'): 1e-05,
  Param(parent='HashingTF_09da511e2252', name='numFeatures', doc='Number of features. Should be greater than 0.'): 65536},
 {Param(parent='LogisticRegression_c0720e4be8fd', name='regParam', doc='regularization parameter (>= 0).'): 0.0001,
  Param(parent='HashingTF_09da511e2252', name='numFeatures', doc='Number of features. Should be greater than 0.'): 16384},
 {Param(parent='LogisticRegression_c0720e4be8fd

In [55]:
best_model.stages

[StringIndexerModel: uid=StringIndexer_1b7666625488, handleInvalid=keep,
 OneHotEncoderModel: uid=OneHotEncoder_f8ce09f92290, dropLast=true, handleInvalid=error, numInputCols=1, numOutputCols=1,
 VectorAssembler_a14fd2c14970,
 LogisticRegressionModel: uid=LogisticRegression_c0720e4be8fd, numClasses=6, numFeatures=16]

In [145]:
best_model.stages[-1].getRegParam()
best_numFeatures = best_model.stages[1].getNumFeatures()
print(f"Best MLR regParam: {best_regParam}, numFeatures: {best_numFeatures}")

Best MLR regParam: 1e-05, numFeatures: 16384


In [146]:
import numpy as np

cv_model.getEstimatorParamMaps()[np.argmin(cv_model.avgMetrics)]

{Param(parent='LogisticRegression_bb8ab41a3990', name='regParam', doc='regularization parameter (>= 0).'): 0.001}

In [52]:
cv_model.getEstimatorParamMaps()

[{Param(parent='LogisticRegression_c0720e4be8fd', name='regParam', doc='regularization parameter (>= 0).'): 1e-05,
  Param(parent='HashingTF_09da511e2252', name='numFeatures', doc='Number of features. Should be greater than 0.'): 16384},
 {Param(parent='LogisticRegression_c0720e4be8fd', name='regParam', doc='regularization parameter (>= 0).'): 1e-05,
  Param(parent='HashingTF_09da511e2252', name='numFeatures', doc='Number of features. Should be greater than 0.'): 32768},
 {Param(parent='LogisticRegression_c0720e4be8fd', name='regParam', doc='regularization parameter (>= 0).'): 1e-05,
  Param(parent='HashingTF_09da511e2252', name='numFeatures', doc='Number of features. Should be greater than 0.'): 65536},
 {Param(parent='LogisticRegression_c0720e4be8fd', name='regParam', doc='regularization parameter (>= 0).'): 0.0001,
  Param(parent='HashingTF_09da511e2252', name='numFeatures', doc='Number of features. Should be greater than 0.'): 16384},
 {Param(parent='LogisticRegression_c0720e4be8fd

In [54]:
cv_model.avgMetrics

[1.7981911068677103,
 1.7981911068677103,
 1.7981911068677103,
 1.7981911068677103,
 1.7981911068677103,
 1.7981911068677103,
 1.7981911068677103,
 1.7981911068677103,
 1.7981911068677103]

In [134]:
# Evaluate best model on test set
test_preds = best_model.transform(raw_test)
# Create numeric prediction via expectation over probability vector
metrics = {
    "MLR": {
        "MAE": eval_mae.evaluate(test_preds),
        "RMSE": eval_rmse.evaluate(test_preds),
        "accuracy": eval_acc.evaluate(test_preds),
    }
}

print(metrics)

{'MLR': {'MAE': 0.6873790166473094, 'RMSE': 1.155287127463761, 'accuracy': 0.5307781649245064}}
