## Collaborative Filtering (hotel recommendation for specific user)

In [None]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Hotel Recommendation") \
    .getOrCreate()

# Load the review data from GCS
data_path = "gs://ds5460-final-bucket/notebooks/jupyter/Master Files/reviews_with_predicted_ratings.csv"
reviews_df = spark.read.csv(data_path, header=True)

# Show the first few rows of the DataFrame
reviews_df.show()


+----------+---------+----------+-----------+----------------+-------------------------------------+-----+-----------+----------------+
|listing_id|       id|      date|reviewer_id|   reviewer_name|                             comments|state|       city|predicted_rating|
+----------+---------+----------+-----------+----------------+-------------------------------------+-----+-----------+----------------+
|       109|   449036|2011-08-15|     927861|           Edwin|                 The host canceled...|   CA|Los Angeles|               2|
|       109| 74506539|2016-05-15|   22509885|            Jenn|                 Me and two friend...|   CA|Los Angeles|               5|
|      2708| 13994902|2014-06-09|   10905424|         Kuberan|                 i had a wonderful...|   CA|Los Angeles|               3|
|      2708| 14606598|2014-06-23|    2247288|         Camilla|                 Charles is just a...|   CA|Los Angeles|               5|
|      2708| 39597339|2015-07-25|   27974696|   

In [None]:
# Print the schema of the DataFrame
reviews_df.printSchema()


root
 |-- listing_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- reviewer_id: string (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- predicted_rating: string (nullable = true)



In [None]:
from pyspark.sql.functions import col

# Remove null values
reviews_df = reviews_df.na.drop()

# Convert listing_id, reviewer_id, and predicted_rating columns to integer type
reviews_df = reviews_df.withColumn("listing_id", col("listing_id").cast("int"))
reviews_df = reviews_df.withColumn("reviewer_id", col("reviewer_id").cast("int"))
reviews_df = reviews_df.withColumn("predicted_rating", col("predicted_rating").cast("int"))

# Print the schema to verify the changes
reviews_df.printSchema()


root
 |-- listing_id: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- reviewer_id: integer (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- predicted_rating: integer (nullable = true)



In [None]:
from pyspark.sql.functions import col

# Remove rows with null values in reviewer_id and listing_id columns
reviews_df = reviews_df.filter(col("reviewer_id").isNotNull() & col("listing_id").isNotNull())

# Convert columns to integer type
reviews_df = reviews_df.withColumn("reviewer_id", reviews_df["reviewer_id"].cast("int"))
reviews_df = reviews_df.withColumn("listing_id", reviews_df["listing_id"].cast("int"))


In [None]:
# Filter out rows with null values in the predicted_rating column
reviews_df = reviews_df.filter(reviews_df.predicted_rating.isNotNull())

# Show the first few rows of the DataFrame to verify
reviews_df.show()


+----------+---------+----------+-----------+----------------+-------------------------------------+-----+-----------+----------------+
|listing_id|       id|      date|reviewer_id|   reviewer_name|                             comments|state|       city|predicted_rating|
+----------+---------+----------+-----------+----------------+-------------------------------------+-----+-----------+----------------+
|       109|   449036|2011-08-15|     927861|           Edwin|                 The host canceled...|   CA|Los Angeles|               2|
|       109| 74506539|2016-05-15|   22509885|            Jenn|                 Me and two friend...|   CA|Los Angeles|               5|
|      2708| 13994902|2014-06-09|   10905424|         Kuberan|                 i had a wonderful...|   CA|Los Angeles|               3|
|      2708| 14606598|2014-06-23|    2247288|         Camilla|                 Charles is just a...|   CA|Los Angeles|               5|
|      2708| 39597339|2015-07-25|   27974696|   

In [None]:
from pyspark.sql.functions import col

# Group by the column and count the occurrences of each value
counts = reviews_df.groupBy("predicted_rating").count()

# Show the counts
counts.show()


+----------------+-------+
|predicted_rating|  count|
+----------------+-------+
|               1|1498603|
|               3|1433432|
|               5|1303945|
|               4|1499924|
|             805|      1|
|               2|1580811|
+----------------+-------+



In [None]:
# Filter out rows with predicted_rating equal to 805
reviews_df = reviews_df.filter(reviews_df.predicted_rating != '805')

In [None]:
reviews_df = reviews_df.select("listing_id", "reviewer_id", "predicted_rating", "state","city")

In [None]:
reviews_df.printSchema()

root
 |-- listing_id: integer (nullable = true)
 |-- reviewer_id: integer (nullable = true)
 |-- predicted_rating: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)



In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import IntegerType, FloatType

def get_recommendations(reviews_df, user_state, user_city, user_id):

    filtered_df = reviews_df.filter((reviews_df['state'] == user_state) & (reviews_df['city'] == user_city))
    filtered_df = filtered_df.withColumn("reviewer_id", filtered_df["reviewer_id"].cast(IntegerType()))
    filtered_df = filtered_df.withColumn("listing_id", filtered_df["listing_id"].cast(IntegerType()))
    filtered_df = filtered_df.withColumn("predicted_rating", filtered_df["predicted_rating"].cast(FloatType()))

    # Train the ALS model
    als = ALS(maxIter=5, regParam=0.01, userCol="reviewer_id", itemCol="listing_id", ratingCol="predicted_rating", coldStartStrategy="drop")
    model = als.fit(filtered_df)

    # Generate recommendations for the specified user ID
    user_recommendations = model.recommendForUserSubset(filtered_df.where(filtered_df['reviewer_id'] == user_id), 10)

    # Display recommendations to the user
    print(f"Top 10 hotel recommendations for user {user_id} in {user_city}, {user_state}:")
    for recommendation in user_recommendations.collect():
        print(recommendation)


if __name__ == "__main__":

    # Input user state, city, and ID
    user_state = input("Enter state: ")
    user_city = input("Enter city: ")
    user_id = int(input("Enter user ID: "))

    # Get recommendations
    get_recommendations(reviews_df, user_state, user_city, user_id)

#CA
#Los Angeles
#10905424

Enter state:  CA
Enter city:  Los Angeles
Enter user ID:  10905424


Top 10 hotel recommendations for user 10905424 in Los Angeles, CA:
Row(reviewer_id=10905424, recommendations=[Row(listing_id=1995651, rating=3.3418052196502686), Row(listing_id=34153402, rating=3.2698936462402344), Row(listing_id=2708, rating=2.999297857284546), Row(listing_id=44601934, rating=2.926595449447632), Row(listing_id=46817301, rating=2.9147450923919678), Row(listing_id=22689153, rating=2.900916576385498), Row(listing_id=32834116, rating=2.8662588596343994), Row(listing_id=43358109, rating=2.8515090942382812), Row(listing_id=53212342, rating=2.8079516887664795), Row(listing_id=37103436, rating=2.797819137573242)])


In [None]:
from pyspark.sql.types import IntegerType,FloatType
filtered_df = reviews_df.filter((reviews_df['state'] == 'CA') & (reviews_df['city'] == 'Los Angeles'))
filtered_df = filtered_df.withColumn("reviewer_id", filtered_df["reviewer_id"].cast(IntegerType()))
filtered_df = filtered_df.withColumn("listing_id", filtered_df["listing_id"].cast(IntegerType()))
filtered_df = filtered_df.withColumn("predicted_rating", filtered_df["predicted_rating"].cast(FloatType()))
filtered_df.show()

+----------+-----------+----------------+-----+-----------+
|listing_id|reviewer_id|predicted_rating|state|       city|
+----------+-----------+----------------+-----+-----------+
|       109|     927861|             2.0|   CA|Los Angeles|
|       109|   22509885|             5.0|   CA|Los Angeles|
|      2708|   10905424|             3.0|   CA|Los Angeles|
|      2708|    2247288|             5.0|   CA|Los Angeles|
|      2708|   27974696|             2.0|   CA|Los Angeles|
|      2708|   33226412|             5.0|   CA|Los Angeles|
|      2708|   23408691|             2.0|   CA|Los Angeles|
|      2708|  155985882|             3.0|   CA|Los Angeles|
|      2708|    6840784|             5.0|   CA|Los Angeles|
|      2708|  162469426|             2.0|   CA|Los Angeles|
|      2708|   93292025|             4.0|   CA|Los Angeles|
|      2708|  183639942|             2.0|   CA|Los Angeles|
|      2708|   37846734|             5.0|   CA|Los Angeles|
|      2708|   48408491|             5.0

In [None]:
filtered_df.count()

1285066

In [None]:
from pyspark.ml.recommendation import ALS
(trainingData, testData) = filtered_df .randomSplit([0.7,0.3])

als = ALS(maxIter=5, rank=10, regParam=0.01, userCol="reviewer_id", itemCol="listing_id", ratingCol="predicted_rating", coldStartStrategy="drop")
model = als.fit(trainingData)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# Make predictions on the validationData
predictions = model.transform(testData)

# Evaluate the model by computing the RMSE on the validation data
evaluator = RegressionEvaluator(metricName="rmse", labelCol="predicted_rating", predictionCol="prediction")

# Note that if your rating column is named differently, replace 'predicted_rating' with the actual name.
rmse = evaluator.evaluate(predictions)

print(f"Root-mean-square error = {rmse}")

Root-mean-square error = 3.5872330951597426


Our ratings are on a 1-5 scale, an RMSE of over 3.5 is quite high. It suggests that the model's predictions are, on average, off by more than 3 points on a 5-point scale, which indicates the model is not predicting very accurately.

In [None]:
from pyspark.ml.tuning import ParamGridBuilder

paramGrid = (ParamGridBuilder()
             .addGrid(als.maxIter, [20])  # Number of iterations
             .addGrid(als.regParam, [0.05, 0.1])  # Regularization parameter
             .addGrid(als.rank, [15, 18])  # Number of features to use (also known as the rank)
             .build())

In [None]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse", labelCol="predicted_rating", predictionCol="prediction")

crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)  # Use 3+ folds in practice


In [None]:
# took 40mins, and crashed my kernel. needs to re-run with the subnet of data
cvModel = crossval.fit(trainingData)

In [None]:
bestModel = cvModel.bestModel
predictions = bestModel.transform(testData)
rmse = evaluator.evaluate(predictions)
print(f"Best model RMSE on test data: {rmse}")

## Most recent year - hotel recommendation for users

In [None]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Hotel Recommendation") \
    .getOrCreate()

# Load the review data from GCS
data_path = "gs://ds5460-final-bucket/notebooks/jupyter/Master Files/reviews_with_predicted_ratings.csv"
reviews_df = spark.read.csv(data_path, header=True)

# Show the first few rows of the DataFrame
reviews_df.show()


+----------+---------+----------+-----------+----------------+-------------------------------------+-----+-----------+----------------+
|listing_id|       id|      date|reviewer_id|   reviewer_name|                             comments|state|       city|predicted_rating|
+----------+---------+----------+-----------+----------------+-------------------------------------+-----+-----------+----------------+
|       109|   449036|2011-08-15|     927861|           Edwin|                 The host canceled...|   CA|Los Angeles|               2|
|       109| 74506539|2016-05-15|   22509885|            Jenn|                 Me and two friend...|   CA|Los Angeles|               5|
|      2708| 13994902|2014-06-09|   10905424|         Kuberan|                 i had a wonderful...|   CA|Los Angeles|               3|
|      2708| 14606598|2014-06-23|    2247288|         Camilla|                 Charles is just a...|   CA|Los Angeles|               5|
|      2708| 39597339|2015-07-25|   27974696|   

In [None]:
from pyspark.sql.functions import year

# Extract the year from the date column
reviews_df = reviews_df.withColumn("year", year("date"))

# Filter the DataFrame to include only rows from 2022 to the current year
recent_reviews_df = reviews_df.filter((reviews_df["year"] >= 2023))

# Drop the temporary 'year' column
recent_reviews_df = recent_reviews_df.drop("year")

# Count the number of rows
row_count = recent_reviews_df.count()

# Print the count
print("Number of rows:", row_count)

Number of rows: 2102862


In [None]:
recent_reviews_df.printSchema()

root
 |-- listing_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- reviewer_id: string (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- predicted_rating: string (nullable = true)



In [None]:
# Remove null values
recent_reviews_df = recent_reviews_df.na.drop()

# Convert listing_id, reviewer_id, and predicted_rating columns to integer type
recent_reviews_df = recent_reviews_df.withColumn("listing_id", col("listing_id").cast("int"))
recent_reviews_df = recent_reviews_df.withColumn("reviewer_id", col("reviewer_id").cast("int"))
recent_reviews_df = recent_reviews_df.withColumn("predicted_rating", col("predicted_rating").cast("int"))

In [None]:
from pyspark.sql.functions import col

# Check for nulls in the specific columns
for column in ['reviewer_id', 'listing_id', 'predicted_rating']:
    null_count = recent_reviews_df.filter(col(column).isNull()).count()
    print(f"Number of nulls in {column}: {null_count}")


Number of nulls in reviewer_id: 0
Number of nulls in listing_id: 847868
Number of nulls in predicted_rating: 8335


In [None]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Step 1: Clean the Data
cleaned_df = recent_reviews_df.dropna(subset=["reviewer_id", "listing_id", "predicted_rating"])

# Step 2: Split the Dataset
(trainingData, validationData, testData) = cleaned_df.randomSplit([0.6, 0.1, 0.3])

In [None]:
num_rows_after_dropping_nas = cleaned_df.count()

print(f"Number of rows after dropping NaNs: {num_rows_after_dropping_nas}")

Number of rows after dropping NaNs: 1249672


In [None]:
# from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator
# from pyspark.ml.evaluation import RegressionEvaluator
# (trainingData, validationData, testData) = recent_reviews_df .randomSplit([0.6,0.1,0.3])

#als = ALS(maxIter=25,rank=12,regParam=0.2,nonnegative=True, userCol="reviewer_id", itemCol="listing_id",ratingCol="predicted_rating",coldStartStrategy="drop")
#als = ALS(nonnegative=True, userCol="reviewer_id", itemCol="listing_id",ratingCol="predicted_rating",coldStartStrategy="drop")
als = ALS(maxIter=5, rank=10, regParam=0.1, nonnegative=True, userCol="reviewer_id", itemCol="listing_id", ratingCol="predicted_rating", coldStartStrategy="drop")

#narrow down the range of each parameter. have tried .rank [10,15,18,20,30]; maxIter[5,15,20]; regParam[0.05, 0.1]
param_grid = ParamGridBuilder()\
             .addGrid(als.rank, [15,18])\
             .addGrid(als.maxIter, [20])\
             .addGrid(als.regParam, [0.05, 0.1])\
             .build()

evaluator = RegressionEvaluator(metricName='rmse',
                                labelCol='predicted_rating', predictionCol='prediction')

cv = CrossValidator(
        estimator=als,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        numFolds=3)

model = cv.fit(trainingData)

best_model = model.bestModel
print('rank: ', best_model.rank)
print('MaxIter: ', best_model._java_obj.parent().getMaxIter())
print('RegParam: ', best_model._java_obj.parent().getRegParam())

rank:  18
MaxIter:  20
RegParam:  0.05


In [None]:
# Train the model using the training data
from pyspark.sql.functions import col, expr, rank, countDistinct, count

model = als.fit(trainingData)

# Generate predictions on the test data
predictions = best_model.transform(testData)
predictions = predictions.withColumn("prediction", expr("CASE WHEN prediction < 1 THEN 1 WHEN prediction > 5 THEN 5 ELSE prediction END"))

evaluator = RegressionEvaluator(metricName='rmse', labelCol='predicted_rating', predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print(f'Root Mean Squared Error (RMSE): {rmse}')

Root Mean Squared Error (RMSE): 1.9978335717142977


In [None]:
# compared with previous rmse
#from pyspark.sql.functions import expr
#from pyspark.ml.evaluation import RegressionEvaluator

#previous_als = ALS(maxIter=5, regParam=0.1, nonnegative=True, userCol="reviewer_id", itemCol="listing_id", ratingCol="predicted_rating", coldStartStrategy="drop")
#previous_model = previous_als.fit(trainingData)

# Generate predictions on the test data using the previous model
#predictions = previous_model.transform(testData)
#predictions = predictions.withColumn("prediction", expr("CASE WHEN prediction < 1 THEN 1 WHEN prediction > 5 THEN 5 ELSE prediction END"))

# Evaluate the model using RMSE
#evaluator = RegressionEvaluator(metricName='rmse', labelCol='predicted_rating', predictionCol='prediction')
#rmse = evaluator.evaluate(predictions)
#print(f'Root Mean Squared Error (RMSE) of previous model: {rmse}')


In [None]:
# Stop SparkSession
spark.stop()