In [5]:
import pandas as pd
import numpy as np
import polars as pl
from sklearn.model_selection import train_test_split
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, expr
import pyspark.sql.functions as F


# Initialize Spark session
spark = SparkSession.builder \
    .appName("HotelRecommendationALS") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# loading in the cleaned training data
train_data = pl.read_csv('data/cleaned_training_data.csv')
test_data = pl.read_csv('data/cleaned_test_data.csv')

# I will drop unnecessary columns
train_data = train_data.drop(['prop_log_historical_price', 'price_usd', 'parsed_date', 'year', 'month', 'day', 'search_hour', 'day_of_week', 'year_month', 'date_time', 'prop_historical_price', 'price_usd_per_night_test', 'price_ratio', 'position', 'gross_bookings_usd', 'click_bool']). \
    rename({'price_usd_without_promo': 'price_usd'})

test_data = test_data.drop(['prop_log_historical_price', 'price_usd', 'parsed_date', 'year', 'month', 'day', 'search_hour', 'day_of_week', 'year_month', 'date_time', 'prop_historical_price', 'price_usd_per_night_test', 'price_ratio']). \
    rename({'price_usd_without_promo': 'price_usd'})

In [6]:
# creating the dataframe for the CatBoostRanker
train_df = train_data.to_pandas()
test_df = test_data.to_pandas()

# # I will train the model on a subset
# train_df = train_df[train_df['srch_id'].isin(train_df['srch_id'].unique()[:4000])]
# print(train_df.shape) # 2492 observations

# # test model subset
# test_subset = test_data.to_pandas().copy()
# test_subset = test_subset[test_subset['srch_id'].isin(train_df['srch_id'])]

In [None]:
# We'll create an implicit feedback dataset
# For collaborative filtering, we need (user, item, rating) format
# Here, srch_id = user, prop_id = item, and we'll create a rating based on booking_bool
# If booking_bool=1, we'll give it a higher weight

# Create rating values based on booking behavior
# Rating scale: booking=5, click only=1, impression only=0.5
#train_df['rating'] = train_df['booking_bool'] * 4 + 1.0

# Keep only the columns needed for ALS
als_data = train_df[['srch_id', 'prop_id', 'booking_bool']].copy()

# Convert to Spark DataFrame
spark_df = spark.createDataFrame(als_data)

# Ensure correct types and no nulls for ALS
from pyspark.sql.functions import col
spark_df = spark_df.dropna(subset=["srch_id", "prop_id", "booking_bool"])
spark_df = spark_df.withColumn("srch_id", col("srch_id").cast("integer"))
spark_df = spark_df.withColumn("prop_id", col("prop_id").cast("integer"))
spark_df = spark_df.withColumn("booking_bool", col("booking_bool").cast("float"))

# Split the data into training and validation sets
(training_data, validation_data) = spark_df.randomSplit([0.8, 0.2], seed=42)

# Create an ALS model
als = ALS(
    maxIter=15,
    regParam=0.01,
    userCol="srch_id",
    itemCol="prop_id",
    ratingCol="booking_bool",
    coldStartStrategy="drop",
    implicitPrefs=True,  # Use implicit feedback
    alpha=10.0,  # Confidence parameter for implicit feedback
    nonnegative=True,  # Use non-negative factorization for better interpretability
    rank=80  # Number of latent factors
)

# Train the model
model = als.fit(training_data)

# Make predictions on validation data
predictions = model.transform(validation_data)

# Evaluate the model
evaluator = RegressionEvaluator(
    metricName="rmse", 
    labelCol="rating", 
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")

# Make predictions on test data
# First convert test data to Spark DataFrame with the needed columns
test_df = test_data.to_pandas()
test_spark_df = spark.createDataFrame(test_df[['srch_id', 'prop_id']])

# Generate predictions for all search-property pairs in test data
test_predictions = model.transform(test_spark_df)

# Generate top N recommendations for each search
N = 5  # Number of recommendations per search
search_recs = model.recommendForAllUsers(N)

# Explode the recommendations column to get a flat table
search_recs = search_recs.select(
    col("srch_id"),
    expr("explode(recommendations)").alias("recommendation")
).select(
    col("srch_id"),
    col("recommendation.prop_id").alias("prop_id"),
    col("recommendation.rating").alias("pred_score")
)

# Calculate softmax scores for ranking
# First, collect the data back to pandas
search_recs_pd = search_recs.toPandas()

# Apply softmax function to prediction scores
def softmax(x):
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum()

# Group by search_id and apply softmax within each group
search_recs_pd['softmax_score'] = search_recs_pd.groupby('srch_id')['pred_score'].transform(
    lambda x: softmax(x.values)
)

# Sort by search_id and prediction score (descending)
ranked_recs = search_recs_pd.sort_values(by=['srch_id', 'pred_score'], ascending=[True, False])
print(ranked_recs.head(10))

# Save the results
ranked_recs[['srch_id', 'pred_score', 'softmax_score', 'prop_id']].to_csv('als_recommendations.csv', index=False)

# For full test dataset predictions (not just top N)
# This will generate a score for every search-property pair
full_test_predictions = test_predictions.select(
    col("srch_id"),
    col("prop_id"),
    col("prediction").alias("pred_score")
).toPandas()

# Apply softmax for full predictions
full_test_predictions['softmax_score'] = full_test_predictions.groupby('srch_id')['pred_score'].transform(
    lambda x: softmax(x.values)
)

# Sort by search_id and prediction score (descending)
full_ranked = full_test_predictions.sort_values(by=['srch_id', 'pred_score'], ascending=[True, False])

# Save full predictions
full_ranked[['srch_id', 'pred_score', 'softmax_score', 'prop_id']].to_csv('als_full_predictions.csv', index=False)

# Stop Spark session


In [None]:


als_full_predictions = pd.read_csv('als_full_predictions.csv')
als_full_predictions[['srch_id', 'prop_id']].to_csv('als_full_predictions_ids.csv', index=False)


Root-mean-square error = 1.2053121495196557
        srch_id  prop_id  pred_score  softmax_score
189095        1    53341    0.760762       0.204566
189096        1    95307    0.753495       0.203085
189097        1    88218    0.729622       0.198294
189098        1    65606    0.725878       0.197553
189099        1    41488    0.720551       0.196503
472925        4    48512    0.510664       0.204785
472926        4    51555    0.493643       0.201329
472927        4   114177    0.484217       0.199440
472928        4    45412    0.475473       0.197704
472929        4   140304    0.470590       0.196741
Best Rank: 80
Best RegParam: 0.01
Best MaxIter: 15
Best Alpha: 10.0