<a href="https://colab.research.google.com/github/ChristianaZorzi/DSC511-Group-Project-Final/blob/main/Classification_Modelling.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Logistic Regression Model for predicting the sentiment of the reviews

## Loading libraries

In [None]:
# Installing pyspark
! pip3 install pyspark



In [None]:
# Installing langdetect
!pip install langdetect

Collecting langdetect
  Downloading langdetect-1.0.9.tar.gz (981 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/981.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m122.9/981.5 kB[0m [31m4.5 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m972.8/981.5 kB[0m [31m15.1 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m981.5/981.5 kB[0m [31m9.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: langdetect
  Building wheel for langdetect (setup.py) ... [?25l[?25hdone
  Created wheel for langdetect: filename=langdetect-1.0.9-py3-none-any.whl size=993223 sha256=ee3500a67516e61b887c7f092aaae24416b7ced2b6ec4bd99c073eaabdf6137e
  Stored in directory: /root/.cache/pip/wheels/0a/f2/b2/e5ca405801e05eb7c8ed5b3b4bcf1fcabcd62

In [None]:
# Importing Libraries
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import seaborn as sns
# from google.colab import drive
from pyspark.sql.window import Window
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors, DenseVector
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import StringType
from langdetect import detect, DetectorFactory
import langdetect
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
spark = SparkSession.builder \
        .master("local[24]") \
       .config("spark.driver.memory", "70g") \
       .config("spark.executor.instances", "6") \
       .config("spark.executor.cores", "7") \
       .config("spark.executor.memory", "70g") \
       .config("spark.memory.fraction", "0.8") \
       .config("spark.memory.storageFraction", "0.2") \
       .config("spark.default.parallelism", "288") \
       .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
       .config("spark.kryoserializer.buffer.max", "128m") \
       .config("spark.driver.extraJavaOptions", "-Xss1024m") \
       .config("spark.executor.extraJavaOptions", "-Xss1024m") \
       .appName("Group_Project_Reviews_Covid") \
       .getOrCreate()

## Loading the datasets

In [None]:
parquet_path_post = "data/parquet"
post_df = spark.read.parquet(parquet_path_post)

In [None]:
parquet_path_pre = "data/pre_parquet"
pre_df = spark.read.parquet(parquet_path_pre)

## Data Preprocessing

Checking for missing values


In [None]:
# check for missing values
pre_df = pre_df.replace("NULL", None)
#pre_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in pre_df.columns]).show()

In [None]:
# check for missing values
post_df = post_df.replace("NULL", None)
#post_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in post_df.columns]).show()

Both datasets have some missing values in address,hours and postal code columns.
Since we will mostly focus on the reviews we decided to not drop them.
Only the one missing review was removed.

In [None]:
pre_df = pre_df.dropna(subset=["text_"])

Removing duplicated reviews

In [None]:
# Checking for duplicated values
#print(pre_df.count())
pre_df = pre_df.dropDuplicates()
#print(pre_df.count())
# there are no duplicated rows

In [None]:
# Checking for duplicated values
#print(post_df.count())
post_df = post_df.dropDuplicates()
#print(post_df.count())
# there are no duplicated rows

In [None]:
# Drop duplicate reviews (same user, business, and text)
post_df = post_df.dropDuplicates(["user_id", "business_id", "text_"])
#post_df.count()
# Number of rows before removing duplicated reviews: 400295

In [None]:
# Drop duplicate reviews (same user, business, and text)
pre_df = pre_df.dropDuplicates(["user_id", "business_id", "text_"])
#pre_df.count()
# Number of rows before removing duplicated reviews: 5172198

Date Range of Post and Pre Covid dataset

In [None]:
print('Date Range Pre-Covid:', pre_df.select(min('date_')).collect()[0][0], '-', pre_df.select(max('date_')).collect()[0][0])
print('Date Range Post-Covid:', post_df.select(min('date_')).collect()[0][0], '-', post_df.select(max('date_')).collect()[0][0])

Date Range Pre-Covid: 2004-10-13 00:03:20 - 2019-12-30 23:59:16
Date Range Post-Covid: 2020-01-01 00:00:01 - 2021-01-27 23:59:54


 The first American case was reported on January 20, and Health and Human Services Secretary Alex Azar declared a public health emergency on January 31.
 https://en.wikipedia.org/wiki/COVID-19_pandemic_in_the_United_States

In [None]:
pre_covid_reviews = post_df.filter(col("date_")< to_timestamp(lit("2020-01-20 00:00:01")))
#pre_covid_reviews.count()

 There are 31633 reviews that were written before 20/01,which was when the first covid case was reported in America

Removing those reviews from the post covid dataset

In [None]:
post_df =post_df.filter(col("date_") >= to_timestamp(lit("2020-01-20 00:00:01")))
post_df.count()

368193

Adding those reviews in the pre covid dataset

In [None]:
pre_df = pre_df.union(pre_covid_reviews)
#pre_df.show()

#### Location

##### Location - Preprocessing - Pre df

At a first sight in the pre covid dataset we have a total of 450 cities.
But lets check like before.

Same approach we used in post covid dataframe.

In [None]:
# Let's clean and normalize city column
# Converting the first letters to capital ones and by using trim we remove
# extra spaces
pre_df=pre_df.withColumn("city",initcap(trim(col("city"))))
#pre_df.show()

In [None]:
# Finding the unique cities
pre_df.select(col("city")).distinct().count()

415

As we expected the unique cities are reduced. Now we have a total of 415.

In [None]:
# Removing any punctuations and extra spaces for comparison
# e.g. St.Cloud now will be stcloud or St.  Cloud and St. Cloud are treated
# as two distincts cities

pre_df = pre_df.withColumn("city_key", lower(regexp_replace(col("city"), r"[^a-zA-Z0-9]", "")))

In [None]:
# Creating window to rank cities by count per city_key
# For each group of rows with the same city_key sort them by how often they appear
windowSpec = Window.partitionBy("city_key").orderBy(col("count").desc())

# Counting how many times city name appears per city
city_counts = pre_df.groupBy("city_key", "city").count()

# Picking the most common city_clean for each city_key
standard_cities = city_counts.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") == 1)

### Now we have a standarized column, e.g. stcloud-> St. Cloud

In [None]:
# Renaming the selected column
standard_cities = standard_cities.withColumnRenamed("city", "city_standardized")

# Preforming join to have a final dataframe
pre_df = pre_df.join(
    standard_cities.select("city_key", "city_standardized"),
    on="city_key",
    how="left"
)


In [None]:
# Dropping city key and city columns, because they are not useful
pre_df=pre_df.drop("city_key", "city")

In [None]:
pre_df.select("city_standardized").distinct().count()

404

The final total number of cities in the pre covid dataframe is 351.

##### Location - Preprocessing - Post df

In [None]:
post_df.show(5)

+--------------------+--------------------+--------------------+------+--------+-----------+-------------+--------------+-----+------------+-------+--------------------+--------------------+--------------------+--------------------+--------------+------+-----+----+--------------------+-------------------+
|         business_id|                name|             address|state_|    city|postal_code|     latitude|     longitude|stars|review_count|is_open|          categories|               hours|           review_id|             user_id|customer_stars|useful|funny|cool|               text_|              date_|
+--------------------+--------------------+--------------------+------+--------+-----------+-------------+--------------+-----+------------+-------+--------------------+--------------------+--------------------+--------------------+--------------+------+-----+----+--------------------+-------------------+
|iTLwEi5Kc9fDGlKNs...|JP's Pancake Company|  2512 Rio Grande St|    TX|  Austin

In [None]:
# Printing unique Cities
post_df.select(col("city")).distinct().count()

As we can notice at first the post covid dataset contained 394 distinct cities, but with a detailed inspection we observed that there was similar cities just written differentlty.

In [None]:
# Let's clean and normalize city column
# Converting the first letters to capital ones and by using trim we remove
# extra spaces
# initcamp convert the column to title case, where the first letter is always capital
post_df=post_df.withColumn("city",initcap(trim(col("city"))))
post_df.show()

In [None]:
post_df.select(col("city")).distinct().count()

We can notice that the unique cities
are reduced to 360.

In [None]:
# Removing any punctuations and extra spaces for comparison
# e.g. St.Cloud now will be stcloud
from pyspark.sql.functions import lower, regexp_replace

post_df = post_df.withColumn("city_key", lower(regexp_replace(col("city"), r"[^a-zA-Z0-9]", "")))


In [None]:
# Let's see how many cities we have with the same key
post_df.groupBy("city_key", "city").count().orderBy("city_key").show(360, truncate=False)

+---------------------+-----------------------+-----+
|city_key             |city                   |count|
+---------------------+-----------------------+-----+
|acworth              |acworth                |2    |
|alafaya              |Alafaya                |79   |
|alamontesprings      |Alamonte Springs       |7    |
|allston              |Allston                |991  |
|aloha                |Aloha                  |304  |
|aloma                |Aloma                  |19   |
|altamontesprings     |Altamonte Springs      |1748 |
|andover              |Andover                |12   |
|apopka               |Apopka                 |702  |
|arlington            |Arlington              |590  |
|arvada               |Arvada                 |2    |
|ashville             |Ashville               |14   |
|atlanta              |Atlanta                |44349|
|atlanta              |ATLANTA                |53   |
|auburndale           |Auburndale             |18   |
|austell              |Auste

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Creating window to rank cities by count per city_key
# For each group of rows with the same city_key sort them by how often they appear
# By using window we create a subset of dataframe where all rows have the same key
# and within each window rows are ordered by the count in descending order
windowSpec = Window.partitionBy("city_key").orderBy(col("count").desc())

# Counting how many times city name appears per city
city_counts = post_df.groupBy("city_key", "city").count()

# Picking the most common city_clean for each city_key
standard_cities = city_counts.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") == 1)

### Now we have a standarized column, e.g. stcloud-> St. Cloud
#### we don't have multiple variations like stcloud, St.  Cloud and St.CLoud etc

###### Explanation of window
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Window.html


Break data into groups based on city key (all rows same city key are groupped together).

In [None]:
# Renaming the selected column
standard_cities = standard_cities.withColumnRenamed("city", "city_standardized")

# Preforming join to have a final dataframe
post_df = post_df.join(
    standard_cities.select("city_key", "city_standardized"),
    on="city_key",
    how="left"
)


In [None]:
# Dropping city key and city columns, because they are not useful
post_df=post_df.drop("city_key", "city")

In [None]:
post_df.show()

+--------------------+--------------------+--------------------+------+-----------+-------------+--------------+-----+------------+-------+--------------------+--------------------+--------------------+--------------------+--------------+------+-----+----+--------------------+-------------------+-----------------+
|         business_id|                name|             address|state_|postal_code|     latitude|     longitude|stars|review_count|is_open|          categories|               hours|           review_id|             user_id|customer_stars|useful|funny|cool|               text_|              date_|city_standardized|
+--------------------+--------------------+--------------------+------+-----------+-------------+--------------+-----+------------+-------+--------------------+--------------------+--------------------+--------------------+--------------+------+-----+----+--------------------+-------------------+-----------------+
|OvfeSj9IPZPsyub5u...|Jose's Torta Mexi...|793 Massa

In [None]:
post_df.select("city_standardized").distinct().count()

The final unique cities  of our dataset
are 351.

#### States

##### States - Post *Df*

In [None]:
# Finding the unique states
post_df.select(col("state_")).distinct().count()

14

ABE is not a regognized USA state so let's investigate further.

In [None]:
# Finding rows with ABE state
post_df.filter(col("state_")=="ABE").count()

3

In [None]:
# Displaying the details for those 3 rows
post_df.filter(col("state_") == "ABE") \
    .select("name", "address", "city_standardized", "postal_code", "latitude", "longitude") \
    .show(truncate=False)

+-------------------+------------------+-----------------+-----------+--------+-----------+
|name               |address           |city_standardized|postal_code|latitude|longitude  |
+-------------------+------------------+-----------------+-----------+--------+-----------+
|Kitanoya Guu Garlic|1698 Robson Street|Vancouver        |V6G 1C7    |49.29065|-123.133761|
|Kitanoya Guu Garlic|1698 Robson Street|Vancouver        |V6G 1C7    |49.29065|-123.133761|
|Kitanoya Guu Garlic|1698 Robson Street|Vancouver        |V6G 1C7    |49.29065|-123.133761|
+-------------------+------------------+-----------------+-----------+--------+-----------+



There is no state called ABE. Actually, the above restauraunts are in the Vancouver, British Columbia, Canada, so the correct province is BC.

In [None]:
# Changing to the correct province
post_df=post_df.withColumn("state_",
                          when((col("state_") == "ABE") & (col("city_standardized") == "Vancouver"), "BC")
    .otherwise(col("state_"))
)

In [None]:
# Checking if the changes are applied
post_df.filter(col("state_")=="ABE").count()

0

In [None]:
post_df.select("state_").distinct().count()

13

In [None]:
post_df.select("state_").distinct().show()

+------+
|state_|
+------+
|    OR|
|    VA|
|    WY|
|    KY|
|    BC|
|    NH|
|    WA|
|    OH|
|    TX|
|    GA|
|    MA|
|    FL|
|    CO|
+------+



In conclusion in the post covid dataset we have a total of  13  unique states.

In [None]:
post_df.groupBy("state_").agg(countDistinct("business_id").alias("business_count")).orderBy("state_", ascending=False).show()

+------+--------------+
|state_|business_count|
+------+--------------+
|    WY|             1|
|    WA|           524|
|    VA|             1|
|    TX|          3163|
|    OR|          4209|
|    OH|          2875|
|    NH|             1|
|    MA|          6067|
|    KY|             1|
|    GA|          3750|
|    FL|          5127|
|    CO|           480|
|    BC|          3862|
+------+--------------+



Florida, Massachusetts, Georgia and Texas seems to have the most restauraunts.

In [None]:
#Total number of reviews for each business in each state
state_reviews=post_df.groupBy("state_", "business_id").agg(sum("review_count").alias("review_count")).orderBy("review_count", ascending=False)

The top reviewed states in terms of businesses are Texas, Georgia, Oregon and Florida.

##### States - Pre Df

In [None]:
# Finding the unique states
pre_df.select(col("state_")).distinct().count()

16

The pre covid dataset contains 16 states. Two more than the post covid one.
But let's check like before.

In [None]:
# Printing Unique States
pre_df.select(col("state_")).distinct().show()

+------+
|state_|
+------+
|    OR|
|    WY|
|    BC|
|    WA|
|    OH|
|    TX|
|    GA|
|    MA|
|    KS|
|    FL|
|    CO|
|    VA|
|    NH|
|    MN|
|    KY|
|   ABE|
+------+



Keeping the same approach with pre covid dataset.

In [None]:
# Finding rows with ABE state
pre_df.filter(col("state_")=="ABE").count()

11

In [None]:
# Displaying the details for those 11 rows
pre_df.filter(col("state_") == "ABE") \
    .select("name", "address", "city_standardized", "postal_code", "latitude", "longitude") \
    .show(truncate=False)

+-------------------+------------------+-----------------+-----------+--------+-----------+
|name               |address           |city_standardized|postal_code|latitude|longitude  |
+-------------------+------------------+-----------------+-----------+--------+-----------+
|Kitanoya Guu Garlic|1698 Robson Street|Vancouver        |V6G 1C7    |49.29065|-123.133761|
|Kitanoya Guu Garlic|1698 Robson Street|Vancouver        |V6G 1C7    |49.29065|-123.133761|
|Kitanoya Guu Garlic|1698 Robson Street|Vancouver        |V6G 1C7    |49.29065|-123.133761|
|Kitanoya Guu Garlic|1698 Robson Street|Vancouver        |V6G 1C7    |49.29065|-123.133761|
|Kitanoya Guu Garlic|1698 Robson Street|Vancouver        |V6G 1C7    |49.29065|-123.133761|
|Kitanoya Guu Garlic|1698 Robson Street|Vancouver        |V6G 1C7    |49.29065|-123.133761|
|Kitanoya Guu Garlic|1698 Robson Street|Vancouver        |V6G 1C7    |49.29065|-123.133761|
|Kitanoya Guu Garlic|1698 Robson Street|Vancouver        |V6G 1C7    |49.29065|-

In [None]:
# Changing to the correct province
pre_df=pre_df.withColumn("state_",
                          when((col("state_") == "ABE") & (col("city_standardized") == "Vancouver"), "BC")
    .otherwise(col("state_"))
)

In [None]:
# Checking if the changes are applied
pre_df.filter(col("state_")=="ABE").count()

In [None]:
pre_df.select("state_").distinct().count()

15

In [None]:
pre_df.select(col("state_")).distinct().show()

+------+
|state_|
+------+
|    MN|
|    OR|
|    VA|
|    WY|
|    KY|
|    BC|
|    NH|
|    WA|
|    OH|
|    TX|
|    GA|
|    MA|
|    KS|
|    FL|
|    CO|
+------+



In conclusion we have 13 unique states in the pre dataset.

In [None]:
state_counts=pre_df.groupBy("state_").agg(countDistinct("business_id").alias("business_count")).orderBy("state_", ascending=False)

In [None]:
# Finding NULL string in columns
#post_df = post_df.replace("NULL", None)
pre_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in pre_df.columns]).show()

+-----------+----+-------+------+-----------+--------+---------+-----+------------+-------+----------+------+---------+-------+--------------+------+-----+----+-----+-----+-----------------+
|business_id|name|address|state_|postal_code|latitude|longitude|stars|review_count|is_open|categories| hours|review_id|user_id|customer_stars|useful|funny|cool|text_|date_|city_standardized|
+-----------+----+-------+------+-----------+--------+---------+-----+------------+-------+----------+------+---------+-------+--------------+------+-----+----+-----+-----+-----------------+
|          0|   0|  13453|     0|        384|       0|        0|    0|           0|      0|         0|217149|        0|      0|             0|     0|    0|   0|    0|    0|                0|
+-----------+----+-------+------+-----------+--------+---------+-----+------------+-------+----------+------+---------+-------+--------------+------+-----+----+-----+-----+-----------------+



In [None]:
#Total number of reviews for each business in each state
state_reviews=pre_df.groupBy("state_", "business_id").agg(sum("review_count").alias("review_count")).orderBy("review_count", ascending=False)

The states in the pre covid dataset that contain the most reviews are: Texas, Oregon,Massachusseuts and Georgia.

#### Hours column

In [None]:
post_df.select("hours").printSchema()

root
 |-- hours: string (nullable = true)



In [None]:
# convert the hours column into dictionary

from pyspark.sql.types import MapType, StringType

# Define the schema for the 'hours' column as a Map[String, String]
schema = MapType(StringType(), StringType())

# Convert the 'hours' column from string to a MapType
post_df = post_df.withColumn("hours", from_json(post_df["hours"], schema))

# Verify the schema
post_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- state_: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars: double (nullable = true)
 |-- review_count: integer (nullable = true)
 |-- is_open: integer (nullable = true)
 |-- categories: string (nullable = true)
 |-- hours: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- customer_stars: integer (nullable = true)
 |-- useful: integer (nullable = true)
 |-- funny: integer (nullable = true)
 |-- cool: integer (nullable = true)
 |-- text_: string (nullable = true)
 |-- date_: timestamp (nullable = true)
 |-- city_standardized: string (nullable = true)



Count how many businesses have null hours for all days of the week.

In [None]:
## Find records with null values in the hours column or where all days have null values

from pyspark.sql.functions import col, size, map_values

# Filter rows where 'hours' is null or where all values in the 'hours' map are null
df_filtered = post_df.filter(
    (col('hours').isNull()) |  # Check if the 'hours' column is null
    ((size(map_values(col('hours'))) > 0) &  # Check if the map has any entries
     (col('hours').getItem('Monday').isNull() &
      col('hours').getItem('Tuesday').isNull() &
      col('hours').getItem('Wednesday').isNull() &
      col('hours').getItem('Thursday').isNull() &
      col('hours').getItem('Friday').isNull() &
      col('hours').getItem('Saturday').isNull() &
      col('hours').getItem('Sunday').isNull())
    )
)

df_filtered.count()

11327

Define periods:
- Morning: 06:00-12:00
- Afternoon: 12:00-17:00
- Evening: 17:00 -23:00
- LateNight: 23:00- 06:00

In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import datetime

# Define a UDF to categorize the period based on opening and closing times
def categorize_period(opening_closing_str):
    try:
        opening, closing = opening_closing_str.split('-')
        opening_hour, opening_minute = map(int, opening.split(':'))
        closing_hour, closing_minute = map(int, closing.split(':'))

        # Combine the time into a datetime object for comparison
        opening_time = datetime.time(opening_hour, opening_minute)
        closing_time = datetime.time(closing_hour, closing_minute)

        # Define time ranges (assuming the times are in the format "HH:MM")
        if opening_time >= datetime.time(6, 0) and closing_time <= datetime.time(12, 0):
            return 'Morning'
        elif opening_time >= datetime.time(12, 0) and closing_time <= datetime.time(17, 0):
            return 'Afternoon'
        elif opening_time >= datetime.time(17, 0) and closing_time <= datetime.time(23, 0):
            return 'Evening'
        elif closing_time >= datetime.time(23, 0) or opening_time <= datetime.time(6, 0):
            return 'Late Night'
        else:
            return 'Unknown'
    except Exception as e:
        return 'Closed'

# Register UDF with Spark
categorize_period_udf = udf(categorize_period, StringType())

# Add new column 'period' by applying the UDF on each day in the 'hours' map
df_with_period = post_df.withColumn(
    'period_Monday',
    categorize_period_udf(col('hours').getItem('Monday'))  # Example for Monday, repeat for other days if needed
)

df_with_period.show(truncate=False)

+----------------------+-----------------------------------+-----------------------+------+-----------+-------------+--------------+-----+------------+-------+--------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+----------------------+--------------+------+-----+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
df_with_period = df_with_period.withColumn(
    'period_Tuesday',
    categorize_period_udf(col('hours').getItem('Tuesday'))  # Example for Monday, repeat for other days if needed
)
df_with_period = df_with_period.withColumn(
    'period_Wednesday',
    categorize_period_udf(col('hours').getItem('Wednesday'))  # Example for Monday, repeat for other days if needed
)
df_with_period = df_with_period.withColumn(
    'period_Thursday',
    categorize_period_udf(col('hours').getItem('Thursday'))  # Example for Monday, repeat for other days if needed
)
df_with_period = df_with_period.withColumn(
    'period_Friday',
    categorize_period_udf(col('hours').getItem('Friday'))  # Example for Monday, repeat for other days if needed
)
df_with_period = df_with_period.withColumn(
    'period_Saturday',
    categorize_period_udf(col('hours').getItem('Saturday'))  # Example for Monday, repeat for other days if needed
)
df_with_period = df_with_period.withColumn(
    'period_Sunday',
    categorize_period_udf(col('hours').getItem('Sunday'))  # Example for Monday, repeat for other days if needed
)

In [None]:
from pyspark.sql.functions import col

# Select only columns whose name contains the word 'period'
df_with_period.select([col(column) for column in df_with_period.columns if 'period' in column.lower()]).show(truncate=False)

+-------------+--------------+----------------+---------------+-------------+---------------+-------------+
|period_Monday|period_Tuesday|period_Wednesday|period_Thursday|period_Friday|period_Saturday|period_Sunday|
+-------------+--------------+----------------+---------------+-------------+---------------+-------------+
|Late Night   |Closed        |Unknown         |Unknown        |Unknown      |Unknown        |Unknown      |
|Late Night   |Morning       |Morning         |Morning        |Morning      |Morning        |Morning      |
|Late Night   |Late Night    |Late Night      |Late Night     |Late Night   |Late Night     |Late Night   |
|Unknown      |Unknown       |Unknown         |Morning        |Unknown      |Unknown        |Unknown      |
|Unknown      |Unknown       |Unknown         |Unknown        |Unknown      |Unknown        |Unknown      |
|Late Night   |Late Night    |Late Night      |Late Night     |Morning      |Morning        |Morning      |
|Late Night   |Unknown      

# Classification Modelling on Reviews column (Approach 1) - Manually chosen features for the logistic regression




We are going to perform sentiment analysis, through Logistic Regression, in order to predict the category at which each review of each customer falls.
1. We will first create a new columns that will be our label. This column will be called "customer_stars_category" and will be generated by the "customer_stars" as follows:
- if "customer_star" == 4 OR 5 => "customer_stars_category" == "Positive"
- if "customer_star" == 3 => "customer_stars_category" == "Neutral"
- if "customer_star" == 1 OR 2 => "customer_stars_category" == "Negative"

2. Then, we will filter out all the reviews that are not in English language.

3. The next step will be a preprocessing step of the "Reviews" column. We will do TF-IDF in order to measure the importance of each word in the reviews collection. We will start by tokenizing the text and removing stopwords. After we will create a CountVectorizer, fit & transform. This will allows us to convert text data into a numerical format snce it will generate a matrix of term frequency counts for each review. Finally, we will use IDF to weight the word frequencies.

4. Afterwards, we will continue with more pre-processing steps in other columns:
  - Changing to the correct province (Vancouver is in BC not ABE).
  - One-hot-encoding at the states column.

5. Then, we split the dataset in train(80%) and test(20%) datasets.

6. Then, we create the VectorAssembler that includes all the features that we will use to train the Logistic Regression model and label encode the label, "customer_stars_category" on train data.

7. Train and fit Logistic Regression on train data.

8. Create VectorAssembler for test data and label encode the label, "customer_stars_category" on test data.

9. Make predictions

10. Calculate metrics


*Note: The above process will be performed at a random sample of 10% of the pre-covid dataset*

In [None]:
pre_df.sample(fraction=0.1, seed=42).write.mode('overwrite').parquet("sample_data")

In [None]:
"""
First we start by taking a proportion of our dataset - we take the 10%
"""

# sampled_pre_df3 = pre_df.sample(fraction=0.1, seed=42)
sampled_pre_df3 = spark.read.parquet("sample_data")
# sampled_pre_df3.count()

In [None]:
sampled_pre_df3 = sampled_pre_df3.sample(fraction=0.5, seed=42)

In [None]:
sampled_pre_df3.count()

259586

In [None]:
sampled_pre_df3 = sampled_pre_df3.cache()

In [None]:
sampled_pre_df3.count()

259123

In [None]:
"""
Printing schema of my sampled_pre_df dataset and shoe the first 5 cols
"""
sampled_pre_df3.printSchema()
#sampled_pre_df3.show(5)

root
 |-- business_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- state_: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars: double (nullable = true)
 |-- review_count: integer (nullable = true)
 |-- is_open: integer (nullable = true)
 |-- categories: string (nullable = true)
 |-- hours: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- customer_stars: integer (nullable = true)
 |-- useful: integer (nullable = true)
 |-- funny: integer (nullable = true)
 |-- cool: integer (nullable = true)
 |-- text_: string (nullable = true)
 |-- date_: timestamp (nullable = true)
 |-- city_standardized: string (nullable = true)



In [None]:
"""
1)
"""
"""
Creating a new column named "customer_stars_category" and categorizing customer_stars column as it follows: 1,2 => negative, 3 => neutral, 4,5 => positive
Then, cout how much i have from each category.
"""

sampled_pre_df3 = sampled_pre_df3.withColumn(
    "customer_stars_category",
    when(col("customer_stars").isin([4, 5]), "positive")
    .when(col("customer_stars").isin([1, 2]), "negative")
    .otherwise("neutral")
)

sampled_pre_df3.show(20, truncate=False)
# sampled_pre_df3.groupBy("customer_stars_category").count().show()

+----------------------+----------------------------------+-----------------------+------+-----------+------------------+------------------+-----+------------+-------+--------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+----------------------+--------------+------+-----+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
"""
2)
"""
"""
Filter out all non-Enlgish reviews from the "text_" columns
"""
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from langdetect import detect, DetectorFactory
import langdetect

# Set seed for consistent results
DetectorFactory.seed = 42

# Define UDF to detect language
def detect_language(text):
    try:
        return detect(text)
    except:
        return "unknown"

detect_language_udf = udf(detect_language, StringType())

# Apply UDF to create a new column for language
sampled_pre_df3 = sampled_pre_df3.withColumn("language", detect_language_udf(sampled_pre_df3["text_"]))

# Filter the DataFrame to only include English-language reviews
sampled_pre_df3 = sampled_pre_df3.filter(sampled_pre_df3["language"] == "en")


# Show rows where the text is not English
#sampled_pre_df_with_sentiment.filter(sampled_pre_df_with_sentiment["language"] != "en").select("text_", "language").show()

In [None]:
#sampled_pre_df3.show(10)

In [None]:
"""
3) A)
"""
"""
We preprocess the review column ("text_") (1)
"""
"""
starting by tokenize and remove stop words from the reviews column
"""
sampled_pre_df_dataset_filtered3 = sampled_pre_df3.dropna() #filtering out NAs
tokenizer2 = RegexTokenizer(inputCol="text_", outputCol="words", pattern="\\W")

tokenized_raw3 = tokenizer2.transform(sampled_pre_df_dataset_filtered3)
print("--- tokenized reviews ---")
# tokenized_raw3.select("words").show(5)

remover3 = StopWordsRemover(inputCol="words", outputCol="filtered")
df_cleaned3 = remover3.transform(tokenized_raw3)
print("--- remove stop words from reviews ---")
# df_cleaned3.select("filtered").show(5)

--- tokenized reviews ---
--- remove stop words from reviews ---


In [None]:
"""
3) B)
"""
"""
Now, we create a CountVectorizer, fit & transform
This will allows us to convert text data into a numerical format.
It will generate a matrix of term frequency counts for each review
"""
"""
For CountVectorizer we use as input the column that was the output from the StopWordsRemover. In addition, set vocabSize=5000, minDF=10.0
"""

hashingTF = HashingTF(inputCol="filtered", outputCol="raw_features", numFeatures=1000)
result_cv3 = hashingTF.transform(df_cleaned3)
#result_cv3.show(5)

In [None]:
"""
3) C)
"""
"""
Continue with preprocess of Reviews column (3)
Here, we use IDF to weight the word frequencies.
"""
idf3 = IDF(inputCol="raw_features", outputCol="reviews")
idfModel3 = idf3.fit(result_cv3)
result_tfidf3 = idfModel3.transform(result_cv3)
#result_tfidf3.show(5)

In [None]:
# """
# 4) A)
# """
# """
# Changing to the correct province
# """
# result_tfidf3=result_tfidf3.withColumn("state_",
#                           when((col("state_") == "ABE") & (col("city") == "Vancouver"), "BC")
#     .otherwise(col("state_"))
# )

In [None]:
"""
4) B)
"""
"""
state_: Apply one-hot encoding at the "states" column
"""
from pyspark.ml.feature import OneHotEncoder
states = [("MN",), ("OR",), ("KY",), ("BC",), ("NH",), ("WA",), ("OH",), ("TX",),("GA",), ("MA",), ("KS",), ("FL",), ("CO",)]
states_df = spark.createDataFrame(states, ["state_"])

states_indexer = StringIndexer(inputCol="state_", outputCol="state_index")

states_encoder = OneHotEncoder(inputCol="state_index", outputCol="state_ohe")

pipeline = Pipeline(stages=[states_indexer, states_encoder])

pipeline_model = pipeline.fit(states_df)
df_encoded = pipeline_model.transform(states_df)
result_tfidf3 = result_tfidf3.join(
    df_encoded.select("state_", "state_index", "state_ohe"),
    on="state_",
    how="left"
)

In [None]:
# result_tfidf3.write.mode('overwrite').parquet("features")

In [None]:
# sampled_pre_df3 = spark.read.parquet("features")
# sampled_pre_df3.count()

In [None]:
"""
5)
"""
"""
We split to train and test
"""
train_data, test_data = result_tfidf3.randomSplit([0.8, 0.2], seed=42)

In [None]:
"""
6)
"""
"""
We create the VectorAssembler that includes all the features on train data
"""
assembler_lr = VectorAssembler(
    inputCols=["review_count","stars", "state_ohe", "reviews"],
    outputCol="all_features"
)
df_final_train = assembler_lr.transform(train_data)

In [None]:
#Label Encoding the "customer_stars_category"
df_final_train = df_final_train.withColumn(
    "customer_stars_category_labeled",when((col("customer_stars_category") == "positive"), 0).when((col("customer_stars_category") == "neutral"), 1).otherwise(2)
)

In [None]:
"""
7)
"""
"""
Perform logistic regression
"""
#Train logistic regression
from pyspark.ml.classification import LogisticRegression

df_train = df_final_train.select("all_features", "customer_stars_category_labeled")
df_train = df_train.cache()

lr = LogisticRegression(featuresCol="all_features", labelCol="customer_stars_category_labeled")
lr_model = lr.fit(df_train)

In [None]:
"""
8)
"""
"""
We create the VectorAssembler that includes all the features on test data
"""
assembler_lr_test = VectorAssembler(
    inputCols=["review_count","stars", "state_ohe", "reviews"],
    outputCol="all_features"
)
df_final_test = assembler_lr_test.transform(test_data)

In [None]:
df_final_test = df_final_test.withColumn(
    "customer_stars_category_labeled",when((col("customer_stars_category") == "positive"), 0).when((col("customer_stars_category") == "neutral"), 1).otherwise(2)
)

In [None]:
"""
9)
"""
"""
Make predictions
"""

predictions = lr_model.transform(df_final_test)
# predictions.select("customer_stars_category_labeled", "prediction").show(10)

In [None]:
"""
10)
"""
"""
Calculate metrics
"""
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="customer_stars_category_labeled",
                                              predictionCol="prediction",
                                              metricName="f1")

f1_score = evaluator.evaluate(predictions)

print(f"Test F1-score: {f1_score}")

Test F1-score: 0.7798786547563238


# Classification Modelling on Reviews column (Approach 2) - Used just the reviews in the model, and used hashingDF instead of count vectorizer.


We are going to perform sentiment analysis, through Logistic Regression, in order to predict the category at which each review of each customer falls. We create a pipeline for this as it follows:
1. We will first create a new columns that will be our label. This column will be called "customer_stars_category" and will be generated by the "customer_stars" as follows:
- if "customer_star" == 4 OR 5 => "customer_stars_category" == "Positive"
- if "customer_star" == 3 => "customer_stars_category" == "Neutral"
- if "customer_star" == 1 OR 2 => "customer_stars_category" == "Negative"

2. Then, we will filter out all the reviews that are not in English language.

3. The next step will be a preprocessing step of the "Reviews" column. We will apply TF-IDF to measure the importance of each word in the collection of reviews. First, we will tokenize the text into individual words and remove common stopwords. Then, we will use HashingTF to convert the filtered tokens into fixed-length numerical feature vectors using a hashing function. Finally, we will use IDF to weight the word frequencies.

4. We encode the target label "customer_stars_category" into numerical values, which are required for machine learning algorithms. We assign: 0 to "positive", 1 to "neutral", and 2 to "negative" reviews.


5. Define the Logistic Regression, which will predict the sentiment category based on the features extracted from the text using HashingTF and IDF.

6. Build a pipeline that tokenizes the text, removes stopwords, converts the text into numerical features using HashingTF, applies IDF for weighting, and trains a Logistic Regression model to predict sentiment, as created above.

7. Split the data into training (80%) and testing (20%) sets. Use the training data to train the model, and the test data to evaluate it.

8. Use the trained pipeline to make predictions on the test data. The model will predict the sentiment for each review.

9. Evaluate the model's performance using the F1-score, which gives a measure of how well the model balances precision and recall.


*Note: The above process will be performed at a random sample of 1% of the pre-covid dataset*

In [None]:

# Step 1: Add customer_stars_category column
sampled_pre_df3 = sampled_pre_df3.withColumn(
    "customer_stars_category",
    when(col("customer_stars").isin([4, 5]), "positive")
    .when(col("customer_stars").isin([1, 2]), "negative")
    .otherwise("neutral")
)

# Step 2: Language detection
def detect_language(text):
    try:
        return detect(text)
    except:
        return "unknown"

detect_language_udf = udf(detect_language, StringType())
sampled_pre_df3 = sampled_pre_df3.withColumn("language", detect_language_udf("text_"))
sampled_pre_df3 = sampled_pre_df3.filter(col("language") == "en")

sampled_pre_df3 = sampled_pre_df3.cache()

# Step 3: Text preprocessing (tokenize → stopwords → hashing→ TF-IDF)
tokenizer = RegexTokenizer(inputCol="text_", outputCol="words", pattern="\\W")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="raw_features", numFeatures=1000)
idf = IDF(inputCol="raw_features", outputCol="reviews")

# Step 4: Encode label
sampled_pre_df3 = sampled_pre_df3.withColumn(
    "customer_stars_category_labeled",
    when(col("customer_stars_category") == "positive", 0)
    .when(col("customer_stars_category") == "neutral", 1)
    .otherwise(2)
)

# Step 5: Define model
lr = LogisticRegression(
    featuresCol="reviews",
    labelCol="customer_stars_category_labeled",
    family="multinomial",
    maxIter=10,
    regParam=0.01
)

# Step 6: Build pipeline
pipeline = Pipeline(stages=[
    tokenizer,
    remover,
    hashingTF,
    idf,
    lr
])

# Step 7: Split and fit
train_data, test_data = sampled_pre_df3.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train_data)

# Step 8: Predict
predictions = model.transform(test_data)

# Step 9: Evaluate

evaluator = MulticlassClassificationEvaluator(
    labelCol="customer_stars_category_labeled",
    predictionCol="prediction",
    metricName="f1"
)
f1_score = evaluator.evaluate(predictions)
print(f"Test F1-score: {f1_score}")


Test F1-score: 0.7568601442610665
