# New York taxis trips

This homework is about New York taxi trips. Here is something from [Todd Schneider](https://toddwschneider.com/posts/analyzing-1-1-billion-nyc-taxi-and-uber-trips-with-a-vengeance/):

> The New York City Taxi & Limousine Commission has released a  detailed historical dataset covering over 1 billion individual taxi trips in the city from January 2009 through December 2019. 
Taken as a whole, the detailed trip-level data is more than just a vast list of taxi pickup and drop off coordinates: it's a story of a City. 
How bad is the rush hour traffic from Midtown to JFK? 
Where does the Bridge and Tunnel crowd hang out on Saturday nights?
What time do investment bankers get to work? How has Uber changed the landscape for taxis?
The dataset addresses all of these questions and many more.

The NY taxi trips dataset has been plowed by series of distinguished data scientists.
The dataset is available from on Amazon S3 (Amazon's cloud storage service).
The link for each file has the following form:

    https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_{year}-{month}.csv

There is one CSV file for each NY taxi service (`yellow`, `green`, `fhv`) and each calendar month (replacing `{year}` and `{month}` by the desired ones).
Each file is moderately large, a few gigabytes. 
The full dataset is relatively large if it has to be handled on a laptop (several hundred gigabytes).

You will focus on the `yellow` taxi service and a pair of months, from year 2015 and from year 2018. 
Between those two years, for hire vehicles services have taken off and carved a huge marketshare.

Whatever the framework you use, `CSV` files prove hard to handle. 
After downloading the appropriate files (this takes time, but this is routine), a first step will consist in converting the csv files into a more Spark friendly format such as `parquet`.

Saving into one of those formats require decisions about bucketing, partitioning and so on. Such decisions influence performance. It is your call.
Many people have been working on this dataset, to cite but a few:


- [1 billion trips with a vengeance](https://toddwschneider.com/posts/analyzing-1-1-billion-nyc-taxi-and-uber-trips-with-a-vengeance/)
- [1 billion trips with R and SQL ](http://freerangestats.info/blog/2019/12/22/nyc-taxis-sql)
- [1 billion trips with redshift](https://tech.marksblogg.com/billion-nyc-taxi-rides-redshift.html)
- [nyc-taxi](https://github.com/fmaletski/nyc-taxi-map)

Depending on your internet connection, **download the files** corresponding to **"yellow" taxis** for the years 2015 and 2018. Download **at least one month** (the same) for 2015 and 2018, if you can download all of them.

**Hint.** The 12 csv for 2015 are about 23GB in total, but the corresponding parquet file, if you can create it for all 12 months, is only about 3GB.

You **might** need the following stuff in order to work with GPS coordinates and to plot things easily.

In [None]:
!pip install geojson geojsonio geopandas plotly geopy cffi

In [None]:
!pip install descartes shapely folium

For this homework **we will let you decide on the tools to use** (expected for Spark) and to **find out information all by yourself** (but don't hesitate to ask questions on the `slack` channel).

# Loading data as parquet files

We want to organize the data on a per year and per service basis. 
We want to end up with one `parquet` file for each year and each taxi service, since parquet is much better than CSV files.

**Hint.** Depending on your internet connection and your laptop, you can use only the "yellow" service and use one month of 2015 and 2018

CSV files can contain corrupted lines. You may have to work in order to perform ETL (Extract-Transform-Load) in order obtain a properly typed data frame.

You are invited to proceed as follows:

## Try to read the CSV file without imposing a schema. 

In [None]:
# import os
# import sys
# import time
import numpy as np
import pandas as pd
import folium
from folium import plugins
from folium.plugins import HeatMap, TimeSliderChoropleth
import plotly.express as px
import calendar
import geopandas as gpd
from shapely.geometry import *
import json

# spark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import col, udf
import pyspark.sql.functions as fn
from pyspark.ml.feature import Bucketizer
# from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType, BooleanType

conf = SparkConf().setAll([ ('spark.app.name','Spark SQL'),
                            ('spark.executor.memory', '8g'), 
                            ('spark.executor.cores', '4'),
                            ('spark.driver.cores', '4'),
                            ('spark.cores.max', '4'), 
                            ('spark.driver.memory','8g')])

sc = SparkContext(conf=conf)

spark = SparkSession.builder\
                    .config(conf = conf)\
                    .getOrCreate()\

In [None]:
# df = []

# periods = ['2015-01', '2015-02', '2018-01', '2018-02']

# for time in periods:
#     path = f'yellow_tripdata_{time}.csv'
#     print(f'Loading {path} ...')
#     df.append(spark.read\
#              .format('csv')\
#              .option("header", "true")\
#              .option("mode", "FAILFAST")\
#              .option("inferSchema", "true")\
#              .option("sep", ",")\
#              .load(path))
# print("Done.")

## Inspect the inferred schema. Do you agree with Spark's typing decision?

We load the data and see that the timestamp columns are not of type spark Timestamp.

In [None]:
# for i in range(len(df)):
#     df[i].printSchema()

## Eventually correct the schema and read again the data

We cast the timestamp column into spark Timestamp type.

In [None]:
# df = []

# for time in periods:
#     path = f'yellow_tripdata_{time}.csv'
#     print(f'Loading {path}, forcing timestamp format...')
#     df.append(spark.read\
#              .format('csv')\
#              .option("header", "true")\
#              .option("mode", "FAILFAST")\
#              .option("inferSchema", "true")\
#              .option("timestampFormat", "yyyy-MM-dd HH:mm")\
#              .option("sep", ",")\
#              .load(path))
    
# print("Done.")

We see that the schemas of 2015 and 2018 differ by how was recorded the pickup/dropoff location.

## Save the data into parquet files

Before saving into parquet files, we bind by year, and add month, weekday, hour, minute and duration (in seconds) columns that will be usefull for our future calculations.

In [None]:
# for i in range(4):
    
#     print(f'Adding columns to {periods[i]} data...')
    
#     df[i] = df[i].withColumn("Month", fn.month("tpep_pickup_datetime"))\
#                  .withColumn("Weekday", fn.dayofweek("tpep_pickup_datetime"))\
#                  .withColumn("Hour", fn.hour("tpep_pickup_datetime"))\
#                  .withColumn("Minute", fn.minute("tpep_pickup_datetime"))\
#                  .withColumn("Duration", fn.unix_timestamp("tpep_dropoff_datetime") -
#                                          fn.unix_timestamp("tpep_pickup_datetime"))

# print("Done.")

In [None]:
# print("Joining data by year")

# df_by_year = [df[0].union(df[1]), df[2].union(df[3])]

# years = ['2015', '2018']

# for i in range(2):
#     parquet_path = f'yellow_{years[i]}.parquet'
    
#     print(f"Writing {years[i]} data into parquet...")
    
#     df_by_year[i].repartition(2 * 7 * 24, "Month", "Weekday", "Hour")\
#                  .write\
#                  .mode("overwrite")\
#                  .partitionBy("Month", "Weekday", "Hour")\
#                  .option("maxrecordsFile", 50000)\
#                  .parquet(parquet_path)

# print("Done.")

## In the rest of your work, **you will only use the parquet files you created**, not the csv files (don't forget to choose a partitioning column and a number of partitions when creating the parquet files).

**Hint.** Don't forget to ask `Spark` to use all the memory and ressources from your computer.

**Hint.** Don't foreget that you should specify a partitioning column and a number of partitions when creating the parquet files.

**Hint.** Note that the schemas of the 2015 and 2018 data are different...

**Hint.** When working on this, ask you and answer to the following questions:

In [None]:
df = []

years = ['2015', '2018']
center = (40.735, -73.971)

for i in range(2):
        parquet_path = f'yellow_{years[i]}.parquet'
        print(f'Reading {parquet_path}...')
        df.append(spark.read\
                       .parquet(parquet_path)\
                       .repartition(2 * 7 * 24, "Month", "Weekday", "Hour"))
        
df2015, df2018 = df

## What is the `StorageLevel` of the dataframe after reading the csv files?

In [None]:
for i in range(len(df)):
    print(df[i].storageLevel)

## What is the number of partitions of the dataframe? 

In [None]:
for i in range(len(df)):
    print(df[i].rdd.getNumPartitions())

## Is it possible to tune this number at loading time? 

When loading our dataframes, spark gives a partition of cardinal ~15.
When are not convinced it is possible to tune the number at loading time so we used `repartition`.

## Why would we want to modify the number of partitions when creating the parquet files?

We might want to modify the number of partitions when creating a parquet file so spark can retrieve it more easily when loading it.

# Investigate (at least) one month of data in 2015

From now on, you will be using **the parquet files you created for 2015**.

We shall visualize several features of taxi traffic during one calendar month
in 2015 and the same calendar month in 2018.

**Hint.** In order to build appealing graphics, you may stick to `matplotlib + seaborn`, you can use also
`plotly`, which is used a lot to build interactive graphics, but you can use whatever you want.

The following longitudes and latitudes encompass Newark and JFK airports, Northern Manhattan and Verazzano bridge.

In [None]:
long_min = -74.10
long_max = -73.70
lat_min = 40.58
lat_max = 40.90

query31 = f'''pickup_longitude BETWEEN {long_min} AND {long_max}
              AND pickup_latitude BETWEEN {lat_min} AND {lat_max}
              AND dropoff_longitude BETWEEN {long_min} AND {long_max}
              AND dropoff_latitude BETWEEN {lat_min} AND {lat_max}'''

## Using these boundaries, count the number of trips for each count of passenger and make a plot of that.

**Hint.** Filter the 2015 data using pickup and dropoff longitude and latitude.

In [None]:
query31 = f'''pickup_longitude BETWEEN {long_min} AND {long_max}
              AND pickup_latitude BETWEEN {lat_min} AND {lat_max}
              AND dropoff_longitude BETWEEN {long_min} AND {long_max}
              AND dropoff_latitude BETWEEN {lat_min} AND {lat_max}'''

df31 = df2015.where(query31)\
             .groupBy("Month", "passenger_count")\
             .count()\
             .toPandas()

df31["Month"] = df31["Month"].apply(lambda i : calendar.month_name[i])

fig31 = px.bar(df31,
              x = "passenger_count",
              y = "count",
              color = "Month",
              barmode = "group")

fig31.update_layout(title = "Number of trips by passenger count, 2015",
                    xaxis = dict(title = "Number of passengers",
                                 tickvals = list(range(10))),
                    yaxis = dict(title = "Number of trips"),
                    barmode = "group")

fig31.show()

Trips with $0$ or larger than $7$ passengers are pretty rare.
We suspect these to be outliers. 
We need to explore these trips further in order order to understand what might be wrong
with them

## What's special with trips with zero passengers?

We think that the 0 passengers trips can be explained by either human error when entering the number of passenger, or a missing value that was replaced by 0 as there are no NA values in the table.
Another explanation could be that some of these taxis drivers were hired to transport items rather than persons.

In [None]:
df2015.filter(col("passenger_count")==0)[["trip_distance"]].show()

## What's special with trips with more than $6$ passengers?

We suspect that most of these trips, especially those of distance 0 to be errors. The only explanation we could find for trips with more passengers than legally allowed (5) is that children younger than 7 years old are allowed to sir on the lap of someone, which means that technically the maximum number of passenger is 10.

In [None]:
df2015.filter(col("passenger_count") > 6)[["trip_distance"]].show()

## What is the largest distance travelled during this month? Is it the first taxi on the moon?

Once again, we suspect human error.

In [None]:
df2015.agg({"trip_distance" :"max"}).collect()[0][0]

## Plot the distribution of the trip distances (using an histogram for instance) during year 2015. Focus on trips with non-zero trip distance and trip distance less than 30 miles.

In [None]:
bucketizer = Bucketizer(splits = range(31),
                        inputCol = "trip_distance",
                        outputCol = "bucket_distance")

w35 = Window.partitionBy("Month")

df35 = bucketizer.transform(df2015.filter((col("trip_distance") > 0) & (col("trip_distance") < 30)))\
                                  .groupBy("Month", "bucket_distance")\
                                  .count()\
                                  .withColumn("freq", col("count") / fn.sum(col("count")).over(w35))\
                                  .toPandas()\
                                  .sort_values(["Month", "bucket_distance"])

df35["Month"] = df35["Month"].apply(lambda i : calendar.month_name[i])

fig35 = px.bar(df35,
                x = "bucket_distance",
                y = "freq",
                color = "Month",
                barmode = "group")

fig35.update_layout(title = "Distribution of trip distances (<30 miles), 2015",
                    xaxis = dict(title = "Distance (miles)",
                                 tickvals = list(range(0, 31, 2))),
                    yaxis = dict(title = "Frequence",
                                 tickvals = [i/20 for i in range(8)],
                                 ticktext = [f'{5*i}' + '%' for i in range(8)]))

fig35.show()

Let's look at what Spark does for these computations

## Use the explain method or have a look at the [Spark UI](http://localhost:4040/SQL/) to analyze the job. You should be able to assess 
    - Parsed Logical Plan
    - Analyzed Logical Plan
    - Optimized Logical Plan
    - Physical Plan

In [None]:
df2015.explain()

## Do the Analyzed Logical Plan and Optimized Logical Plan differ? Spot the differences if any. How would a RDBMS proceed with such a query?

## How does the physical plan differ from the Optimized Logical Plan? What are the keywords you would not expects in a RDBMS? What is their meaning? 

## Inspect the stages on [Spark UI](http://localhost:4040/stages/stage). How many *stages* are necessary to complete the Spark job? What are the roles of HashAggregate and Exchange hashpartitioning ?

## Does the physical plan perform shuffle operations? If yes how many?

## What are tasks with respect to stages (in Spark language)? How many tasks are your stages made of?

Now, compute the following and produce relevant plots:

## Break down the trip distance distribution for each day of week

In [None]:
w312 = Window.partitionBy("Month", "Weekday")

df312 = bucketizer.transform(df2015.filter((col("trip_distance") > 0) & (col("trip_distance")<30)) 
                                                )\
                                     .groupBy("Month", "Weekday", "bucket_distance")\
                                     .count()\
                                     .withColumn("freq", 100 * col("count") /
                                                                 fn.sum(col("count")).over(w312))\
                                     .toPandas()\
                                     .sort_values(["Month", "Weekday", "bucket_distance"])

df312["Weekday"] = df312["Weekday"].apply(lambda i : calendar.day_name[i-1])
df312["Month"] = df312["Month"].apply(lambda i : calendar.month_name[i])

fig312 = px.line(df312,
                 x = "bucket_distance",
                 y = "freq",
                 color = "Weekday",
                 facet_row = "Month")

fig312.update_layout(title = "Distribution of trip distances (<30 miles), Jan. 2015",
                     xaxis_title = "Distance (in miles)")

fig312.update_xaxes(dtick = 2)
fig312.update_yaxes(title = "Frequence",
                    dtick = 5,
                    ticksuffix = "%")

fig312.show()

## Count the number of distinct pickup location

We compute the number of distinct pickup location in 2018 as locations in 2015 are too accurate.

In [None]:
df2018.select("PULocationID").distinct().count()

In [None]:
districts = gpd.read_file('./districts.geojson')
districts.info()

## Compute and display tips and profits as a function of the pickup location

We use 2018 data with location ID, compute the right columns and then use a spatial join with the "official" geojson we found on https://data.cityofnewyork.us/Transportation/NYC-Taxi-Zones/d3c5-ddgc

In [None]:
df314 = df2018.withColumn("profit", col("fare_amount") + col("tip_amount"))\
              .select("tip_amount", "profit", "PULocationID")\
              .groupBy("PULocationID")\
              .agg(fn.avg("tip_amount").alias("average_tip"),
                   fn.log(fn.count(fn.lit(1))).alias("count"),
                   fn.avg("profit").alias("average_profit"))\
              .toPandas()\
              .rename(columns={"PULocationID" : "objectid"})

districts = gpd.read_file('./districts.geojson')
districts["objectid"] = districts["objectid"].astype("int32")

df314 = districts.merge(df314, on = "objectid", how = "left")

df314 = df314[["count", "objectid", "average_tip", "average_profit"]]

df314 = df314.rename(columns={"objectid" : "id"})\
             .sort_values("id")

with open('./districts.geojson') as f:
    districts = json.load(f)
    
for dico in districts["features"]:
    dico["id"] = int(dico["properties"]["objectid"])

In [None]:
m = folium.Map(location = [40.7, -74],
               zoom_start = 10,
               tiles = 'https://tiles.stadiamaps.com/tiles/alidade_smooth_dark/{z}/{x}/{y}{r}.png',
               attr = '&copy; <a href="https://stadiamaps.com/">Stadia Maps</a>, &copy; <a href="https://openmaptiles.org/">OpenMapTiles</a> &copy; <a href="http://openstreetmap.org">OpenStreetMap</a> contributors')

folium.Choropleth(
    geo_data=districts,
    name='choropleth',
    data=df314,
    columns=['id', 'average_tip'],
    key_on='id',
    fill_color = 'RdPu',
    fill_opacity=0.7,
    line_opacity=0.2,
    legend_name='Average tip ($)').add_to(m)

folium.LayerControl().add_to(m)

m

In [None]:
m2 = folium.Map(location = [40.7, -74],
               zoom_start = 10,
               tiles = 'https://tiles.stadiamaps.com/tiles/alidade_smooth_dark/{z}/{x}/{y}{r}.png',
               attr = '&copy; <a href="https://stadiamaps.com/">Stadia Maps</a>, &copy; <a href="https://openmaptiles.org/">OpenMapTiles</a> &copy; <a href="http://openstreetmap.org">OpenStreetMap</a> contributors')

folium.Choropleth(
    geo_data=districts,
    name='choropleth',
    data=df314,
    columns=['id', 'average_profit'],
    key_on='id',
    fill_color = 'YlOrRd',
    fill_opacity=0.7,
    line_opacity=0.2,
    legend_name='Average profit ($)').add_to(m2)

folium.LayerControl().add_to(m2)

m2

# Investigate one month of trips data in 2015 and 2018

 Consider one month of trips data from `yellow` taxis for each year

## Filter and cache/persist the result

## Assessing seasonalities and looking at time series

Compute and plot the following time series indexed by day of the week and hour of day:

### The number of pickups

In [None]:
df421 = [df2015.filter(df2015["Month"] == 1)\
               .filter(df2015["Duration"] < 2000),
         df2018.filter(df2018["Month"] == 1)]

for i, y in enumerate(years):
    df421[i] = df421[i].withColumn("Duration", col("Duration") * (1/60))\
                       .groupBy("Weekday", "Hour")\
                       .agg(fn.count(fn.lit(1)).alias("trips"),
                            fn.avg("fare_amount").alias("average_fare"),
                            fn.avg("Duration").alias("average_duration"))\
                       .orderBy("Weekday", "Hour")\
                       .toPandas()
    
    df421[i]["Weekday"] = df421[i]["Weekday"].apply(lambda i : calendar.day_name[i-1])
    df421[i]["Year"] = int(y)
    
df421 = df421[0].append(df421[1])

fig421 = px.line(df421,
                 x = "Hour",
                 y = "trips",
                 color = "Weekday",
                 facet_row = "Year")

fig421.update_layout(title = "Cumulated number of pickups during each weekday in January",
                     xaxis_title = "Time of the day")
fig421.update_xaxes(dtick = 1,
                    matches = None)
fig421.update_yaxes(title = "Cumulated pickups over Jan.",
                    dtick = 30000,
                    matches = None)

### The average fare

In [None]:
fig422 = px.line(df421,
                 x = "Hour",
                 y = "average_fare",
                 color = "Weekday",
                 facet_row = "Year")

fig422.update_layout(title = "Average fare during each weekday in January",
                     xaxis_title = "Time of the day")
fig422.update_xaxes(dtick = 1,
                    matches = None)
fig422.update_yaxes(title = "Fare (in $)",
                    matches = None)

### The average trip duration

In [None]:
fig423 = px.line(df421,
                 x = "Hour",
                 y = "average_duration",
                 color = "Weekday",
                 facet_row = "Year")

fig423.update_layout(title = "Average trip duration during each weekday in January",
                     xaxis_title = "Time of the day")
fig423.update_xaxes(dtick = 1,
                    matches = None)
fig423.update_yaxes(title = "Duration (in minutes)",
                    matches = None)

### Plot the average number of ongoing trips

We decided to compute the number of ongoing trips each minute the following way:
    For each minute n, we count the pickups from minute 0 to n (included), and then subtract the dropoffs from minute 0 to n-1 (included).

In [None]:
w1 = Window.orderBy("date")\
            .rowsBetween(Window.unboundedPreceding, 0)

w2 = Window.orderBy("date")\
            .rowsBetween(Window.unboundedPreceding, -1)

df424 = [df2015.filter(df2015["Month"] == 1)\
               .filter(df2015["Duration"] < 2000),
         df2018.filter(df2018["Month"] == 1)]

for i, y in enumerate(years):
    
    pickup = df424[i].withColumn("date", fn.date_trunc("minute", "tpep_pickup_datetime"))\
                 .groupBy("date")\
                 .count()\
                 .withColumn("cum_pickup", fn.sum("count").over(w1))\
                 .drop("count")

    dropoff = df424[i].withColumn("date", fn.date_trunc("minute", "tpep_dropoff_datetime"))\
                     .groupBy("date")\
                     .count()\
                     .withColumn("cum_dropoff", fn.sum("count").over(w2))\
                     .drop("count")
    
    df424[i] = pickup.join(dropoff, "date", "inner")\
                    .withColumn("time", fn.split("date", " ")[1])\
                    .withColumn("date", fn.split("date", " ")[0])\
                    .withColumn("diff", col("cum_pickup") - col("cum_dropoff"))\
                    .na.fill(0)\
                    .drop("cum_pickup", "cum_dropoff")\
                    .groupBy(fn.dayofweek("date").alias("Weekday"), "time")\
                    .agg(fn.avg("diff").alias("avg_ong"))\
                    .orderBy("Weekday", "time")\
                    .toPandas()
    
    df424[i]["Weekday"] = df424[i]["Weekday"].apply(lambda i : calendar.day_name[i-1])
    df424[i]["Year"] = int(y)
    
df424 = df424[0].append(df424[1])

fig424 = px.line(df424,
                 x = "time",
                 y = "avg_ong",
                 color = "Weekday",
                 facet_row = "Year")

fig424.update_layout(title = "Average number of ongoing trips during each weakday in January",
                     xaxis_title = "Time of the day")

fig424.update_xaxes(dtick = 60)
fig424.update_yaxes(title = "Number of ongoing trips",
                    dtick = 1000,
                    matches = None)

fig424.show()

## Rides to the airports

In order to find the longitude and lattitude of JFK and Newark airport as well as the longitude and magnitudes 
of Manhattan, you can use a service like [geojson.io](http://geojson.io/).
Plot the following time series, indexed by the day of the week and hour of the day

### Median duration of taxi trip leaving Midtown (Southern Manhattan) headed for JFK Airport

We first filter the data using a rough estimate of the districts (with a box large enough to contain them), the data is then small enough to be handle with (geo)pandas dataframes where we can apply an exact filter, using (multi)polygons from a geojson.

In [None]:
midtown_box = [-74.0091, -73.9587, 40.7269, 40.7729]
jfk_box = [-73.8196, -73.7704, 40.6389, 40.6652]

midtown = gpd.read_file('./midtown.geojson').geometry[0]
jfk = gpd.read_file('./jfk.geojson').geometry[0]
m_to_j2015 = udf(lambda a, b, c, d : Point(a, b).within(midtown) and Point(c, d).within(jfk))
j_to_m2015 = udf(lambda a, b, c, d : Point(a, b).within(jfk) and Point(c, d).within(midtown))

midtown_by_id = [224, 107, 234, 90, 68, 246, 137, 170, 164, 186, 233, 100,229, 162, 161, 230, 48, 50, 163]
m_to_j2018 = udf(lambda i, j : (i.isin(midtown_by_id)) and (j == 132))

def query2015(a,b):
    query = f'''pickup_longitude BETWEEN {a[0]} AND {a[1]}
            AND pickup_latitude BETWEEN {a[2]} AND {a[3]}
            AND dropoff_longitude BETWEEN {b[0]} AND {b[1]}
            AND dropoff_latitude BETWEEN {b[2]} AND {b[3]}'''
    return query

In [None]:
df431 = [df2015.filter(df2015["Month"] == 1),
         df2018.filter(df2018["Month"] == 1)]

df431[0] = df431[0].where(query2015(midtown_box, jfk_box))\
                         .withColumn("to_keep", m_to_j2015("pickup_longitude",
                                                           "pickup_latitude",
                                                           "dropoff_longitude",
                                                           "dropoff_latitude"))
df431[0] = df431[0].filter(col("to_keep") == True)

df431[1] = df431[1].filter((col("PULocationID").isin(midtown_by_id)) & (col("DOLocationID") == 132))

for i, y in enumerate(years):
    
    df431[i] = df431[i].withColumn("Duration", col("Duration") * (1/60))\
                       .groupBy("Weekday", "Hour")\
                       .agg(fn.expr('percentile_approx(Duration, 0.5)').alias("Median"))\
                               .orderBy("Weekday", "Hour")\
                               .toPandas()
    
    df431[i]["Weekday"] = df431[i]["Weekday"].apply(lambda i : calendar.day_name[i-1])
    df431[i]["Year"] = int(y)
    
df431 = df431[0].append(df431[1])

fig431 = px.line(df431,
                 x = "Hour",
                 y = "Median",
                 color = "Weekday",
                 facet_row = "Year")

fig431.update_layout(title = "Median trip duration from Midtown to JFK during each weekday in January",
                     xaxis_title = "Time of the day")

fig431.update_xaxes(dtick = 1)
fig431.update_yaxes(title = "Duration (in minutes)",
                    matches = None)

fig431.show()

### Median taxi duration of trip leaving from JFK Airport to Midtown (Southern Manhattan)

We replicate the previous process.

In [None]:
df432 = [df2015.filter(df2015["Month"] == 1),
         df2018.filter(df2018["Month"] == 1)]

df432[0] = df432[0].where(query2015(jfk_box, midtown_box))\
                         .withColumn("to_keep", j_to_m2015("pickup_longitude",
                                                           "pickup_latitude",
                                                           "dropoff_longitude",
                                                           "dropoff_latitude"))
df432[0] = df432[0].filter(col("to_keep") == True)

df432[1] = df432[1].filter((col("DOLocationID").isin(midtown_by_id)) & (col("PULocationID") == 132))

for i, y in enumerate(years):
    
    df432[i] = df432[i].withColumn("Duration", col("Duration") * (1/60))\
                       .groupBy("Weekday", "Hour")\
                       .agg(fn.expr('percentile_approx(Duration, 0.5)').alias("Median"))\
                       .orderBy("Weekday", "Hour")\
                       .toPandas()
    
    df432[i]["Weekday"] = df432[i]["Weekday"].apply(lambda i : calendar.day_name[i-1])
    df432[i]["Year"] = int(y)
    
df432 = df432[0].append(df432[1])

fig432 = px.line(df432,
                 x = "Hour",
                 y = "Median",
                 color = "Weekday",
                 facet_row = "Year")

fig432.update_layout(title = "Median trip duration from JFK to Midtown during each weekday in January",
                     xaxis_title = "Time of the day")

fig432.update_xaxes(dtick = 1)
fig432.update_yaxes(title = "Duration (in minutes)",
                    matches = None)

fig432.show()

## Geographic information

For this, you will need to find tools to display maps and to build choropleth maps.
We let you look and find relevant tools to do this.

### Build a heatmap where color is a function of
    1. number of `pickups`
    2. number of `dropoffs`
    3. number of `pickups` with dropoff at some airport (JFK, LaGuardia, Newark)

Same as before, a rough filter to work with data of reasonable size, as folium needs a list of coordinates to build a heat map.

In [None]:
long_min = -74.3
long_max = -73.7
lat_min = 40.5
lat_max = 40.9

loc = ["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"]

query441 = f'''{loc[0]} BETWEEN {long_min} AND {long_max}
               AND {loc[1]} BETWEEN {lat_min} AND {lat_max}
               AND {loc[2]} BETWEEN {long_min} AND {long_max}
               AND {loc[3]} BETWEEN {lat_min} AND {lat_max}'''

df441 = df2015.where(query441)\
              .select(*loc)

for loc in loc:
    df441 = df441.withColumn(loc, fn.round(col(loc), 4))
    
df4411 = df441.select("pickup_latitude", "pickup_longitude")\
              .groupBy("pickup_latitude", "pickup_longitude")\
              .count()

df4412 = df441.select("dropoff_latitude", "dropoff_longitude")\
              .groupBy("dropoff_latitude", "dropoff_longitude")\
              .count()

df4411 = [list(row) for row in df4411.collect()]
df4412 = [list(row) for row in df4412.collect()]

In [None]:
m = folium.Map(location = list(center),
               zoom_start = 11,
               tiles = 'https://tiles.stadiamaps.com/tiles/alidade_smooth_dark/{z}/{x}/{y}{r}.png',
               attr = '&copy; <a href="https://stadiamaps.com/">Stadia Maps</a>, &copy; <a href="https://openmaptiles.org/">OpenMapTiles</a> &copy; <a href="http://openstreetmap.org">OpenStreetMap</a> contributors')

HeatMap(df4411,
        radius = 1.95,
        blur = 2).add_to(m)

m

In [None]:
m = folium.Map(location = list(center),
               zoom_start = 11,
               tiles = 'https://tiles.stadiamaps.com/tiles/alidade_smooth_dark/{z}/{x}/{y}{r}.png',
               attr = '&copy; <a href="https://stadiamaps.com/">Stadia Maps</a>, &copy; <a href="https://openmaptiles.org/">OpenMapTiles</a> &copy; <a href="http://openstreetmap.org">OpenStreetMap</a> contributors')

HeatMap(df4412,
        radius = 2,
        blur = 2).add_to(m)

m

Same process, rough filter by emboxing airports then using geojson polygons.

In [None]:
nwbox = [-74.20, -74.14, 40.67, 40.71]
jfkbox = [-73.819, -73.770, 40.638, 40.665]
lgbox = [-73.89, -73.85, 40.76, 40.78]

airports_box = [nwbox, jfkbox, lgbox]
query = [f'''dropoff_longitude BETWEEN {box[0]} AND {box[1]}
            AND dropoff_latitude BETWEEN {box[2]} AND {box[3]}''' for box in airports_box]

districts = gpd.read_file('./districts.geojson')
airports_geom2 = list(districts[districts["objectid"].isin(['1', '132', '138'])]["geometry"])
# airports_geom = [gpd.read_file(f'./{x}.geojson').geometry[0] for x in ['nw', 'jfk', 'lg']]

df = []

for i in range(3):
    temp = df441.where(query[i])\
                .withColumn("to_keep",
                            udf(lambda a, b : Point(a,b).within(airports_geom2[i]), BooleanType())("dropoff_longitude", "dropoff_latitude"))\
                .filter(col("to_keep") == True)\
                .select("pickup_latitude", "pickup_longitude")
    df.append([list(row) for row in temp.collect()])
    
df4413 = df[0] + df[1] + df[2]

m = folium.Map(location = list(center),
               zoom_start = 11,
               tiles = 'https://tiles.stadiamaps.com/tiles/alidade_smooth_dark/{z}/{x}/{y}{r}.png',
               attr = '&copy; <a href="https://stadiamaps.com/">Stadia Maps</a>, &copy; <a href="https://openmaptiles.org/">OpenMapTiles</a> &copy; <a href="http://openstreetmap.org">OpenStreetMap</a> contributors')

HeatMap(df4413,
        radius = 2,
        blur = 2).add_to(m)

m

### Build a choropleth map where color is a function of
    1. number of pickups in the area
    2. ratio of number of payments by card/number of cash payments for pickups in the area
    3. ratio of total fare/trip duration for dropoff in the area

We already computed the column needed here, in question 3.14, thus we use df314.

In [None]:
m = folium.Map(location = [40.7, -74],
               zoom_start = 10,
               tiles = 'https://tiles.stadiamaps.com/tiles/alidade_smooth_dark/{z}/{x}/{y}{r}.png',
               attr = '&copy; <a href="https://stadiamaps.com/">Stadia Maps</a>, &copy; <a href="https://openmaptiles.org/">OpenMapTiles</a> &copy; <a href="http://openstreetmap.org">OpenStreetMap</a> contributors')

folium.Choropleth(
    geo_data=districts,
    name='choropleth',
    data=df314,
    columns=['id', 'count'],
    key_on='id',
    fill_color = 'RdPu',
    fill_opacity=0.7,
    line_opacity=0.2,
    legend_name='Number of pickups').add_to(m)

folium.LayerControl().add_to(m)

m

We couldn't find on time which payment the numbers referred to, thus we are not able to produce the choropleth map for card/cash ratio.

In [None]:
df4423 = df2018.withColumn("Duration", col("Duration") * (1/60))\
               .withColumn("minute_cost", col("total_amount") / col("Duration"))\
               .groupBy("DOLocationID")\
               .agg(fn.avg("minute_cost").alias("average_minute_cost"))\
               .toPandas()\
               .rename(columns={"DOLocationID" : "objectid"})   

districts = gpd.read_file('./districts.geojson')
districts["objectid"] = districts["objectid"].astype("int32")

df4423 = districts.merge(df4423, on = "objectid", how = "left")

df4423 = df4423[["objectid", "average_minute_cost"]]

df4423 = df4423.rename(columns={"objectid" : "id"})\
               .sort_values("id")

with open('./districts.geojson') as f:
    districts = json.load(f)
    
for dico in districts["features"]:
    dico["id"] = int(dico["properties"]["objectid"])

In [None]:
m = folium.Map(location = [40.7, -74],
               zoom_start = 10,
               tiles = 'https://tiles.stadiamaps.com/tiles/alidade_smooth_dark/{z}/{x}/{y}{r}.png',
               attr = '&copy; <a href="https://stadiamaps.com/">Stadia Maps</a>, &copy; <a href="https://openmaptiles.org/">OpenMapTiles</a> &copy; <a href="http://openstreetmap.org">OpenStreetMap</a> contributors')

folium.Choropleth(
    geo_data=districts,
    name='choropleth',
    data=df4423,
    columns=['id', 'average_minute_cost'],
    key_on='id',
    fill_color = 'RdPu',
    fill_opacity=0.7,
    line_opacity=0.2,
    legend_name='Average minute cost (in $)').add_to(m)

folium.LayerControl().add_to(m)

m

### Build an interactive chorophlet with a slider allowing the user to select an hour of day and where the color is a function of
    1. average number of dropoffs in the area during that hour the day
    2. average ratio of tip over total fare amount for pickups in the area at given hour of the day

In [None]:
df4431 = df2018.groupBy("Month", 
                        fn.dayofmonth("tpep_pickup_datetime"), 
                        "Hour", 
                        "DOLocationID")\
               .count()\
               .groupBy("Hour", "DOLocationID")\
               .agg(fn.avg("count").alias("average_dropoffs"))\
               .toPandas()\
               .rename(columns={"DOLocationID" : "objectid"})

districts = gpd.read_file('./districts.geojson')
districts["objectid"] = districts["objectid"].astype("int32")

df4431 = districts.merge(df4431, on = "objectid", how = "left")

df4431 = df4431.rename(columns={"objectid" : "id"})\
               .sort_values("id")

with open('./districts.geojson') as f:
    districts = json.load(f)
    
for dico in districts["features"]:
    dico["id"] = int(dico["properties"]["objectid"])

In [None]:
m = folium.Map(location = [40.7, -74],
               zoom_start = 10,
               tiles = 'https://tiles.stadiamaps.com/tiles/alidade_smooth_dark/{z}/{x}/{y}{r}.png',
               attr = '&copy; <a href="https://stadiamaps.com/">Stadia Maps</a>, &copy; <a href="https://openmaptiles.org/">OpenMapTiles</a> &copy; <a href="http://openstreetmap.org">OpenStreetMap</a> contributors')

folium.Choropleth(
    geo_data=districts,
    name='choropleth',
    data=df4431,
    columns=['id', 'average_dropoffs'],
    key_on='id',
    fill_color = 'RdPu',
    fill_opacity=0.7,
    line_opacity=0.2,
    legend_name='Average minute cost (in $)').add_to(m)

folium.LayerControl().add_to(m)

m

We couldn't understand on time how to use the TimeSliderChoropleth plugin of folium, thus not producing the asked map.