<a href="https://colab.research.google.com/github/Vasiliki655/DSC511-Introduction/blob/main/DSC511_GroupProject_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# DSC 511 - Big Data Analytics - Group Semester Project
## Reviews of restaurants for pre and post-Covid-19 periods
### Spring Semester 2025
##### Team: Rafaela Christou, Emili Rousou, Christiana Zorzi, Vasiliki Christodoulou

The dataset used for this project was sourced from Kaggle and includes two CSV files containing restaurant reviews from pre-COVID and post-COVID periods, allowing for comparative analysis of customer sentiment and behaviour over time.
Through this analysis, we aim to understand how user behavior and sentiment changed due to the impact of COVID-19 on the restaurant industry.

We tried to answer questions like:

- How did the customer sentiment change over time (post and pre covid)?


- How did the total number of reviews per restaurant change from the pre-COVID period to the post-COVID period?


- Did the average customer star ratings for the top 15 highest-rated restaurants decline after COVID-19, and if so, which restaurants were most affected?


- Which U.S. states had the highest number of restaurants pre-COVID vs. post-COVID?

- Which states had the highest number of restaurant reviews in the pre-COVID and post-COVID periods? Did the top states in number of reviews change over time?

- How accuraately the classification model (logistic regression) is predicting the sentiment of the customers reviews?

- Finding the topics of the reviews pre and post Covid.


## Loading libraries

In [1]:
# Installing pyspark
! pip3 install pyspark



In [2]:
# Importing Libraries
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import seaborn as sns
from google.colab import drive
from pyspark.sql.window import Window
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import CountVectorizer
from pyspark.ml import Pipeline
from pyspark.ml.feature import IDF
from pyspark.ml.linalg import Vectors, DenseVector
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql.types import MapType, StringType
import plotly.express as px
from pyspark.ml.feature import Tokenizer, StopWordsRemover
import datetime
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col

Creating the Spark Session

In [3]:
spark = SparkSession.builder \
    .appName("Group_Project_Reviews_Covid") \
    .master("local") \
    .getOrCreate()

## Loading the datasets

In [4]:
# Google drive and reading the csv of post dataset
drive.mount('/content/drive')
google_drive_path = "/content/drive/MyDrive/postcovid_reviews.csv"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
# Google drive and reading the csv of pre dataset
drive.mount('/content/drive')
google_drive = "/content/drive/MyDrive/precovid_reviews.csv"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [6]:
post_df = spark.read.parquet("/content/drive/MyDrive/post_parquet")

In [None]:
#post_df.show(10)

In [None]:
#drive.mount('/content/drive')
#google_drive = "/content/drive/MyDrive/precovid_reviews.csv"
#pre_df = spark.read.options(header='True', inferSchema='True', delimiter=',',multiline = True, escape = '"').csv(google_drive_path)

In [None]:
#pre_df.write.mode("overwrite").parquet("/content/drive/MyDrive/pre_parquet")
#os.listdir("/content/drive/MyDrive/pre_parquet")

In [7]:
pre_df = spark.read.parquet("/content/drive/MyDrive/pre_parquet")


In [None]:
#pre_df.show(10)

## Data Preprocessing

Checking for missing values


In [8]:
# check for missing values
pre_df = pre_df.replace("NULL", None)
pre_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in pre_df.columns]).show()

+-----------+----+-------+------+----+-----------+--------+---------+-----+------------+-------+----------+------+---------+-------+--------------+------+-----+----+-----+-----+
|business_id|name|address|state_|city|postal_code|latitude|longitude|stars|review_count|is_open|categories| hours|review_id|user_id|customer_stars|useful|funny|cool|text_|date_|
+-----------+----+-------+------+----+-----------+--------+---------+-----+------------+-------+----------+------+---------+-------+--------------+------+-----+----+-----+-----+
|          0|   0|  13409|     0|   0|        384|       0|        0|    0|           0|      0|         0|216501|        0|      0|             0|     0|    0|   0|    1|    0|
+-----------+----+-------+------+----+-----------+--------+---------+-----+------------+-------+----------+------+---------+-------+--------------+------+-----+----+-----+-----+



In [9]:
# check for missing values
post_df = post_df.replace("NULL", None)
post_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in post_df.columns]).show()

+-----------+----+-------+------+----+-----------+--------+---------+-----+------------+-------+----------+-----+---------+-------+--------------+------+-----+----+-----+-----+
|business_id|name|address|state_|city|postal_code|latitude|longitude|stars|review_count|is_open|categories|hours|review_id|user_id|customer_stars|useful|funny|cool|text_|date_|
+-----------+----+-------+------+----+-----------+--------+---------+-----+------------+-------+----------+-----+---------+-------+--------------+------+-----+----+-----+-----+
|          0|   0|    992|     0|   0|          4|       0|        0|    0|           0|      0|         0|12232|        0|      0|             0|     0|    0|   0|    0|    0|
+-----------+----+-------+------+----+-----------+--------+---------+-----+------------+-------+----------+-----+---------+-------+--------------+------+-----+----+-----+-----+



Both datasets have some missing values in address,hours and postal code columns.
Since we will mostly focus on the reviews we decided to not drop them.
Only the one missing review was removed.

In [10]:
pre_df = pre_df.dropna(subset=["text_"])

Removing duplicated reviews

In [11]:
# Checking for duplicated values
#print(pre_df.count())
pre_df = pre_df.dropDuplicates()
#print(pre_df.count())
# there are no duplicated rows

In [12]:
# Checking for duplicated values
#print(post_df.count())
post_df = post_df.dropDuplicates()
#print(post_df.count())
# there are no duplicated rows

Removing identical reviews made from the same user about a specific business

In [13]:
# Drop duplicate reviews (same user, business, and text)
post_df = post_df.dropDuplicates(["user_id", "business_id", "text_"])
#post_df.count()
# Number of rows before removing duplicated reviews: 400295

In [14]:
# Drop duplicate reviews (same user, business, and text)
pre_df = pre_df.dropDuplicates(["user_id", "business_id", "text_"])
#pre_df.count()
# Number of rows before removing duplicated reviews: 5172198

Date Range of Post and Pre Covid dataset

In [None]:
print('Date Range Pre-Covid:', pre_df.select(min('date_')).collect()[0][0], '-', pre_df.select(max('date_')).collect()[0][0])
print('Date Range Post-Covid:', post_df.select(min('date_')).collect()[0][0], '-', post_df.select(max('date_')).collect()[0][0])

 The first American case was reported on January 20, and Health and Human Services Secretary Alex Azar declared a public health emergency on January 31.
 https://en.wikipedia.org/wiki/COVID-19_pandemic_in_the_United_States

Finding reviews before covid started in USA

In [17]:
pre_covid_reviews = post_df.filter(col("date_")< to_timestamp(lit("2020-01-20 00:00:01")))
#pre_covid_reviews.count()

 There are 31633 reviews that were written before 20/01,which was when the first covid case was reported in America

Removing those reviews from the post covid dataset

In [15]:
post_df =post_df.filter(col("date_") >= to_timestamp(lit("2020-01-20 00:00:01")))
#post_df.count()

Adding those reviews in the pre covid dataset

In [18]:
pre_df = pre_df.union(pre_covid_reviews)
#pre_df.show()

Checking if each business id has a unique name

In [None]:
print('unique names:', pre_df.select("name").distinct().count())
print('unique ids:', pre_df.select('business_id').distinct().count())

In [None]:
print('unique names:', post_df.select("name").distinct().count())
print('unique ids:', post_df.select('business_id').distinct().count())

#### Location

##### Location - Preprocessing - Post df

In [None]:
# Printing unique Cities
post_df.select(col("city")).distinct().count()

As we can notice at first the post covid dataset contained 392 distinct cities, but with a detailed inspection we observed that there was similar cities just written differentlty.

In [19]:
# Let's clean and normalize city column
# Converting the first letters to capital ones and by using trim we remove
# extra spaces
# initcamp convert the column to title case, where the first letter is always capital
post_df=post_df.withColumn("city",initcap(trim(col("city"))))
#post_df.show()

In [None]:
post_df.select(col("city")).distinct().count()

We can notice that the unique cities
are reduced to 358.

In [20]:
# Removing any punctuations and extra spaces for comparison
# e.g. St.Cloud now will be stcloud
post_df = post_df.withColumn("city_key", lower(regexp_replace(col("city"), r"[^a-zA-Z0-9]", "")))

In [None]:
# Let's see how many cities we have with the same key
post_df.groupBy("city_key", "city").count().orderBy("city_key").show(360, truncate=False)

In [21]:
# Creating window to rank cities by count per city_key
# For each group of rows with the same city_key sort them by how often they appear
# By using window we create a subset of dataframe where all rows have the same key
# and within each window rows are ordered by the count in descending order
windowSpec = Window.partitionBy("city_key").orderBy(col("count").desc())

# Counting how many times city name appears per city
city_counts = post_df.groupBy("city_key", "city").count()

# Picking the most common city_clean for each city_key
standard_cities = city_counts.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") == 1)

### Now we have a standarized column, e.g. stcloud-> St. Cloud
#### we don't have multiple variations like stcloud, St.  Cloud and St.CLoud etc

###### Explanation of window
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Window.html


Break data into groups based on city key (all rows same city key are groupped together).

In [22]:
# Renaming the selected column
standard_cities = standard_cities.withColumnRenamed("city", "city_standardized")

# Preforming join to have a final dataframe
post_df = post_df.join(
    standard_cities.select("city_key", "city_standardized"),
    on="city_key",
    how="left"
)

In [23]:
# Dropping city key and city columns, because they are not useful
post_df=post_df.drop("city_key", "city")

In [None]:
post_df.show()

In [None]:
post_df.select("city_standardized").distinct().count()

The final unique cities  of our dataset
are 349.

##### Location - Preprocessing - Pre df

In [None]:
# Finding the unique cities
pre_df.select(col("city")).distinct().count()

At a first sight in the pre covid dataset we have a total of 450 cities.
But lets check like before.

Same approach we used in post covid dataframe.

In [24]:
# Let's clean and normalize city column
# Converting the first letters to capital ones and by using trim we remove
# extra spaces
pre_df=pre_df.withColumn("city",initcap(trim(col("city"))))
#pre_df.show()

In [None]:
# Finding the unique cities
pre_df.select(col("city")).distinct().count()

As we expected the unique cities are reduced. Now we have a total of 415.

In [25]:
# Removing any punctuations and extra spaces for comparison
# e.g. St.Cloud now will be stcloud or St.  Cloud and St. Cloud are treated
# as two distincts cities

pre_df = pre_df.withColumn("city_key", lower(regexp_replace(col("city"), r"[^a-zA-Z0-9]", "")))

In [None]:
# Let's see how many cities we have with the same key
pre_df.groupBy("city_key", "city").count().orderBy("city_key").show(360, truncate=False)

In [26]:
# Creating window to rank cities by count per city_key
# For each group of rows with the same city_key sort them by how often they appear
windowSpec = Window.partitionBy("city_key").orderBy(col("count").desc())

# Counting how many times city name appears per city
city_counts = pre_df.groupBy("city_key", "city").count()

# Picking the most common city_clean for each city_key
standard_cities = city_counts.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") == 1)

### Now we have a standarized column, e.g. stcloud-> St. Cloud

In [27]:
# Renaming the selected column
standard_cities = standard_cities.withColumnRenamed("city", "city_standardized")

# Preforming join to have a final dataframe
pre_df = pre_df.join(
    standard_cities.select("city_key", "city_standardized"),
    on="city_key",
    how="left"
)


In [28]:
# Dropping city key and city columns, because they are not useful
pre_df=pre_df.drop("city_key", "city")

In [None]:
pre_df.select("city_standardized").distinct().count()

The final total number of cities in the pre covid dataframe is 404.

#### State

##### State - Post df

In [None]:
# Finding the unique states
post_df.select(col("state_")).distinct().count()

ABE is not a regognized USA state so let's investigate further.

In [None]:
# Finding rows with ABE state
post_df.filter(col("state_")=="ABE").count()

In [None]:
# Displaying the details for those 3 rows
post_df.filter(col("state_") == "ABE") \
    .select("name", "address", "city_standardized", "postal_code", "latitude", "longitude") \
    .show(truncate=False)

There is no state called ABE. Actually, the above restauraunts are in the Vancouver, British Columbia, Canada, so the correct province is BC.

In [29]:
# Changing to the correct province
post_df=post_df.withColumn("state_",
                          when((col("state_") == "ABE") & (col("city_standardized") == "Vancouver"), "BC")
    .otherwise(col("state_"))
)

In [None]:
# Checking if the changes are applied
post_df.filter(col("state_")=="ABE").count()

In [None]:
post_df.select("state_").distinct().count()

In [None]:
post_df.select("state_").distinct().show()

In conclusion in the post covid dataset we have a total of  13  unique states.

##### State - Pre df

In [None]:
# Finding the unique states
pre_df.select(col("state_")).distinct().count()

The pre covid dataset contains 16 states.
But let's check it like before.

In [None]:
# Printing Unique States
pre_df.select(col("state_")).distinct().show()

Keeping the same approach with pre covid dataset.

In [None]:
# Finding rows with ABE state
pre_df.filter(col("state_")=="ABE").count()

In [None]:
# Displaying the details for those 11 rows
pre_df.filter(col("state_") == "ABE") \
    .select("name", "address", "city_standardized", "postal_code", "latitude", "longitude") \
    .show(truncate=False)

In [30]:
# Changing to the correct province
pre_df=pre_df.withColumn("state_",
                          when((col("state_") == "ABE") & (col("city_standardized") == "Vancouver"), "BC")
    .otherwise(col("state_"))
)

In [None]:
# Checking if the changes are applied
pre_df.filter(col("state_")=="ABE").count()

In [None]:
pre_df.select("state_").distinct().count()

In [None]:
pre_df.select(col("state_")).distinct().show()

In conclusion we have 15 unique states in the pre dataset.

# Classification Modelling on Reviews column

We are going to perform sentiment analysis, through Logistic Regression, in order to predict the category at which each review of each customer falls.
1. We will first create a new columns that will be our label. This column will be called "customer_stars_category" and will be generated by the "customer_stars" as follows:
- if "customer_star" == 4 OR 5 => "customer_stars_category" == "Positive"
- if "customer_star" == 3 => "customer_stars_category" == "Neutral"
- if "customer_star" == 1 OR 2 => "customer_stars_category" == "Negative"

2. Then, we will filter out all the reviews that are not in English language.

3. The next step will be a preprocessing step of the "Reviews" column. We will do TF-IDF in order to measure the importance of each word in the reviews collection. We will start by tokenizing the text and removing stopwords. After we will create a CountVectorizer, fit & transform. This will allows us to convert text data into a numerical format snce it will generate a matrix of term frequency counts for each review. Finally, we will use IDF to weight the word frequencies.

4. Afterwards, we will continue with more pre-processing steps in other columns:
  - Changing to the correct province (Vancouver is in BC not ABE).
  - One-hot-encoding at the states column.

5. Then, we split the dataset in train(80%) and test(20%) datasets.

6. Then, we create the VectorAssembler that includes all the features that we will use to train the Logistic Regression model and label encode the label, "customer_stars_category" on train data.

7. Train anf fit Logistic Regression on train data.\
   *Note: we set elastic net regularization to 1. This way we use Lasso - L1 regularization, which it basically automatically performs feature selection by setting some feature coefficients to zero. Features with non-zero coefficients are the ones that the model has learned to be important for predicting the target variable.*

8. Create VectorAssembler for test data and label encode the label, "customer_stars_category" on test data.


9. Make predictions

10. Calculate metrics


*Note: The above process will be performed at a random sample of 10% of the pre-covid dataset*

In [None]:
"""
First we start by taking a proportion of our dataset - we take the 10%
"""

sampled_pre_df3 = pre_df.sample(fraction=0.1, seed=42)

In [None]:
"""
Printing schema of my sampled_pre_df dataset and shoe the first 5 cols
"""
sampled_pre_df3.printSchema()
#sampled_pre_df3.show(5)

In [None]:
"""
1)
"""
"""
Creating a new column named "customer_stars_category" and categorizing customer_stars column as it follows: 1,2 => negative, 3 => neutral, 4,5 => positive
Then, cout how much i have from each category.
"""
from pyspark.sql.functions import when, col

sampled_pre_df3 = sampled_pre_df3.withColumn(
    "customer_stars_category",
    when(col("customer_stars").isin([4, 5]), "positive")
    .when(col("customer_stars").isin([1, 2]), "negative")
    .otherwise("neutral")
)

sampled_pre_df3.show(20, truncate=False)
sampled_pre_df3.groupBy("customer_stars_category").count().show()

In [None]:
!pip install langdetect

In [None]:
"""
2)
"""
"""
Filter out all non-Enlgish reviews from the "text_" columns
"""
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from langdetect import detect, DetectorFactory
import langdetect

# Set seed for consistent results
DetectorFactory.seed = 42

# Define UDF to detect language
def detect_language(text):
    try:
        return detect(text)
    except:
        return "unknown"

detect_language_udf = udf(detect_language, StringType())

# Apply UDF to create a new column for language
sampled_pre_df3 = sampled_pre_df3.withColumn("language", detect_language_udf(sampled_pre_df3["text_"]))

# Filter the DataFrame to only include English-language reviews
sampled_pre_df3 = sampled_pre_df3.filter(sampled_pre_df3["language"] == "en")


# Show rows where the text is not English
#sampled_pre_df_with_sentiment.filter(sampled_pre_df_with_sentiment["language"] != "en").select("text_", "language").show()

In [None]:
#sampled_pre_df3.show(10)

In [None]:
"""
3) A)
"""
"""
We preprocess the review column ("text_") (1)
"""
"""
starting by tokenize and remove stop words from the reviews column
"""
sampled_pre_df_dataset_filtered3 = sampled_pre_df3.dropna() #filtering out NAs
tokenizer2 = RegexTokenizer(inputCol="text_", outputCol="words", pattern="\\W")

tokenized_raw3 = tokenizer2.transform(sampled_pre_df_dataset_filtered3)
print("--- tokenized reviews ---")
tokenized_raw3.select("words").show(5)

remover3 = StopWordsRemover(inputCol="words", outputCol="filtered")
df_cleaned3 = remover3.transform(tokenized_raw3)
print("--- remove stop words from reviews ---")
df_cleaned3.select("filtered").show(5)

In [None]:
"""
3) B)
"""
"""
Now, we create a CountVectorizer, fit & transform
This will allows us to convert text data into a numerical format.
It will generate a matrix of term frequency counts for each review
"""
"""
For CountVectorizer we use as input the column that was the output from the StopWordsRemover. In addition, set vocabSize=5000, minDF=10.0
"""
from pyspark.ml.feature import CountVectorizer

cv3 = CountVectorizer(inputCol="filtered", outputCol="raw_features", vocabSize=5000, minDF=10.0)
cvmodel3 = cv3.fit(df_cleaned3)
result_cv3 = cvmodel3.transform(df_cleaned3)
#result_cv3.show(5)

In [None]:
"""
3) C)
"""
"""
Continue with preprocess of Reviews column (3)
Here, we use IDF to weight the word frequencies.
"""
idf3 = IDF(inputCol="raw_features", outputCol="reviews")
idfModel3 = idf3.fit(result_cv3)
result_tfidf3 = idfModel3.transform(result_cv3)
#result_tfidf3.show(5)

In [None]:
"""
4) A)
"""
"""
Changing to the correct province
"""
result_tfidf3=result_tfidf3.withColumn("state_",
                          when((col("state_") == "ABE") & (col("city") == "Vancouver"), "BC")
    .otherwise(col("state_"))
)

In [None]:
"""
4) B)
"""
"""
state_: Apply one-hot encoding at the "states" column
"""
from pyspark.ml.feature import OneHotEncoder
states = [("MN",), ("OR",), ("KY",), ("BC",), ("NH",), ("WA",), ("OH",), ("TX",),("GA",), ("MA",), ("KS",), ("FL",), ("CO",)]
states_df = spark.createDataFrame(states, ["state_"])

states_indexer = StringIndexer(inputCol="state_", outputCol="state_index")

states_encoder = OneHotEncoder(inputCol="state_index", outputCol="state_ohe")

pipeline = Pipeline(stages=[states_indexer, states_encoder])

pipeline_model = pipeline.fit(states_df)
df_encoded = pipeline_model.transform(states_df)
result_tfidf3 = result_tfidf3.join(
    df_encoded.select("state_", "state_index", "state_ohe"),
    on="state_",
    how="left"
)

In [None]:
"""
5)
"""
"""
We split to train and test
"""
train_data, test_data = result_tfidf3.randomSplit([0.8, 0.2], seed=42)

In [None]:
"""
6)
"""
"""
We create the VectorAssembler that includes all the features on train data
"""
assembler_lr = VectorAssembler(
    inputCols=["latitude","longitude","useful","funny","cool","review_count","stars", "state_ohe", "is_open","reviews"],
    outputCol="all_features"
)
df_final_train = assembler_lr.transform(train_data)

In [None]:
#df_final_train.show(5)

In [None]:
#Label Encoding the "customer_stars_category"
df_final_train = df_final_train.withColumn(
    "customer_stars_category_labeled",when((col("customer_stars_category") == "positive"), 0).when((col("customer_stars_category") == "neutral"), 1).otherwise(2)
)

In [None]:
"""
7)
"""
"""
Perform logistic regression
"""
#Train logistic regression
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=15,regParam=0.03,elasticNetParam=1,featuresCol="all_features", labelCol="customer_stars_category_labeled")
lr_model = lr.fit(df_final_train)

In [None]:
"""
8)
"""
"""
We create the VectorAssembler that includes all the features on test data
"""
assembler_lr_test = VectorAssembler(
    inputCols=["latitude","longitude","useful","funny","cool","review_count","stars", "state_ohe", "is_open","reviews"],
    outputCol="all_features"
)
df_final_test = assembler_lr_test.transform(test_data)

In [None]:
df_final_test = df_final_test.withColumn(
    "customer_stars_category_labeled",when((col("customer_stars_category") == "positive"), 0).when((col("customer_stars_category") == "neutral"), 1).otherwise(2)
)

In [None]:
"""
9)
"""
"""
Make predictions
"""

predictions = lr_model.transform(df_final_test)
predictions.select("customer_stars_category_labeled", "prediction").show(10)

In [None]:
"""
10)
"""
"""
Calculate metrics
"""
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="customer_stars_category_labeled",
                                              predictionCol="prediction",
                                              metricName="f1")

f1_score = evaluator.evaluate(predictions)

print(f"Test F1-score: {f1_score}")

# Recommendation system Pre-Covid

In [38]:
sampled_pre_df3 = pre_df.sample(fraction=0.1, seed=42)
sampled_pre_df3 = sampled_pre_df3.sample(fraction=0.5, seed=42)

In [40]:
sampled_pre_df3.count()

259049

In [41]:
# Index the `user_id` and `business_id` columns to take numerical values
user_indexer = StringIndexer(inputCol="user_id", outputCol="user_index", handleInvalid="keep")
business_indexer = StringIndexer(inputCol="business_id", outputCol="business_index", handleInvalid="keep")

# Fit and transform to get indexed columns
df_indexed = user_indexer.fit(sampled_pre_df3).transform(sampled_pre_df3)
df_indexed = business_indexer.fit(df_indexed).transform(df_indexed)

#  Check the schema after indexing to ensure proper data types
df_indexed.printSchema()

# Prepare data for ALS (Use indexed columns for user and business)
df_clean = df_indexed.select("user_index", "business_index", "customer_stars")

#  Split the data into training and test sets
(training_data, test_data) = df_clean.randomSplit([0.8, 0.2], seed=1234)

# Train the ALS model
als = ALS(maxIter=10, regParam=0.01, userCol="user_index", itemCol="business_index",
          ratingCol="customer_stars", coldStartStrategy="drop")

# Fit the model to the training data
model = als.fit(training_data)

#  Generate recommendations for all users
user_recs = model.recommendForAllUsers(10)

# Explode recommendations and select relevant columns
user_recs = user_recs.selectExpr("user_index", "explode(recommendations) as recommendations")
user_recs = user_recs.selectExpr("user_index", "recommendations.business_index as business_index",
                                 "recommendations.rating as rating")

# Show the results
user_recs.show()

root
 |-- business_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- state_: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars: double (nullable = true)
 |-- review_count: integer (nullable = true)
 |-- is_open: integer (nullable = true)
 |-- categories: string (nullable = true)
 |-- hours: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- customer_stars: integer (nullable = true)
 |-- useful: integer (nullable = true)
 |-- funny: integer (nullable = true)
 |-- cool: integer (nullable = true)
 |-- text_: string (nullable = true)
 |-- date_: timestamp (nullable = true)
 |-- city_standardized: string (nullable = true)
 |-- user_index: double (nullable = false)
 |-- business_index: double (nullable = false)

+----------+--------------+---------+
|user_index|busine

In [42]:
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test_data)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="customer_stars",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 2.1714276503917826


In [43]:
# Show the recommendations for a specific user
user_id = 2

user_rec = user_recs.filter(user_recs.user_index == user_id)

print("Businesses rated by user with index " + str(user_id))
# Show the movies rated by the user
user_ratings = df_indexed.filter(df_indexed.user_index == user_id)\
    .select("name", "categories", "customer_stars")

user_ratings.show(100,truncate=False)

# Show the top 10 recommendations for the specific user
user_rec = user_recs.filter(user_recs.user_index== user_id)

# Print and show the recommendations
user_rec.show(truncate=False)

Businesses rated by user with index 2
+------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+--------------+
|name                                      |categories                                                                                                                                       |customer_stars|
+------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+--------------+
|Freshslice Pizza                          |Restaurants, Pizza                                                                                                                               |3             |
|Ramen Danbo                               |Ramen, Japanese, Noodles, Restaurants                                                         

# 2


In [None]:
sampled_pre_df3 = pre_df.limit(100000)

In [None]:
# Index the `user_id` and `business_id` columns to take numerical values
user_indexer = StringIndexer(inputCol="user_id", outputCol="user_index", handleInvalid="keep")
business_indexer = StringIndexer(inputCol="business_id", outputCol="business_index", handleInvalid="keep")

# Fit and transform to get indexed columns
df_indexed = user_indexer.fit(sampled_pre_df3).transform(sampled_pre_df3)
df_indexed = business_indexer.fit(df_indexed).transform(df_indexed)

#  Check the schema after indexing to ensure proper data types
df_indexed.printSchema()

# Prepare data for ALS (Use indexed columns for user and business)
df_clean = df_indexed.select("user_index", "business_index", "customer_stars")

#  Split the data into training and test sets
(training_data, test_data) = df_clean.randomSplit([0.8, 0.2], seed=1234)

# Train the ALS model
als = ALS(maxIter=10, regParam=0.01, userCol="user_index", itemCol="business_index",
          ratingCol="customer_stars", coldStartStrategy="drop")

# Fit the model to the training data
model = als.fit(training_data)

#  Generate recommendations for all users
user_recs = model.recommendForAllUsers(10)

# Explode recommendations and select relevant columns
user_recs = user_recs.selectExpr("user_index", "explode(recommendations) as recommendations")
user_recs = user_recs.selectExpr("user_index", "recommendations.business_index as business_index",
                                 "recommendations.rating as rating")

# Show the results
user_recs.show()

root
 |-- business_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- state_: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars: double (nullable = true)
 |-- review_count: integer (nullable = true)
 |-- is_open: integer (nullable = true)
 |-- categories: string (nullable = true)
 |-- hours: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- customer_stars: integer (nullable = true)
 |-- useful: integer (nullable = true)
 |-- funny: integer (nullable = true)
 |-- cool: integer (nullable = true)
 |-- text_: string (nullable = true)
 |-- date_: timestamp (nullable = true)
 |-- city_standardized: string (nullable = true)
 |-- user_index: double (nullable = false)
 |-- business_index: double (nullable = false)

+----------+--------------+---------+
|user_index|busine

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test_data)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="customer_stars",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 4.222957018480883


In [None]:
# Show the recommendations for a specific user
user_id = 2

user_rec = user_recs.filter(user_recs.user_index == user_id)

print("Businesses rated by user with index " + str(user_id))
# Show the movies rated by the user
user_ratings = df_indexed.filter(df_indexed.user_index == user_id)\
    .select("name", "categories", "customer_stars")

user_ratings.show(100,truncate=False)

# Show the top 10 recommendations for the specific user
user_rec = user_recs.filter(user_recs.user_index== user_id)

# Print and show the recommendations
user_rec.show(truncate=False)

Businesses rated by user with index 2
+-------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+--------------+
|name                                       |categories                                                                                                                                    |customer_stars|
+-------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+--------------+
|New Town Bakery & Restaurant               |Restaurants, Chinese, Dim Sum, Food, Bakeries                                                                                                 |3             |
|Mad Greek Restaurant                       |Restaurants, Mediterranean, Specialty Food, Ethnic Food, Greek, Food                                 

# Clusters