# 1. Overview

Below are the variables that the data set contains:
* **vendor_id**:	A code indicating the TPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc
* **pickup_datetime**: The date and time when the meter was engaged.
* **dropoff_datetime**: The date and time when the meter was disengaged
* **passenger_count**: The number of passengers in the vehicle. This is a driver-entered value
* **trip_distance**: The elapsed trip distance in miles reported by the taximeter.
* **rate_code**: The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride
* **store_and_fwd_flag**: This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka “store and forward,” because the vehicle did not have a connection to the server. Y= store and forward trip N= not a store and forward trip
* **payment_type**:	A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip
* **fare_amount**: The time-and-distance fare calculated by the meter
* extra	Miscellaneous extras and surcharges. Currently, this only includes the \$0.50 and \$1 rush hour and overnight charges.
* **mta_tax**: \$0.50 MTA tax that is automatically triggered based on the metered rate in use
* **tip_amount**:	Tip amount – This field is automatically populated for credit card tips. Cash tips are not included
* **tolls_amount**:	Total amount of all tolls paid in trip.
* **imp_surcharge**:	\$0.30 improvement surcharge assessed trips at the flag drop. The improvement surcharge began being levied in 2015.
* **total_amount**:	The total amount charged to passengers. Does not include cash tips
* **pickup_location_id**: TLC Taxi Zone in which the taximeter was engaged
* **dropoff_location_id**: TLC Taxi Zone in which the taximeter was disengaged

# 2. Getting Started

In [1]:
# Imports 
import numpy as np
import pandas as pd

from pyspark.sql import SQLContext
from pyspark.sql import types
from pyspark.sql.functions import isnan, when, count, col

In [2]:
# Initialize Spark
sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [116]:
# Get data from BigQuery and store it into Cloud Storage as GZIP compressed .csv files
!bq --location=US extract --compression GZIP 'bigquery-public-data:new_york_taxi_trips.tlc_yellow_trips_2018' gs://ajk-portfolio/data/nyc_taxi_trips/nyc_taxi_2018-*.csv.gz
!bq --location=US extract --compression GZIP 'bigquery-public-data:new_york_taxi_trips.taxi_zone_geom' gs://ajk-portfolio/data/nyc_taxi_trips/nyc_taxi_zones-*.csv.gz

Waiting on bqjob_r5ae5d0098852883d_00000169b23015d0_1 ... (85s) Current status: DONE   


In [117]:
# Load data into spark DataFrame
trips_df = spark.read.option("header", "true").csv("gs://ajk-portfolio/data/nyc_taxi_trips/nyc_taxi_2018-*.csv.gz")
loc_df = spark.read.option("header", "true").csv("gs://ajk-portfolio/data/nyc_taxi_trips/nyc_taxi_zones-*.csv.gz")

In [118]:
# Adjust column types to float
float_columns = ["total_amount",
                 "trip_distance",
                 "passenger_count",
                 "fare_amount",
                 "extra",
                 "tip_amount",
                 "imp_surcharge"]

for f in float_columns:
    trips_df = trips_df.withColumn(f, trips_df[f].cast(types.FloatType()))

In [119]:
# Adjust column types to timestamp
ts_columns = ["pickup_datetime",
              "dropoff_datetime"]

for t in ts_columns:
    trips_df = trips_df.withColumn(t, trips_df[t].cast(types.TimestampType()))

In [120]:
# Columns that we want to keep from the two tables
cols = [trips_df[c] for c in trips_df.columns]
cols = cols + [loc_df[c] for c in loc_df.columns if c not in ("zone_id", "zone_geom")]

# Join loc_df so we can get location info for each pickup_location_id
trips_df = trips_df.join(loc_df, trips_df["pickup_location_id"] == loc_df["zone_id"], how="left").select(*cols)

# Rename columns to be more informative
trips_df = trips_df.withColumnRenamed("zone_name", "pickup_zone")
trips_df = trips_df.withColumnRenamed("borough", "pickup_borough")


In [121]:
# Columns that we want to keep from the two tables
cols = [trips_df[c] for c in trips_df.columns]
cols = cols + [loc_df[c] for c in loc_df.columns if c not in ("zone_id", "zone_geom")]

# Join the loc_df again, but this time to get the dropoff location info
trips_df = trips_df.join(loc_df, trips_df["dropoff_location_id"] == loc_df["zone_id"], how="left").select(*cols)

# Rename columns to be more informative
trips_df = trips_df.withColumnRenamed("zone_name", "dropoff_zone")
trips_df = trips_df.withColumnRenamed("borough", "dropoff_borough")


# 3. Data Quality Check

For the sake of brevity, I am not including the various steps I took for data quality check that was done prior to doing data analysis. However, the following are the summary of some of the important findings:
* There are a handful of records where the `total_amount` paid for the is less than 0 or exceeds \$100,000. This does not make a lot of sense given the data set, and seems to likely be caused by erroneous data entry. 
* There are few records where the `trip_distance` is either 0 or greater than 100,000 miles. the 0 miles travelled may be due to cancelled trips, while the ones where over 100,000 miles is traveled are likely caused by erroneous entries.
* There are records where the `passenger_count` have odd values: 0 or greater than 100. Since this value is entered by taxi drivers, the 0s may be instances where the driver did not enter any value, and the ones over 100 may be erroneous entries.
* The data set also seems to be incomplete, as it mainly only contains trip data from January-June 2018

The problematic records above are all removed prior to any subsequent analysis done as part of this notebook. In total 43K records were removed out of 63 million+ that the dataset contains.


In [159]:
# register temp table so it can be queried with SQL
trips_df.registerTempTable("trips")

In [175]:
# Filter out problematic/erroneous records
query = """
SELECT
    *
FROM
    trips t
WHERE 
    t.total_amount < 10000
    AND t.total_amount > 0
    AND t.trip_distance < 1000
    AND t.passenger_count < 100
"""

trips_df = spark.sql(query)

# 4. Exploratory Data Analysis

* There seems to be records where the total amount exceeds $100,000, which does not make a lot of sense. Likely these records will have to be removed prior to any modelling work.
* There seems to be a few issues with the `trip_distance` variables as there are instances where the logged value is 0 or greater than 100,000 miles. These are likely to be erroneous values and will be removed from the DataFrame
* There seems to be

In [181]:
# register temp table so it can be queried with SQL
trips_df.registerTempTable("trips")

### How many passengers do trips typically have?

In [176]:
# get summary statistics for passenger_count
trips_df.describe(['passenger_count']).show()

+-------+------------------+
|summary|   passenger_count|
+-------+------------------+
|  count|          63331706|
|   mean|1.5999719319103767|
| stddev|1.2476925848383433|
|    min|               0.0|
|    max|               9.0|
+-------+------------------+



### How far are the distance for the taxi trips?
It looks like on average each trip is about 3 miles.

In [177]:
# get summary statistics for passenger_count
trips_df.describe(['trip_distance']).show()

+-------+------------------+
|summary|     trip_distance|
+-------+------------------+
|  count|          63331706|
|   mean|2.8955117587555947|
| stddev|3.7460191815527777|
|    min|               0.0|
|    max|             943.5|
+-------+------------------+



### What payment methods are popular?

In [227]:
query = """
SELECT DISTINCT
    CASE WHEN t.payment_type = 1 THEN "Credit Card"
         WHEN t.payment_type = 2 THEN "Cash"
         WHEN t.payment_type = 3 THEN "No Charge"
         WHEN t.payment_type = 4 THEN "Dispute"
         WHEN t.payment_type = 5 THEN "Unknown"
         WHEN t.payment_type = 6 THEN "Voided Trip" 
         END AS payment_type,
    COUNT(*) OVER(PARTITION BY t.payment_type) AS trip_count,
    ROUND((COUNT(*) OVER(PARTITION BY t.payment_type)/COUNT(*) OVER())*100, 2) AS pct_total,
    ROUND(AVG(t.fare_amount) OVER(PARTITION BY t.payment_type), 2) AS avg_fare_amount,
    ROUND(AVG(t.trip_distance) OVER(PARTITION BY t.payment_type), 2) AS avg_trip_distance,
    ROUND(AVG(t.passenger_count) OVER(PARTITION BY t.payment_type), 2) AS avg_passenger_count
FROM
    trips t
ORDER BY
    trip_count DESC
"""

spark.sql(query).toPandas()

Unnamed: 0,payment_type,trip_count,pct_total,avg_fare_amount,avg_trip_distance,avg_passenger_count
0,Credit Card,44132242,69.68,13.22,3.03,1.59
1,Cash,18807044,29.7,11.7,2.57,1.63
2,No Charge,311721,0.49,13.36,2.93,1.2
3,Dispute,80699,0.13,14.16,3.19,1.2


### What are the top trip destinations? and where are those trips coming from?
Below is a results table that contain the top 3 destinations along with the top 5 origin for each of the 3 destinations. A few observation on the results:
* It looks like in NYC, taxis are mainly used for short/medium distance transportation since the majority of the trips are fairly short in distance (around 1 mile or less), and the fare amount is relatively low (around \$10)
* In addition majority of the destination and origin are within Manhattan, which is the most populous borough in NYC.


In [228]:
query = """
WITH top_destination_origin AS 
    (SELECT
        t.dropoff_borough,
        t.dropoff_zone,
        t.pickup_borough,
        t.pickup_zone,
        COUNT(*) trip_count,
        AVG(t.trip_distance) AS avg_trip_distance,
        AVG(t.passenger_count) AS avg_passenger_count,
        AVG(t.fare_amount) AS avg_fare_amount,
        DENSE_RANK() OVER(PARTITION BY t.dropoff_borough, t.dropoff_zone ORDER BY COUNT(*) DESC) AS trip_count_rank
    FROM
        trips t
    GROUP BY
        t.pickup_borough,
        t.pickup_zone,
        t.dropoff_borough,
        t.dropoff_zone),
top_destination AS 
    (SELECT
        t.dropoff_borough,
        t.dropoff_zone,
        DENSE_RANK() OVER(ORDER BY COUNT(*) DESC) AS trip_count_rank,
        COUNT(*) AS trip_count
    FROM 
        trips t
    GROUP BY
        t.dropoff_borough,
        t.dropoff_zone)
SELECT
    tdo.dropoff_borough,
    tdo.dropoff_zone,
    tdo.pickup_borough,
    tdo.pickup_zone,
    tdo.trip_count,
    tdo.avg_trip_distance,
    tdo.avg_passenger_count,
    tdo.avg_fare_amount
FROM 
    top_destination td
INNER JOIN 
    top_destination_origin tdo
    ON 
        tdo.dropoff_borough = td.dropoff_borough
        AND tdo.dropoff_zone = td.dropoff_zone
WHERE
    td.trip_count_rank IN (1, 2, 3) 
    AND tdo.trip_count_rank IN (1, 2, 3, 4, 5)
ORDER BY
    td.trip_count_rank,
    tdo.trip_count_rank
"""

spark.sql(query).toPandas()

Unnamed: 0,dropoff_borough,dropoff_zone,pickup_borough,pickup_zone,trip_count,avg_trip_distance,avg_passenger_count,avg_fare_amount
0,Manhattan,Upper East Side North,Manhattan,Upper East Side South,371390,1.040266,1.572988,6.284425
1,Manhattan,Upper East Side North,Manhattan,Upper East Side North,322442,0.614161,1.572159,5.141404
2,Manhattan,Upper East Side North,Manhattan,Lenox Hill West,147460,1.081685,1.608924,6.341164
3,Manhattan,Upper East Side North,Manhattan,Yorkville West,134968,0.67325,1.601728,5.222863
4,Manhattan,Upper East Side North,Manhattan,Midtown Center,112407,1.991236,1.580329,10.214633
5,Manhattan,Midtown Center,Manhattan,Upper East Side South,155187,1.08786,1.545664,7.888813
6,Manhattan,Midtown Center,Manhattan,Penn Station/Madison Sq West,138413,1.307035,1.568465,9.091393
7,Manhattan,Midtown Center,Manhattan,Midtown Center,125097,0.619652,1.563123,6.858843
8,Manhattan,Midtown Center,Manhattan,Garment District,106950,0.964256,1.549406,7.055802
9,Manhattan,Midtown Center,Manhattan,Murray Hill,104060,0.933673,1.555747,7.38877


### How does taxi usage vary by Month?
There is some oddity on the results here, it looks like the number of trips in March 2018 is double the amount than any of the other month: there are ~19 million trips in March 2018 alone, as opposed to it typically being in the 8-9 million range. I currently do not have a good explanation why there is such a stark difference in the trip count in March 2018, but this is something that needs to be explored further.


In [239]:
query = """
SELECT
    TRUNC(t.pickup_datetime, 'MM') AS date_month,
    COUNT(*) AS trip_count
FROM
    trips t
WHERE 
    t.pickup_datetime >= '2018-01-01'
    AND t.pickup_datetime < '2018-07-01'
GROUP BY
    date_month
ORDER BY
    date_month
"""

spark.sql(query).toPandas()

Unnamed: 0,date_month,trip_count
0,2018-01-01,8757334
1,2018-02-01,8490206
2,2018-03-01,18850979
3,2018-04-01,9302068
4,2018-05-01,9220545
5,2018-06-01,8709358


# 5. Models

In [None]:
# predicting travel times
# predicting total tip
# pickup dropoff location web graph

# 6. Conclusion