
## Project overview:

##### This project is a capstone project for data engineering nanodegree, where the student can define the scope and data for a project of their own design.

##### The project chosen is to design a data lake for the New York yellow taxi trip records were collected and provided to the [**NYC Taxi and Limousine Commission (TLC)**](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)  by technology providers authorized under the Taxicab & Livery Passenger Enhancement Programs (TPEP/LPEP).

##### This data lake was desined to provide a start schema optimized for queries on yellow taxi trips records analysis.


### Importing the necessary libraries:

In [1]:
import configparser
from datetime import datetime
from os import walk
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, monotonically_increasing_id

# importing types
from pyspark.sql.types import StructType as R,\
     StructField as Fld,\
     LongType as Lon, \
     DoubleType as Dbl, \
     StringType as Str,\
     IntegerType as Int, \
     FloatType as Flo, \
     DataType as Dt,\
     DecimalType as Dc,\
     TimestampType as Ts


## Step 1: Scope the Project and Gather Data

##### The task was to build an ETL pipeline that extracts the recorded trips, processes them using Spark, and loads the data into S3 as a set of dimensional tables. This will allow to continue finding insights in how are the trips distributed in the city zones, and analyse their durations with respect to the fare amounts.

### Recorded trips data dictionary:

| Field Name 	| Description 	|
|---	|---	|
| VendorID 	| A code indicating the TPEP provider that provided the record. 	|
| tpep_pickup_datetime 	| The date and time when the meter was engaged. 	|
| tpep_dropoff_datetime 	| The date and time when the meter was disengaged. 	|
| Passenger_count 	| The number of passengers in the vehicle. 	|
| Trip_distance 	| The elapsed trip distance in miles reported by the taximeter 	|
| PULocationID 	| TLC Taxi Zone in which the taximeter was engaged 	|
| DOLocationID 	| TLC Taxi Zone in which the taximeter was disengaged 	|
| RateCodeID 	| The final rate code in effect at the end of the trip. 	|
| Store_and_fwd_flag 	| This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka “store and forward,” because the vehicle did not have a connection to the server. 	|
| Payment_type 	| A numeric code signifying how the passenger paid for the trip. 	|
| Fare_amount 	| The time-and-distance fare calculated by the meter. 	|
| Extra 	| Miscellaneous extras and surcharges. 	|
| MTA_tax 	| $0.50 MTA tax that is automatically triggered based on the metered rate in use. 	|
| Improvement_surcharge 	| $0.30 improvement surcharge assessed trips at the flag drop. 	|
| Tip_amount 	| This field is automatically populated for credit card tips. Cash tips are not included. 	|
| Tolls_amount 	| Total amount of all tolls paid in trip. 	|
| Total_amount 	| The total amount charged to passengers. Does not include cash tips. 	|

### Taxi zones:

![alt text](assets/NY_zones_maps.png "Schema")

In [2]:
# creating Spark session
spark = SparkSession \
    .builder \
    .appName("NY_yellow_taxi") \
    .getOrCreate()

![alt text](assets/schema1.png "Schema")

### Fixing our data schema:

In [3]:
precision = 8
scale =4
trips_schema = R([
    Fld('VendorID', Int()),
    Fld('tpep_pickup_datetime', Ts(),nullable=False),
    Fld('tpep_dropoff_datetime', Ts(),nullable=False),
    Fld('passenger_count', Int(),nullable=False),
    Fld('trip_distance', Dbl(),nullable=False),
    Fld('RatecodeID', Int(),nullable=False),
    Fld('store_and_fwd_flag', Str()),   
    Fld('PULocationID', Int(),nullable=False),   
    Fld('DOLocationID', Int(),nullable=False),   
    Fld('payment_type', Int(),nullable=False),   
    Fld('fare_amount', Dc(precision, scale),nullable=False),   
    Fld('extra', Dc(precision, scale)),   
    Fld('mta_tax', Dc(precision, scale)),   
    Fld('tip_amount', Dc(precision, scale)),   
    Fld('tolls_amount', Dc(precision, scale)),   
    Fld('improvement_surcharge', Dc(precision, scale)), 
    Fld('total_amount', Dc(precision, scale),nullable=False), 
    Fld('congestion_surcharge', Dc(precision, scale)), 
])

In [4]:
zones_schema = R([
    Fld('LocationID', Int(),nullable=False),
    Fld('Borough', Str(),nullable=False),
    Fld('Zone', Str(),nullable=False),
    Fld('service_zone', Str(),nullable=False),
])

In [5]:
vendors_schema = R([
    Fld('vendor_id', Int(),nullable=False),
    Fld('vendor_name', Str(),nullable=False),
])

In [6]:
rate_codes_schema = R([
    Fld('rate_code_id', Int(),nullable=False),
    Fld('rate_name', Str(),nullable=False),
])

In [7]:
payment_types_schema = R([
    Fld('payment_type_id', Int(),nullable=False),
    Fld('payment_type', Str(),nullable=False),
])

In [8]:
# Define the data and output paths:
data_path = 'data/'

output_data = 'output/'

### Loading data into Spark Session:

In [9]:

trip_data_path = data_path + "trip_data/"

filenames = [trip_data_path + file for file in next(walk(trip_data_path), (None, None, []))[2]]

trips_df = spark.read.csv(filenames, schema=trips_schema, header=True)

In [10]:
zones_data_path = data_path + "zone_lookup.csv"

zones_df = spark.read.csv(zones_data_path, schema=zones_schema, header=True)

In [11]:
payment_types_data_path = data_path + "payment_types.csv"

payment_types_df = spark.read.csv(payment_types_data_path, schema=payment_types_schema, header=True)

In [12]:
rate_codes_data_path = data_path + "rate_codes.csv"

rate_code_df = spark.read.csv(rate_codes_data_path, schema=rate_codes_schema, header=True)

In [13]:
vendors_data_path = data_path + "vendors.csv"

vendors_df = spark.read.csv(vendors_data_path, schema=vendors_schema, header=True)

#### Check the schema:

In [14]:
# creating an id for each trip
trips_df= trips_df.withColumn('trip_id', F.sha2(
        F.concat(*(
            F.col(col).cast("string")
            for col 
            in trips_df.columns
        )),
        256
    ))

print('trips df schema')
trips_df.printSchema()

print('trips df header')
trips_df.limit(5).toPandas()


trips df schema
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: decimal(8,4) (nullable = true)
 |-- extra: decimal(8,4) (nullable = true)
 |-- mta_tax: decimal(8,4) (nullable = true)
 |-- tip_amount: decimal(8,4) (nullable = true)
 |-- tolls_amount: decimal(8,4) (nullable = true)
 |-- improvement_surcharge: decimal(8,4) (nullable = true)
 |-- total_amount: decimal(8,4) (nullable = true)
 |-- congestion_surcharge: decimal(8,4) (nullable = true)
 |-- trip_id: string (nullable = true)

trips df header


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,trip_id
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.1,1,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,212f85f7df25d3b9b4c0bb7d91df026a1257f49418ef90...
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.2,1,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,fcdd7dbef9c42486c865a210866f9349c1906e3ef34ace...
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.7,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,0693adaeb76f679f64828c33a174f8c794f955f5b8b2b1...
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.6,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,33ebd7844d4f9f8d5b5596f928eaf8cc7840d1d66d8d92...
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,c0bb0e896dcd8312e0d5eeea44b3adea480bcd18f74140...


In [15]:
print('zones df schema')
zones_df.printSchema()

print('zones df header')
zones_df.limit(5).toPandas()



zones df schema
root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)

zones df header


Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [16]:
print('payment types df schema')
payment_types_df.printSchema()

print('payment types df header')
payment_types_df.limit(5).toPandas()



payment types df schema
root
 |-- payment_type_id: integer (nullable = true)
 |-- payment_type: string (nullable = true)

payment types df header


Unnamed: 0,payment_type_id,payment_type
0,1,Credit card
1,2,Cash
2,3,No charge
3,4,Dispute
4,5,Unknown


In [17]:
print('rate codes df schema')
rate_code_df.printSchema()

print('rate codes df header')
rate_code_df.limit(5).toPandas()



rate codes df schema
root
 |-- rate_code_id: integer (nullable = true)
 |-- rate_name: string (nullable = true)

rate codes df header


Unnamed: 0,rate_code_id,rate_name
0,1,Standard rate
1,2,JFK
2,3,Newark
3,4,Nassau or Westchester
4,5,Negotiated fare


In [18]:
print('vendors df schema')
vendors_df.printSchema()


print('vendors df header')
vendors_df.limit(5).toPandas()



vendors df schema
root
 |-- vendor_id: integer (nullable = true)
 |-- vendor_name: string (nullable = true)

vendors df header


Unnamed: 0,vendor_id,vendor_name
0,1,Creative Mobile Technologies
1,2,VeriFone Inc.


## Step 2: Explore and Assess the Data

In [19]:
trips_df.groupBy('passenger_count').count().toPandas()


Unnamed: 0,passenger_count,count
0,1.0,1931999
1,6.0,48519
2,3.0,85731
3,5.0,64850
4,4.0,32120
5,8.0,2
6,2.0,328602
7,0.0,53042
8,7.0,7
9,9.0,3


When grouping the data by the passenger count, we see that we have rows that don't have any values recorded. 

In [20]:
no_passengers = trips_df.where(F.col('passenger_count').isNull())

no_passengers.limit(5).toPandas()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,trip_id
0,,2021-02-01 10:02:04,2021-02-01 11:02:07,,2.67,,,265,215,,35.2,0.0,0.5,0.0,0.0,0.3,36.0,0.0,
1,,2021-02-01 10:02:13,2021-02-01 10:02:08,,1.47,,,265,215,,35.2,0.0,0.5,0.0,0.0,0.3,36.0,0.0,
2,,2021-02-01 10:26:00,2021-02-01 10:48:00,,2.28,,,68,137,,16.12,2.75,0.5,0.0,0.0,0.3,19.67,0.0,
3,,2021-02-01 10:56:00,2021-02-01 12:04:00,,23.95,,,37,212,,43.84,2.75,0.5,0.0,6.12,0.3,53.51,0.0,
4,,2021-02-01 10:15:00,2021-02-01 11:14:00,,15.98,,,10,230,,53.1,2.75,0.5,0.0,6.12,0.3,62.77,0.0,


After assising the NaN values we see that these rows have other values that are not recorded (VendorID, RatecodeID, store_and_fwd_flag, payment_type), 

even the fare cost on some of the records doesn,t make sence with the trip distance and the pickup/dropoff duration.

In [38]:
nullrow = no_passengers\
  .filter(no_passengers.tpep_pickup_datetime == '2021-02-01 10:02:13')
nullrow.toPandas()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,trip_id
0,,2021-02-01 10:02:13,2021-02-01 10:02:08,,1.47,,,265,215,,35.2,0.0,0.5,0.0,0.0,0.3,36.0,0.0,


For example on the above record, we can see that the dropoff datetime is earlier than the pickup datetime.

### For these reasons we will remove all the records with null values from our dataframe to have more accurate data for our analysis.


In [39]:
# Dopping rows with null values
trips_df = trips_df.na.drop('any')

In [40]:
trips_df.groupBy('passenger_count').count().toPandas()

Unnamed: 0,passenger_count,count
0,1,1931999
1,6,48519
2,3,85731
3,5,64850
4,4,32120
5,8,2
6,2,328602
7,0,53042
8,7,7
9,9,3


In [43]:
nulls = trips_df.select('tpep_pickup_datetime').where(F.isnull(trips_df.tpep_pickup_datetime))

nulls.collect()

[]

## Step 3: Define the Data Model

Our data model will be a dimentional model with a star schema, it consists of one or more fact tables referecing any number of dimention tables "see the image below". 

The start schema model is:
- Easy to understand
- Has a fast analytical query performance
- Good for Online Analtical Proccessing OLAP

![alt text](assets/schema1.png "Schema")


## Step 4: Run ETL to Model the Data

## Writing dimesion tables to Drive:

In [None]:
# writing zones dimention table to drive
zones_df.write.mode('overwrite')\
    .parquet(output_data + 'zones/zones_table.parquet')

# writing payment_types dimention table to drive
payment_types_df.write.mode('overwrite')\
    .parquet(output_data + 'pyment_types/pyment_types_table.parquet')

# writing rate_code dimention table to drive
rate_code_df.write.mode('overwrite')\
    .parquet(output_data + 'rate_codes/rate_codes_table.parquet')

# writing vendors dimention table to drive
vendors_df.write.mode('overwrite')\
    .parquet(output_data + 'vendors/vendors_table.parquet')

## Creating trips fact table:

In [19]:
trips_fact_table = trips_df\
  .selectExpr('trip_id',\
              'VendorID as vendor_id' ,\
              'RatecodeID as rate_code_id', \
              'PULocationID as pu_location_id' ,\
              'DOLocationID as do_location_id' ,\
              'payment_type as payment_type_id'\
              'fare_amount',\
              'extra',\
              'mta_tax',\
              'tip_amount',\
              'tolls_amount',
              'improvement_surcharge',\
              'total_amount',
              'congestion_surcharge',
              'tpep_pickup_datetime as pickup_datetime',
              'tpep_dropoff_datetime as dropoff_datetime',
              'passenger_count',\
              'trip_distance as distance_miles')


In [20]:
duration_fun_min = udf(lambda dropoff_datetime, pickup_datetime : round((dropoff_datetime - pickup_datetime).total_seconds() / 60,2),Dbl())

In [21]:
# adding durations column
trips_fact_table = trips_fact_table\
  .withColumn('duration_min', duration_fun_min(trips_fact_table.dropoff_datetime , trips_fact_table.pickup_datetime))\

In [22]:
# adding month and year column for partitioning
trips_fact_table = trips_fact_table\
  .withColumn('month', F.month(trips_fact_table.pickup_datetime))\
  .withColumn('year', F.year(trips_fact_table.pickup_datetime))

In [23]:
trips_fact_table.printSchema()

root
 |-- trip_id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- rate_code_id: integer (nullable = true)
 |-- pu_location_id: integer (nullable = true)
 |-- do_location_id: integer (nullable = true)
 |-- payment_type_idfare_amount: integer (nullable = true)
 |-- extra: decimal(8,4) (nullable = true)
 |-- mta_tax: decimal(8,4) (nullable = true)
 |-- tip_amount: decimal(8,4) (nullable = true)
 |-- tolls_amount: decimal(8,4) (nullable = true)
 |-- improvement_surcharge: decimal(8,4) (nullable = true)
 |-- total_amount: decimal(8,4) (nullable = true)
 |-- congestion_surcharge: decimal(8,4) (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- distance_miles: double (nullable = true)
 |-- duration_min: double (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)



In [24]:
trips_fact_table.limit(5).toPandas()

Unnamed: 0,trip_id,vendor_id,rate_code_id,pu_location_id,do_location_id,payment_type_idfare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,pickup_datetime,dropoff_datetime,passenger_count,distance_miles,duration_min,month,year
0,212f85f7df25d3b9b4c0bb7d91df026a1257f49418ef90...,1,1,142,43,2,3.0,0.5,0.0,0.0,0.3,11.8,2.5,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.1,6.03,1,2021
1,fcdd7dbef9c42486c865a210866f9349c1906e3ef34ace...,1,1,238,151,2,0.5,0.5,0.0,0.0,0.3,4.3,0.0,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.2,0.98,1,2021
2,0693adaeb76f679f64828c33a174f8c794f955f5b8b2b1...,1,1,132,165,1,0.5,0.5,8.65,0.0,0.3,51.95,0.0,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.7,27.6,1,2021
3,33ebd7844d4f9f8d5b5596f928eaf8cc7840d1d66d8d92...,1,1,138,132,1,0.5,0.5,6.05,0.0,0.3,36.35,0.0,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.6,15.22,1,2021
4,c0bb0e896dcd8312e0d5eeea44b3adea480bcd18f74140...,2,1,68,33,1,0.5,0.5,4.06,0.0,0.3,24.36,2.5,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,16.53,1,2021


In [26]:
trips_fact_table.agg({'duration_min':'avg'}).collect()

[Row(avg(duration_min)=14.39911415505466)]

### Writing fact table to drive:

In [28]:

# write trips fact table to parquet files partitioned by year and month

trips_fact_table.write.partitionBy('year', 'month').mode('overwrite')\
    .parquet(output_data + 'trips/trips_fact_table.parquet')

## Step 5: Complete Project Write Up

The data lake using Spark that has been chosen for our dataset is due to the amount of data we could expect for the proccess. 
The current data we have if for only 2 months of trips data, so Spark would be great with this size of big data. 

The data can be updated every month, to analyze the monthly trips activity around the city, and compare it tot the previous data.

If the data was increased by 100x, we can store our trips data on an AWS S3 bucket, then we can run an AWS EMR(Elastic Map Reduce) cluster that can run on multiple nodes in parallel to load and distribute the data on those nodes and perform fast and advanced analysis, after that we can load our data model to S3.

If the pipelines were run on a daily basis by 7am, we would set an ELT pipeline using Airflow to schedule a DAG to load the data into S3, then run the EMR cluster to perform the steps required to model the data, and finally load the preccessed data to S3 to ba avaliable for the analysis team.

If the database needed to be accessed by 100+ people. We could create AWS users for those people, provide them with access to the data set AWS IAM roles for 