# New York taxis trips

This homework is about New York taxi trips. Here is something from [Todd Schneider](https://toddwschneider.com/posts/analyzing-1-1-billion-nyc-taxi-and-uber-trips-with-a-vengeance/):

> The New York City Taxi & Limousine Commission has released a  detailed historical dataset covering over 1 billion individual taxi trips in the city from January 2009 through December 2019. 
Taken as a whole, the detailed trip-level data is more than just a vast list of taxi pickup and drop off coordinates: it's a story of a City. 
How bad is the rush hour traffic from Midtown to JFK? 
Where does the Bridge and Tunnel crowd hang out on Saturday nights?
What time do investment bankers get to work? How has Uber changed the landscape for taxis?
The dataset addresses all of these questions and many more.

The NY taxi trips dataset has been plowed by series of distinguished data scientists.
The dataset is available from on Amazon S3 (Amazon's cloud storage service).
The link for each file has the following form:

    https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_{year}-{month}.csv

There is one CSV file for each NY taxi service (`yellow`, `green`, `fhv`) and each calendar month (replacing `{year}` and `{month}` by the desired ones).
Each file is moderately large, a few gigabytes. 
The full dataset is relatively large if it has to be handled on a laptop (several hundred gigabytes).

You will focus on the `yellow` taxi service and a pair of months, from year 2015 and from year 2018. 
Between those two years, for hire vehicles services have taken off and carved a huge marketshare.

Whatever the framework you use, `CSV` files prove hard to handle. 
After downloading the appropriate files (this takes time, but this is routine), a first step will consist in converting the csv files into a more Spark friendly format such as `parquet`.

Saving into one of those formats require decisions about bucketing, partitioning and so on. Such decisions influence performance. It is your call.
Many people have been working on this dataset, to cite but a few:


- [1 billion trips with a vengeance](https://toddwschneider.com/posts/analyzing-1-1-billion-nyc-taxi-and-uber-trips-with-a-vengeance/)
- [1 billion trips with R and SQL ](http://freerangestats.info/blog/2019/12/22/nyc-taxis-sql)
- [1 billion trips with redshift](https://tech.marksblogg.com/billion-nyc-taxi-rides-redshift.html)
- [nyc-taxi](https://github.com/fmaletski/nyc-taxi-map)

Depending on your internet connection, **download the files** corresponding to **"yellow" taxis** for the years 2015 and 2018. Download **at least one month** (the same) for 2015 and 2018, if you can download all of them.

**Hint.** The 12 csv for 2015 are about 23GB in total, but the corresponding parquet file, if you can create it for all 12 months, is only about 3GB.

You **might** need the following stuff in order to work with GPS coordinates and to plot things easily.

In [None]:
!pip install geojson geopandas plotly geopy

In [None]:
!pip install ipyleaflet

For this homework **we will let you decide on the tools to use** (expected for Spark) and to **find out information all by yourself** (but don't hesitate to ask questions on the `slack` channel).

# Loading data as parquet files

We want to organize the data on a per year and per service basis. 
We want to end up with one `parquet` file for each year and each taxi service, since parquet is much better than CSV files.

**Hint.** Depending on your internet connection and your laptop, you can use only the "yellow" service and use one month of 2015 and 2018

CSV files can contain corrupted lines. You may have to work in order to perform ETL (Extract-Transform-Load) in order obtain a properly typed data frame.

You are invited to proceed as follows:

1. Try to read the CSV file without imposing a schema. 
1. Inspect the inferred schema. Do you agree with Spark's typing decision?
1. Eventually correct the schema and read again the data
1. Save the data into parquet files
1. In the rest of your work, **you will only use the parquet files you created**, not the csv files (don't forget to choose a partitioning column and a number of partitions when creating the parquet files).

**Hint.** Don't forget to ask `Spark` to use all the memory and ressources from your computer.

**Hint.** Don't foreget that you should specify a partitioning column and a number of partitions when creating the parquet files.

**Hint.** Note that the schemas of the 2015 and 2018 data are different...

**Hint.** When working on this, ask you and answer to the following questions:

1. What is the `StorageLevel` of the dataframe after reading the csv files?
1. What is the number of partitions of the dataframe? 
1. Is it possible to tune this number at loading time? 
1. Why would we want to modify the number of partitions when creating the parquet files?

In [None]:
!pip install pyshp

In [None]:
!pip install descartes

In [None]:
# import the usual suspects
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import urllib.request
import os
from pathlib import Path
import sys
import timeit
import plotly.graph_objects as go
import plotly.express as px
import zipfile
import shapefile
from shapely.geometry import Polygon, Point
from descartes.patch import PolygonPatch
import geopandas
import shapely.geometry


%matplotlib inline
import seaborn as sns

sns.set_context("notebook", font_scale=1.2)

In [None]:
# spark
from pyspark import SparkConf, SparkContext, StorageLevel
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
import pyspark.sql.functions as fn
from pyspark.sql.catalog import Catalog
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType, LongType, BooleanType

In [None]:
# Start the SparkSession
conf = SparkConf()
conf.set('spark.executor.memory', '16g')
conf.set('spark.driver.memory', '8g')
conf.set("spark.driver.cores","4")
conf.set("spark.num.executors","10")
conf.set("spark.executor.cores","4")

spark = (SparkSession
    .builder
    .config(conf=conf)
    .appName("New York taxis trips")
    .getOrCreate()
)

In [None]:
# set the number of partitions
spark.conf.set("spark.default.parallelism", 150)
spark.conf.set("spark.sql.shuffle.partitions", 150)

In [None]:
spark.conf.set("spark.sql.auto.repartition", "true")

## Try to read the CSV file without imposing a schema.

In [None]:
import requests
from pathlib import Path

# download the data of 2015
path2015 = Path('yellow_tripdata_2015-07.csv')
if not path2015.exists():
    url = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2015-07.csv"
    r = requests.get(url)
    with open(os.path.join('./', 'yellow_tripdata_2015-07.csv'), 'wb') as f:
        f.write(r.content)

In [None]:
# download the data of 2018
path2018 = Path('yellow_tripdata_2018-07.csv')
if not path2018.exists():
    url = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-07.csv"
    r = requests.get(url)
    with open(os.path.join('./', 'yellow_tripdata_2018-07.csv'), 'wb') as f:
        f.write(r.content)

In [None]:
# Load data from a csv file
df_sp2015 = spark.read\
             .format('csv')\
             .option("header", "true")\
             .option("mode", "FAILFAST")\
             .option("inferSchema", "true")\
             .option("sep", ",")\
             .load("yellow_tripdata_2015-07.csv")

df_sp2015.printSchema()

In [None]:
df_sp2018 = spark.read\
             .format('csv')\
             .option("header", "true")\
             .option("mode", "FAILFAST")\
             .option("inferSchema", "true")\
             .option("sep", ",")\
             .load("yellow_tripdata_2018-07.csv")

df_sp2018.printSchema()

## Inspect the inferred schema. Do you agree with Spark's typing decision?

Non, les colonnes "tpep_pickup_datetime" et "tpep_dropoff_datetime" doivent être de type date et pas de type string.

## Eventually correct the schema and read again the data

In [None]:
df_sp2015 = df_sp2015\
            .withColumn("tpep_pickup_datetime",(fn.to_timestamp(col("tpep_pickup_datetime"))))\
            .withColumn("tpep_dropoff_datetime",(fn.to_timestamp(col("tpep_dropoff_datetime"))))

df_sp2015.printSchema()

In [None]:
df_sp2018 = df_sp2018\
            .withColumn("tpep_pickup_datetime",(fn.to_timestamp(col("tpep_pickup_datetime"))))\
            .withColumn("tpep_dropoff_datetime",(fn.to_timestamp(col("tpep_dropoff_datetime"))))

df_sp2018.printSchema()

## Save the data into parquet files

In [None]:
df_sp2015.write.mode('overwrite').partitionBy('payment_type').parquet("yellow_tripdata_2015-07.parquet")
df_sp2015.rdd.getNumPartitions()

In [None]:
df_sp2018.write.mode('overwrite').partitionBy('payment_type').parquet("yellow_tripdata_2018-07.parquet")
df_sp2018.rdd.getNumPartitions()

In [None]:
df2015 = spark.read.parquet("yellow_tripdata_2015-07.parquet")

In [None]:
df2018 = spark.read.parquet("yellow_tripdata_2018-07.parquet")

In [None]:
# Cleaning the data
def cleanData(df, year, month):
    df = df\
        .withColumn('year', fn.year('tpep_pickup_datetime'))\
        .withColumn('month', fn.month('tpep_pickup_datetime'))\
        .where(col('year')==year)\
        .where(col('month')==month)\
        .where(col('tpep_pickup_datetime')<=col('tpep_dropoff_datetime'))
    return df

df2018 = cleanData(df2018, '2018','7')
df2015 = cleanData(df2015, '2015','7')
df2018.cache()
df2015.cache()

In [None]:
df2015.count()

In [None]:
df2018.count()

Le schema est différente entre 2015 et 2018, nous ajoutons de manière préemptive certaines colonnes nécessaires telles que location_id, zone, etc. à l'ensemble de données de 2015 afin de traiter les données ultérieurement. En raison de l'énorme quantité de données et de la longue durée d'exécution, nous avons placé le code de prétraitement dans le fichier 'data_op,ipynb', qui générera un nouveau fichier csv, ici nous nous référons directement au fichier parquet converti à partir du fichier csv


In [None]:
df2015_add = spark.read.parquet("yellow_tripdata_2015-07_add.parquet")
df2015_add = cleanData(df2015_add, '2015','7')
df2015_add.cache()

In [None]:
def _map_to_pandas(rdds):
    """ Needs to be here due to pickling issues """
    return [pd.DataFrame(list(rdds))]

def toPandas(df, n_partitions=None):
    """
    Returns the contents of 'df' as a local 'pandas.DataFrame' in a speedy fashion. The DataFrame is
    repartitioned if 'n_partitions' is passed.
    :param df:              pyspark.sql.DataFrame
    :param n_partitions:    int or None
    :return:                pandas.DataFrame
    """
    if n_partitions is not None: df = df.repartition(n_partitions)
    df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
    df_pand = pd.concat(df_pand)
    df_pand.columns = df.columns
    return df_pand

# Investigate (at least) one month of data in 2015

From now on, you will be using **the parquet files you created for 2015**.

We shall visualize several features of taxi traffic during one calendar month
in 2015 and the same calendar month in 2018.

**Hint.** In order to build appealing graphics, you may stick to `matplotlib + seaborn`, you can use also
`plotly`, which is used a lot to build interactive graphics, but you can use whatever you want.

The following longitudes and lattitudes encompass Newark and JFK airports, Northern Manhattan and Verazzano bridge.

In [None]:
long_min = -74.10
long_max = -73.70
lat_min = 40.58
lat_max = 40.90

1. Using these boundaries, **filter the 2015 data** (using pickup and dropoff longitude and latitude) and count the number of trips for each value of `passenger_count` and make a plot of that.

In [None]:
df2015_f = df2015.filter((col("pickup_longitude").between(long_min, long_max)) & (col("pickup_latitude").between(lat_min, lat_max)) & (col("dropoff_longitude").between(long_min, long_max)) & (col("dropoff_latitude").between(lat_min, lat_max)))
df2018_f = df2018.filter((col("pickup_longitude").between(long_min, long_max)) & (col("pickup_latitude").between(lat_min, lat_max)) & (col("dropoff_longitude").between(long_min, long_max)) & (col("dropoff_latitude").between(lat_min, lat_max)))
df2015_add_f = df2015_add.filter((col("pickup_longitude").between(long_min, long_max)) & (col("pickup_latitude").between(lat_min, lat_max)) & (col("dropoff_longitude").between(long_min, long_max)) & (col("dropoff_latitude").between(lat_min, lat_max)))

num_trips = df2015_f\
    .groupBy('passenger_count')\
    .count()\
    .orderBy("passenger_count")
    
num_trips.show()
num_trips = num_trips.toPandas()

In [None]:
import plotly.express as px

fig = px.bar(num_trips, 
             x="passenger_count", 
             y="count", 
             title="number of trips for each value of passenger_count for July 2015")
fig.show()

Trips with $0$ or larger than $7$ passengers are pretty rare.
We suspect these to be outliers. 
We need to explore these trips further in order order to understand what might be wrong
with them

1. What's special with trips with zero passengers?
1. What's special with trips with more than $6$ passengers?
1. What is the largest distance travelled during this month? Is it the first taxi on the moon?
1. Plot the distribution of the `trip_distance` (using an histogram for instance) during year 2105. Focus on trips with non-zero trip distance and trip distance less than 30 miles.

In [None]:
from pyspark.sql.functions import dayofweek

df2015_f = df2015_f.withColumn('dayofweek',dayofweek(df2015_f['tpep_pickup_datetime']))
df2015_f.show()

In [None]:
#What's special with trips with zero passengers?
zero_passengers = df2015_f.filter(col("passenger_count") == 0 ).select('*').toPandas()
zero_passengers.describe()

In [None]:
zero_passengers.head()

In [None]:
#What's special with trips with more than  6  passengers?
sixplus_passengers = df2015_f.filter(col("passenger_count") > 6 ).select('*').toPandas()
sixplus_passengers.describe()

In [None]:
sixplus_passengers.head()

In [None]:
#What is the largest distance travelled during this month? Is it the first taxi on the moon?
#La distance maximale est de 1.0083357E7, ce qui s'est produit le 2015-07-31 10:35:58 et n'est clairement pas le premier TAXI du mois.

from pyspark.sql.functions import max

df_distance = df2015_f.select('trip_distance','tpep_pickup_datetime').orderBy(df2015_f['trip_distance'].desc())
df_distance.show()

In [None]:
trip_distance = df2015_f\
                .where('trip_distance > 0 and trip_distance < 30')\
                .select('trip_distance','tpep_pickup_datetime',
                        'pickup_longitude','pickup_latitude',
                        'dayofweek','tip_amount','total_amount')

In [None]:
trip_distance.count()

In [None]:
trip_distance.show()

In [None]:
from pyspark.sql.functions import ceil

trip_distance_f = trip_distance.withColumn('trip_distance_int',ceil(trip_distance['trip_distance']))
trip_distance_f.show()

In [None]:
distance_histogram = trip_distance_f.groupBy('trip_distance_int').count()
distance_histogram.show()

In [None]:
distance_histogram = distance_histogram.orderBy(distance_histogram['trip_distance_int']).select('*').toPandas()

fig = px.bar(distance_histogram, 
             x="trip_distance_int", 
             y="count", 
             title="the distribution of the trip_distance")
fig.show()

Let's look at what Spark does for these computations

1. Use the `explain` method or have a look at the [Spark UI](http://localhost:4040/SQL/) to analyze the job. You should be able to assess 
    - Parsed Logical Plan
    - Analyzed Logical Plan
    - Optimized Logical Plan
    - Physical Plan
1. Do the Analyzed Logical Plan and Optimized Logical Plan differ? Spot the differences if any. How would a RDBMS proceed with such a query?

The Analyzed Logical Plan and Optimized Logical Plan are two different phases of the Spark execution plan.The Optimised Logical Plan follows the Analyzed Logical Plan stage and further optimises the output plan from the Analyzed Logical Plan stage.
The Analyzed Logical Plan phase loads the external RDD, then serialises the fields, lists the names of the mapped tables, confirms that the tables exist, and then filters by condition to get the result set.
The Optimized Logical Plan stage optimises the filtering criteria output from the Analyzed Logical Plan stage to determine if it is null (neither hive nor relational databases have this), the difference between this and RDD is that rdd loads all the data in, whereas sparksql stops directly if it encounters a null value.


1. How does the physical plan differ from the Optimized Logical Plan? What are the keywords you would not expects in a RDBMS? What is their meaning?

the physical plan belongs to the physical execution plan phase, which converts the upstream logical execution plan into a physical execution plan.


1. Inspect the stages on [Spark UI](http://localhost:4040/stages/stage). How many *stages* are necessary to complete the Spark job? What are the roles of `HashAggregate` and `Exchange hashpartitioning`?

With Hash aggregation, the database calculates the hash value based on the value following the group by field and maintains the corresponding list in memory based on the aggregation function used earlier. If there are two aggregate functions after select, then two corresponding data are maintained in memory. Similarly, having n aggregation functions will maintain n of the same arrays.

Exchange hashpartitioning

1. Does the physical plan perform `shuffle` operations? If yes how many?

Any operator with reduce requires a shuffle operation.

1. What are tasks with respect to stages (in Spark language)? How many tasks are your stages made of?

Now, compute the following and produce relevant plots:

1. Break down the trip distance distribution for each day of week
1. Count the number of distinct pickup location
1. Compute and display tips and profits as a function of the pickup location

In [None]:
#1. Break down the trip distance distribution for each day of week
trip_distance_weekofday = trip_distance_f.groupby('dayofweek','trip_distance_int').count()
trip_distance_weekofday.show()

In [None]:
trip_distance = trip_distance_weekofday.orderBy(trip_distance_weekofday['trip_distance_int'],
                                                trip_distance_weekofday['dayofweek']).select('*').toPandas()

In [None]:
fig = px.histogram(trip_distance, 
                   x="trip_distance_int", y="count", 
                   color="dayofweek",nbins=30,
                   title="the trip distance distribution of day of week")
fig.show()

In [None]:
#2.Count the number of distinct pickup location
trip_distance_f.select('pickup_longitude','pickup_latitude').distinct().count()

In [None]:
#3.Compute and display tips and profits as a function of the pickup location

token = 'pk.eyJ1Ijoid2VuaHVhbjA0MjEiLCJhIjoiY2wwZW54cmlwMGl0ZTNrazlobmx6eWl4ZSJ9.Fs0GrFewZMDTY7qAvaYmhA'

In [None]:
trip_distance = df2015_f.where('trip_distance > 0 and trip_distance < 30').select('trip_distance','tpep_pickup_datetime','pickup_longitude','pickup_latitude','dayofweek','tip_amount','total_amount')

pickup_df = df2015_add_f.where('pickup_longitude','pickup_latitude','tip_amount','mta_tax','tolls_amount','total_amount','pickup_zone')
pickup_df = pickup_df.sample(False,0.0001)
pickup_df.withColumn("profits", col = pickup_df("total_amount")-pickup_df("mta_tax")-pickup_df("tolls_amount"))
pickup_df.toPandas()
pickup_df.head()


In [None]:
fig = px.scatter_mapbox(pickup_df,
                       lon = 'pickup_longitude',
                       lat = 'pickup_latitude',
                       size = 'tip_amount',
                       title = "The function of tips and pickup location",
                       hover_data = ['pickup_zone'],
                       size_max = 30,
                       color_continuous_scale = px.colors.carto.Temps)

fig.update_layout( mapbox = {'accesstoken':token,'center':{'lon':-73.965691,'lat':40.97},'zoom':5},margin = {'l':1,'r':1,'t':1,'b':1})
fig.show()

In [None]:
fig = px.scatter_mapbox(pickup_df,
                       lon = 'pickup_longitude',
                       lat = 'pickup_latitude',
                       size = 'profits',
                       title = "The profit of tips and pickup location",
                       hover_data = ['pickup_zone'],
                       size_max = 30,
                       color_continuous_scale = px.colors.carto.Temps)

fig.update_layout( mapbox = {'accesstoken':token,'center':{'lon':-73.965691,'lat':40.97},'zoom':5},margin = {'l':1,'r':1,'t':1,'b':1})
fig.show()

# Investigate one month of trips data in 2015 and 2018

 Consider one month of trips data from `yellow` taxis for each year

1. Filter and cache/persist the result

## Assessing seasonalities and looking at time series

Compute and plot the following time series indexed by day of the week and hour of day:

    1. The number of pickups
    2. The average fare
    3. The average trip duration
    4. Plot the average of ongoing trips

In [None]:
# data pour 4.1
time_series2015 = df2015.select('tpep_pickup_datetime', 'tpep_dropoff_datetime', 'fare_amount').cache()
time_series2018 = df2018.select('tpep_pickup_datetime', 'tpep_dropoff_datetime', 'fare_amount').cache()

In [None]:
time_series2015.count()
time_series2015.show(1)

In [None]:
time_series2018.count()
time_series2018.show(1)

In [None]:
# ajouter des colonnes pour 4.1
def set_date(df):
    df = df\
        .withColumn('weekofyear', fn.weekofyear('tpep_pickup_datetime'))\
        .withColumn('dayofweek', fn.date_format(col('tpep_pickup_datetime'), "EEEE"))\
        .withColumn('date', fn.date_format(col('tpep_pickup_datetime'), "dd"))\
        .withColumn('hour', fn.hour('tpep_pickup_datetime'))\
        .withColumn('tripDurationInMinutes', fn.round((col('tpep_dropoff_datetime').cast('long') - col('tpep_pickup_datetime').cast('long'))/60, 2))
    return df

def addDuration(df):
    df = df.withColumn('tripDurationInMinutes', fn.round((col('tpep_dropoff_datetime').cast('long') - col('tpep_pickup_datetime').cast('long'))/60, 2))
    return df
    
# The functions pour 4.1
func_num_pickups = fn.count(col('tpep_pickup_datetime'))
func_average_fare = fn.round(fn.avg(col('fare_amount')), 2)
func_average_duration = fn.round(fn.avg(col('tripDurationInMinutes')), 2)

# The function of time series
def time_series(df, typeTimeSerie, function):
    if (typeTimeSerie == 'day_of_week'):
        argGroupBy1 = 'weekofyear'
        argGroupBy2 = 'dayofweek'
    elif (typeTimeSerie == 'hour_of_day'):
        argGroupBy1 = 'date'
        argGroupBy2 = 'hour'
    else:
        print("type time serie error")
        return
    df_ts = df\
            .groupBy(argGroupBy1, argGroupBy2)\
            .agg(function.alias('num_time_series'))\
            .orderBy(argGroupBy1, argGroupBy2)
    return df_ts

In [None]:
time_series2015 = set_date(time_series2015).cache()
time_series2015.count()
time_series2015.show(1)

In [None]:
time_series2018 = set_date(time_series2018).cache()
time_series2018.count()
time_series2018.show(1)

In [None]:
num_pickups_dw_2015 = time_series(time_series2015, 'day_of_week', func_num_pickups).cache()
num_pickups_hd_2015 = time_series(time_series2015, 'hour_of_day', func_num_pickups).cache()
avg_fare_dw_2015 = time_series(time_series2015, 'day_of_week', func_average_fare).cache()
avg_fare_hd_2015 = time_series(time_series2015, 'hour_of_day', func_average_fare).cache()
avg_trip_dur_dw_2015 = time_series(time_series2015, 'day_of_week', func_average_duration).cache()
avg_trip_dur_hd_2015 = time_series(time_series2015, 'hour_of_day', func_average_duration).cache()

num_pickups_dw_2018 = time_series(time_series2018, 'day_of_week', func_num_pickups).cache()
num_pickups_hd_2018 = time_series(time_series2018, 'hour_of_day', func_num_pickups).cache()
avg_fare_dw_2018 = time_series(time_series2018, 'day_of_week', func_average_fare).cache()
avg_fare_hd_2018 = time_series(time_series2018, 'hour_of_day', func_average_fare).cache()
avg_trip_dur_dw_2018 = time_series(time_series2018, 'day_of_week', func_average_duration).cache()
avg_trip_dur_hd_2018 = time_series(time_series2018, 'hour_of_day', func_average_duration).cache()

In [None]:
# plot
def new_update_layout(fig,my_title,x,y,my_legend_title=None) :
    fig.update_layout(
        title={'text' : my_title,
               'x':0.5,
               'xanchor': 'center'},
        legend_title = my_legend_title,
        xaxis_title = x,
        yaxis_title = y          
    )
    return

def plot_time_series(df, typeTimeSerie, title, xaxis_title, yaxis_title):
    if (typeTimeSerie == 'day_of_week'):
        xaxis = 'weekofyear'
        group = 'dayofweek'
    elif (typeTimeSerie == 'hour_of_day'):
        xaxis = 'date'
        group = 'hour'
    else:
        print("type time serie error")
        return
    yaxis = 'num_time_series'
    plot_pd = toPandas(df)
    fig = px.line(plot_pd, 
                  x=xaxis, 
                  y=yaxis, 
                  color=group, 
                  line_group=group, 
                  hover_name=group)
    new_update_layout(fig, title, x=xaxis_title, y=yaxis_title)
    fig.show()

### The number of pickups

In [None]:
plot_time_series(num_pickups_dw_2015, 'day_of_week',
                 "The number of pickups in 2015 by day of week",
                 "the number of week in 2015", "the number of pickups"
                )

plot_time_series(num_pickups_hd_2015, 'hour_of_day',
                 "The number of pickups in 2015 by hour of day",
                 "date", "the number of pickups"
                )

plot_time_series(num_pickups_dw_2018, 'day_of_week',
                 "The number of pickups in 2018 by day of week",
                 "the number of week in 2018", "the number of pickups"
                )

plot_time_series(num_pickups_hd_2018, 'hour_of_day',
                 "The number of pickups in 2018 by hour of day",
                 "date", "the number of pickups"
                )

### The average fare

In [None]:
plot_time_series(avg_fare_dw_2015, 'day_of_week',
                 "The average fare in 2015 by day of week",
                 "the number of week in 2015", "the average fare"
                )

plot_time_series(avg_fare_hd_2015, 'hour_of_day',
                 "The average fare in 2015 by hour of day",
                 "date", "the average fare"
                )

plot_time_series(avg_fare_dw_2018, 'day_of_week',
                 "The average fare in 2018 by day of week",
                 "the number of week in 2018", "the average fare"
                )

plot_time_series(avg_fare_hd_2018, 'hour_of_day',
                 "The average fare in 2018 by hour of day",
                 "date", "the average fare"
                )

### The average trip duration

In [None]:
plot_time_series(avg_trip_dur_dw_2015, 'day_of_week',
                 "The average trip duration in 2015 by day of week",
                 "the number of week in 2015", "the average trip duration"
                )

plot_time_series(avg_trip_dur_hd_2015, 'hour_of_day',
                 "The average trip duration in 2015 by hour of day",
                 "date", "the average trip duration"
                )

plot_time_series(avg_trip_dur_dw_2018, 'day_of_week',
                 "The average trip duration in 2018 by day of week",
                 "the number of week in 2018", "the average trip duration"
                )

plot_time_series(avg_trip_dur_hd_2018, 'hour_of_day',
                 "The average trip duration in 2018 by hour of day",
                 "date", "the average trip duration"
                )

##  Rides to the airports
In order to find the longitude and lattitude of JFK and Newark airport as well as the longitude and magnitudes of Manhattan, you can use a service like geojson.io. Plot the following time series, indexed the day of the week and hour of the day

    1  Median duration of taxi trip leaving Midtown (Southern Manhattan) headed for JFK Airport
    2  Median taxi duration of trip leaving from JFK Airport to Midtown (Southern Manhattan)

In [None]:
!pip install shapely

In [None]:
# Coordonnées des zones spécifiques
# Southern Manhattan
South_Manhattan = {"type": "Polygon",
                   "coordinates": [[
                                [-73.99755477905273,40.77313187935118],
                                [-74.01420593261717,40.751418432997454],
                                [-74.02210235595702,40.70549780669077],
                                [-74.01540756225586,40.69847032728747],
                                [-73.97592544555664,40.70992213555912],
                                [-73.96905899047852,40.73177921058233],
                                [-73.9712905883789,40.74127439314326],
                                [-73.95824432373047,40.758700379161006],
                                [-73.99703979492188,40.774041868909734],
                                [-73.99755477905273,40.77313187935118]]]
                  }
South_Manhattan_id = [12,88,87,261,13,209,231,45,232,148,144,211,125,114,158,249,113,79,4,224,107,234,90,68,246,186,164,170,137,233,229,162,161,230,48,50,163,100]

# JFK Airport
JFK = {"type": "Polygon",
       "coordinates": [[
                       [-73.78778457641602,40.666251560504264],
                       [-73.8226318359375,40.66475414828327],
                       [-73.82649421691895,40.6546620153016],
                       [-73.8226318359375,40.64469860601899],
                       [-73.78469467163085,40.61864344909241],
                       [-73.77053260803221,40.61949040153005],
                       [-73.74701499938965,40.634994248282894],
                       [-73.74881744384766,40.64632671574881],
                       [-73.78778457641602,40.666251560504264]]]
      }
JFK_id = [132]

# LaGuardia Airport
LaGuardia = {"type": "Polygon",
             "coordinates": [[
                                [-73.88777732849121,40.76656658538413],
                                [-73.8758897781372,40.77144186567577],
                                [-73.86966705322266,40.7709543537425],
                                [-73.86297225952148,40.76630656038832],
                                [-73.85541915893555,40.76737915693862],
                                [-73.85404586791992,40.77313187935118],
                                [-73.87056827545166,40.787040358887566],
                                [-73.89009475708008,40.77842914365316],
                                [-73.88777732849121,40.76656658538413]]]
                 }
LaGuardia_id = [138]

# Newark Airport
Newark = {"type": "Polygon",
          "coordinates": [[
                          [-74.15797233581543,40.708295578231315],
                          [-74.18097496032715,40.70823051511181],
                          [-74.19479370117188,40.684738575525],
                          [-74.19118881225586,40.67523532779746],
                          [-74.17745590209961,40.66911608150882],
                          [-74.15119171142578,40.70634365699408],
                          [-74.15797233581543,40.708295578231315]]]
         }

Newark_id = [1]

In [None]:
# data for 4.2
airports2015 = df2015.select('tpep_pickup_datetime', 'tpep_dropoff_datetime','pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude').cache()
airports2018 = df2018.select('tpep_pickup_datetime', 'tpep_dropoff_datetime','PULocationID', 'DOLocationID').cache()
airports2015.count()
airports2018.count()

In [None]:
# vérifier si un point est dans un polygone
import shapely.geometry
def in_area_Midtown(longitude, latitude):
    point = shapely.geometry.Point(longitude, latitude)
    poly_shape = shapely.geometry.asShape(South_Manhattan)
    return poly_shape.intersects(point)

def in_area_JFK(longitude, latitude):
    point = shapely.geometry.Point(longitude, latitude)
    poly_shape = shapely.geometry.asShape(JFK)
    return poly_shape.intersects(point)

def in_area_airport(longitude, latitude):
    point = shapely.geometry.Point(longitude, latitude)
    poly_shape_JFK = shapely.geometry.asShape(JFK)
    poly_shape_LaGuardia = shapely.geometry.asShape(LaGuardia)
    poly_shape_Newark = shapely.geometry.asShape(Newark)
    return (poly_shape_JFK.intersects(point) | poly_shape_LaGuardia.intersects(point) | poly_shape_Newark.intersects(point))

udf_in_area_Midtown = udf(lambda x,y : in_area_Midtown(x,y), BooleanType())
udf_in_area_JFK = udf(lambda x,y : in_area_JFK(x,y), BooleanType())
udf_in_area_airport = udf(lambda x,y : in_area_airport(x,y), BooleanType())



# ajouter des colonnes si la location est dans un domaine donné
def add_area_2015(df):
    df = df\
        .withColumn('pickup_Midtown', udf_in_area_Midtown(col('pickup_longitude'), col('pickup_latitude')))\
        .withColumn('dropoff_JFK', udf_in_area_JFK(col('dropoff_longitude'), col('dropoff_latitude')))\
        .withColumn('pickup_JFK', udf_in_area_JFK(col('pickup_longitude'), col('pickup_latitude')))\
        .withColumn('dropoff_Midtown', udf_in_area_Midtown(col('dropoff_longitude'), col('dropoff_latitude')))
    return df

def midtown_to_JFK2015(df):
    df = df\
        .withColumn('pickup_Midtown', udf_in_area_Midtown(col('pickup_longitude'), col('pickup_latitude')))\
        .withColumn('dropoff_JFK', udf_in_area_JFK(col('dropoff_longitude'), col('dropoff_latitude')))
    return df

def JFK_to_midtown2015(df):
    df = df\
        .withColumn('pickup_JFK', udf_in_area_JFK(col('pickup_longitude'), col('pickup_latitude')))\
        .withColumn('dropoff_Midtown', udf_in_area_Midtown(col('dropoff_longitude'), col('dropoff_latitude')))
    return df

def in_id_Midtown(ID):
    return ID in South_Manhattan_id

def in_id_JFK(ID):
    return ID in JFK_id
    
udf_in_id_Midtown = udf(lambda x : in_id_Midtown(x), BooleanType())
udf_in_id_JFK = udf(lambda x : in_id_JFK(x), BooleanType())

def add_area_2018(df):
    df = df\
        .withColumn('pickup_Midtown',udf_in_id_Midtown(col('PULocationID')))\
        .withColumn('dropoff_JFK',udf_in_id_JFK(col('DOLocationID')))\
        .withColumn('pickup_JFK',udf_in_id_JFK(col('PULocationID')))\
        .withColumn('dropoff_Midtown',udf_in_id_Midtown(col('DOLocationID')))
    return df

def midtown_to_JFK2018(df):
    df = df\
        .withColumn('pickup_Midtown', udf_in_id_Midtown(col('PULocationID')))\
        .withColumn('dropoff_JFK', udf_in_id_JFK(col('DOLocationID')))
    return df

def JFK_to_midtown2018(df):
    df = df\
        .withColumn('pickup_JFK', udf_in_id_JFK(col('PULocationID')))\
        .withColumn('dropoff_Midtown', udf_in_id_Midtown(col('DOLocationID')))
    return df


In [None]:
airports_dur2015 = set_date(airports2015).cache()
airports_dur2018 = set_date(airports2018).cache()
airports_dur2015.count()
airports_dur2018.count()
airports_dur2015.show(1)

In [None]:
airports_dur2018.show(1)

In [None]:
area_airports_dur2018 = add_area_2018(airports_dur2018)
area_airports_dur2018.cache()
area_airports_dur2018.count()
area_airports_dur2018.show(1)

In [None]:
# trop lent !!!!!
area_airports_dur2015 = airports_dur2015.repartition(30)
area_airports_dur2015 = add_area_2015(area_airports_dur2015)

In [None]:
def MedianDuration(df, typeTimeSerie, depart, arrival, orderby):
    if (typeTimeSerie == 'day_of_week'):
        arg1 = 'weekofyear'
        arg2 = 'dayofweek'
    elif (typeTimeSerie == 'hour_of_day'):
        arg1 = 'date'
        arg2 = 'hour'
    else:
        print("type time serie error")
        return
    window = Window.partitionBy(arg1, arg2)
    med = fn.expr('percentile_approx(tripDurationInMinutes, 0.5)').over(window)
    df = df.where(col(depart)).where(col(arrival))
    df = df.withColumn('num_time_series', med)
    df = df.select(arg1, arg2, 'num_time_series')
    df = df.dropDuplicates([arg1, arg2])
    df = df.orderBy(orderby)
    return df

In [None]:
med_JFK_to_Mid_dw_2018 = MedianDuration(area_airports_dur2018, 'day_of_week', 'pickup_JFK', 'dropoff_Midtown', 'weekofyear')
med_JFK_to_Mid_dw_2018.cache()
med_JFK_to_Mid_dw_2018.count()
med_JFK_to_Mid_dw_2018.show(1)

In [None]:
med_JFK_to_Mid_hd_2018 = MedianDuration(area_airports_dur2018, 'hour_of_day', 'pickup_JFK', 'dropoff_Midtown', 'date')
med_JFK_to_Mid_hd_2018.cache()
med_JFK_to_Mid_hd_2018.count()
med_JFK_to_Mid_hd_2018.show(1)

In [None]:
med_mid_to_JFK_dw_2018 = MedianDuration(area_airports_dur2018, 'day_of_week', 'pickup_Midtown', 'dropoff_JFK', 'weekofyear')
med_mid_to_JFK_dw_2018.cache()
med_mid_to_JFK_dw_2018.count()
med_mid_to_JFK_dw_2018.show(1)

In [None]:
med_mid_to_JFK_hd_2018 = MedianDuration(area_airports_dur2018, 'hour_of_day', 'pickup_Midtown', 'dropoff_JFK', 'date')
med_mid_to_JFK_hd_2018.cache()
med_mid_to_JFK_hd_2018.count()
med_mid_to_JFK_hd_2018.show(1)

In [None]:
# trop lent !!!!
med_JFK_to_mid_dw_2015 = MedianDuration(area_airports_dur2015, 'day_of_week', 'pickup_JFK', 'dropoff_Midtown')
med_JFK_to_mid_dw_2015.show(10)

### Median duration of taxi trip leaving Midtown (Southern Manhattan) headed for JFK Airport

In [None]:
plot_time_series(med_mid_to_JFK_dw_2018, 
                 'day_of_week',
                 "Median duration of taxi trip leaving Midtown headed for JFK Airport in 2018 by day of week",
                 "week of year", 
                 "the median trip duration"
                )

plot_time_series(med_mid_to_JFK_hd_2018, 
                 'hour_of_day',
                 "Median duration of taxi trip leaving Midtown headed for JFK Airport in 2018 by hour of day",
                 "date", 
                 "the median trip duration"
                )

### Median taxi duration of trip leaving from JFK Airport to Midtown (Southern Manhattan)

In [None]:
plot_time_series(med_JFK_to_Mid_dw_2018, 
                 'day_of_week',
                 "Median duration of taxi trip leaving from JFK Airport to Midtown in 2018 by day of week",
                 "week of year", 
                 "the median trip duration"
                )

plot_time_series(med_JFK_to_Mid_hd_2018, 
                 'hour_of_day',
                 "Median duration of taxi trip leaving from JFK Airport to Midtown in 2018 by hour of day",
                 "date", 
                 "the median trip duration"
                )

## Geographic information

For this, you will need to find tools to display maps and to build choropeth maps.
We let you look and find relevant tools to do this.

### Build a heatmap where color is a function of
    1. number of `pickups`
    2. number of `dropoffs`
    3. number of `pickups` with dropoff at some airport (JFK, LaGuardia, Newark)

In [None]:
! pip install datashader

In [None]:
# data for 4.3.1
pickup_dropoff2015 = df2015.select('pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude')
pickup_dropoff2015 = pickup_dropoff2015.cache()
pickup_dropoff2015.count()
pd_pickup_dropoff2015 = toPandas(pickup_dropoff2015, 100)

In [None]:
pickup_dropoff2018 = df2018.select('PULocationID','DOLocationID')
pickup_dropoff2018.cache()
pickup_dropoff2018.count()

In [None]:
dropoff2018 = df2018.select('DOLocationID').groupBy('DOLocationID').agg(fn.count('DOLocationID').alias('numdropoff'))
dropoff2018.cache()
dropoff2018.count()
pd_dropoff2018 = toPandas(dropoff2018)
pickup2018 = df2018.select('PULocationID').groupBy('PULocationID').agg(fn.count('PULocationID').alias('numpickup'))
pickup2018.cache()
pickup2018.count()
pd_pickup2018 = toPandas(pickup2018)

In [None]:
# data of the NYC taxi zone
urllib.request.urlretrieve("https://data.cityofnewyork.us/api/geospatial/d3c5-ddgc?method=export&format=Shapefile", "NYC Taxi Zones.zip")
with zipfile.ZipFile("NYC Taxi Zones.zip","r") as zip_ref:
    zip_ref.extractall("./shape")
shp_path = os.path.join('shape/geo_export_6b025b64-5fe2-42d7-8e38-5a8d46c76b0d.shp')
gdf_zones = geopandas.GeoDataFrame.from_file(shp_path)
gdf_zones.to_crs(epsg=4326,inplace=True)
gdf_zones['point'] = gdf_zones.representative_point()
gdf_zones['location_i'] = gdf_zones["location_i"].astype("int")
gdf_zones["lon"] = gdf_zones['point'].x
gdf_zones["lat"] = gdf_zones['point'].y
loc_zones = gdf_zones[['location_i','lon','lat']]

In [None]:
pd_pickup2018 = pd.merge(pd_pickup2018, loc_zones, left_on='PULocationID', right_on='location_i')

In [None]:
pd_pickup2018.head(1)

In [None]:
pd_dropoff2018 = pd.merge(pd_dropoff2018, loc_zones, left_on='DOLocationID', right_on='location_i')

In [None]:
pd_dropoff2018.head(1)

In [None]:
import datashader as ds
from colorcet import fire
import datashader.transfer_functions as tf

def plot_heatmap(df, up_off):
    if (up_off == 'pickup'):
        Lat = 'pickup_latitude'
        Lon = 'pickup_longitude'
        dff = df\
            .query('pickup_latitude < 40.90')\
            .query('pickup_latitude > 40.58')\
            .query('pickup_longitude > -74.10')\
            .query('pickup_longitude < -73.70')
    if (up_off == 'dropoff'):
        Lat = 'dropoff_latitude'
        Lon = 'dropoff_longitude'
        dff = df\
            .query('dropoff_latitude < 40.90')\
            .query('dropoff_latitude > 40.58')\
            .query('dropoff_longitude > -74.10')\
            .query('dropoff_longitude < -73.70')

    cvs = ds.Canvas(plot_width=1000, plot_height=1000)
    agg = cvs.points(dff, x=Lon, y=Lat)

    coords_lat, coords_lon = agg.coords[Lat].values, agg.coords[Lon].values

    coordinates = [[coords_lon[0], coords_lat[0]],
                   [coords_lon[-1], coords_lat[0]],
                   [coords_lon[-1], coords_lat[-1]],
                   [coords_lon[0], coords_lat[-1]]]

    img = tf.shade(agg, cmap=fire)[::-1].to_pil()

    fig = px.scatter_mapbox(dff[:1], lat=Lat, lon=Lon, zoom=10)
    fig.update_layout(mapbox_style="carto-darkmatter",
                     mapbox_layers = [
                    {
                        "sourcetype": "image",
                        "source": img,
                        "coordinates": coordinates
                    }]
    )
    fig.show()

In [None]:
def plot_heatmap18(df, num, title):    
    fig = px.density_mapbox(
        df, lat='lat', lon='lon', z=num, radius=10,
        center=dict(lat=40.74, lon=-73.96), zoom=10,
        color_continuous_scale="Viridis",
        mapbox_style="carto-positron",
        title = title,   
    )
    fig.show()

#### number of pickups

In [None]:
plot_heatmap(pd_pickup_dropoff2015, 'pickup')

In [None]:
plot_heatmap18(pd_pickup2018, 'numpickup', 'Number of pickups in July 2018')

#### number of dropoffs

In [None]:
plot_heatmap(pd_pickup_dropoff2015, 'dropoff')

In [None]:
plot_heatmap18(pd_dropoff2018, 'numdropoff', 'Number of dropoff in July 2018')

#### number of pickups with dropoff at some airport (JFK, LaGuardia, Newark)

In [None]:
to_airport2018 = pickup_dropoff2018.where("DOLocationID == 1 or DOLocationID == 132 or DOLocationID == 138")
numPickup_to_airport2018 = to_airport2018.groupBy('PULocationID').agg(fn.count('PULocationID').alias('numPickups'))
numPickup_to_airport2018.cache()
numPickup_to_airport2018.count()
pd_Pickup_to_airport2018 = toPandas(numPickup_to_airport2018)

In [None]:
pd_Pickup_to_airport2018 = pd.merge(pd_Pickup_to_airport2018, loc_zones, 
                                    left_on='PULocationID', right_on='location_i')

In [None]:
pd_Pickup_to_airport2018.head(1)

In [None]:
plot_heatmap18(pd_Pickup_to_airport2018, 'numPickups', 
               'Number of pickups with dropoff at some airport in July 2018')

In [None]:
# trop lent !!!!
pickup_dropoffAirport2015 = pickup_dropoff2015.repartition(100)
pickup_dropoffAirport2015 = pickup_dropoffAirport2015.withColumn('dropoff_in_airport', udf_in_area_airport(col('dropoff_longitude'), col('dropoff_latitude')))
pickup_dropoffAirport2015 = pickup_dropoffAirport2015.cache()
pickup_dropoffAirport2015.count()

In [None]:
# trop lent !!!!
numpick_dropAirport2015 = pickup_dropoffAirport2015.select('pickup_longitude', 'pickup_latitude').where(col('dropoff_in_airport'))
numpick_dropAirport2015.persist()
numpick_dropAirport2015.count()

In [None]:
# trop lent !!!!
pd_numPick_Airport2015 = toPandas(numpick_dropAirport2015, 150)

In [None]:
# trop lent !!!!
plot_heatmap(pd_pickup_dropoffAirport2015, 'dropoff')

### Build a choropeth map where color is a function of
    1. number of pickups in the area
    2. ratio of number of payments by card/number of cash payments for pickups in the area
    3. ratio of total fare/trip duration for dropoff in the area

In [None]:
# download the data of NYC Taxi Zones
from urllib.request import urlopen
import json
with urlopen('https://raw.githubusercontent.com/PetitPoissonL/NYC_Taxi_Zones/main/NYC_Taxi_Zones.json') as response:
    taxi_zone = json.load(response)

In [None]:
def choropleth_map(df, col, label, loc, t):
    fig = px.choropleth_mapbox(
        df, geojson=taxi_zone, locations=loc, color=col,
        featureidkey="properties.location_id",
        color_continuous_scale="Viridis",
        mapbox_style="carto-positron",
        zoom=9.5, center = {"lat": 40.74, "lon": -73.96},
        opacity=0.5,
        title = t,
        labels={col:label}
    )
    fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
    fig.show()

#### number of pickups in the area

In [None]:
# data for 4.3.2.1
pickup2018 = df2018.groupBy('PULocationID').agg(fn.count('tpep_pickup_datetime').alias('numPickup'))
pd_pickup2018 = toPandas(pickup2018, 50)

In [None]:
choropleth_map(pd_pickup2018, 'numPickup', 
               'number of pickups', 'PULocationID', 
               "Number of pickups in 2018")

In [None]:
#number of pickups in the area 2015
pickup_zone = df2015_add.groupby('pickup_zone').count()
pickup_zone = pickup_zone.toPandas()
pickup_zone.head()

In [None]:
chor_pickup = go.Figure(
    go.Choroplethmapbox(
        geojson = taxi_zone,
        featureidkey="properties.zone",
        locations = pickup_zone['pickup_zone'],
        z = pickup_zone['count'],
        zauto=True,
        colorscale='ylorrd',
        marker={'opacity':0.8,'line_width':0.5},
        hovertext = pickup_zone['pickup_zone'],
        hoverinfo='text + z',
        showscale=True,
    )
)

chor_pickup.update_layout( mapbox = {'accesstoken':token,'center':{'lon':-73.965691,'lat':40.97},'zoom':5},margin = {'l':1,'r':1,'t':1,'b':1})
chor_pickup.show()

#### ratio of number of payments by card/number of cash payments for pickups in the area

In [None]:
# data for 4.3.2.2
ratioPayment2018 = df2018.select('tpep_pickup_datetime', 'PULocationID', 'payment_type').cache()
ratioPayment2018.count()

In [None]:
def payment(df, typepatment, nameCol):
    df = df\
         .where(col('payment_type')==typepatment)\
         .groupBy(col('PULocationID'))\
         .agg(fn.count(col('tpep_pickup_datetime')).alias(nameCol))
    return df
    
ratioPayment_byCard = payment(ratioPayment2018, 1, 'num_by_card')
ratioPayment_byCash = payment(ratioPayment2018, 2, 'num_by_cash')
    
df_ratioPayment = ratioPayment_byCash.join(ratioPayment_byCard, on="PULocationID")

df_ratioPayment = df_ratioPayment\
                  .withColumn('ratio',fn.round(fn.col('num_by_card')/fn.col('num_by_cash'),2))
df_ratioPayment.cache()
df_ratioPayment.count()

In [None]:
pd_ratioPayment = toPandas(df_ratioPayment)

In [None]:
pd_ratioPayment.head(2)

In [None]:
choropleth_map(pd_ratioPayment, 'ratio', 
               'ration payments', 'PULocationID', 
               'Ratio of number of payments by card/number of cash payments for pickups in 2018')

In [None]:
#ratio 2015
ratiopay_zone = df2015_add.groupby('pickup_zone','payment_type').count().toPandas()
ratiopay_zone.head()

In [None]:
card_payment = ratiopay_zone[ratiopay_zone['payment_type'] == 1]

cash_payment = ratiopay_zone[ratiopay_zone['payment_type'] == 2]

payment_df = pd.merge(card_payment,cash_payment,how='inner',on='pickup_zone')
payment_df.head()

In [None]:
payment_df['ratio'] = payment_df['count_x']/payment_df['count_y']
payment_df.head()

In [None]:
payment_pickup = go.Figure(
    go.Choroplethmapbox(
        geojson = taxi_zone,
        featureidkey="properties.zone",
        locations = payment_df['pickup_zone'],
        z = payment_df['ratio'],
        zauto=True,
        colorscale='ylorrd',
        marker={'opacity':0.8,'line_width':0.5},
        hovertext = payment_df['pickup_zone'],
        hoverinfo='text + z',
        showscale=True,
    )
)


payment_pickup.update_layout( mapbox = {'accesstoken':token,'center':{'lon':-73.965691,'lat':40.97},'zoom':5},margin = {'l':1,'r':1,'t':1,'b':1})
payment_pickup.show()

#### ratio of total fare/trip duration for dropoff in the area

In [None]:
ratio_fare_dur2018 = df2018.select('tpep_pickup_datetime', 'tpep_dropoff_datetime', 'DOLocationID', 'total_amount')
ratio_fare_dur2018 = addDuration(ratio_fare_dur2018).cache()
ratio_fare_dur2018.count()

In [None]:
df_ratio_fare_dur2018 = ratio_fare_dur2018\
                        .groupBy('DOLocationID')\
                        .agg(fn.round(fn.sum('total_amount'),2).alias('total_fare'),
                             fn.round(fn.sum('tripDurationInMinutes'),2).alias('trip_duration'))
df_ratio_fare_dur2018 = df_ratio_fare_dur2018\
                        .withColumn('ratio',fn.round(fn.col('total_fare')/fn.col('trip_duration'),2))
pd_ratio_fare_dur2018 = toPandas(df_ratio_fare_dur2018)

In [None]:
pd_ratio_fare_dur2018.head(2)

In [None]:
choropleth_map(pd_ratio_fare_dur2018, 'ratio', 
               'total fare/trip duration in 2018', 'DOLocationID', 
               'Ratio of total fare/trip duration for dropoff in 2018')

In [None]:
#2015
from pyspark.sql.functions import dayofweek,hour,date_format,unix_timestamp,sum

ratiofd_zone = df2015_add.withColumn('trip_duration',(unix_timestamp(df2015_add['tpep_dropoff_datetime']) - unix_timestamp(df2015_add['tpep_pickup_datetime'])))

ratiofd_zone = ratiofd_zone.groupby('dropoff_zone').agg(sum(ratiofd_zone['trip_duration']),sum(ratiofd_zone['fare_amount'])).toPandas()
ratiofd_zone.head()

In [None]:
ratiofd_zone['ratio'] = ratiofd_zone['sum(fare_amount)']/ratiofd_zone['sum(trip_duration)']
ratiofd_zone.head()

In [None]:
ratiofd_dropoff = go.Figure(
    go.Choroplethmapbox(
        geojson = taxi_zone,
        featureidkey="properties.zone",
        locations = ratiofd_zone['dropoff_zone'],
        z = ratiofd_zone['ratio'],
        zauto=True,
        colorscale='ylorrd',
        marker={'opacity':0.8,'line_width':0.5},
        hovertext = ratiofd_zone['dropoff_zone'],
        hoverinfo='text + z',
        showscale=True,
    )
)

ratiofd_dropoff.update_layout( mapbox = {'accesstoken':token,'center':{'lon':-73.965691,'lat':40.97},'zoom':5},margin = {'l':1,'r':1,'t':1,'b':1})
ratiofd_dropoff.show()

### Build an interactive chorophet with a slider 
    allowing the user to select an `hour of day` and where the color is a function of
    1. average number of dropoffs in the area during that hour the day
    2. average ratio of tip over total fare amount for pickups in the area at given hour of the day

In [None]:
interactive2018 = df2018.select('tpep_pickup_datetime', 'tpep_dropoff_datetime', 'PULocationID', 'DOLocationID', 'tip_amount', 'total_amount')
interactive2018 = set_date(interactive2018)
interactive2018 = interactive2018.persist()
interactive2018.count()

In [None]:
interactive2018.show(1)

In [None]:
from pyspark.sql.functions import hour,date_format

df2015_add = df2015_add.withColumn('hour_pickup',hour(df2015_add['tpep_pickup_datetime']))
df2015_add = df2015_add.withColumn('hour_dropoff',hour(df2015_add['tpep_dropoff_datetime']))
df2015_add = df2015_add.withColumn('date_pickup',date_format(df2015_add['tpep_pickup_datetime'],'yyyy-MM-dd'))
df2015_add = df2015_add.withColumn('date_dropoff',date_format(df2015_add['tpep_dropoff_datetime'],'yyyy-MM-dd'))
df2015_add.show(1)

#### average number of dropoffs in the area during that hour the day

In [None]:
dropoff_interactive2018 = interactive2018\
                          .groupBy('date', 'hour', 'DOLocationID')\
                          .agg(fn.count(col('tpep_dropoff_datetime')).alias('numDropoff'))\
                          .orderBy('date', 'hour')
avgdropoff2018 = dropoff_interactive2018\
                          .groupBy('hour', 'DOLocationID')\
                          .agg(fn.round(fn.avg(col('numDropoff')),2).alias('avgDropoff'))\
                          .orderBy('hour')

In [None]:
pd_avgdropoff2018 = toPandas(avgdropoff2018)

In [None]:
pd_avgdropoff2018.head(2)

In [None]:
fig = px.choropleth_mapbox(
    pd_avgdropoff2018, 
    geojson=taxi_zone, locations='DOLocationID', color='avgDropoff',
    featureidkey="properties.location_id",
    color_continuous_scale="Viridis",
    mapbox_style="carto-positron",
    zoom=9.5, center = {"lat": 40.74, "lon": -73.96},
    animation_frame="hour",
    opacity=0.5,
    labels={'avgDropoff':'average number of dropoffs'},
    title = 'average number of dropoffs in 2018'
)

fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
fig.show()

In [None]:
#2015
avg_dropoffs = df2015_add.groupby('dropoff_zone','hour_dropoff','date_dropoff').count()
avg_dropoffs = avg_dropoffs.groupby('dropoff_zone','hour_dropoff').agg({'count':'mean'}).toPandas()
avg_dropoffs.head()

In [None]:
steps = []
datas = []

for hour in range(24):
    t = avg_dropoffs[avg_dropoffs['hour_dropoff'] == hour]
    data = go.Choroplethmapbox(
        geojson = taxi_zone,
        featureidkey="properties.zone",
        locations = t['dropoff_zone'],
        z = t['avg(count)'],
        zauto=False,
        colorscale='ylorrd',
        marker={'opacity':0.8,'line_width':0.5},
        hovertext = t['dropoff_zone'],
        hoverinfo='text + z',
        showscale=True
    )
    
    datas.append(data)
    
    visibles = [False]*24
    visibles[hour] = True
    
    step = {
        "args": [{"visible":visibles}],
        "label": hour+1,
        "method": "restyle"
    }
    
    steps.append(step)
    
sliders = [{"x":0,"y":0.15,"pad":{"t":24},"steps":steps,"active":1,"currentvalue":{"prefix":"hour_dropoff"}}]
    

avg_dropoffs_fig = go.Figure(
    data = datas
)

avg_dropoffs_fig.update_layout(
    sliders = sliders,
    mapbox = {'accesstoken':token,'center':{'lon':-73.965691,'lat':40.97},'zoom':5},margin = {'l':1,'r':1,'t':1,'b':1})
avg_dropoffs_fig.show()

#### average ratio of tip over total fare amount for pickups in the area at given hour of the day

In [None]:
sumFare2018 = interactive2018\
            .groupBy('date', 'hour','PULocationID')\
            .agg(fn.round(fn.sum(col('total_amount')),2).alias('sum_fare'), 
                fn.round(fn.sum(col('tip_amount')),2).alias('sum_tip'))\
            .orderBy('date','hour')

sumFare2018 = sumFare2018\
            .groupBy('hour','PULocationID')\
            .agg(fn.round(fn.avg(col('sum_fare')),2).alias('avg_fare'), 
                 fn.round(fn.avg(col('sum_tip')),2).alias('avg_tip'))\
            .orderBy('hour')
                
ratio2018 = sumFare2018\
            .withColumn('ratio',fn.round(fn.col('avg_tip')/fn.col('avg_fare'),2))\
            .cache()
ratio2018.count()

In [None]:
pd_ratio2018 = toPandas(ratio2018)

In [None]:
pd_ratio2018.head(2)

In [None]:
fig = px.choropleth_mapbox(
    pd_ratio2018, 
    geojson=taxi_zone, locations='PULocationID', color='ratio',
    featureidkey="properties.location_id",
    color_continuous_scale="Viridis",
    mapbox_style="carto-positron",
    zoom=9.5, center = {"lat": 40.74, "lon": -73.96},
    animation_frame="hour",
    opacity=0.5,
    labels={'ratio':'average ratio of tip over total fare'}
    title = {'average ratio of tip over total fare amount for pickups in 2018'}
)

fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
fig.show()

In [None]:
#2015
ratio_pickup_fare = df2015_add.groupby('pickup_zone','hour_pickup').agg(sum(df2015_add['tip_amount']),sum(df2015_add['total_amount']))
ratio_pickup_fare.show(1)

In [None]:
avg_ratio_pickup_fare = ratio_pickup_fare.withColumn('ratio',ratio_pickup_fare['sum(tip_amount)']/ratio_pickup_fare['sum(total_amount)']).select('pickup_zone','hour_pickup','ratio')
avg_ratio_pickup_fare.show(1)

In [None]:
avg_ratio_pickup_fare = avg_ratio_pickup_fare.toPandas()

steps = []
datas = []

for hour in range(24):
    t = avg_ratio_pickup_fare[avg_ratio_pickup_fare['hour_pickup'] == hour]
    data = go.Choroplethmapbox(
        geojson = taxi_zone,
        featureidkey="properties.zone",
        locations = t['pickup_zone'],
        z = t['ratio'],
        zauto=False,
        colorscale='ylorrd',
        marker={'opacity':0.8,'line_width':0.5},
        hovertext = t['pickup_zone'],
        hoverinfo='text + z',
        showscale=True
    )
    
    datas.append(data)
    
    visibles = [False]*24
    visibles[hour] = True
    
    step = {
        "args": [{"visible":visibles}],
        "label": hour+1,
        "method": "restyle"
    }
    
    steps.append(step)


sliders = [{"x":0,"y":0.15,"pad":{"t":24},"steps":steps,"active":1,"currentvalue":{"prefix":"hour_pickup"}}]
    

avg_pickup_fig = go.Figure(
    data = datas
)

avg_pickup_fig.update_layout(
    sliders = sliders,
    mapbox = {'accesstoken':token,'center':{'lon':-73.965691,'lat':40.97},'zoom':5},margin = {'l':1,'r':1,'t':1,'b':1})
avg_pickup_fig.show()

In [None]:
spark.stop()