In [1]:
# NOTES
# TODO: Join Geo Taxi data

# Setup

## Import Packages

In [2]:
# import packages
import os
import dotenv
from collections import Counter
import pandas as pd
import numpy as np
import geopandas as gpd
import matplotlib.pyplot as plt
import plotly
from pyspark.sql import SparkSession
from pyspark.sql.types import \
    StructType, StructField, StringType, LongType, DoubleType, TimestampType, IntegerType
import pyspark.sql.functions as F
from hdfs import InsecureClient

## Load Environemnt Variables

In [3]:
# load environment variables
dotenv.load_dotenv()

SPARK_CUDF_JAR = os.getenv('SPARK_CUDF_JAR')
SPARK_RAPIDS_PLUGIN_JAR = os.getenv('SPARK_RAPIDS_PLUGIN_JAR')
SPARK_GPU_DISCOVERY_SCRIPT = os.getenv('SPARK_GPU_DISCOVERY_SCRIPT')
namenode_URI = os.getenv('namenode_URI')
hadoop_user = os.getenv('hadoop_user')
hdfs_path = os.getenv('hdfs_path')

## Configure Spark, Connection to HDFS

In [4]:
# configure environment & instantiate sparksession
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('nyc_taxi') \
    .config('spark.plugins', 'com.nvidia.spark.SQLPlugin') \
    .config('spark.jars', f'{SPARK_CUDF_JAR},{SPARK_RAPIDS_PLUGIN_JAR}') \
    .config('spark.rapids.sql.enabled', 'true') \
    .config('spark.rapids.sql.incompatibleOps.enabled', 'true') \
    .config('sparl.executor.resource.gpu.amount', '1') \
    .config('sparl.task.resource.gpu.amount', '0.25') \
    .config('spark.driver.memory', '20g') \
    .config('spark.task.cpus', '4') \
    .config('spark.sql.shuffle.partitions', '20') \
    .config('spark.driver.maxResultSize', '10g') \
    .getOrCreate()


2021-10-28 17:52:05,813 WARN util.Utils: Your hostname, david-Z97X-Gaming-7 resolves to a loopback address: 127.0.1.1; using 192.168.0.159 instead (on interface enp42s0)
2021-10-28 17:52:05,813 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
2021-10-28 17:52:05,991 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2021-10-28 17:52:10,640 WARN rapids.SQLExecPlugin: RAPIDS Accelerator 21.08.0 using cudf 21.08.2. To disable GPU support set `spark.rapids.sql.enabled` to false
2021-10-28 17:52:10,641 WARN udf.Plugin: Installing rapids UDF compiler extensions to Spark. The compiler is disabled by default. To enable it, set `spark.rapids.sql.udfCompiler.enabled` to true


In [5]:
# url for Spark UI for monitoring
spark.sparkContext.uiWebUrl

'http://192.168.0.159:4040'

In [6]:
# list files in hdfs directory, count files in directory
hdfs = InsecureClient(namenode_URI, user=hadoop_user)
fs = hdfs.list(hdfs_path)
print(fs)
print('number of files in dfs directory:', len(fs))

['yellow_taxi_2017-01.parquet', 'yellow_taxi_2017-02.parquet', 'yellow_taxi_2017-03.parquet', 'yellow_taxi_2017-04.parquet', 'yellow_taxi_2017-05.parquet', 'yellow_taxi_2017-06.parquet', 'yellow_taxi_2017-07.parquet', 'yellow_taxi_2017-08.parquet', 'yellow_taxi_2017-09.parquet', 'yellow_taxi_2017-10.parquet', 'yellow_taxi_2017-11.parquet', 'yellow_taxi_2017-12.parquet', 'yellow_taxi_2018-01.parquet', 'yellow_taxi_2018-02.parquet', 'yellow_taxi_2018-03.parquet', 'yellow_taxi_2018-04.parquet', 'yellow_taxi_2018-05.parquet', 'yellow_taxi_2018-06.parquet', 'yellow_taxi_2018-07.parquet', 'yellow_taxi_2018-08.parquet', 'yellow_taxi_2018-09.parquet', 'yellow_taxi_2018-10.parquet', 'yellow_taxi_2018-11.parquet', 'yellow_taxi_2018-12.parquet', 'yellow_taxi_2019-01.parquet', 'yellow_taxi_2019-02.parquet', 'yellow_taxi_2019-03.parquet', 'yellow_taxi_2019-04.parquet', 'yellow_taxi_2019-05.parquet', 'yellow_taxi_2019-06.parquet', 'yellow_taxi_2019-07.parquet', 'yellow_taxi_2019-08.parquet', 'yellow

# Data Ingestion

In [7]:
# Get all column names in data
columns = []
for prqt in hdfs.list(hdfs_path):
    path = os.path.join('hdfs://localhost:9000/', hdfs_path, prqt)
    cols = spark.read \
        .options(header='true', inferschema='true') \
        .parquet(path) \
        .limit(1) \
        .columns
    
    columns.extend(cols)

# see what columns are similar between
counter = Counter(columns)
print(counter)

# lets take all columns that are present within all datasets (48 maximum columns count)
columns = [i[0] for i in counter.items() if i[1] == max([i[1]for i in counter.items()])]


Counter({'VendorID': 48, 'tpep_pickup_datetime': 48, 'tpep_dropoff_datetime': 48, 'passenger_count': 48, 'trip_distance': 48, 'RatecodeID': 48, 'store_and_fwd_flag': 48, 'PULocationID': 48, 'DOLocationID': 48, 'payment_type': 48, 'fare_amount': 48, 'extra': 48, 'mta_tax': 48, 'tip_amount': 48, 'tolls_amount': 48, 'improvement_surcharge': 48, 'total_amount': 48, 'congestion_surcharge': 24})


In [8]:
# set the schema for data
customSchema = StructType([
    StructField('VendorID', LongType(), True),
    StructField('tpep_pickup_datetime', TimestampType(), True),
    StructField('tpep_dropoff_datetime', TimestampType(), True),
    StructField('passenger_count', LongType(), True),
    StructField('trip_distance', DoubleType(), True),
    StructField('RatecodeID', LongType(), True),
    StructField('store_and_fwd_flag', StringType(), True),
    StructField('PULocationID', LongType(), True),
    StructField('DOLocationID', LongType(), True),
    StructField('payment_type', LongType(), True),
    StructField('fare_amount', DoubleType(), True),
    StructField('extra', DoubleType(), True),
    StructField('mta_tax', DoubleType(), True),
    StructField('tip_amount', DoubleType(), True),
    StructField('tolls_amount', DoubleType(), True),
    StructField('improvement_surcharge', DoubleType(), True),
    StructField('total_amount', DoubleType(), True)
])


In [9]:
# read in data
path = os.path.join('hdfs://localhost:9000/', hdfs_path, '*')
df = spark.read \
    .format('parquet') \
    .schema(customSchema) \
    .load(path) \

df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



In [10]:
df.rdd.getNumPartitions()

45

In [11]:
# preview of concatenated dataframe size
print('number of rows: ', round(df.count() / 1e6, 2), 'Million',
      '\nnumber of columns: ', len(df.columns),
      '\ntotal count of data: ', round(df.count() * len(df.columns) / 1e9, 2), 'Billion')

number of rows:  325.35 Million 
number of columns:  17 
total count of data:  5.53 Billion


In [12]:
# load location lookup table
lfp = './data/taxi_location_lookup_table.csv'
lookup_table = pd.read_csv(lfp)

columns = lookup_table.columns
schema = StructType([
    StructField(columns[0], IntegerType(), True),
    StructField(columns[1], StringType(), True),
    StructField(columns[2], StringType(), True),
    StructField(columns[3], StringType(), True),
])
lookup_table = spark.createDataFrame(lookup_table, schema)

In [34]:
# load geographic data (shapefile)
gfp = './data/geo_files/taxi_zones.shp'
gdf = gpd.read_file(gfp)
gdf = gdf[['LocationID', 'Shape_Leng', 'Shape_Area', 'zone', 'geometry']]

# Data Preparation

## Spark

In [14]:
partitions = df.rdd.getNumPartitions()

In [15]:
# check for null values in dataframe
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]) \
    .toPandas() \
    .transpose() \
    .rename(columns={0: 'count_na'})

# seems like there are issues with about 1M rows of the data missing VendorID, passenger_count, RatecodeID, payment_type
# remaining columns have no null values



Unnamed: 0,count_na
VendorID,1056169
tpep_pickup_datetime,0
tpep_dropoff_datetime,0
passenger_count,1056169
trip_distance,0
RatecodeID,1056169
store_and_fwd_flag,0
PULocationID,0
DOLocationID,0
payment_type,1056169


In [16]:
# check the dates of null value rows
print('dates with null values')
df.filter(F.col('VendorID').isNull()) \
    .groupBy(F.year('tpep_pickup_datetime'), F.month('tpep_pickup_datetime')) \
    .count().alias('count_na') \
    .toPandas().sort_values(by=['year(tpep_pickup_datetime)', 'month(tpep_pickup_datetime)'])

dates with null values




Unnamed: 0,year(tpep_pickup_datetime),month(tpep_pickup_datetime),count
3,2019,6,161
16,2019,7,33970
12,2019,8,33235
13,2019,9,34197
8,2019,10,46826
1,2019,11,47311
7,2019,12,51198
0,2020,1,65347
15,2020,2,48760
9,2020,3,37474


In [17]:
# remove rows with null values
df_drop = df.filter(~F.col('VendorID').isNull())

In [18]:
# we should only have data on years 2017 - 2020
print('unique years:', df.select(F.year(df.tpep_pickup_datetime)).distinct().collect())

# check how many rows of data are not within 2017-2020
years = ['2017', '2018', '2019', '2020']
print('rows with wrong years:', df.filter(~F.year(df.tpep_pickup_datetime).isin(years)).count())

# 113.5k relatively small amount of taxi rides compared to 325.3m, we can drop these rows
df = df.filter(F.year(df.tpep_pickup_datetime).isin(years))



unique years: [Row(year(tpep_pickup_datetime)=2009), Row(year(tpep_pickup_datetime)=2019), Row(year(tpep_pickup_datetime)=2058), Row(year(tpep_pickup_datetime)=2090), Row(year(tpep_pickup_datetime)=2008), Row(year(tpep_pickup_datetime)=2001), Row(year(tpep_pickup_datetime)=2017), Row(year(tpep_pickup_datetime)=2016), Row(year(tpep_pickup_datetime)=2084), Row(year(tpep_pickup_datetime)=2031), Row(year(tpep_pickup_datetime)=2053), Row(year(tpep_pickup_datetime)=2002), Row(year(tpep_pickup_datetime)=2088), Row(year(tpep_pickup_datetime)=2066), Row(year(tpep_pickup_datetime)=2003), Row(year(tpep_pickup_datetime)=2042), Row(year(tpep_pickup_datetime)=2029), Row(year(tpep_pickup_datetime)=2033), Row(year(tpep_pickup_datetime)=2041), Row(year(tpep_pickup_datetime)=2018), Row(year(tpep_pickup_datetime)=2026), Row(year(tpep_pickup_datetime)=2038), Row(year(tpep_pickup_datetime)=2037), Row(year(tpep_pickup_datetime)=2021), Row(year(tpep_pickup_datetime)=2000), Row(year(tpep_pickup_datetime)=2015



rows with wrong years: 113510




In [19]:
# check the months within the data
df.select(F.month(df.tpep_pickup_datetime)).distinct().collect()



[Row(month(tpep_pickup_datetime)=10),
 Row(month(tpep_pickup_datetime)=4),
 Row(month(tpep_pickup_datetime)=1),
 Row(month(tpep_pickup_datetime)=8),
 Row(month(tpep_pickup_datetime)=11),
 Row(month(tpep_pickup_datetime)=12),
 Row(month(tpep_pickup_datetime)=5),
 Row(month(tpep_pickup_datetime)=7),
 Row(month(tpep_pickup_datetime)=9),
 Row(month(tpep_pickup_datetime)=6),
 Row(month(tpep_pickup_datetime)=3),
 Row(month(tpep_pickup_datetime)=2)]

In [20]:
# check if the years are correct like the pickup datetime
print('unique years:', df.select(F.year(df.tpep_dropoff_datetime)).distinct().collect())

# there are extra rows with improper years, check the amount of bad data
print('rows with wrong years:', df.filter(~F.year(df.tpep_dropoff_datetime).isin(years)).count())

# only 75 rows, we can drop them
df = df.filter(F.year(df.tpep_dropoff_datetime).isin(years))



unique years: [Row(year(tpep_dropoff_datetime)=1926), Row(year(tpep_dropoff_datetime)=2019), Row(year(tpep_dropoff_datetime)=2016), Row(year(tpep_dropoff_datetime)=2017), Row(year(tpep_dropoff_datetime)=1998), Row(year(tpep_dropoff_datetime)=1997), Row(year(tpep_dropoff_datetime)=2018), Row(year(tpep_dropoff_datetime)=2021), Row(year(tpep_dropoff_datetime)=2020)]




rows with wrong years: 75




In [21]:
# check the months within the dates
df.select(F.month(df.tpep_dropoff_datetime)).distinct().collect()



[Row(month(tpep_dropoff_datetime)=4),
 Row(month(tpep_dropoff_datetime)=10),
 Row(month(tpep_dropoff_datetime)=1),
 Row(month(tpep_dropoff_datetime)=11),
 Row(month(tpep_dropoff_datetime)=8),
 Row(month(tpep_dropoff_datetime)=12),
 Row(month(tpep_dropoff_datetime)=5),
 Row(month(tpep_dropoff_datetime)=7),
 Row(month(tpep_dropoff_datetime)=6),
 Row(month(tpep_dropoff_datetime)=9),
 Row(month(tpep_dropoff_datetime)=3),
 Row(month(tpep_dropoff_datetime)=2)]

In [22]:
# get attributes of dates
df = df.withColumn('pickup_year', F.year('tpep_pickup_datetime')) \
       .withColumn('pickup_month', F.month('tpep_pickup_datetime')) \
       .withColumn('pickup_week', F.weekofyear('tpep_pickup_datetime')) \
       .withColumn('ride_duration', (F.unix_timestamp('tpep_dropoff_datetime') - F.unix_timestamp('tpep_pickup_datetime')) / 60)

In [25]:
# get total fares and count of rides by pick up drop off and time location pairs
df = df.groupBy('PULocationID', 'DOLocationID', 'pickup_year', 'pickup_month', 'pickup_week') \
                        .agg(F.sum('fare_amount').alias('total_fare'),
                             F.count('fare_amount').alias('num_rides'),
                             F.mean('ride_duration').alias('avg_duration')) \
                         .toPandas()



## Joining Geographic Data

In [35]:
# Join Pickup and Drop off geodata
taxi_df = df.merge() 

gdf = gdf.merge(df, left_on='LocationID', right_on='PULocationID').drop('LocationID')
gdf = gdf.rename({'Shape_Leng':'pu_Shape_Leng',
                    'Shape_Area':'pu_Shape_Area',
                    'zone':'pu_zone',
                    'geometry':'pu_geometry'})

gdf = gdf.merge(df, left_on='LocationID', right_on='PULocationID')
gdf = gdf.rename({'Shape_Leng':'do_Shape_Leng',
                    'Shape_Area':'do_Shape_Area',
                    'zone':'do_zone',
                    'geometry':'do_geometry'})


Unnamed: 0,LocationID,Shape_Leng,Shape_Area,zone,geometry,PULocationID,DOLocationID,pickup_year,pickup_month,pickup_week,total_fare,num_rides,avg_duration
0,1,0.116357,0.000782,Newark Airport,"POLYGON ((933100.918 192536.086, 933091.011 19...",1,265,2017,2,5,4.5,1,1.366667
1,1,0.116357,0.000782,Newark Airport,"POLYGON ((933100.918 192536.086, 933091.011 19...",1,132,2017,3,9,113.5,1,87.55
2,1,0.116357,0.000782,Newark Airport,"POLYGON ((933100.918 192536.086, 933091.011 19...",1,132,2017,3,10,92.0,1,53.833333
3,1,0.116357,0.000782,Newark Airport,"POLYGON ((933100.918 192536.086, 933091.011 19...",1,161,2017,5,18,67.5,1,34.066667
4,1,0.116357,0.000782,Newark Airport,"POLYGON ((933100.918 192536.086, 933091.011 19...",1,50,2017,6,22,106.0,1,88.033333


# APPENDIX

```python
# RDD implementation of fares_by_locations (GPU not utilized)
fares_by_locations = df.select(['PULocationID', 'DOLocationID', 'fare_amount']).rdd \
                         .map(lambda x: ((x[0], x[1]), x[2])) \
                         .reduceByKey(lambda x, y: x + y) \
                         .toDF().toPandas()
time: 10m 53s
```

```python
# user defined function for finding difference between two timestamps in minutes
# best practices recommend staying away from using UDFs

time_difference_udf = F.udf(lambda pickup, dropoff: (F.unix_timestamp(dropoff) - F.unix_timestamp(pickup)) / 60, DoubleType())
```