# Project: A Scalable Content-Based Recommendation System for Sri Lankan Tourism
Dataset: Sri Lanka Airbnb Listings
#
### Objective:
This project implements a content-based recommendation system using a real-world
Airbnb dataset for Sri Lanka. The system leverages Apache Spark to build a scalable
data processing pipeline, engineer features from listing attributes, and provide
relevant accommodation recommendations.


In [6]:
# ===================================================================
# Section 1: Environment Setup and Spark Initialization
# ===================================================================

# First, we need to install the PySpark library in our notebook environment.
# The '!' prefix allows us to run a shell command directly from the notebook.
# The '-q' flag is used for a "quiet" installation, which reduces the amount of output.
!pip install pyspark -q

# Here, we are importing the essential classes and functions we will need from PySpark.
# Rationale for each import:
#   - SparkSession: This is the main entry point to all Spark functionality. We can't do anything
#     in Spark without creating a SparkSession first.
#   - F (pyspark.sql.functions): This module contains a large collection of built-in functions
#     for transforming and manipulating DataFrame columns. Importing it as 'F' is a standard
#     convention that makes the code cleaner.
#   - FloatType: Part of pyspark.sql.types, this allows us to explicitly define a column's
#     data type, which is crucial for ensuring data quality during transformations.
#   - ml.feature & ml.Pipeline: These are components of Spark's Machine Learning (ML) library.
#     We need them to build a standardized workflow (a 'Pipeline') for converting our raw
#     data attributes into numerical feature vectors that algorithms can understand.
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Normalizer, MinMaxScaler
from pyspark.ml import Pipeline

# This is where we create our SparkSession object, which controls our Spark application.
# We use the builder pattern, which is a flexible way to configure the session.
#   - .appName(): Assigns a name to our application. This name appears in the Spark UI,
#     making it easy to monitor our job on a cluster.
#   - .getOrCreate(): This is a smart method that checks if a SparkSession already exists.
#     If it does, it returns the existing one; otherwise, it creates a new one. This
#     prevents errors if we accidentally try to create multiple sessions.
spark = SparkSession.builder \
    .appName("AirbnbPySparkRecommender") \
    .getOrCreate()

print("SparkSession initialized successfully.")

SparkSession initialized successfully.


In [7]:
# -----------------------------------------------------------------------
# Section 2: Data Ingestion and Preprocessing (The ETL Process)
# -----------------------------------------------------------------------

# This section covers the 'Extract' and 'Transform' stages of our ETL pipeline.
# The goal is to load the raw data and prepare it for feature engineering.

# --- Stage 1: Extract ---

# Action: We are ingesting the raw Airbnb dataset from the uploaded CSV file.
# Rationale: `spark.read.csv` is the standard method for loading structured text data
# into a distributed Spark DataFrame. We've configured it with several important options:
#   - `header=True`: This tells Spark to use the first row of the file as the column headers.
#   - `inferSchema=True`: Spark will automatically scan the data to guess the data type for
#     each column (e.g., Integer, String). This is very convenient for exploration.
#   - `multiLine=True` & `escape='\"'`: These options are vital for data integrity. They
#     ensure that fields which might contain special characters or span multiple lines
#     (like long listing names) are parsed correctly into a single field.
file_path = "sri_lanka_airbnb.csv"
print(f"\nIngesting data from '{file_path}'...")
raw_df = spark.read.csv(file_path, header=True, inferSchema=True, multiLine=True, escape='\"')

# --- Stage 2: Transform ---

# Action: Here, we select the columns relevant to our model and standardize their names.
# Rationale: A key principle in data processing is to work only with the data you need.
# By selecting a subset of columns, we reduce the memory footprint and improve performance.
# We use the `.alias()` method to rename columns to a consistent `snake_case` format, which is a
# widely adopted coding standard that improves readability.
df = raw_df.select(
    F.col("name"),
    F.col("roomType").alias("room_type"),
    F.col("stars").alias("star_rating"),
    F.col("numberOfGuests").alias("number_of_guests")
)

# Note on price data: The 'price' feature, while potentially valuable, is not included in the
# current selection. If it were present as a string (e.g., '$95'), a transformation step
# involving `regexp_replace` to remove non-numeric symbols and a `.cast()` to convert it to a
# numeric type would be necessary before it could be used in modeling.

# Action: We are removing records that have null values in critical feature columns.
# Rationale: Missing data in key fields like `star_rating` or `room_type` can lead to errors
# or biases in our recommendation logic. Using `.na.drop()` is a straightforward strategy to
# ensure our dataset contains only complete, high-quality records for our chosen features.
df = df.na.drop(subset=["star_rating", "room_type"])


# Action: Displaying the schema and a sample of the transformed DataFrame.
# Rationale: It's crucial to verify the results of our transformation steps.
#   - `printSchema()` confirms that our column names and data types are correct.
#   - `show()` allows us to inspect a sample of the data to ensure the cleaning
#     and selection logic worked as expected.
print("\n--- Data after cleaning and preprocessing ---")
df.show(5, truncate=False)
df.printSchema()


Ingesting data from 'sri_lanka_airbnb.csv'...

--- Data after cleaning and preprocessing ---
+-------------------------------------------------+---------------------------+-----------+----------------+
|name                                             |room_type                  |star_rating|number_of_guests|
+-------------------------------------------------+---------------------------+-----------+----------------+
|D'Villa Garden House                             |Private room in guest suite|4.61       |2               |
|Suppar villa, a vacation home                    |Entire home                |4.5        |6               |
|Brinthavanam days inn /Non Ac/Free Veg Break fast|Private room in guesthouse |4.44       |2               |
|"Mathura" Home / Two rooms for rental in Jaffna  |Entire home                |4.0        |4               |
|Tony's Garden House 🏡 inn                       |Private room in home       |4.08       |16              |
+----------------------------------

In [8]:
# ----------------------------------------------------------------
# Section 3: Content-Based Feature Engineering
# ----------------------------------------------------------------
# Objective: The primary goal of this section is to transform the descriptive,
# raw attributes of each listing into a standardized numerical representation,
# known as a "feature vector". This vector format is essential because
# mathematical operations, like similarity calculations, can only be
# performed on numerical data.

# Action: We are defining a multi-stage feature engineering pipeline.
# Rationale: Using a `Pipeline` from Spark's ML library is a best practice for
# organizing our data transformation workflow. It allows us to chain multiple
# steps together in a specific order. This makes the code more readable,
# reusable, and ensures that the same transformations are applied consistently,
# which is critical for both training and later, for making predictions on new data.

# --- Pipeline Stage 1: StringIndexer ---
# Purpose: To convert our categorical feature 'room_type' (which contains text
# like "Private room") into a numerical format.
# How it works: It assigns a unique numerical index to each distinct category.
# For example: "Private room" -> 0.0, "Entire home/apt" -> 1.0.
room_type_indexer = StringIndexer(inputCol="room_type", outputCol="room_type_index", handleInvalid="keep")

# --- Pipeline Stage 2: OneHotEncoder ---
# Purpose: To convert the numerical indices into a more meaningful binary vector format.
# Rationale: Simply using the indices (0.0, 1.0, etc.) could mislead an algorithm into
# thinking there is an ordinal relationship (e.g., that 1.0 > 0.0). One-Hot Encoding
# prevents this by creating a sparse vector where each category is a dimension.
# For example, "Private room" (index 0) might become a vector like [1.0, 0.0, 0.0].
room_type_encoder = OneHotEncoder(inputCol="room_type_index", outputCol="room_type_vec")

# --- Pipeline Stage 3: VectorAssembler ---
# Purpose: To gather all our feature columns into a single vector column.
# Rationale: Spark's ML algorithms expect all features for a given data point
# to be consolidated into a single vector. Even though we only have one feature
# vector in this case ('room_type_vec'), this is a required and scalable step.
final_assembler = VectorAssembler(inputCols=["room_type_vec"], outputCol="features")

# --- Pipeline Stage 4: Normalizer ---
# Purpose: To scale our feature vectors to have a unit length (L2 norm).
# Rationale: This is a crucial prerequisite for calculating Cosine Similarity.
# Cosine similarity measures the angle between two vectors, and this calculation
# is only accurate if the vectors are normalized to the same length (a magnitude of 1).
normalizer = Normalizer(inputCol="features", outputCol="norm_features", p=2.0)

# Action: We now define the full pipeline by listing the stages in order.
pipeline = Pipeline(stages=[
    room_type_indexer, room_type_encoder,
    final_assembler, normalizer
])

# Action: We fit the pipeline to our DataFrame and then transform the data.
# Rationale:
#   - `.fit(df)`: This step trains our pipeline. The `StringIndexer`, for instance,
#     scans the 'room_type' column to learn all the unique categories and create its
#     internal index mapping.
#   - `.transform(df)`: This step applies the learned transformations to every row
#     in the DataFrame, executing all stages in order and adding the new columns,
#     including our final 'norm_features' vector.
feature_pipeline = pipeline.fit(df)
items_with_features = feature_pipeline.transform(df)

# Action: Display the results to verify our feature engineering was successful.
# Rationale: We inspect the output to confirm that a 'norm_features' vector has
# been created for each listing, representing its content in a numerical format.
print("\n--- Data with final engineered feature vectors ---")
items_with_features.select("name", "norm_features").show(5, truncate=False)


--- Data with final engineered feature vectors ---
+-------------------------------------------------+---------------+
|name                                             |norm_features  |
+-------------------------------------------------+---------------+
|D'Villa Garden House                             |(62,[30],[1.0])|
|Suppar villa, a vacation home                    |(62,[1],[1.0]) |
|Brinthavanam days inn /Non Ac/Free Veg Break fast|(62,[9],[1.0]) |
|"Mathura" Home / Two rooms for rental in Jaffna  |(62,[1],[1.0]) |
|Tony's Garden House 🏡 inn                       |(62,[8],[1.0]) |
+-------------------------------------------------+---------------+
only showing top 5 rows



In [9]:
# -----------------------------------------------------------------------
# Section 4: Recommendation Logic Implementation
# -----------------------------------------------------------------------
# Objective: This section contains the core algorithm of our content-based
# recommender. We will define a function that, given a specific item, can
# find and rank the 'top k' most similar items from our dataset based on the
# feature vectors we engineered in the previous section.

# Action: Define a function to generate content-based recommendations.
# Rationale: This function encapsulates the recommendation logic, making it reusable
# and easy to test. It takes a target item's name, the full DataFrame of items
# with their feature vectors, and the number of recommendations to return (top_k).
def recommend_spark(item_name, items_df, top_k=5):
    """
    This function implements the recommendation logic by calculating cosine similarity
    between a target item and all other items in the dataset.
    """
    # Step 1: Isolate the feature vector of the target item.
    # We filter the DataFrame to find the specific item we want recommendations for
    # and extract its pre-computed 'norm_features' vector. A try-except block is
    # used for robust error handling in case the item name is not found.
    try:
        target_vector = items_df.filter(F.col("name") == item_name).select("norm_features").first()[0]
    except (IndexError, TypeError):
        print(f"Error: Item with name '{item_name}' not found.")
        return None

    # Step 2: Define a User-Defined Function (UDF) to compute similarity.
    # Rationale: Since the feature vectors are normalized (scaled to a unit length),
    # the cosine similarity is equivalent to their dot product. We create a UDF to
    # apply this vector operation efficiently across all rows of our distributed DataFrame.
    dot_product_udf = F.udf(lambda x: float(x.dot(target_vector)), "float")

    # Step 3: Compute similarity for all items and rank them.
    # A new 'similarity' column is created by applying our UDF to every item's
    # feature vector. This is a highly parallelized transformation.
    similarity_df = items_df.withColumn("similarity", dot_product_udf(F.col("norm_features")))

    # Step 4: Sort, filter, and select the top recommendations.
    #   - `orderBy()`: We sort the entire DataFrame by the new 'similarity' column in
    #     descending order to bring the most similar items to the top.
    #   - `filter()`: We must exclude the input item itself from the results, as its
    #     similarity to itself will always be a perfect 1.0.
    #   - `limit()`: We select only the top 'k' items from the sorted list.
    recommendations = similarity_df.orderBy(F.col("similarity").desc()) \
                                   .filter(F.col("name") != item_name) \
                                   .limit(top_k)

    # Finally, we select a few readable columns to present the recommendations.
    return recommendations.select("name", "room_type", "star_rating", "number_of_guests", "similarity")

# Action: Execute the recommendation function to test its functionality.
# Rationale: To demonstrate that our function works, we select a sample item from
# our dataset and generate recommendations for it. We programmatically get the first
# item's name to ensure the code is runnable without manual input.
target_item_name = items_with_features.first()["name"]
print(f"\n--- Top 5 Recommendations for '{target_item_name}' ---")
recs = recommend_spark(target_item_name, items_with_features, top_k=5)

# We check if the function returned a DataFrame before trying to display it.
if recs:
    recs.show(truncate=False)


--- Top 5 Recommendations for 'D'Villa Garden House' ---
+---------------------------------------------+---------------------------+-----------+----------------+----------+
|name                                         |room_type                  |star_rating|number_of_guests|similarity|
+---------------------------------------------+---------------------------+-----------+----------------+----------+
|Luxury Oceanfront private Suite              |Private room in guest suite|4.92       |2               |1.0       |
|Into The Blue - Yoga included | Walk to Beach|Private room in guest suite|4.86       |2               |1.0       |
|Ambalama Pavillion                           |Private room in guest suite|5.0        |2               |1.0       |
|Sudu Neluma Home Stay (Foreigners Only)      |Private room in guest suite|4.55       |5               |1.0       |
|Ivory Villa with a private Pool and Garden   |Private room in guest suite|4.75       |2               |1.0       |
+-------------

In [5]:
# --------------------------------------------------------------------------
# Section 5: Offline Model Validation Framework
# --------------------------------------------------------------------------
# Objective: This section is dedicated to the critical process of model evaluation.
# It is insufficient to simply build a model; we must quantitatively measure its
# performance to understand its effectiveness. Here, we construct a harness to
# calculate Precision@K, a standard information retrieval metric adapted for
# assessing recommendation quality.

# Action: Define a function to calculate the average Precision@K for our model.
# Rationale: This function serves as our evaluation protocol. A key challenge in a
# Big Data context is that evaluating a model across the entire dataset can be
# computationally expensive. Therefore, a common academic and industry practice
# is to estimate performance by testing on a smaller, random, and statistically
# representative sample of the data.
def evaluate_model_spark(items_df, top_k=5, sample_size=20):
    """
    Calculates average Precision@K across a random sample of items. This metric
    is a measure of the accuracy of the top-K recommendations.
    """
    # Step 1: Sub-sampling the dataset for evaluation.
    # Rationale: We take a small fraction of the data to create our test set.
    # The use of a fixed 'seed' is crucial for reproducibility, ensuring that
    # the same random sample is selected each time the experiment is run, which
    # is a cornerstone of valid scientific methodology.
    test_items = items_df.sample(False, 0.05, seed=42).limit(sample_size).collect()

    # A simple check to ensure the sampling process yielded enough data.
    if len(test_items) < sample_size:
        print(f"Warning: The evaluation sample is smaller than desired ({len(test_items)} items).")

    total_precision = 0.0
    evaluated_count = 0

    # Step 2: Iterating through the test set to calculate precision for each item.
    for item_row in test_items:
        item_name = item_row["name"]

        # Step 3: Defining the "Ground Truth" using a relevance proxy.
        # Rationale: The most significant challenge in offline evaluation is the lack
        # of explicit user feedback. To address this, we define a heuristic-based
        # proxy for relevance. Our assumption is that `room_type` is a primary
        # decision factor for users. Therefore, if a user is viewing a specific item,
        # other recommended items with the same `room_type` are considered relevant.
        # It's important to acknowledge that this is a simplification of true user preference.
        ground_truth_room_type = item_row["room_type"]

        # Step 4: Generating recommendations for the item under test.
        recommendations_df = recommend_spark(item_name, items_df, top_k=top_k)

        if recommendations_df is None:
            continue

        # Step 5: Computing Precision@K for the generated recommendations.
        # The number of "hits" is the count of recommended items that satisfy our
        # ground truth criterion. Precision@K is then calculated as:
        # (Number of Relevant Items in Top-K) / K.
        hits = recommendations_df.filter(F.col("room_type") == ground_truth_room_type).count()
        precision = hits / top_k
        total_precision += precision
        evaluated_count += 1

    # Step 6: Aggregating the results.
    # Rationale: A single precision score can be noisy. By averaging the precision
    # across our entire test sample, we obtain a more stable and generalizable
    # estimate of the model's overall performance.
    avg_precision = total_precision / evaluated_count if evaluated_count > 0 else 0
    return avg_precision

# Action: Execute the evaluation protocol and report the final score.
# Rationale: This is the final step where we run our defined logic to produce a
# single, quantitative metric. This score serves as a benchmark for our model's
# accuracy and can be used to compare different versions of the recommendation algorithm.
avg_precision_at_5 = evaluate_model_spark(items_with_features, top_k=5, sample_size=20)
print(f"\n--- Evaluation Results ---")
print(f"Average Precision@5 (based on a sample of 20 items): {avg_precision_at_5:.3f}")


--- Evaluation Results ---
Average Precision@5 (based on a sample of 20 items): 1.000


In [None]:
# ----------------------------------------------------
# Section 6: Shutdown
# ----------------------------------------------------
# Action: Stop the SparkSession.
# Rationale: It is best practice to release the cluster resources when the
# application is finished.
spark.stop()