# Introduction to Big Data Technologies - Project

**by Itay Cohen (211896261) and Tal Skopas (322593070) and Shalev Mahadav (322642752)**

## 3. Loading the data

In [0]:
import math
import ast
import re
import json
from datetime import datetime
from collections import defaultdict
from itertools import accumulate

import numpy as np
import pandas as pd

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.types import (
    StringType, DoubleType, LongType,
    ArrayType, StructType, StructField, IntegerType
)

from pyspark.sql.functions import (
    col, dayofmonth, month, year, dayofweek, date_format,
    when, lag, udf, collect_list, slice, current_timestamp,
    sqrt, pow, unix_timestamp, lit, round as spark_round,
    from_json, element_at, get_json_object, split,
    trim, row_number, broadcast, avg
)

df_checkins = spark.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/gowalla_checkins.csv')
df_spots1 = spark.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/gowalla_spots_subset1.csv').dropDuplicates()
df_spots2 = spark.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/gowalla_spots_subset2.csv').dropDuplicates(["city_state"])
# df_friendship = spark.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/gowalla_friendship.csv')
# df_userinfo = spark.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/gowalla_userinfo.csv')

## 3.4 Clean the data.

**For Example:** הסרת רשומות כפולות של אותו המשתמש על אותו מקום בתאריך זהה
<div>(3 points)</div>

In [0]:
from pyspark.sql.functions import col, current_timestamp

df_checkins = (
    df_checkins
    .filter(
        col("datetime").isNotNull()
        & col("placeid").isNotNull() & (col("placeid") > 0)
        & (col("datetime") < current_timestamp())
    )
    .dropDuplicates(["userid", "placeid", "datetime"])
)

df_spots1 = (
    df_spots1
    .select(
        col("id").alias("placeid"),
        col("created_at"),
        col("lng").cast("double").alias("lng"),
        col("lat").cast("double").alias("lat"),
        col("checkins_count"),
        col("spot_categories"),
        col("items_count")
    )
    .filter(
        col("lat").isNotNull()
        & col("lng").isNotNull()
        & col("lat").between(-90, 90)
        & col("lng").between(-180, 180)
    )
)

df_spots2 = (
    df_spots2
    .select(
        col("id").alias("placeid"),
        col("name"),
        col("city_state"),
        col("lng").cast("double").alias("lng"),
        col("lat").cast("double").alias("lat")
    )
    .filter(
        col("lat").isNotNull()
        & col("lng").isNotNull()
        & col("lat").between(-90, 90)
        & col("lng").between(-180, 180)
    )
)

display(df_spots1)
display(df_spots2)


**Creating a new dataframe that connects between the spots in spots1 to their cities**

האלגוריתם מחלק את המפה לריבועים קטנים בגודל של 0.02 מעלות , משייך כל נקודה לריבוע שלה ו24 ריבועים סמוכים, ואז בודק רק את הערים שנמצאות באותם ריבועים בלבד. לאחר מכן הוא מחשב מרחק ישיר בין כל נקודה לכל עיר בסביבה ומסנן החוצה את אלה שמעבר לסף. לבסוף, מתוך כל הערים שעברו את המסנן הוא בוחר לכל נקודה את העיר עם המרחק הקטן ביותר, וכך מוצא עבור כל נקודה את העיר הקרובה מבלי להשוות אותו לעשרות אלפי ערים בכל פעם.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, floor, sqrt, pow, broadcast, row_number
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

# Prepare df_spots2 – cast lat/lng, filter valid coordinates, keep city info
df_spots2_clean = (
    df_spots2
    # Convert lat/lng to double and filter out nulls or out-of-range values
    .withColumn("lat2", col("lat").cast("double"))
    .withColumn("lng2", col("lng").cast("double"))
    .filter(
        (col("lat2").isNotNull()) &
        (col("lng2").isNotNull()) &
        (col("lat2").between(-90, 90)) &
        (col("lng2").between(-180, 180))
    )
    # Select only the necessary columns
    .select(
        col("placeid").alias("city_id"),
        col("city_state"),
        col("lat2"),
        col("lng2")
    )
)

# Prepare df_spots1 – cast lat/lng and filter valid coordinates
df_spots1_clean = (
    df_spots1
    .withColumn("lat1", col("lat").cast("double"))
    .withColumn("lng1", col("lng").cast("double"))
    .filter(
        (col("lat1").isNotNull()) &
        (col("lng1").isNotNull()) &
        (col("lat1").between(-90, 90)) &
        (col("lng1").between(-180, 180))
    )
    .select(
        col("placeid").alias("spot_id"),
        col("created_at"),
        col("lat1"),
        col("lng1"),
        col("checkins_count"),
        col("items_count"),
        col("spot_categories")
    )
)

# Define grid-binning: compute a “cell” for each point by truncating lat/lng
# We choose 0.02-degree precision. 
# Then we will only join points whose cells match.

# Compute integer bins for lat/lng in both datasets
df_spots1_binned = df_spots1_clean.withColumn(
    "lat_bin", floor(col("lat1") * 50).cast("int")
).withColumn(
    "lng_bin", floor(col("lng1") * 50).cast("int")
)

df_spots2_binned = df_spots2_clean.withColumn(
    "lat_bin", floor(col("lat2") * 50).cast("int")
).withColumn(
    "lng_bin", floor(col("lng2") * 50).cast("int")
)

deltas = [(i, j) for i in (-2, -1, 0, 1, 2) for j in (-2, -1, 0, 1, 2)]
deltas_df = spark.createDataFrame(deltas, ["dlat", "dlng"])


spots1_candidates = (
    df_spots1_binned
    .crossJoin(deltas_df)
    .withColumn("candidate_lat_bin", col("lat_bin") + col("dlat"))
    .withColumn("candidate_lng_bin", col("lng_bin") + col("dlng"))
    .select(
        "spot_id", "created_at", "lat1", "lng1",
        "checkins_count", "items_count", "spot_categories",
        "candidate_lat_bin", "candidate_lng_bin"
    )
)

# Join spots1_candidates to cities on matching 25‐cell bins
joined_candidates = (
    spots1_candidates
    .join(
        broadcast(df_spots2_binned),
        (spots1_candidates.candidate_lat_bin == df_spots2_binned.lat_bin) &
        (spots1_candidates.candidate_lng_bin == df_spots2_binned.lng_bin),
        how="inner"
    )
    .select(
        "spot_id", "created_at", "lat1", "lng1",
        "checkins_count", "items_count", "spot_categories",
        "city_id", "city_state", "lat2", "lng2"
    )
)

# Compute exact distance and filter within 5 km
distance_filtered = (
    joined_candidates
    .withColumn(
        "dist_to_city",
        sqrt(
            pow(col("lat1") - col("lat2"), 2) +
            pow(col("lng1") - col("lng2"), 2)
        )
    )
    .filter(col("dist_to_city") <= 0.045)
)

# For each spot, pick the single city with the minimum dist_to_city using a Window
w = Window.partitionBy("spot_id").orderBy(col("dist_to_city"))
closest_city_per_spot = (
    distance_filtered
    .withColumn("rank", row_number().over(w))
    .filter(col("rank") == 1)
    .drop("rank")
)

# Build final DataFrame of matched spots with city info
df_spots_matched = closest_city_per_spot.select(
    "spot_id",
    "created_at",
    "lat1",
    "lng1",
    "checkins_count",
    "items_count",
    "spot_categories",
    "city_state",
    "dist_to_city"
)

display(df_spots_matched)

**The function extract_categories helps us extract the gencats from the gowalla_category_structure.json file.**

For example: the gencat of categories/1 is Food

In [0]:
def extract_categories(node, gencat=None):
    results = []
    stack = [(node, gencat)]

    while stack:
        current, gencat = stack.pop()
        if isinstance(current, list):
            for item in reversed(current):
                stack.append((item, gencat))
        elif isinstance(current, dict):
            url = current.get('url')
            name = current.get('name')
            current_gencat = gencat
            if url and name and gencat is None:
                current_gencat = name
            if url and name:
                results.append({
                    'url': url,
                    'name': name,
                    'gencat': current_gencat
                })
            for key, value in current.items():
                if isinstance(value, (dict, list)):
                    stack.append((value, current_gencat))

    return results


In [0]:
# this need to be added to the preprocess this is the addition of the categories to the dataset
# add category

results = []
categories = spark.read.option("multiline", "true").format("json").load("/FileStore/tables/gowalla_category_structure.json")
json_cat = json.loads(categories.toJSON().first())
results = extract_categories(json_cat["spot_categories"])
schema = StructType([
    StructField("url", StringType(), True),
    StructField("name", StringType(), True),
    StructField("gencat", StringType(), True),
])
categories_df = spark.createDataFrame(results, schema=schema)

category_schema = ArrayType(
    StructType([
        StructField("url", StringType(), True),
        StructField("name", StringType(), True)
    ])
)

# Parse the 'categories' column as JSON
spots_parsed = df_spots_matched.withColumn("parsed_categories", from_json(col("spot_categories"), category_schema))

# Extract the first url from the parsed categories array
spots_with_url = spots_parsed.withColumn("category_url", element_at(col("parsed_categories.url"), 1))

# Now join on the extracted url
df_spots_matched = spots_with_url.join(categories_df, spots_with_url["category_url"] == categories_df["url"], how="inner")

display(df_spots_matched)

In [0]:
df_checkins_with_spots = df_checkins.join(df_spots_matched.select(
    col("spot_id").alias("placeid"),
    col("lat1"),
    col("lng1"),
    col("gencat"),
    col("created_at").cast("timestamp").alias("place_created_at"),
    col("city_state"),
    col("name")
), on="placeid", how="left")

## 3.1. Divide datetime into 5 columns - Day, Month, Year, Weekday, DayofTheWeek.

**Note:** assume Saturday and Sunday are weekends. Validate the values of the new columns.


<div>(3 points)</div>

In [0]:
df_checkins_with_spots = df_checkins_with_spots.withColumn("Day", dayofmonth(col("datetime"))) \
       .withColumn("Month", month(col("datetime"))) \
       .withColumn("Year", year(col("datetime"))) \
       .withColumn("DayOfWeek", dayofweek(col("datetime"))) \
       .withColumn("Weekend", when(col("DayOfWeek").isin([1,7]), "כן").otherwise("לא"))

display(df_checkins_with_spots)

## 3.2. Add last_place_visited column, that calculates last visit date for each user. If he doesn't have any return None.

**For example:** אם יש משתמש שביקר בשלושה מקומות שונים
בתאריכים 2010-06-28 , 2010-06-24 , 2010-05-01 אז ערכי העמודה יהיו 2010-06-24 , 2010-05-01 , ו -
None בהתאמה .


<div style="color:blue">(3 points)</div>

In [0]:
time_ordered_window = Window.partitionBy("userid").orderBy("datetime")

df_checkins_with_spots = df_checkins_with_spots.withColumn("prev_place", lag("placeid").over(time_ordered_window)) \
       .withColumn("last_place_visited", when(col("prev_place").isNull(), col("placeid")).otherwise(col("prev_place")))

display(df_checkins_with_spots)

## 3.3 Add last_x_place_visited column, that will calculate the x for every data row, since the last visit of the user.

מרחק שעבר מהביקור הקודם, האם המשתמש שינה קטגוריה או עיר מאז הביקור הקודם וכו'. מטרת סעיף זה
לייצג את השינוי מאז הביקור הקודם כדי לתמוך בקבלת החלטות של חלוקת המשתמשים לקבוצות על בסיס
דפוסי הביקורים שלהם באופן דינמי במערכת.
<div>(8 points)</div>

In [0]:
# We chose the distance between 2 places column because we figured it might be helpful

w = Window.partitionBy("userid").orderBy("datetime")

df_checkins_with_spots = df_checkins_with_spots.withColumn("last_lat", lag("lat1").over(w)) \
       .withColumn("last_lng", lag("lng1").over(w)) \
       .withColumn("distance_from_last", 
                   sqrt(pow(col("lat1") - col("last_lat"), 2) + pow(col("lng1") - col("last_lng"), 2))
                  )

display(df_checkins_with_spots)

In [0]:
# Moreover, we added the column time since last visit hours

df_checkins_with_spots = df_checkins_with_spots.withColumn("prev_datetime", lag("datetime").over(time_ordered_window))

df_checkins_with_spots = df_checkins_with_spots.withColumn(
    "time_since_last_visit_hours",
    when(col("prev_datetime").isNotNull(),
         spark_round((unix_timestamp("datetime") - unix_timestamp("prev_datetime")) / 3600.0, 2)
    ).otherwise(lit(0))
)

display(df_checkins_with_spots)

## 4. Return top-k
Given the gencat, year, and k, return the top-k popular places with their popularity measurement
<div>(8 points)</div>

In [0]:
rdd = df_checkins_with_spots.select("Year", "placeid", "gencat", "userid").rdd

category = "Food"
year_to_check = 2010
k = 3

top_k = rdd.filter(lambda row: row.gencat == category and row.Year == year).map(lambda row: (row.placeid, row.userid)).map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[-1])

is_empty = rdd.isEmpty()
print("Is RDD empty?", is_empty)

## 5. Is there long-tail
You need to establish if there is a long-tail phenomenon in the food gencat in each of the top 2 cities.          
נגדיר תופעת לונג-טייל בעיר ושנה מסוימת,
בה מעל 70% מהביקורים בשנה זו נמצאים ב20% מהמקומות הקיימים בעיר זו.
<div>(20 points)</div>

In [0]:
# How we did the match between a place and its city: We distributed our matching calculations by firstly matching 
# each of the spots with the closest us state than finding the closest city within said state 
# Than matching all the spots that didn’t correlate with any state with all the non us cities
# We grouped the cities by state
# Than calculating the centroid of each group, that centroid represents the states location
# Than we calculated distance from each centroid and related the spot to a state that its within its borders (by approximating state area)
# Than calculated the distance from each city within said state and related the city that said spot is inside its borders (also by approximating)

df_with_year = df_spots_matched.withColumn("year", year(col("created_at")))

filtered_df = df_with_year.select("year", "city_state", "spot_id", "checkins_count", "gencat").withColumn("checkins_count", col("checkins_count").cast(IntegerType())) \
    .where(
        (col("year").isNotNull()) &
        (col("city_state").isNotNull()) &
        (col("city_state") != "none") &
        (col("spot_id").isNotNull()) &
        (col("checkins_count").isNotNull()) &
        (col("gencat") == "Food")
    )

# Find top 2 global cities (after dropping nulls)
city_checkins_rdd = filtered_df.rdd.map(lambda row: (row["city_state"], row["checkins_count"]))
top_2_cities = city_checkins_rdd.reduceByKey(lambda a, b: a + b) \
                                .takeOrdered(2, key=lambda x: -x[1])
top_city_names = set([city for city, _ in top_2_cities])

# Filter only top cities, group by (year, city, id)
spot_rdd = (
    filtered_df.rdd
    .filter(lambda row: row["city_state"] in top_city_names)
    .map(lambda row: ((row["year"], row["city_state"], row["spot_id"]), row["checkins_count"]))
    .reduceByKey(lambda a, b: a + b)
    .map(lambda x: ((x[0][0], x[0][1]), (x[0][2], x[1])))  # ((year, city), (id, checkins))
    .groupByKey()
)

# Analyze long-tail for each (year, city)
for (year, city), spot_data in spot_rdd.collect():
    spot_list = sorted(list(spot_data), key=lambda x: x[1], reverse=True)
    total_checkins = sum([count for _, count in spot_list])

    if total_checkins == 0 or len(spot_list) == 0:
        continue

    # Cumulative check-in percentages
    cumulative = list(accumulate([count for _, count in spot_list]))
    cumulative_data = list(zip(
        [sid for sid, _ in spot_list],
        [c for _, c in spot_list],
        [c / total_checkins for c in cumulative]
    ))

    # Determine how many spots needed for 70%
    top_spots = [entry for entry in cumulative_data if entry[2] <= 0.7]
    if len(top_spots) < len(cumulative_data) and cumulative_data[len(top_spots)][2] > 0.7:
        top_spots.append(cumulative_data[len(top_spots)])

    top_count = len(top_spots)
    total_spots = len(spot_list)
    long_tail = (top_count / total_spots) <= 0.2

    print(f"\n=== Year: {year}, City: {city} ===")
    print(f"Total check-ins: {total_checkins}")
    print(f"Total spots: {total_spots}")
    print(f"Spots for 70% of check-ins: {top_count} ({(top_count / total_spots) * 100:.2f}%)")
    print(f"Long-tail pattern detected: {long_tail}")


## 6.1 Create ETL/ELT that creates new user-places structure
עליכם להחליט כיצד לייצג באופן יעיל את נתוני המקומות של
המשתמשים (טיפ: היעזרו בסעיף 3 ג'). שימו לב ששיקולי אתיקה ופרטיות חשובים ביותר לחברה.
<div>(6 points)</div>

In [0]:
q6_df = df_checkins_with_spots.drop("gencat").drop("place_created_at").drop("lng1").drop("lat1")

q6_df = q6_df.na.drop(how="any")

user_checkins = q6_df.rdd.map(lambda row: (
    row.userid,
    (row.datetime, row.name, row.city_state)
))

user_sorted_checkins = user_checkins.groupByKey().mapValues(
    lambda vals: sorted(list(vals), key=lambda x: x[0])
)

def build_user_history(checkins):
    category_counts = defaultdict(int)
    total = 0
    history = []
    for dt, cat, city in checkins:
        category_counts[cat] += 1
        total += 1
        cat_list = sorted(category_counts.items(), key=lambda x: (-x[1], x[0]))
        history.append((dt, cat_list.copy(), city, total))
    return history

user_full_history = user_sorted_checkins.flatMapValues(build_user_history)

rdd_result = user_full_history.map(lambda x: (x[0],) + x[1])

# print(rdd_result.take(10))


## 6.5 Implement 6.2 and 6.3
<div>(15 points)</div>

In [0]:
#6.2

def parse_time(dt):
    if isinstance(dt, datetime):
        return dt
    return datetime.strptime(dt, '%Y-%m-%d %H:%M:%S')

def find_nearest_entry(entries, target_timestamp):
    valid_entries = [e for e in entries if parse_time(e[1]) <= target_timestamp]
    if not valid_entries:
        return None
    return max(valid_entries, key=lambda x: parse_time(x[1]))

def compute_similarity(target_cats, target_city, other_cats, other_city):
    set_target = {cat for cat, _ in target_cats}
    set_other = {cat for cat, _ in other_cats}
    intersection = len(set_target & set_other)
    union = len(set_target | set_other)
    jaccard = intersection / union if union != 0 else 0
    
    location_bonus = 0.5 if (target_city == other_city) else 0
    
    return jaccard + location_bonus

def find_similar_users(rdd, target_user_id, target_timestamp_str, k=5):
    target_timestamp = parse_time(target_timestamp_str)
    
    target_entries = rdd.filter(lambda x: x[0] == target_user_id).collect()
    target_entry = find_nearest_entry(target_entries, target_timestamp)
    
    if not target_entry:
        return []
    
    _, ts, target_cats, target_city, _ = target_entry
    
    other_users = rdd.filter(lambda x: x[0] != target_user_id) \
        .groupBy(lambda x: x[0]) \
        .mapValues(list) \
        .mapValues(lambda entries: find_nearest_entry(entries, target_timestamp)) \
        .filter(lambda x: x[1] is not None)
    
    def similarity_mapper(user_entry):
        user_id, entry = user_entry
        _, _, other_cats, other_city, _ = entry
        score = compute_similarity(target_cats, target_city, other_cats, other_city)
        return (user_id, score)
    
    similarities = other_users.map(similarity_mapper)
    
    return similarities.top(k, key=lambda x: x[1])

result = find_similar_users(rdd_result, 34240, '2010-12-25 18:45:00', k=3)
# print(result)


In [0]:
# 6.3

def parse_time(dt):
    if isinstance(dt, datetime):
        return dt
    return datetime.strptime(dt, '%Y-%m-%d %H:%M:%S')

def get_place_info(place_id):
    place_row = shalev.filter(shalev.id == place_id).first()
    return (place_row.name, place_row.city_state)

def calculate_user_score(user_entry, target_gencat, target_city):
    categories, last_city, total_visits = user_entry
    
    target_count = next((count for cat, count in categories if cat == target_gencat), 0)
    
    score = 0
    if last_city == target_city:
        score += 50 
    score += target_count * 10 
    if total_visits > 0:
        score += (target_count / total_visits) * 100
    
    return score

def top_k_users_for_place(rdd, place_id, timestamp_str, k):
    target_gencat, target_city = get_place_info(place_id)
    target_timestamp = parse_time(timestamp_str)
    
    result = rdd.map(lambda x: (x[0], x[1:])) \
        .groupByKey() \
        .mapValues(lambda vals: max(
            [v for v in vals if parse_time(v[0]) <= target_timestamp],
            key=lambda x: parse_time(x[0]),
            default=None
        )) \
        .filter(lambda x: x[1] is not None) \
        .mapValues(lambda x: (
            x[1], 
            x[2], 
            x[3]
        )) \
        .map(lambda x: (
            x[0],
            calculate_user_score(x[1], target_gencat, target_city)
        )) \
        .top(k, key=lambda x: x[1])
    
    return result

result = top_k_users_for_place(rdd_result, 12696, '2011-06-01 12:00:00', k=5)
# print(result)