# **Recommendation Systems**

The main idea of this file, is to create "ground truth" by using KMeans Clustering, train on training set and get recommendations and then test that the recommendations are in the same cluster as the specified property id. We are not going to analyze much the file as we are not sure if the results are reliable. We couldnt figure out a good way to implement the code since we didnt have label column for evaluating our recommendations.

In [1]:
! pip install pyspark seaborn



In [2]:
# libraries

import sklearn
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.ml.clustering import KMeans
from scipy.spatial.distance import cosine

In [3]:
from google.colab import drive
drive.mount('/content/gdrive')
google_drive_path = "/content/gdrive/MyDrive/Colab Notebooks/Project/"

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [4]:
spark = SparkSession.builder.appName("Project511_RS").master("local[*]") \
          .config("spark.driver.memory", "10g") \
          .config("spark.executor.memory", "10g") \
          .config("spark.driver.maxResultSize", "2g") \
          .config("spark.sql.shuffle.partitions", "200") \
          .config("spark.default.parallelism", "100") \
          .getOrCreate()

# Read Dataset

In [5]:
sale_df = spark.read.format("parquet").load(google_drive_path + "sale_ml.parquet")

In [6]:
sale_df.printSchema()

root
 |-- price_bucket: double (nullable = true)
 |-- start_year: integer (nullable = true)
 |-- end_year: integer (nullable = true)
 |-- is_active: integer (nullable = true)
 |-- l3_one_hot: vector (nullable = true)
 |-- l2_one_hot: vector (nullable = true)
 |-- property_one_hot: vector (nullable = true)
 |-- encoded_start_month: vector (nullable = true)
 |-- encoded_end_month: vector (nullable = true)
 |-- encoded_start_day_of_week: vector (nullable = true)
 |-- encoded_end_day_of_week: vector (nullable = true)
 |-- geo_cluster: integer (nullable = true)
 |-- rooms_scaled: double (nullable = true)
 |-- bedrooms_scaled: double (nullable = true)
 |-- bathrooms_scaled: double (nullable = true)
 |-- surface_total_scaled: double (nullable = true)
 |-- scaled_final_features: vector (nullable = true)



In [7]:
sale_df.count()

210339

This df does not have the description of the properties in textual representation, but only in numeric representation, which we converted in the Text Analytics file. Both datasets have the same rows and data in the same order, but in order to give the description of the property when we recommending other properties, we are going to merge the two datasets.

In [8]:
df = spark.read.format("parquet").load(google_drive_path + "ar_properties_processed.parquet")

In [9]:
df.count()

259910

This is the dataset with the descriptions and all the info not encoded.

In [10]:
df.show()

+----------+----------+--------------------+--------------------+-----+--------+---------+-------------+---------------+--------------------+--------------------+----------------+--------------+--------------------+------------------+
|start_date|  end_date|                  l2|                  l3|rooms|bedrooms|bathrooms|surface_total|surface_covered|               title|         description|   property_type|operation_type|         coordinates|    amount_in_euro|
+----------+----------+--------------------+--------------------+-----+--------+---------+-------------+---------------+--------------------+--------------------+----------------+--------------+--------------------+------------------+
|2019-12-29|2020-02-11|Bs.As. G.B.A. Zon...|               Pilar|    4|       3|        2|        140.0|          140.0|Susana Aravena Pr...|Excelente propied...|           House|          Rent|[-34.879692077636...|             533.8|
|2019-12-29|2020-04-04|Bs.As. G.B.A. Zon...|             Esc

We are going to do some preprocessing in order to have the consistent version of it

In [11]:
df = df.filter(col("operation_type") == "Sale")

In [12]:
df.count()

210341

In [13]:
df = df.filter((col("amount_in_euro") <= 9000000000) & (col("amount_in_euro") > 11))

In [14]:
df.count()

210339

We are going to add the property id, in order to merge it then with the sale_df

In [15]:
# add column called 'id' that contains row numbers from 1 to n
w = Window().orderBy(lit('A'))
df = df.withColumn('id', row_number().over(w))

In [16]:
df.show()

+----------+----------+--------------------+--------------------+-----+--------+---------+-------------+---------------+--------------------+--------------------+-------------+--------------+--------------------+------------------+---+
|start_date|  end_date|                  l2|                  l3|rooms|bedrooms|bathrooms|surface_total|surface_covered|               title|         description|property_type|operation_type|         coordinates|    amount_in_euro| id|
+----------+----------+--------------------+--------------------+-----+--------+---------+-------------+---------------+--------------------+--------------------+-------------+--------------+--------------------+------------------+---+
|2019-12-29|2020-06-14|Bs.As. G.B.A. Zon...|              Moreno|    2|       2|        1|        300.0|          260.0|Azucena Villaflor...|Excelente oportun...|          Lot|          Sale|[-34.879692077636...| 7064.999999999999|  1|
|2019-12-29|2019-12-29|             Mendoza|           L

# Properties' Indexing

In [17]:
sale_df = sale_df.withColumn('id', row_number().over(w))

In [18]:
sale_df.show()

+------------+----------+--------+---------+-------------+--------------+----------------+-------------------+-----------------+-------------------------+-----------------------+-----------+-------------------+--------------------+-------------------+--------------------+---------------------+---+
|price_bucket|start_year|end_year|is_active|   l3_one_hot|    l2_one_hot|property_one_hot|encoded_start_month|encoded_end_month|encoded_start_day_of_week|encoded_end_day_of_week|geo_cluster|       rooms_scaled|     bedrooms_scaled|   bathrooms_scaled|surface_total_scaled|scaled_final_features| id|
+------------+----------+--------+---------+-------------+--------------+----------------+-------------------+-----------------+-------------------------+-----------------------+-----------+-------------------+--------------------+-------------------+--------------------+---------------------+---+
|         4.0|      2020|    2020|        0|(4,[0],[1.0])|(27,[2],[1.0])|   (9,[0],[1.0])|     (11,[7],

In [19]:
sale_df = sale_df.select("id", "scaled_final_features", "start_year", "end_year", "is_active", "l3_one_hot", "l2_one_hot", "property_one_hot", \
               "encoded_start_month", "encoded_end_month", "encoded_start_day_of_week",  "encoded_end_day_of_week", "geo_cluster", "rooms_scaled",
               "bedrooms_scaled", "bathrooms_scaled", "surface_total_scaled", "price_bucket")

In [20]:
sale_df.show()

+---+---------------------+----------+--------+---------+-------------+--------------+----------------+-------------------+-----------------+-------------------------+-----------------------+-----------+-------------------+--------------------+-------------------+--------------------+------------+
| id|scaled_final_features|start_year|end_year|is_active|   l3_one_hot|    l2_one_hot|property_one_hot|encoded_start_month|encoded_end_month|encoded_start_day_of_week|encoded_end_day_of_week|geo_cluster|       rooms_scaled|     bedrooms_scaled|   bathrooms_scaled|surface_total_scaled|price_bucket|
+---+---------------------+----------+--------+---------+-------------+--------------+----------------+-------------------+-----------------+-------------------------+-----------------------+-----------+-------------------+--------------------+-------------------+--------------------+------------+
|  1| (17000,[0,1,2,3,6...|      2020|    2020|        0|(4,[0],[1.0])|(27,[2],[1.0])|   (9,[0],[1.0])|

In [21]:
sale_df.count()

210339

# Feature Vector Construction

We need to combine all our features into a single feature vector for each property. This will be used for the similarity calculations in the recommendation model.

In [22]:
from pyspark.ml.feature import VectorAssembler

# define all the features to be included in the vector
feature_columns = [
    'scaled_final_features', 'price_bucket', 'l3_one_hot', 'l2_one_hot', 'property_one_hot',
    'encoded_start_month', 'encoded_end_month', 'encoded_start_day_of_week', 'encoded_end_day_of_week',
    'rooms_scaled', 'bedrooms_scaled', 'bathrooms_scaled', 'surface_total_scaled'
]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
sale_df = assembler.transform(sale_df)

In [23]:
sale_df = sale_df.sample(fraction=0.01, seed=1234)

In [24]:
sale_df.select("id").show()

+----+
|  id|
+----+
| 239|
| 283|
| 313|
| 336|
| 376|
| 395|
| 442|
| 449|
| 501|
| 521|
| 638|
| 744|
| 781|
| 829|
| 931|
| 939|
|1017|
|1233|
|1361|
|1524|
+----+
only showing top 20 rows



Take an example

In [25]:
property_id = sale_df.first()["id"]

In [26]:
property_id

239

We have to ensure that our feature vectors are normalized. This simplifies the cosine similarity to just a dot product since cosine similarity between two normalized vectors A and B is just their dot product.

In [27]:
from pyspark.ml.feature import Normalizer

# normalize feature vectors
normalizer = Normalizer(inputCol="features", outputCol="normFeatures")
sale_df = normalizer.transform(sale_df)


We cache the DataFrame to optimize read and compute efficiency across multiple actions on this dataframe. Then, we repartition the dataframe based on the 'id' column to balance the data across partitions, which can lead to performance improvements in distributed environments.

In [28]:
sale_df.cache()

DataFrame[id: int, scaled_final_features: vector, start_year: int, end_year: int, is_active: int, l3_one_hot: vector, l2_one_hot: vector, property_one_hot: vector, encoded_start_month: vector, encoded_end_month: vector, encoded_start_day_of_week: vector, encoded_end_day_of_week: vector, geo_cluster: int, rooms_scaled: double, bedrooms_scaled: double, bathrooms_scaled: double, surface_total_scaled: double, price_bucket: double, features: vector, normFeatures: vector]

In [29]:
sale_df = sale_df.repartition(50, "id")

# KMeans Clustering

We apply KMeans clustering to group similar properties based on their normalized feature vectors. We specify the number of clusters and set a seed for reproducibility. This step helps in understanding sensible groups in the property data which can be useful for recommendation systems.

In [30]:
# initialize the KMeans model with 10 clusters
kmeans = KMeans().setK(10).setSeed(1).setFeaturesCol("normFeatures").setPredictionCol("cluster")
model = kmeans.fit(sale_df)

# add cluster column to the df
sale_df = model.transform(sale_df)

In [31]:
sale_df.groupBy("cluster").count().show()

+-------+-----+
|cluster|count|
+-------+-----+
|      1|  485|
|      6|  172|
|      3|  286|
|      9|   26|
|      4|  233|
|      8|  188|
|      7|  117|
|      2|  212|
|      0|  435|
|      5|   16|
+-------+-----+



# Similarity-Based Recommender

Since we are not dealing with user interactions but rather with item similarity, traditional collaborative filtering techniques like ALS (Alternating Least Squares) may not be suitable. Instead, we can focus on content-based filtering using the feature vectors we've constructed.

In [32]:
# UDF for calculating cosine similarity
def cosine_similarity(x, y):
    return float(1 - cosine(np.array(x), np.array(y)))

In [33]:
# udf
cosine_similarity_udf = udf(cosine_similarity, FloatType())

recommendation function based on cosine similarity

The recommendation function identifies properties similar to a given property based on their feature vectors and the cluster they belong to. This method ensures that recommended properties are not only similar in feature space but also belong to the same data-driven group, improving the relevance of the recommendations.

In [43]:
def recommend_properties(property_id, num_recommendations=10):
    try:
        # extract the feature vector and cluster for the specified property
        target_row = sale_df.filter(col("id") == property_id).select("features", "cluster").first()
        if target_row is None:
            return f"No property found with ID {property_id}"

        target_feature_vector = target_row["features"].toArray()
        target_cluster = target_row["cluster"]

        # broadcast the target feature vector and cluster to all nodes
        broadcast_vector = spark.sparkContext.broadcast(target_feature_vector)
        broadcast_cluster = spark.sparkContext.broadcast(target_cluster)

        # calculate the similarity and filter by cluster
        similarities = sale_df.withColumn("similarity", cosine_similarity_udf(col("features"), lit(broadcast_vector.value)))
        # similarities = similarities.filter(col("cluster") == broadcast_cluster.value)

        # get the top properties ordered by their similarity scores, excluding the property itself
        recommendations = similarities.orderBy("similarity", ascending=False).limit(num_recommendations + 1)
        recommendations = recommendations.filter(col("id") != property_id)

        return recommendations.select("id", "similarity", "cluster")

    # handle exception
    except Exception as e:
        return f"An error occurred: {str(e)}"


In [44]:
recommended_properties = recommend_properties(property_id).cache()

In [45]:
recommended_properties.show()

+------+----------+-------+
|    id|similarity|cluster|
+------+----------+-------+
| 13935| 0.7240418|      1|
|104835| 0.7235273|      1|
|143138| 0.6786017|      1|
|149266|0.67008126|      1|
|187800| 0.6656894|      1|
|  5327|0.64989126|      1|
| 36650| 0.6481783|      1|
| 93877|0.64642775|      1|
|  2307| 0.6450274|      1|
|101316| 0.6442616|      1|
+------+----------+-------+



The similarities listed in our output are all relatively high, with the lowest being around 0.644 and the highest above 0.724. This means that the recommended properties are quite similar to the target property based on the feature vectors used.
The range of similarity scores (from 0.644 to 0.724) suggests that the top 10 recommendations are fairly close in terms of feature-based similarity.

# Evaluate Recommendations

The function evaluate_recommendations evaluates the accuracy of the recommendation system by checking how many of the recommended properties belong to the same cluster as the original property.

The evaluation function calculates the percentage of recommended properties that share the same cluster as the original property. This metric - accuracy, helps in understanding the clustering quality and the effectiveness of the recommendation based on feature similarity.

In [37]:
def evaluate_recommendations(property_id, recommendations):

    # recommendations = recommend_properties(property_id)

    # handle errors -> return the error
    if isinstance(recommendations, str):
        return recommendations

    # get the cluster id of the specified property
    original_cluster = sale_df.filter(col("id") == property_id).select("cluster").first()["cluster"]

    # calculate how many recommendations match the specified property's cluster
    matching_cluster_count = recommendations.filter(col("cluster") == original_cluster).count()
    total_recommendations = recommendations.count()

    # calculate accuracy
    accuracy = matching_cluster_count / total_recommendations if total_recommendations > 0 else 0
    return f"Accuracy of recommendations: {accuracy:.2f}"



In [38]:
accuracy_score = evaluate_recommendations(property_id, recommended_properties)
print(accuracy_score)


Accuracy of recommendations: 1.00


All recommended properties belong to the same cluster (cluster = 1). This is a good indication that the clustering is meaningful and that properties within the same cluster are indeed similar to each other.
Given that the accuracy of recommendations is 1.00, it confirms that all recommended properties match the original property's cluster, underscoring reliable clustering. Of course, we have in mind that the clusters/label we have are not actual truthful data but a column that we created.


# Merge Datasets

We merge the two datasets, in order to get a detailed description in a readable representation for each recommended property.

In [39]:
merged_df = recommended_properties.join(df, "id")

In [40]:
merged_df.show()

+------+----------+-------+----------+----------+--------------------+----------------+-----+--------+---------+-------------+---------------+--------------------+--------------------+-------------+--------------+--------------------+--------------+
|    id|similarity|cluster|start_date|  end_date|                  l2|              l3|rooms|bedrooms|bathrooms|surface_total|surface_covered|               title|         description|property_type|operation_type|         coordinates|amount_in_euro|
+------+----------+-------+----------+----------+--------------------+----------------+-----+--------+---------+-------------+---------------+--------------------+--------------------+-------------+--------------+--------------------+--------------+
|  2307| 0.6450274|      1|2021-01-12|9999-12-31|     Capital Federal|Parque Chacabuco|   10|       5|        4|       438.17|         347.07|CASA 3 DORMI + DP...|<b>CASA 3 DORMI +...|        House|          Sale|[-34.879692077636...|      142800.0|
