# Data Preparation
Prepare data sets for training and inference.

In [0]:
import time
from helper_functions import haversine_miles, multipolygon_centroid
from math import radians, sin, cos, sqrt, atan2
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType, StructField, StructType
from pyspark.sql.functions import udf
from pyspark.sql.window import Window
import ast

In [0]:
table_names = {
  "train": "prepared_training_sets",
  "test": "prepared_testing_sets"
}

query_adjustments = {
    "unmodified": "",
    "manhattan_only": "where District = 'Manhattan'",
    "manhattan_only_no_marble_hill": "where District = 'Manhattan' and Neighborhood != 'Marble Hill'"
}

test_set_query = {
  "test": ["s", "join workspace.rental_predictions.sample_submission as s on (s.ID = t.ID)"],
  "train": ["t", ""]
}

data_set    = dbutils.widgets.get("data_set") if "data_set" in dbutils.widgets.getAll() else "train"
preparation = dbutils.widgets.get("preparation") if "preparation" in dbutils.widgets.getAll() else "unmodified"
job_id      = dbutils.widgets.get("job_id") if "job_id" in dbutils.widgets.getAll() else -1
run_id      = dbutils.widgets.get("run_id") if "run_id" in dbutils.widgets.getAll() else -1
museum_and_parks = ast.literal_eval(dbutils.widgets.get("museum_and_parks")) if "museum_and_parks" in dbutils.widgets.getAll() else False

In [0]:
base_query = f"""
    select 
    t.ID,
    t.District,
    t.Neighborhood,
    t.PropertyType,
    t.CancellationPolicy,
    t.Accommodates,
    t.RoomType,
    round(t.Bathrooms/0.5)*0.5 as Bathrooms_rounded,
    round(t.Bedrooms) as Bedrooms_rounded,
    t.CleaningFee,
    t.Latitude,
    t.Longitude,
    t.ReviewRating,
    {test_set_query[data_set][0]}.Price,
    '{preparation}' as data_set_preparation,
    current_timestamp() as snapshot_timestamp,
    {job_id} as job_id,
    {run_id} as run_id
    from workspace.rental_predictions.{data_set} as t
    {test_set_query[data_set][1]}
    {query_adjustments[preparation]}
"""
data = spark.sql(base_query)
display(data)

### Include Museum Data
This data is sourced from [data.gov](https://catalog.data.gov/dataset/museum-b306c?utm_source=chatgpt.com) and was uploaded on 2024-02-24.

In [0]:
if museum_and_parks:
    museum_data = spark.sql("""select NAME, the_geom from workspace.rental_predictions.nyc_museums""")

    museum_data = (
        museum_data
        .withColumns({
            "museum_Longitude": F.regexp_extract("the_geom", r"POINT \(([-\d\.]+) [-\d\.]+\)", 1).cast("double"),
            "museum_Latitude": F.regexp_extract("the_geom", r"POINT \([-\d\.]+ ([-\d\.]+)\)", 1).cast("double")
        })
    ).drop("the_geom")

    display(museum_data)

In [0]:
if museum_and_parks:
    haversine_udf = udf(haversine_miles, DoubleType())

    window = Window.partitionBy("ID").orderBy(F.col("distance_to_museum").asc())

    data_with_museums = (
        data
        .crossJoin(museum_data)
        .withColumn(
            "distance_to_museum",
            haversine_udf(
                "Latitude",
                "Longitude",
                "museum_Latitude",
                "museum_Longitude"
            )
        )
        .withColumn(
            "rank", F.row_number().over(window)
        )
        .filter(F.col("rank") == 1)
        .drop("rank", "museum_Latitude", "museum_Longitude")
        .withColumnRenamed("NAME", "nearest_museum")
    )

    display(data_with_museums)

### Include NYC Parks Data
This data is sourced from [data.gov](https://catalog.data.gov/dataset/opendata-parksproperties?utm_source=chatgpt.com) and was uploaded on 2024-04-26.

In [0]:
if museum_and_parks:
    park_data = spark.sql("""select SIGNNAME as park_name, the_geom from workspace.rental_predictions.nyc_parks where SIGNNAME is not NULL""")

    #convert polygonal data to centroid
    centroid_schema = StructType([
        StructField("centroid_lon", DoubleType(), True),
        StructField("centroid_lat", DoubleType(), True)
    ])

    centroid_udf = udf(multipolygon_centroid, centroid_schema)

    park_data = (
        park_data
        .withColumn("centroid", centroid_udf("the_geom"))
        .withColumns({
            "park_lon": F.col("centroid.centroid_lon"),
            "park_lat": F.col("centroid.centroid_lat"),
        })
        .drop("centroid", "the_geom")
    )

    # Cross join with bookings to get park distance
    window = Window.partitionBy("ID").orderBy(F.col("distance_to_park").asc())

    data_with_museums_and_parks = (
        data_with_museums
        .crossJoin(park_data)
        .withColumn(
            "distance_to_park",
            haversine_udf(
                "Latitude",
                "Longitude",
                "park_lat",
                "park_lon"
            )
        )
        .withColumn(
            "rank", F.row_number().over(window)
        )
        .filter(F.col("rank") == 1)
        .drop("rank", "park_lat", "park_lon")
        .withColumnRenamed("park_name", "nearest_park")
    )

    display(data_with_museums_and_parks)

### Write to Table

In [0]:
full_table_name = f"workspace.rental_predictions.{table_names[data_set]}"
if museum_and_parks:
    data = data_with_museums_and_parks
    full_table_name += "_museum_and_parks"
    
attempts = 0
max_attempts = 10

while attempts < max_attempts:
    try:
        data.write.mode("append").saveAsTable(full_table_name)
        break
    except Exception as e:
        attempts += 1
        if attempts == max_attempts:
            raise
        time.sleep(2)