## Problem Statement:

The Taxi and Limousine Commission (TLC) of New York City collects trip record data from licensed taxis and for-hire vehicles (FHVs) and provides it to the public. The data includes details such as pick-up and drop-off times, locations, passenger counts, and payment information for each trip. As a data engineer, your task is to build a batch data processing pipeline using PySpark to process and analyze this data to gain insights into taxi and FHV trips in New York City.

### Goals:

Data ingestion: Download the trip record data from the NYC TLC website and ingest it into the pipeline for further processing.

Data cleaning and validation: Perform data quality checks and validation to ensure that the data is clean and consistent. Identify and remove duplicates, null values, and other data quality issues that may impact downstream analysis.

Data transformation: Transform the raw trip record data into a format that is optimized for analysis. This may include aggregating the data by time periods, geographical regions, and other factors of interest.

Data analysis: Use PySpark to perform statistical analysis, data exploration, and data visualization to gain insights into taxi and FHV trips in New York City. This may include identifying popular pick-up and drop-off locations, peak trip times, and other patterns and trends in the data.

Data storage: Store the processed and analyzed data in a suitable data storage system  for future use.

---

The overall goal of the project is to build a batch data processing pipeline using PySpark to extract insights from the NYC TLC trip record data. The pipeline should be scalable, efficient, and automated to enable easy data processing and analysis.

### Import Libraries and Intiate Spark session

In [1]:
# !pip install geospark
# !pip install pyspark

In [2]:
import configparser

import pandas as pd

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, TimestampType

import geospark

In [3]:
spark = SparkSession.builder.appName("nyc_batch_pipeline").getOrCreate()

23/05/16 18:36:01 WARN Utils: Your hostname, joker021-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/05/16 18:36:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/16 18:36:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

### Data Ingestion

In [5]:
# Parsing from config file
conf = configparser.ConfigParser()
conf.read("config")
data_source_path = conf.get("DATA PATH", "DataSource_PATH")

# Schema
schema = StructType([
    StructField('vendor_name', StringType(), True), 
    StructField('Trip_Pickup_DateTime', StringType(), True), 
    StructField('Trip_Dropoff_DateTime', StringType(), True), 
    StructField('Passenger_Count', LongType(), True), 
    StructField('Trip_Distance', DoubleType(), True), 
    StructField('Rate_Code', DoubleType(), True), 
    StructField('store_and_forward', DoubleType(), True), 
    StructField('Payment_Type', StringType(), True), 
    StructField('Fare_Amt', DoubleType(), True), 
    StructField('surcharge', DoubleType(), True), 
    StructField('mta_tax', DoubleType(), True), 
    StructField('Tip_Amt', DoubleType(), True), 
    StructField('Tolls_Amt', DoubleType(), True), 
    StructField('Total_Amt', DoubleType(), True)]
)

# Reading the DataSource from PySpark
df_full = spark.read.schema(schema).parquet(data_source_path, schema=schema)

In [6]:
print(f"No of Partitons: {df_full.rdd.getNumPartitions()}")

No of Partitons: 4


In [7]:
frac = 0.01
df = df_full.sample(fraction=frac, seed=123)

In [8]:
# Schema
df.printSchema()

root
 |-- vendor_name: string (nullable = true)
 |-- Trip_Pickup_DateTime: string (nullable = true)
 |-- Trip_Dropoff_DateTime: string (nullable = true)
 |-- Passenger_Count: long (nullable = true)
 |-- Trip_Distance: double (nullable = true)
 |-- Rate_Code: double (nullable = true)
 |-- store_and_forward: double (nullable = true)
 |-- Payment_Type: string (nullable = true)
 |-- Fare_Amt: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- Tip_Amt: double (nullable = true)
 |-- Tolls_Amt: double (nullable = true)
 |-- Total_Amt: double (nullable = true)



### Data Cleaning And Validation

#### Missing Values

In [9]:
no_of_row = df.count()
print(f"No of Rows: {no_of_row}")
print(f"No of cols: {len(df.columns)}")



No of Rows: 140868
No of cols: 14


                                                                                

In [10]:
null_count = df.\
select(
    [F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]
).collect()[0]\
.asDict()
null_col_list = [c for c in null_count if null_count[c] > 0]

                                                                                

In [11]:
null_count

{'vendor_name': 0,
 'Trip_Pickup_DateTime': 0,
 'Trip_Dropoff_DateTime': 0,
 'Passenger_Count': 0,
 'Trip_Distance': 0,
 'Rate_Code': 140868,
 'store_and_forward': 140862,
 'Payment_Type': 0,
 'Fare_Amt': 0,
 'surcharge': 0,
 'mta_tax': 140868,
 'Tip_Amt': 0,
 'Tolls_Amt': 0,
 'Total_Amt': 0}

In [12]:
# We could see below three cols have huge no of Null Values
# The Rate_Code and mta_tax is completely null
# store_and_forward have few rows present
null_col_list = ["Rate_Code", "store_and_forward", "mta_tax"]
df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in null_col_list]).show()

+---------+-----------------+-------+
|Rate_Code|store_and_forward|mta_tax|
+---------+-----------------+-------+
|   140868|           140862| 140868|
+---------+-----------------+-------+



In [13]:
# Checking no of Distinct values in Null having columns
df.select([F.countDistinct(F.col(c)).alias(c) for c in null_col_list]).show()

+---------+-----------------+-------+
|Rate_Code|store_and_forward|mta_tax|
+---------+-----------------+-------+
|        0|                1|      0|
+---------+-----------------+-------+



In [14]:
# Count of distinct values on store_and_forward, we could see the values are very small compared to total rows
df.select(F.col("store_and_forward")).groupBy('store_and_forward').count().show()

+-----------------+------+
|store_and_forward| count|
+-----------------+------+
|              0.0|     6|
|             null|140862|
+-----------------+------+



In [15]:
# Since the amount of null values is very large compared to total no of records we are dropping those columns
df_not_null = df.drop(*null_col_list)

In [16]:
df_not_null.printSchema()

root
 |-- vendor_name: string (nullable = true)
 |-- Trip_Pickup_DateTime: string (nullable = true)
 |-- Trip_Dropoff_DateTime: string (nullable = true)
 |-- Passenger_Count: long (nullable = true)
 |-- Trip_Distance: double (nullable = true)
 |-- Payment_Type: string (nullable = true)
 |-- Fare_Amt: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- Tip_Amt: double (nullable = true)
 |-- Tolls_Amt: double (nullable = true)
 |-- Total_Amt: double (nullable = true)



#### Data Transformation and DateTime

In [17]:
# Currently the DateTime col are in String format, will check the format
date_col = ["Trip_Pickup_DateTime", "Trip_Dropoff_DateTime"]
df_not_null.select(date_col).show(3)

+--------------------+---------------------+
|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|
+--------------------+---------------------+
| 2009-01-03 11:05:27|  2009-01-03 11:10:55|
| 2009-01-23 00:27:00|  2009-01-23 00:35:00|
| 2009-01-18 16:49:45|  2009-01-18 16:56:34|
+--------------------+---------------------+
only showing top 3 rows



In [18]:
# We could see the format is %Y-%M-%D %H:%M:%s
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
date_dict_map = {date_c: F.to_timestamp(F.col(date_c)) for date_c in date_col}
df_time_parsed = df_not_null.withColumns(date_dict_map)

In [19]:
df_time_parsed.printSchema()

root
 |-- vendor_name: string (nullable = true)
 |-- Trip_Pickup_DateTime: timestamp (nullable = true)
 |-- Trip_Dropoff_DateTime: timestamp (nullable = true)
 |-- Passenger_Count: long (nullable = true)
 |-- Trip_Distance: double (nullable = true)
 |-- Payment_Type: string (nullable = true)
 |-- Fare_Amt: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- Tip_Amt: double (nullable = true)
 |-- Tolls_Amt: double (nullable = true)
 |-- Total_Amt: double (nullable = true)



In [20]:
# Adding Duration Columns
df_date_parsed = df_time_parsed.withColumn(
    "duration", 
    F.col("Trip_Dropoff_DateTime").cast("long") - F.col("Trip_Pickup_DateTime").cast("long")
)

In [21]:
df_trans = df_date_parsed\
.withColumn("fare_per_dist", F.col("Trip_Distance")/F.col("Fare_Amt"))\
.withColumn("tamt_per_dist", F.col("Trip_Distance")/F.col("Total_Amt"))

#### Distinct Values

In [22]:
string_cols = [f.name for f in df_date_parsed.schema.fields if isinstance(f.dataType, F.StringType)]
distinct_count = df_trans.select([F.countDistinct(F.col(c)).alias(c) for c in string_cols])
distinct_count_pd = distinct_count.pandas_api().transpose()
dist_cols = distinct_count_pd[distinct_count_pd[0]<50].index.tolist()

                                                                                

In [23]:
df_trans.select([F.countDistinct(F.col(c)).alias(c) for c in dist_cols]).show()

+-----------+------------+
|vendor_name|Payment_Type|
+-----------+------------+
|          3|           6|
+-----------+------------+



In [24]:
# View Distinct Values
for c in dist_cols:
    df_trans.select(c).distinct().show()

+-----------+
|vendor_name|
+-----------+
|        CMT|
|        VTS|
|        DDS|
+-----------+

+------------+
|Payment_Type|
+------------+
|   No Charge|
|        CASH|
|      Credit|
|        Cash|
|     Dispute|
|      CREDIT|
+------------+



In [28]:
# On checking above Data it is seen that Payment_Type have duplicates like CASH an CREDIT with different Case.
# Hence tranform them to make same category
df_transformed = df_trans.withColumn("Payment_Type", F.upper(F.col("Payment_Type")))

#### Duplicates

In [29]:
# Tried distinct method and directly dropDup but both consumed lot of memory, 
# hence applying a tranformation and removing the duplicates which resulted in same result as direct dropDuplicates
# but less mem consumption

# We are concating all cols, and then checking dup based on concated col
df_drop_by_concat = df_transformed\
.withColumn("concat_cols", F.concat_ws("||", *df_date_parsed.columns))\
.dropDuplicates(["concat_cols"])\
.drop("concat_cols")

In [30]:
# Taking count after duplicates
cnt_after_drop = df_drop_by_concat.count()

# No of Duplciates dropped
no_of_row - cnt_after_drop

                                                                                

4

In [31]:
df_cleaned = df_drop_by_concat

### Data Validation

#### Schema Validation

In [32]:
# Defining Schema for Validation
validate_schema = StructType(
    [
        StructField('vendor_name', StringType(), True), 
        StructField('Trip_Pickup_DateTime', TimestampType(), True), 
        StructField('Trip_Dropoff_DateTime', TimestampType(), True), 
        StructField('Passenger_Count', LongType(), True), 
        StructField('Trip_Distance', DoubleType(), True), 
        StructField('Payment_Type', StringType(), True), 
        StructField('Fare_Amt', DoubleType(), True), 
        StructField('surcharge', DoubleType(), True), 
        StructField('Tip_Amt', DoubleType(), True), 
        StructField('Tolls_Amt', DoubleType(), True),
        StructField('Total_Amt', DoubleType(), True),
        StructField('duration', LongType(), True),
        StructField('fare_per_dist', DoubleType(), True), 
        StructField('tamt_per_dist', DoubleType(), True)
    ]
)

In [33]:
# Validate Schema
assert validate_schema == df_cleaned.schema, "schema is not valid"

#### Null Value Validation

In [34]:
# This method cosumes more memory hence commenting the below, It better to do one by one
# is_null_values = df_cleaned.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_cleaned.columns]).collect()[0].asDict()
# [col for col in is_null_values if is_null_values[col] > 0]

In [35]:
# for c in df_cleaned.columns:
# #     cnt = df_cleaned.select(c).where(F.col(c).isNull()).count()
# #     Experimenting below
#     cnt = df_cleaned.where(F.col(c).isNull()).select(c).count()
#     if cnt > 0:
#         print(c, cnt)
# else:
#     print("There No Null columns")



There No Null columns


                                                                                

### Data Analysis

#### Data Aggregations

In [36]:
df_cleaned.printSchema()

root
 |-- vendor_name: string (nullable = true)
 |-- Trip_Pickup_DateTime: timestamp (nullable = true)
 |-- Trip_Dropoff_DateTime: timestamp (nullable = true)
 |-- Passenger_Count: long (nullable = true)
 |-- Trip_Distance: double (nullable = true)
 |-- Payment_Type: string (nullable = true)
 |-- Fare_Amt: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- Tip_Amt: double (nullable = true)
 |-- Tolls_Amt: double (nullable = true)
 |-- Total_Amt: double (nullable = true)
 |-- duration: long (nullable = true)
 |-- fare_per_dist: double (nullable = true)
 |-- tamt_per_dist: double (nullable = true)



In [37]:
# Total and Avergage data per vendor
numeric_col = ["Passenger_Count", "Trip_Distance", "Fare_Amt", "surcharge", "Tip_Amt", "Tolls_Amt", "Total_Amt", "duration"]
df_per_vendor = df_cleaned.select("vendor_name",*numeric_col)\
.groupBy(F.col("vendor_name"))\
.agg(
    F.count("vendor_name").alias("count"),
    *[F.sum(F.col(c)).alias(c+"_sum") for c in numeric_col], 
    *[F.mean(F.col(c)).alias(c+"_avg") for c in numeric_col]
)

In [38]:
# Parsing from config file
conf = configparser.ConfigParser()
conf.read("config")
data_target_path = conf.get("DATA PATH", "DataTarget_PATH")

In [39]:
# Total and Avergage data per hour
numeric_col = ["Passenger_Count", "Trip_Distance", "Fare_Amt", "surcharge", "Tip_Amt", "Tolls_Amt", "Total_Amt", "duration"]
df_per_hour = df_cleaned.select("Trip_Pickup_DateTime", *numeric_col)\
.groupBy(F.hour(F.col("Trip_Pickup_DateTime")).alias("Hour"))\
.agg(
    F.count("*").alias("count"),
    *[F.sum(F.col(c)).alias(c+"_sum") for c in numeric_col], 
    *[F.mean(F.col(c)).alias(c+"_avg") for c in numeric_col]
)

In [40]:
# Total and Avergage data per date
numeric_col = ["Passenger_Count", "Trip_Distance", "Fare_Amt", "surcharge", "Tip_Amt", "Tolls_Amt", "Total_Amt", "duration"]
df_per_date = df_cleaned.select(
    F.to_date("Trip_Pickup_DateTime", "yyyy-MM-dd").alias("date"), 
    *numeric_col
)\
.groupBy("date")\
.agg(
    F.count("*").alias("count"),
    *[F.sum(F.col(c)).alias(c+"_sum") for c in numeric_col], 
    *[F.mean(F.col(c)).alias(c+"_avg") for c in numeric_col]
)

In [41]:
# Total and Avergage data per week
numeric_col = ["Passenger_Count", "Trip_Distance", "Fare_Amt", "surcharge", "Tip_Amt", "Tolls_Amt", "Total_Amt", "duration"]
df_per_week = df_cleaned.select("Trip_Pickup_DateTime", *numeric_col)\
.groupBy(F.weekofyear(F.col("Trip_Pickup_DateTime")).alias("WeekOfYear"))\
.agg(
    F.count("*").alias("count"),
    *[F.sum(F.col(c)).alias(c+"_sum") for c in numeric_col], 
    *[F.mean(F.col(c)).alias(c+"_avg") for c in numeric_col]
)

In [42]:
# Total and Avergage data per month
numeric_col = ["Passenger_Count", "Trip_Distance", "Fare_Amt", "surcharge", "Tip_Amt", "Tolls_Amt", "Total_Amt", "duration"]
df_per_dayofmonth = df_cleaned.select("Trip_Pickup_DateTime", *numeric_col)\
.groupBy(F.dayofmonth(F.col("Trip_Pickup_DateTime")).alias("DayOfMonth"))\
.agg(
    F.count("*").alias("count"),
    *[F.sum(F.col(c)).alias(c+"_sum") for c in numeric_col], 
    *[F.mean(F.col(c)).alias(c+"_avg") for c in numeric_col]
)

In [43]:
# Total and Avergage data per dayOfWeek
numeric_col = ["Passenger_Count", "Trip_Distance", "Fare_Amt", "surcharge", "Tip_Amt", "Tolls_Amt", "Total_Amt", "duration"]
df_per_dayofweek = df_cleaned.select("Trip_Pickup_DateTime", *numeric_col)\
.groupBy(F.dayofweek(F.col("Trip_Pickup_DateTime")).alias("dayofweek"))\
.agg(
    F.count("*").alias("count"),
    *[F.sum(F.col(c)).alias(c+"_sum") for c in numeric_col], 
    *[F.mean(F.col(c)).alias(c+"_avg") for c in numeric_col]
)

In [44]:
numeric_col = ["Passenger_Count", "Trip_Distance", "Fare_Amt", "surcharge", "Tip_Amt", "Tolls_Amt", "Total_Amt", "duration"]
df_per_payment_type = df_cleaned.select("Payment_Type",*numeric_col)\
.groupBy(F.col("Payment_Type"))\
.agg(
    F.count("*").alias("count"),
    *[F.sum(F.col(c)).alias(c+"_sum") for c in numeric_col], 
    *[F.mean(F.col(c)).alias(c+"_avg") for c in numeric_col]
)

In [45]:
df_cnt_payment_cnt_per_vendor = df_cleaned\
.select("vendor_name", "Payment_Type")\
.groupBy("vendor_name", "Payment_Type")\
.count()

### Data Load

The below analzed data is stored in parquet in Target Location for Visualization

In [46]:
df_per_vendor.write.mode('append').parquet(data_target_path+"df_per_vendor")

23/05/16 18:39:33 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [47]:
df_per_hour.write.mode('append').parquet(data_target_path+"df_per_hour")

                                                                                

In [48]:
df_per_date.write.mode('append').parquet(data_target_path+"df_per_date")

                                                                                

In [49]:
df_per_dayofmonth.write.mode('append').parquet(data_target_path+"df_per_dayofweek")

                                                                                

In [50]:
df_per_week.write.mode('append').parquet(data_target_path+"df_per_week")

                                                                                

In [51]:
df_per_dayofweek.write.mode('append').parquet(data_target_path+"df_per_dayofweek")

                                                                                

In [52]:
df_per_payment_type.write.mode('append').parquet(data_target_path+"df_per_payment_type")

                                                                                

In [53]:
df_cnt_payment_cnt_per_vendor.write.mode('append').parquet(data_target_path+"df_cnt_payment_cnt_per_vendor")

                                                                                