# New York taxis trips

This homework is about New York taxi trips. Here is something from [Todd Schneider](https://toddwschneider.com/posts/analyzing-1-1-billion-nyc-taxi-and-uber-trips-with-a-vengeance/):

> The New York City Taxi & Limousine Commission has released a  detailed historical dataset covering over 1 billion individual taxi trips in the city from January 2009 through December 2019. 
Taken as a whole, the detailed trip-level data is more than just a vast list of taxi pickup and drop off coordinates: it's a story of a City. 
How bad is the rush hour traffic from Midtown to JFK? 
Where does the Bridge and Tunnel crowd hang out on Saturday nights?
What time do investment bankers get to work? How has Uber changed the landscape for taxis?
The dataset addresses all of these questions and many more.

The NY taxi trips dataset has been plowed by series of distinguished data scientists.
The dataset is available from on Amazon S3 (Amazon's cloud storage service).
The link for each file has the following form:

    https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_{year}-{month}.csv

There is one CSV file for each NY taxi service (`yellow`, `green`, `fhv`) and each calendar month (replacing `{year}` and `{month}` by the desired ones).
Each file is moderately large, a few gigabytes. 
The full dataset is relatively large if it has to be handled on a laptop (several hundred gigabytes).

You will focus on the `yellow` taxi service and a pair of months, from year 2015 and from year 2018. 
Between those two years, for hire vehicles services have taken off and carved a huge marketshare.

Whatever the framework you use, `CSV` files prove hard to handle. 
After downloading the appropriate files (this takes time, but this is routine), a first step will consist in converting the csv files into a more Spark friendly format such as `parquet`.

Saving into one of those formats require decisions about bucketing, partitioning and so on. Such decisions influence performance. It is your call.
Many people have been working on this dataset, to cite but a few:


- [1 billion trips with a vengeance](https://toddwschneider.com/posts/analyzing-1-1-billion-nyc-taxi-and-uber-trips-with-a-vengeance/)
- [1 billion trips with R and SQL ](http://freerangestats.info/blog/2019/12/22/nyc-taxis-sql)
- [1 billion trips with redshift](https://tech.marksblogg.com/billion-nyc-taxi-rides-redshift.html)
- [nyc-taxi](https://github.com/fmaletski/nyc-taxi-map)

Depending on your internet connection, **download the files** corresponding to **"yellow" taxis** for the years 2015 and 2018. Download **at least one month** (the same) for 2015 and 2018, if you can download all of them.

**Hint.** The 12 csv for 2015 are about 23GB in total, but the corresponding parquet file, if you can create it for all 12 months, is only about 3GB.

You **might** need the following stuff in order to work with GPS coordinates and to plot things easily.

In [None]:
!pip install geojson geopandas plotly geopy ipyleaflet

In [None]:
!pip install pyshp

In [None]:
# import the usual suspects
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import random as rand
import os
from pathlib import Path
import sys
import timeit
import shapefile
import urllib.request
import zipfile
import random
import itertools
import math
import geopandas as gpd
import seaborn as sns
from pathlib import Path
# spark
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import asc, desc
from pyspark.sql.functions import year
from pyspark import StorageLevel
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import col
import pyspark.sql.functions as fn
from pyspark.sql.catalog import Catalog
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.types import *
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql import SQLContext

In [None]:
#telechargement du fichier de données de decembre 2015
data_2015_12_path = Path('yellow_tripdata_2015-12.csv')
if not data_2015_12_path.exists():
    urllib.request.urlretrieve("https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2015-12.csv", "yellow_tripdata_2015-12.csv")

In [None]:
#telechargement du fichier de données de decembre 2018
data_2018_12_path = Path('yellow_tripdata_2018-12.csv')
if not data_2018_12_path.exists():
    urllib.request.urlretrieve("https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-12.csv", "yellow_tripdata_2018-12.csv")

For this homework **we will let you decide on the tools to use** (expected for Spark) and to **find out information all by yourself** (but don't hesitate to ask questions on the `slack` channel).

In [None]:
conf = SparkConf().setAppName("Spark NY Taxis trips")
conf.set("spark.driver.memory", "4g")
sc = SparkContext(conf=conf)

spark = (SparkSession
    .builder
    .appName("Spark NY Taxis trips")
    .getOrCreate()
)

# Loading data as parquet files

We want to organize the data on a per year and per service basis. 
We want to end up with one `parquet` file for each year and each taxi service, since parquet is much better than CSV files.

**Hint.** Depending on your internet connection and your laptop, you can use only the "yellow" service and use one month of 2015 and 2018

CSV files can contain corrupted lines. You may have to work in order to perform ETL (Extract-Transform-Load) in order obtain a properly typed data frame.

You are invited to proceed as follows:

1. Try to read the CSV file without imposing a schema. 
1. Inspect the inferred schema. Do you agree with Spark's typing decision?
1. Eventually correct the schema and read again the data
1. Save the data into parquet files
1. In the rest of your work, **you will only use the parquet files you created**, not the csv files (don't forget to choose a partitioning column and a number of partitions when creating the parquet files).

**Hint.** Don't forget to ask `Spark` to use all the memory and ressources from your computer.

**Hint.** Don't foreget that you should specify a partitioning column and a number of partitions when creating the parquet files.

**Hint.** Note that the schemas of the 2015 and 2018 data are different...

**Hint.** When working on this, ask you and answer to the following questions:

1. What is the `StorageLevel` of the dataframe after reading the csv files?
1. What is the number of partitions of the dataframe? 
1. Is it possible to tune this number at loading time? 
1. Why would we want to modify the number of partitions when creating the parquet files?

## Try to read the CSV file without imposing a schema. 

In [None]:
def read_csv (input_file_name):
    df = spark.read.load(
    input_file_name,
    format="csv", 
    sep=",", 
    inferSchema="false", 
    header="true"
    )
    return df

In [None]:
#Lecture données de 2015
df_2015_12 = read_csv("yellow_tripdata_2015-12.csv")
df_2015_12.first()

In [None]:
#Lecture données de 2018
df_2018_12 = read_csv("yellow_tripdata_2018-12.csv")
df_2018_12.first()

### What is the StorageLevel of the dataframe after reading the csv files?

In [None]:
# 1. Le storageLevel par défaut est MEMORY_ONLY: Serialized 1x Replicated.
print(df_2018_12.rdd.getStorageLevel())

### What is the number of partitions of the dataframe? 
6 partitions pour 12-2018 et 14 pour 12-2015

In [None]:
print(df_2018_12.rdd.getNumPartitions())
print(df_2015_12.rdd.getNumPartitions())

### Is it possible to tune this number at loading time?
Oui en utilisant la methode .repartition(Nombre_De_Partition) lors du read

## Inspect the inferred schema. Do you agree with Spark's typing decision?
Reponse:
Pour ces deux dataframes, le schéma contient seulement des colonnes
de données de type string.
Avoir un type "général" pour les données est pratique pour les collecter mais pas pour les utiliser 
ensuite, par exemple pour utiliser les dates, les nombres.
Il faut donc proceder a une convertion de type pour mieux exploiter les données

In [None]:
#Inspection du schema de 2015
df_2015_12.printSchema()

In [None]:
#Inspection du schema de 2018
df_2018_12.printSchema()

On remarque :

Le schema de 2015 comporte 4 colonnes qui ne sont pas dans 2018
pickup_longitude, pickup_latitude, 
dropoff_longitude, dropoff_latitude

Le schema de 2018 comporte 2 colonnes qui ne sont pas dans 2015
PULocationID, DOLocationID

Apres plusieurs recherche on comprend que PULocationID et DOLocationID sont des id d'endroit precis dans NY. Grace a un fochier definnissant les lat et long en fonction des ID on va donc pouvoir ramener le schema de 2018 au meme que celui de 2015.
Ainsi PULocationID nous donnera les colonnes pickup_longitude, pickup_latitude.
Et DOLocationID nous donnera les colonnes dropoff_longitude, dropoff_latitude.

## Convert data_2018_12 from old schema to same schema as data_2015_12

In [None]:
#telechargement du fichier de données qui nous permettera
# d'avoir le meme schema pour les df de 2018 et 2015
shape_path = Path('shape')
if not shape_path.exists():
    urllib.request.urlretrieve("https://data.cityofnewyork.us/api/geospatial/d3c5-ddgc?method=export&format=Original", "taxi_zones.zip")
    with zipfile.ZipFile("taxi_zones.zip","r") as zip_ref:
        zip_ref.extractall("./shape")

Lecture du fichier shape

In [None]:
def get_lat_lon(sf):
    content = []
    for sr in sf.shapeRecords():
        shape = sr.shape
        rec = sr.record
        loc_id = rec[shp_dic['LocationID']]
        
        x = (shape.bbox[0]+shape.bbox[2])/2
        y = (shape.bbox[1]+shape.bbox[3])/2
        
        content.append((loc_id, x, y))
    return pd.DataFrame(content, columns=["LocationID", "longitude", "latitude"])

gpd.read_file('shape/taxi_zones.shp').to_crs(epsg=4326).to_file("shape/taxi_zones_4326.shp")
sf_4326 = shapefile.Reader("shape/taxi_zones_4326.shp")


fields_name = [field[0] for field in sf_4326.fields[1:]]
shp_dic = dict(zip(fields_name, list(range(len(fields_name)))))
attributes = sf_4326.records()
shp_attr = [dict(zip(fields_name, attr)) for attr in attributes]

pd_df_loc = pd.DataFrame(shp_attr).join(get_lat_lon(sf_4326)\
                                        .set_index("LocationID"), on="LocationID")
sp_df_loc = spark.createDataFrame(data=pd_df_loc)
sp_df_loc = sp_df_loc\
            .withColumn('LocationID', sp_df_loc['LocationID'].cast(IntegerType()))
sp_df_loc = sp_df_loc.groupBy("LocationID").agg({'longitude':'avg', 'latitude':'avg'})
sp_df_loc = sp_df_loc.withColumnRenamed("AVG(longitude)", "longitude")\
            .withColumnRenamed("AVG(latitude)", "latitude")
PULocationID = sp_df_loc.selectExpr('LocationID as PULocationID','longitude as pickup_longitude','latitude as pickup_latitude')
DOLocationID = sp_df_loc.selectExpr('LocationID as DOLocationID','longitude as dropoff_longitude','latitude as dropoff_latitude')

In [None]:
sp_df_loc.show(2)

In [None]:
PULocationID.printSchema()
DOLocationID.printSchema()

In [None]:
#Preparation de la jointure des deux df
df_2018_12 = df_2018_12\
            .withColumn('PULocationID', df_2018_12['PULocationID'].cast(IntegerType()))\
            .withColumn('DOLocationID', df_2018_12['DOLocationID'].cast(IntegerType()))

In [None]:
#Affichage du nombre de ligne pour garder un oeil sur la perte de donnée apres jointure
print(df_2018_12.count())

In [None]:
#jointure
df_2018_12 = df_2018_12.join(PULocationID, on=['PULocationID'], how='inner')
df_2018_12 = df_2018_12.join(DOLocationID, on=['DOLocationID'], how='inner')
df_2018_12 = df_2018_12.select('VendorID','tpep_pickup_datetime','tpep_dropoff_datetime','passenger_count','trip_distance','pickup_longitude','pickup_latitude','RateCodeID','store_and_fwd_flag','dropoff_longitude','dropoff_latitude','payment_type','fare_amount','extra','mta_tax','tip_amount','tolls_amount','improvement_surcharge','total_amount')

In [None]:
print(df_2018_12.count())

Conclusion:
On a donc perdu quelque données apres la jointure.
Apres recherche, on explique cela par le fait que dans le fichier data_2018 nous avons des PULocationID et DOLocationID qui ont pour valeur 264;265.
Ses ID ne sont pas present dans le fichier shape pris depuis le site NYC open data.
Par conséquent ses lignes la sont perdu lors de la jointure.

In [None]:
#Inspection du schema de 2018
df_2018_12.printSchema()

## Eventually correct the schema and read again the data
Nous avons donc a present deux fichier de données de schema identique dont nous allons maintenant changer les type afin de mieux pouvoir exploiter les données par la suite.

In [None]:
def type_transformer(df):
    df = df\
    .withColumn('VendorID', df['VendorID'].cast(IntegerType()))\
    .withColumn('tpep_pickup_datetime', df['tpep_pickup_datetime'].cast(TimestampType()))\
    .withColumn('tpep_dropoff_datetime', df['tpep_dropoff_datetime'].cast(TimestampType()))\
    .withColumn('passenger_count', df['passenger_count'].cast(IntegerType()))\
    .withColumn('trip_distance', df['trip_distance'].cast(DoubleType()))\
    .withColumn('pickup_longitude', df['pickup_longitude'].cast(DoubleType()))\
    .withColumn('pickup_latitude', df['pickup_latitude'].cast(DoubleType()))\
    .withColumn('RateCodeID', df['RateCodeID'].cast(IntegerType()))\
    .withColumn('dropoff_longitude', df['dropoff_longitude'].cast(DoubleType()))\
    .withColumn('dropoff_latitude', df['dropoff_latitude'].cast(DoubleType()))\
    .withColumn('payment_type', df['payment_type'].cast(IntegerType()))\
    .withColumn('fare_amount', df['fare_amount'].cast(DoubleType()))\
    .withColumn('extra', df['extra'].cast(DoubleType()))\
    .withColumn('mta_tax', df['mta_tax'].cast(DoubleType()))\
    .withColumn('tip_amount', df['tip_amount'].cast(DoubleType()))\
    .withColumn('tolls_amount', df['tolls_amount'].cast(DoubleType()))\
    .withColumn('improvement_surcharge', df['improvement_surcharge'].cast(DoubleType()))\
    .withColumn('total_amount', df['total_amount'].cast(DoubleType()))
    return df

def year_transformer(df):
    year_col = year(col('tpep_pickup_datetime'))
    df = df.withColumn('year', year_col)
    return df

In [None]:
transformers = [
    type_transformer,
    year_transformer
]

In [None]:
#transformation des types des données de 2015 et 2018
for transformer in transformers:
    df_2015_12 = transformer(df_2015_12)
    df_2018_12 = transformer(df_2018_12)

df_2015_12.printSchema()
df_2018_12.printSchema()

## Save the data into parquet files

In [None]:
df_records = df_2015_12.union(df_2018_12)

Nos données que nous allons etudier sont donc maintenant dans le dataframes df_records.
Nous allons donc maintenant creer notre fichier parquet avec un partitionby year afin de pouvoir exploiter ensuite les données selon l'année pour facilement.

### Why would we want to modify the number of partitions when creating the parquet files?
On voudrait modifier le nombre de partitions par défaut (la valeur par défaut de spark.sql.shuffle.partitions est 200) pour l'adapter au dataframe : si c'est un "petit" dataframe, 200 c'est trop et si c'est un grand dataframe

In [None]:
df_records.write.partitionBy("year").parquet('parquets/data_2015_2018_12.parquet')

# Investigate (at least) one month of data in 2015

From now on, you will be using **the parquet files you created for 2015**.

We shall visualize several features of taxi traffic during one calendar month
in 2015 and the same calendar month in 2018.

**Hint.** In order to build appealing graphics, you may stick to `matplotlib + seaborn`, you can use also
`plotly`, which is used a lot to build interactive graphics, but you can use whatever you want.

In [None]:
df = spark.read.parquet('parquets/data_2015_2018_12.parquet')
df2015 = spark.read.parquet("parquets/data_2015_2018_12.parquet/year=2015")
df2018 = spark.read.parquet("parquets/data_2015_2018_12.parquet/year=2018")

The following longitudes and lattitudes encompass Newark and JFK airports, Northern Manhattan and Verazzano bridge.

In [None]:
long_min = -74.10
long_max = -73.70
lat_min = 40.58
lat_max = 40.90

1. Using these boundaries, **filter the 2015 data** (using pickup and dropoff longitude and latitude) and count the number of trips for each value of `passenger_count` and make a plot of that.

In [None]:
newdf15 = df2015.filter(df2015.pickup_longitude.between(long_min, long_max) \
                      & df2015.pickup_latitude.between(lat_min, lat_max) \
                      & df2015.dropoff_longitude.between(long_min, long_max) \
                      & df2015.dropoff_latitude.between(lat_min, lat_max))

In [None]:
data = newdf15.groupBy('passenger_count')\
              .agg(fn.count('passenger_count') \
              .alias('count_pc'))\
                .sort(asc('passenger_count'))
data.show()

In [None]:
import matplotlib.pyplot as plt

x_array = [row.passenger_count for row in data.select("passenger_count").collect()]
y_array = [row.count_pc for row in data.select("count_pc").collect()]
plt.bar(x_array, height=y_array)

Trips with $0$ or larger than $7$ passengers are pretty rare.
We suspect these to be outliers. 
We need to explore these trips further in order order to understand what might be wrong
with them

1. What's special with trips with zero passengers?
1. What's special with trips with more than $6$ passengers?
1. What is the largest distance travelled during this month? Is it the first taxi on the moon?
1. Plot the distribution of the `trip_distance` (using an histogram for instance) during year 2105. Focus on trips with non-zero trip distance and trip distance less than 30 miles.

Let's look at what Spark does for these computations

1. Use the `explain` method or have a look at the [Spark UI](http://localhost:4040/SQL/) to analyze the job. You should be able to assess 
    - Parsed Logical Plan
    - Analyzed Logical Plan
    - Optimized Logical Plan
    - Physical Plan
1. Do the Analyzed Logical Plan and Optimized Logical Plan differ? Spot the differences if any. How would a RDBMS proceed with such a query?
1. How does the physical plan differ from the Optimized Logical Plan? What are the keywords you would not expects in a RDBMS? What is their meaning? 
1. Inspect the stages on [Spark UI](http://localhost:4040/stages/stage). How many *stages* are necessary to complete the Spark job? What are the roles of `HashAggregate` and `Exchange hashpartitioning`?
1. Does the physical plan perform `shuffle` operations? If yes how many?
1. What are tasks with respect to stages (in Spark language)? How many tasks are your stages made of?

Now, compute the following and produce relevant plots:

1. Break down the trip distance distribution for each day of week
1. Count the number of distinct pickup location
1. Compute and display tips and profits as a function of the pickup location

# Investigate one month of trips data in 2015 and 2018

 Consider one month of trips data from `yellow` taxis for each year

1. Filter and cache/persist the result

## Assessing seasonalities and looking at time series

Compute and plot the following time series indexed by day of the week and hour of day:

1. The number of pickups
1. The average fare
1. The average trip duration
1. Plot the average number of ongoing trips

## Rides to the airports

In order to find the longitude and lattitude of JFK and Newark airport as well as the longitude and magnitudes 
of Manhattan, you can use a service like [geojson.io](http://geojson.io/).
Plot the following time series, indexed the day of the week and hour of the day

1. Median duration of taxi trip leaving Midtown (Southern Manhattan) headed for JFK Airport
1. Median taxi duration of trip leaving from JFK Airport to Midtown (Southern Manhattan)

## Geographic information

For this, you will need to find tools to display maps and to build choropeth maps.
We let you look and find relevant tools to do this.

1. Build a heatmap where color is a function of
    1. number of `pickups`
    2. number of `dropoffs`
    3. number of `pickups` with dropoff at some airport (JFK, LaGuardia, Newark)
2. Build a choropeth map where color is a function of
    1. number of pickups in the area
    1. ratio of number of payments by card/number of cash payments for pickups in the area
    2. ratio of total fare/trip duration for dropoff in the area
3. Build an interactive chorophet with a slider allowing the user to select an `hour of day` and where the color is a function of
    1. average number of dropoffs in the area during that hour the day
    2. average ratio of tip over total fare amount for pickups in the area at given hour of the day