In [None]:
from google.colab import auth
auth.authenticate_user()

In [None]:
import pandas as pd
import numpy as np
from sklearn.cluster import DBSCAN
from sklearn.metrics.pairwise import haversine_distances
from math import radians
from sklearn.preprocessing import MinMaxScaler

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=4fe359a2f4e57dd7c26b01617906b23d86c657c83a6eb8e2633aba307e13c862
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic

In [None]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

In [None]:
conf = SparkConf().set("spark.ui.port","4050")
sc = pyspark.SparkContext(conf=conf)

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Geotagged Photo Analysis") \
    .getOrCreate()


# **Data Preprocessing**
1. filter on the basis of london bounding box
2. convert longtiude latitudes to radians
3. drop where location is null
4. extract season and part of day using "taken" attribute of photos

In [None]:
# Load data
df = spark.read.format("csv").option("header", "true").load("/content/london_20k.csv")
bbox = (-0.5282454933, 51.2798389504, 0.2921446401, 51.6900253946)
filtered_df = df.filter(
    (df['lat'] >= bbox[1]) & (df['lon'] >= bbox[0]) &
    (df['lat'] <= bbox[3]) & (df['lon'] <= bbox[2])
)

# Remove rows where latitude or longitude is missing
filtered_df = filtered_df.dropna(subset=['lat', 'lon'])


In [None]:
df = df.withColumn("lat_rad", radians("lat"))
df = df.withColumn("lon_rad", radians("lon"))

In [None]:
from pyspark.sql.functions import col, to_timestamp

# Converting 'taken' to datetime format
filtered_df = filtered_df.withColumn("taken", to_timestamp(col("taken")))


In [None]:
from pyspark.sql.functions import month, when, hour

# Helper function to determine the season
def get_season(month):
    return when((month >= 3) & (month <= 5), "Spring") \
           .when((month >= 6) & (month <= 8), "Summer") \
           .when((month >= 9) & (month <= 11), "Fall") \
           .otherwise("Winter")

# Adding "season" based on month of 'taken'
df = df.withColumn("month", month("taken"))
df = df.withColumn("season", get_season(col("month")))

# Helper function to determine the time of the day
def get_part_of_day(hour):
    return when((hour >= 6) & (hour < 12), "Morning") \
           .when((hour >= 12) & (hour < 18), "Afternoon") \
           .otherwise("Night")  # Accounts for hours between 18:00 and 6:00

# Adding "time of the day" based on hour of 'taken'
df = df.withColumn("hour", hour("taken"))
df = df.withColumn("part_of_day", get_part_of_day(col("hour")))


In [None]:
df.show(5)


+-----------+------------+------+-----------+--------------------+--------------------+--------------------+-----+-----------+-----------+------+---------+-------------------+------------------+--------------------+-----+------+----+-----------+
|   photo_id|       owner|gender| occupation|               title|         description|                tags|faves|        lat|        lon|u_city|u_country|              taken|           lat_rad|             lon_rad|month|season|hour|part_of_day|
+-----------+------------+------+-----------+--------------------+--------------------+--------------------+-----+-----------+-----------+------+---------+-------------------+------------------+--------------------+-----+------+----+-----------+
|12056545693|78191777@N00|     1|Producer/DJ|                près|"<i>Let us draw n...|clapham, london, ...|    0|51.46516300|-0.12908500| 30307|      USA|2014-01-20 15:19:44|0.8982365444255623|-0.00225295826493...|    1|Winter|  15|  Afternoon|
|12453639663|410

# **Clustering**
1. step of preprocessing
2. clustering on the basis of location and temporal context
3. KMeans

In [None]:
# Ensure there are no nulls in the columns used by VectorAssembler
df = df.na.drop(subset=["lat_rad", "lon_rad", "season", "part_of_day"])

# Indexing categorical columns to convert to numeric form
indexer_season = StringIndexer(inputCol="season", outputCol="season_idx")
indexer_part_of_day = StringIndexer(inputCol="part_of_day", outputCol="part_of_day_idx")

# Assemble all features into one vector column
assembler = VectorAssembler(
    inputCols=["lat_rad", "lon_rad", "season_idx", "part_of_day_idx"],
    outputCol="features",
    handleInvalid="skip"
)

# Define the KMeans model
kmeans = KMeans(k=5, seed=1)

# Build the pipeline
pipeline = Pipeline(stages=[indexer_season, indexer_part_of_day, assembler, kmeans])

# Fit the pipeline on the data
model = pipeline.fit(df)

# Transform the data
df_clustered = model.transform(df)

# Show cluster centers and predicted clusters
centers = model.stages[-1].clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

df_clustered.select("photo_id", "lat", "lon", "season", "part_of_day", "prediction").show(5)


Cluster Centers: 
[ 8.98876359e-01 -2.62555558e-03  3.00000000e+00  0.00000000e+00]
[ 0.89894011 -0.00235188  1.38314945  0.        ]
[ 0.89896228 -0.00300653  0.          0.20269216]
[0.89274044 0.00719341 2.33877454 1.3087296 ]
[ 0.89898836 -0.00216751  0.66467554  1.56910128]
+-----------+-----------+-----------+------+-----------+----------+
|   photo_id|        lat|        lon|season|part_of_day|prediction|
+-----------+-----------+-----------+------+-----------+----------+
|12056545693|51.46516300|-0.12908500|Winter|  Afternoon|         0|
|12453639663|51.52767200|-0.08364800|Winter|      Night|         3|
|13185773995|51.52767200|-0.08364800|Spring|      Night|         4|
|13295046445|51.51304200|-0.08922100|Spring|    Morning|         4|
|13357656115|51.52767200|-0.08364800|Spring|      Night|         4|
+-----------+-----------+-----------+------+-----------+----------+
only showing top 5 rows



In [None]:
df_clustered.show(5)

+-----------+------------+------+-----------+--------------------+--------------------+--------------------+-----+-----------+-----------+------+---------+-------------------+------------------+--------------------+-----+------+----+-----------+----------+---------------+--------------------+----------+
|   photo_id|       owner|gender| occupation|               title|         description|                tags|faves|        lat|        lon|u_city|u_country|              taken|           lat_rad|             lon_rad|month|season|hour|part_of_day|season_idx|part_of_day_idx|            features|prediction|
+-----------+------------+------+-----------+--------------------+--------------------+--------------------+-----+-----------+-----------+------+---------+-------------------+------------------+--------------------+-----+------+----+-----------+----------+---------------+--------------------+----------+
|12056545693|78191777@N00|     1|Producer/DJ|                près|"<i>Let us draw n..

**kmeans by finding optimal values of k**

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml import Pipeline


In [None]:
df = df.na.drop(subset=["lat_rad", "lon_rad", "season", "part_of_day"])  # Ensure no nulls

# Indexing categorical columns
indexer_season = StringIndexer(inputCol="season", outputCol="season_idx")
indexer_part_of_day = StringIndexer(inputCol="part_of_day", outputCol="part_of_day_idx")

# Assemble all features into one vector column
assembler = VectorAssembler(inputCols=["lat_rad", "lon_rad", "season_idx", "part_of_day_idx"], outputCol="features")
stages = [indexer_season, indexer_part_of_day, assembler]

# Define a range of k to evaluate
k_range = range(9, 20)
silhouette_scores = []
best_k = 2
best_score = -1

for k in k_range:

    kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features").setPredictionCol("prediction")
    pipeline = Pipeline(stages=stages + [kmeans])
    model = pipeline.fit(df)
    predictions = model.transform(df)
    evaluator = ClusteringEvaluator()
    silhouette = evaluator.evaluate(predictions)
    silhouette_scores.append(silhouette)

    print(f"Silhouette score for k={k}: {silhouette}")

    if silhouette > best_score:
        best_k = k
        best_score = silhouette


print(f"Best k by Silhouette score: {best_k}")
final_kmeans = KMeans().setK(best_k).setSeed(1).setFeaturesCol("features").setPredictionCol("prediction")
final_pipeline = Pipeline(stages=stages + [final_kmeans])
final_model = final_pipeline.fit(df)
df_clustered = final_model.transform(df)

# Show results
df_clustered.select("photo_id", "lat", "lon", "season", "part_of_day", "prediction").show(5)


Silhouette score for k=9: 0.8509387733207695
Silhouette score for k=10: 0.8678546835663999
Silhouette score for k=11: 0.9632128108464526
Silhouette score for k=12: 0.9958849285059251
Silhouette score for k=13: 0.9868955892971455
Silhouette score for k=14: 0.8925083212036053
Silhouette score for k=15: 0.8792812477325019
Silhouette score for k=16: 0.8531197800347217
Silhouette score for k=17: 0.8718570378185633
Silhouette score for k=18: 0.876198805874861
Silhouette score for k=19: 0.8803832516186971
Best k by Silhouette score: 12
+-----------+-----------+-----------+------+-----------+----------+
|   photo_id|        lat|        lon|season|part_of_day|prediction|
+-----------+-----------+-----------+------+-----------+----------+
|12056545693|51.46516300|-0.12908500|Winter|  Afternoon|         6|
|12453639663|51.52767200|-0.08364800|Winter|      Night|        11|
|13185773995|51.52767200|-0.08364800|Spring|      Night|         9|
|13295046445|51.51304200|-0.08922100|Spring|    Morning| 

# **Location time database**
1. Filter out photos taken within 3 hours by the same user at the same location
2. Filter out users with fewer than three distinct visits
3. include most common season and part of day for visits

In [None]:

# Calculate time difference and flag duplicates
df_clustered = df_clustered.withColumn("timestamp_unix", unix_timestamp("taken"))
window_spec = Window.partitionBy("owner", "prediction").orderBy("timestamp_unix")
df_clustered = df_clustered.withColumn("time_diff", abs(col("timestamp_unix") - lag("timestamp_unix", 1).over(window_spec)))
df_clustered = df_clustered.filter((col("time_diff") > 10800) | col("time_diff").isNull())

# Count distinct clusters per user
user_visit_counts = df_clustered.groupBy("owner").agg(countDistinct("prediction").alias("distinct_clusters"))
eligible_users = user_visit_counts.filter(col("distinct_clusters") >= 3)

# Join eligible users back to the clustered data
df_final = df_clustered.join(eligible_users, "owner", "inner")

# Group by cluster to get centroids and count visits
cluster_details = df_final.groupBy("prediction").agg(
    mean(col("lat")).alias("cent_lat"),
    mean(col("lon")).alias("cent_lon"),
    count("photo_id").alias("visit_count")
)

# Calculate the most common 'season' and 'part_of_day' for each cluster
common_time_details = df_final.groupBy("prediction", "season", "part_of_day").agg(
    count("photo_id").alias("count")
).withColumn("rank_season", rank().over(Window.partitionBy("prediction", "season").orderBy(col("count").desc())))
most_common_season = common_time_details.filter(col("rank_season") == 1).drop("count", "rank_season", "part_of_day")

common_time_details = common_time_details.withColumn("rank_part_of_day", rank().over(Window.partitionBy("prediction", "part_of_day").orderBy(col("count").desc())))
most_common_partofday = common_time_details.filter(col("rank_part_of_day") == 1).drop("count", "rank_part_of_day")


location_time_database = df_final.join(cluster_details, "prediction")
location_time_database= location_time_database.join(most_common_season, ["prediction", "season"])
most_common_partofday = most_common_partofday.drop("season")
location_time_database= location_time_database.join(most_common_partofday, ["prediction", "part_of_day"]).select(
    col("prediction").alias("location_id"),
    "photo_id",
    "owner",
    "lat",
    "lon",
    "taken",
    "cent_lat",
    "cent_lon",
    "season",
    "part_of_day"
)
location_time_database.printSchema()


root
 |-- location_id: integer (nullable = false)
 |-- photo_id: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lon: string (nullable = true)
 |-- taken: string (nullable = true)
 |-- cent_lat: double (nullable = true)
 |-- cent_lon: double (nullable = true)
 |-- season: string (nullable = false)
 |-- part_of_day: string (nullable = false)



# **Transformation**

In [None]:
from pyspark.sql.functions import col, max, min, lit
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.window import Window
from pyspark.ml.recommendation import ALS


# Step 1: Calculate visit frequencies
visit_counts = location_time_database.groupBy("owner", "location_id", "season", "part_of_day").count().withColumnRenamed("count", "frequency")

# Step 2: Normalize frequencies to a 1-5 scale
# First, find global max and min frequencies to scale ratings between 1 and 5
freq_stats = visit_counts.select(max("frequency").alias("max_freq"), min("frequency").alias("min_freq")).collect()
max_freq = freq_stats[0]["max_freq"]
min_freq = freq_stats[0]["min_freq"]

# Apply Min-Max normalization
visit_counts = visit_counts.withColumn("ratings",
                                       ((col("frequency") - min_freq) / (max_freq - min_freq) * 4 + 1))

#making owner attribute as type int
user_indexer = StringIndexer(inputCol="owner", outputCol="user_id_indexed")
indexer_model = user_indexer.fit(visit_counts)
visit_counts_indexed = indexer_model.transform(visit_counts)
als_input = visit_counts_indexed.select(
    col("user_id_indexed").alias("user_id"),
    col("location_id"),
    col("ratings").cast("float")
)


# Split the data into training and testing sets
(training_data, test_data) = als_input.randomSplit([0.8, 0.2], seed=42)

# Now we can proceed to use the ALS model as in previous steps
als = ALS(
    userCol="user_id",
    itemCol="location_id",
    ratingCol="ratings",
    nonnegative=True,
    implicitPrefs=False,
    coldStartStrategy="drop",
    rank=10,
    maxIter=10
)

# Fit the ALS model to the training data
model = als.fit(training_data)


# **Evaluation**

**Root Mean Square Error**

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
predictions = model.transform(test_data)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="ratings",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Show example predictions
predictions.show(5)


Root-mean-square error = 0.4163377056277574
+-------+-----------+-------+----------+
|user_id|location_id|ratings|prediction|
+-------+-----------+-------+----------+
|    0.0|          2|    1.2| 2.6525476|
|    0.0|          6|    2.0| 2.5043044|
|    0.0|          8|    1.0| 2.2437298|
|    1.0|          1|    2.8| 2.3076637|
|    1.0|          7|    1.8| 1.9100902|
+-------+-----------+-------+----------+
only showing top 5 rows



**Mean Absolute Error**

In [None]:
mae_evaluator = RegressionEvaluator(metricName="mae", labelCol="ratings", predictionCol="prediction")
mae = mae_evaluator.evaluate(predictions)
print("Mean Absolute Error = " + str(mae))

Mean Absolute Error = 0.27722900972238035


**Approximate Mean Average Precision at n (MAP@n)**
1. Generating top-n recommendations for each user.
2. Determining if these recommendations are in the user's test set.
3. Calculating precision at n for each user.
4. Averaging these precisions.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, expr, explode, rank, desc
from pyspark.sql.window import Window

# Generate predictions for test data
predictions = model.transform(test_data)
predictions.show(5)

# Evaluate the model by computing the RMSE on the test data
rmse_evaluator = RegressionEvaluator(metricName="rmse", labelCol="ratings", predictionCol="prediction")
rmse = rmse_evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Evaluate the model by computing the MAE on the test data
mae_evaluator = RegressionEvaluator(metricName="mae", labelCol="ratings", predictionCol="prediction")
mae = mae_evaluator.evaluate(predictions)
print("Mean Absolute Error = " + str(mae))

# Generate top 10 recommendations for all users
user_recs = model.recommendForAllUsers(10)
user_recs.show(5)

# Explode the recommendations to flat format for easier processing
recommendations = user_recs.select(
    "user_id",
    explode("recommendations").alias("recommendation")
).select(
    "user_id",
    col("recommendation.location_id").alias("location_id"),
    col("recommendation.rating").alias("rating")
)

# Join exploded recommendations with actual ratings for MAP calculation
# Assume `predictions` has columns: user_id, location_id, prediction, and ratings
true_positives = predictions.join(
    recommendations,
    ["user_id", "location_id"],
    "inner"
).select("user_id", "location_id")

# To calculate true positives within top k
precision_at_k = {}
for k in [5, 10, 15]:
    top_k_predictions = predictions.withColumn("rank", rank().over(Window.partitionBy("user_id").orderBy(desc("prediction"))))
    filtered_predictions = top_k_predictions.filter(col("rank") <= k)
    tp_at_k = filtered_predictions.join(true_positives, ["user_id", "location_id"], "inner").groupBy("user_id").agg(count("location_id").alias("tp_at_k"))
    relevant = filtered_predictions.groupBy("user_id").agg(count("location_id").alias("total_relevant"))
    precision_k = tp_at_k.join(relevant, "user_id").selectExpr("user_id", "tp_at_k / total_relevant as precision_at_k")
    average_precision = precision_k.select(mean("precision_at_k")).collect()[0][0]
    precision_at_k[k] = average_precision

# Print MAP@n results
for k, precision in precision_at_k.items():
    print(f"Mean Average Precision at {k}: {precision}")


+-------+-----------+-------+----------+
|user_id|location_id|ratings|prediction|
+-------+-----------+-------+----------+
|    0.0|          2|    1.2| 2.6525476|
|    0.0|          6|    2.0| 2.5043044|
|    0.0|          8|    1.0| 2.2437298|
|    1.0|          1|    2.8| 2.3076637|
|    1.0|          7|    1.8| 1.9100902|
+-------+-----------+-------+----------+
only showing top 5 rows

Root-mean-square error = 0.4163377056277574
Mean Absolute Error = 0.27722900972238035
+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|      0|[{0, 3.8025756}, ...|
|      1|[{6, 2.4007404}, ...|
|      2|[{1, 2.4980018}, ...|
|      3|[{1, 1.4435956}, ...|
|      4|[{5, 1.2446896}, ...|
+-------+--------------------+
only showing top 5 rows

Mean Average Precision at 5: 0.981505376344086
Mean Average Precision at 10: 0.981505376344086
Mean Average Precision at 15: 0.981505376344086
