# Big Data Management Project 1:
## Analyzing New York City Taxi Data

In [1]:
!pip install shapely



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, unix_timestamp
from pyspark.sql.types import IntegerType

import json

from shapely.geometry import shape, Point

In [3]:
spark = (SparkSession.builder
                    .appName('BDM_Project1')
                    .enableHiveSupport()  # Enables Hive support, persistent Hive metastore
                    .getOrCreate()
        )

### NYC Borough Data

In [4]:
with open('input/nyc-boroughs.geojson') as f:
    geo_data = json.load(f)

# Broadcasting data to workers
broadcast_geo_data = spark.sparkContext.broadcast(geo_data)

# TODO: is using a dictionary fine? "dataframe can be created out of it"
# Creating a dictionary of borough codes and polygons within the borough
polygons = {}
b_names = {} # borough names by code

for feature in broadcast_geo_data.value['features']:
    
    code = feature['properties']['boroughCode']
    name = feature['properties']['borough']

    if code not in polygons:
        polygons[code] = []
        b_names[code] = name
    
    polygons[code].append(shape(feature['geometry']))

# Sorting borough polygons by area
for code in polygons: 
    polygons[code] = sorted(
        polygons[code], key=lambda x: x.area, reverse=True
    )

# also sort boroughs by total area?
#borough_total_areas = {code: sum(poly.area for poly in polys) for code, polys in polygons.items()}
#sorted_boroughs = sorted(borough_total_areas.keys(), key=lambda x: borough_total_areas[x], reverse=True)
#polygons = {code: polygons[code] for code in sorted_boroughs}


In [5]:
# UDF: longitude, latitude -> borough
def get_borough(long, lat):
    point = Point(long, lat)
    
    for code, pols in polygons.items():
        for polygon in pols:
            if polygon.contains(point):
                return code
    
    return None

get_borough_udf = udf(get_borough, IntegerType())

### NYC Taxi Data

In [6]:
taxi_df = (spark.read
             .option("sep", ",")
             .option("header", True)
             .option("inferSchema", True)
             .csv("input/Sample NYC Data.csv")
            )

# Selecting only necessary columns
taxi_df = taxi_df.select(
    "hack_license",
    "pickup_latitude",
    "pickup_longitude",
    "pickup_datetime",
    "dropoff_latitude",
    "dropoff_longitude",
    "dropoff_datetime" 
)

# Converting datetime to unix timestamp (seconds)
taxi_df = taxi_df.withColumn(
    "pickup_ts", unix_timestamp("pickup_datetime", "dd-MM-yy HH:mm")
).withColumn(
    "dropoff_ts", unix_timestamp("dropoff_datetime", "dd-MM-yy HH:mm")
)

# Calculating ride duration (seconds)
taxi_df = taxi_df.withColumn(
    "duration", (taxi_df["dropoff_ts"] - taxi_df["pickup_ts"])
)

# Filtering out rides longer than 4h or with negative duration
taxi_df = taxi_df.filter((taxi_df["duration"] > 0) & (taxi_df["duration"] <= 4 * 60 * 60))

# Add pick up and drop off boroughs to taxi data
taxi_df = taxi_df.withColumn(
    "pickup_borough", get_borough_udf("pickup_longitude", "pickup_latitude")
).withColumn(
    "dropoff_borough", get_borough_udf("dropoff_longitude", "dropoff_latitude")
)
taxi_df.show(3, truncate=False)

+--------------------------------+---------------+----------------+---------------+----------------+-----------------+----------------+----------+----------+--------+--------------+---------------+
|hack_license                    |pickup_latitude|pickup_longitude|pickup_datetime|dropoff_latitude|dropoff_longitude|dropoff_datetime|pickup_ts |dropoff_ts|duration|pickup_borough|dropoff_borough|
+--------------------------------+---------------+----------------+---------------+----------------+-----------------+----------------+----------+----------+--------+--------------+---------------+
|BA96DE419E711691B9445D6A6307C170|40.757977      |-73.978165      |01-01-13 15:11 |40.751171       |-73.989838       |01-01-13 15:18  |1357053060|1357053480|420     |1             |1              |
|9FD8F69F0804BDB5549F40E9DA1BE472|40.731781      |-74.006683      |06-01-13 00:18 |40.75066        |-73.994499       |06-01-13 00:22  |1357431480|1357431720|240     |1             |1              |
|9FD8F69F0

### Query 1
Utilization: idle time per taxi

### Query 2
The average time it takes for a taxi to find its next fare(trip) per destination borough

### Query 3
The number of trips that started and ended within the same borough

### Query 4
The number of trips that started in one borough and ended in another one