In [19]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("Hotel Recommendation System") \
    .getOrCreate()

# Load the dataset
df = spark.read.csv('hotel_reviews.csv', header=True, inferSchema=True)

# Show the first few rows of the dataset to understand what data looks like
df.show(5)

# Print the schema to understand the data types and column names
df.printSchema()

+--------------------+----------+--------+-------+---------+---------+------------------+----------+--------+-------------------+-------------------+-------------------+----------+--------------+--------------------+--------------------+----------------+----------------+--------------------+
|             address|categories|    city|country| latitude|longitude|              name|postalCode|province|       reviews.date|  reviews.dateAdded|reviews.doRecommend|reviews.id|reviews.rating|        reviews.text|       reviews.title|reviews.userCity|reviews.username|reviews.userProvince|
+--------------------+----------+--------+-------+---------+---------+------------------+----------+--------+-------------------+-------------------+-------------------+----------+--------------+--------------------+--------------------+----------------+----------------+--------------------+
|Riviera San Nicol...|    Hotels|Mableton|     US|45.421611|12.376187|Hotel Russo Palace|     30126|      GA|2013-09-22 0

## Data Preprocessing

In [20]:
from pyspark.sql.functions import col

# Assuming you've already loaded the dataset into DataFrame df
# Rename columns to replace '.' with '_'
for column in df.columns:
    df = df.withColumnRenamed(column, column.replace('.', '_'))

# Show the updated schema to confirm changes
df.printSchema()

root
 |-- address: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- postalCode: string (nullable = true)
 |-- province: string (nullable = true)
 |-- reviews_date: timestamp (nullable = true)
 |-- reviews_dateAdded: timestamp (nullable = true)
 |-- reviews_doRecommend: string (nullable = true)
 |-- reviews_id: string (nullable = true)
 |-- reviews_rating: double (nullable = true)
 |-- reviews_text: string (nullable = true)
 |-- reviews_title: string (nullable = true)
 |-- reviews_userCity: string (nullable = true)
 |-- reviews_username: string (nullable = true)
 |-- reviews_userProvince: string (nullable = true)



In [21]:
from pyspark.sql.functions import col, when, isnull, count

# Check for missing values in each column
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+-------+----------+----+-------+--------+---------+----+----------+--------+------------+-----------------+-------------------+----------+--------------+------------+-------------+----------------+----------------+--------------------+
|address|categories|city|country|latitude|longitude|name|postalCode|province|reviews_date|reviews_dateAdded|reviews_doRecommend|reviews_id|reviews_rating|reviews_text|reviews_title|reviews_userCity|reviews_username|reviews_userProvince|
+-------+----------+----+-------+--------+---------+----+----------+--------+------------+-----------------+-------------------+----------+--------------+------------+-------------+----------------+----------------+--------------------+
|      0|         0|   0|      0|      86|       86|   0|        55|       0|         259|                0|              35912|     35912|           862|          20|         1612|           19581|              75|               18392|
+-------+----------+----+-------+--------+---------+

In [22]:
# Drop columns with a large number of missing values
df = df.drop("reviews_doRecommend", "reviews_id", "reviews_userCity", "reviews_userProvince")

# Drop rows where 'reviews_text' or 'reviews_rating' is null as these are crucial for analysis
df = df.na.drop(subset=["reviews_text", "reviews_rating"])

# You can opt to fill missing 'reviews_title' with 'No Title' if not crucial
df = df.na.fill({"reviews_title": "No Title"})

# Show updated counts of missing values for verification
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()


+-------+----------+----+-------+--------+---------+----+----------+--------+------------+-----------------+--------------+------------+-------------+----------------+
|address|categories|city|country|latitude|longitude|name|postalCode|province|reviews_date|reviews_dateAdded|reviews_rating|reviews_text|reviews_title|reviews_username|
+-------+----------+----+-------+--------+---------+----+----------+--------+------------+-----------------+--------------+------------+-------------+----------------+
|      0|         0|   0|      0|      76|       76|   0|        55|       0|         257|                0|             0|           0|            0|              75|
+-------+----------+----+-------+--------+---------+----+----------+--------+------------+-----------------+--------------+------------+-------------+----------------+



## Sentiment Analysis

In [23]:
from pyspark.sql.functions import udf
from textblob import TextBlob
from pyspark.sql.types import StringType

# Define a UDF to compute sentiment category
def sentiment_analysis(text):
    return TextBlob(text).sentiment.polarity

sentiment_udf = udf(sentiment_analysis, StringType())

# Apply the sentiment analysis UDF to the review texts
df = df.withColumn("sentiment_score", sentiment_udf(df.reviews_text))
df = df.withColumn("sentiment_category", when(col("sentiment_score") > 0, "Positive")
                                         .when(col("sentiment_score") == 0, "Neutral")
                                         .otherwise("Negative"))

In [24]:
from pyspark.sql.functions import length

# Add a new column that represents the length of each review
df = df.withColumn("review_length", length(df.reviews_text))

In [29]:
df.show(3)

+--------------------+----------+--------+-------+---------+---------+------------------+----------+--------+-------------------+-------------------+--------------+--------------------+--------------------+----------------+------------------+------------------+-------------+
|             address|categories|    city|country| latitude|longitude|              name|postalCode|province|       reviews_date|  reviews_dateAdded|reviews_rating|        reviews_text|       reviews_title|reviews_username|   sentiment_score|sentiment_category|review_length|
+--------------------+----------+--------+-------+---------+---------+------------------+----------+--------+-------------------+-------------------+--------------+--------------------+--------------------+----------------+------------------+------------------+-------------+
|Riviera San Nicol...|    Hotels|Mableton|     US|45.421611|12.376187|Hotel Russo Palace|     30126|      GA|2013-09-22 05:00:00|2016-10-24 05:00:25|           4.0|Pleasant

## Feature Standardization for Clustering

In [7]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Assemble features into a single vector column
assembler = VectorAssembler(inputCols=["reviews_rating", "review_length"], outputCol="features")
df = assembler.transform(df)

# Normalize the features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
scalerModel = scaler.fit(df)
df = scalerModel.transform(df)

## KMeans Clustering

In [17]:
from pyspark.ml.clustering import KMeans

# Apply K-means clustering
kmeans = KMeans(featuresCol='scaledFeatures', k=5)
model = kmeans.fit(df)
predictions = model.transform(df)

# Show the results
predictions.select("prediction").show(20)

+----------+
|prediction|
+----------+
|         3|
|         0|
|         0|
|         0|
|         0|
|         0|
|         3|
|         3|
|         3|
|         3|
|         3|
|         3|
|         3|
|         3|
|         3|
|         3|
|         3|
|         3|
|         3|
|         2|
+----------+
only showing top 20 rows



# Developing a Recommendation System

## Collaborative Filtering

In [9]:
from pyspark.sql.functions import hash
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Initialize Spark session (assuming it's already done in your environment)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Hotel Recommendation System").getOrCreate()

# Load the data (assuming it's already loaded into DataFrame df)

# Convert usernames and hotel names to numeric IDs using Spark's hash function
df = df.withColumn("userId", hash("reviews_username"))
df = df.withColumn("hotelId", hash("name"))

# Split data into training and test sets
(training, test) = df.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="hotelId", ratingCol="reviews_rating", coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="reviews_rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error before tuning = " + str(rmse))

# Parameter tuning using cross-validation
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20, 50]) \
    .addGrid(als.maxIter, [5, 10]) \
    .addGrid(als.regParam, [0.01, 0.05, 0.1]) \
    .build()

crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(metricName="rmse", labelCol="reviews_rating", predictionCol="prediction"),
                          numFolds=3)  # Use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters
cvModel = crossval.fit(training)
bestModel = cvModel.bestModel

# Evaluate the best model on the test set
bestPredictions = bestModel.transform(test)
bestRmse = evaluator.evaluate(bestPredictions)
print("Best model Root-mean-square error = " + str(bestRmse))


24/05/12 01:22:00 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

Root-mean-square error before tuning = 2.0850366333726944


[Stage 4850:>                                                       (0 + 4) / 4]

Best model Root-mean-square error = 1.4421299497904674


                                                                                

## Content-Based Filtering

### Step 1: Feature Extraction Using TF-IDF

In [11]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml import Pipeline

# Text processing to convert reviews into features
tokenizer = Tokenizer(inputCol="reviews_text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=1000)
idf = IDF(inputCol="rawFeatures", outputCol="tfidfFeatures")

# Pipeline to process text
text_pipeline = Pipeline(stages=[tokenizer, hashingTF, idf])
model = text_pipeline.fit(df)
featured_data = model.transform(df)

# Displaying the transformed DataFrame to check new columns
featured_data.select("tfidfFeatures").show(5)

                                                                                

+--------------------+
|       tfidfFeatures|
+--------------------+
|(1000,[17,49,56,1...|
|(1000,[16,17,25,1...|
|(1000,[14,77,99,1...|
|(1000,[16,17,50,5...|
|(1000,[16,17,50,5...|
+--------------------+
only showing top 5 rows



### Step 2: Calculating Cosine Similarity

In [30]:
from pyspark.sql.functions import col
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import Normalizer

# Normalize the feature vectors
normalizer = Normalizer(inputCol="tfidfFeatures", outputCol="normFeatures")
normalized_data = normalizer.transform(featured_data)

# Self-join to compute pairwise similarities (conceptual, adjust based on resources)
# This might be very large; consider using approximations or limiting the number of comparisons
pairwise_similarity = normalized_data.alias("hotels1").join(normalized_data.alias("hotels2"), "hotelId") \
    .select(
        col("hotels1.name").alias("hotel1"),
        col("hotels2.name").alias("hotel2"),
        col("hotels1.normFeatures").alias("features1"),
        col("hotels2.normFeatures").alias("features2")
    )

# Define a UDF to calculate cosine similarity between feature vectors
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
import numpy as np

def cosine_similarity(features1, features2):
    return float(np.dot(features1, features2) / (np.linalg.norm(features1) * np.linalg.norm(features2)))

similarity_udf = udf(cosine_similarity, FloatType())

pairwise_similarity = pairwise_similarity.withColumn("similarity", similarity_udf(col("features1"), col("features2")))


### Step 3: Recommend Based on Similarity

In [32]:
# Filter to show recommendations for a specific hotel
# For instance, recommendations for the first hotel in the DataFrame
target_hotel = pairwise_similarity.filter(col("hotel1") == "Hotel Russo Palace")
top_recommendations = target_hotel.sort(col("similarity").desc()).limit(50)

top_recommendations.show()



+------------------+------------------+--------------------+--------------------+----------+
|            hotel1|            hotel2|           features1|           features2|similarity|
+------------------+------------------+--------------------+--------------------+----------+
|Hotel Russo Palace|Hotel Russo Palace|(1000,[16,17,21,7...|(1000,[16,17,21,7...|       1.0|
|Hotel Russo Palace|Hotel Russo Palace|(1000,[48,76,139,...|(1000,[48,76,139,...|       1.0|
|Hotel Russo Palace|Hotel Russo Palace|(1000,[18,28,30,3...|(1000,[18,28,30,3...|       1.0|
|Hotel Russo Palace|Hotel Russo Palace|(1000,[18,28,83,9...|(1000,[18,28,83,9...|       1.0|
|Hotel Russo Palace|Hotel Russo Palace|(1000,[16,17,50,5...|(1000,[16,17,50,5...|       1.0|
|Hotel Russo Palace|Hotel Russo Palace|(1000,[0,1,40,55,...|(1000,[0,1,40,55,...|       1.0|
|Hotel Russo Palace|Hotel Russo Palace|(1000,[14,77,99,1...|(1000,[14,77,99,1...|       1.0|
|Hotel Russo Palace|Hotel Russo Palace|(1000,[16,17,50,5...|(1000,[16,

                                                                                