In [1]:
import sys
print(sys.version)

3.11.3 (main, Apr  7 2023, 19:29:16) [Clang 14.0.0 (clang-1400.0.29.202)]


In [3]:
import pyspark
from pyspark.sql import SparkSession 
from pyspark.sql import types

import pandas as pd
from fastparquet import write, ParquetFile

## Download and Prepare the Data  
You probably want to uncomment some lines in order to execute code :)

**File size:**  
.parquet ~309 MB  
.csv ~2 GB

In [None]:
# Download Parquet Data

# !wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-01.parquet

In [None]:
# Convert parquet to csv (for tutorial purpose)
# I had to do this, because NYC site with data removed .csv file :(

# df_pandas = pd.read_parquet('fhvhv_tripdata_2021-01.parquet')
# df_pandas.to_csv('fhvhv_tripdata_2021-01.csv')

In [None]:
# Remove first column of csv file using pandas (it's just an index without header-title):

# df_cut = pd.read_csv('fhvhv_tripdata_2021-01.csv')
# df_cut.head()

In [None]:
# Delete first and rewrite the .csv file

# df_cut = df_cut.drop(['Unnamed: 0'], axis=1)
#df_cut.to_csv('fhvhv_tripdata_2021-01.csv', index=False)

## Start Spark Session and Read Data

In [4]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Watson") \
    .getOrCreate()

23/05/04 21:07:35 WARN Utils: Your hostname, Nikitas-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.119 instead (on interface en0)
23/05/04 21:07:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/04 21:07:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# Read Parquet file
df_from_parquet = spark.read \
    .option("header", "true") \
    .parquet('fhvhv_tripdata_2021-01.parquet')

In [6]:
# Read CSV file
df_from_csv = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-01.csv')

In [7]:
# Show Data
df_from_csv.show()

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

In [8]:
# Return first five elements
df_from_csv.head(5)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', originating_base_num='B02682', request_datetime='2021-01-01 00:28:09', on_scene_datetime='2021-01-01 00:31:42', pickup_datetime='2021-01-01 00:33:44', dropoff_datetime='2021-01-01 00:49:07', PULocationID='230', DOLocationID='166', trip_miles='5.26', trip_time='923', base_passenger_fare='22.28', tolls='0.0', bcf='0.67', sales_tax='1.98', congestion_surcharge='2.75', airport_fee=None, tips='0.0', driver_pay='14.99', shared_request_flag='N', shared_match_flag='N', access_a_ride_flag=' ', wav_request_flag='N', wav_match_flag='N'),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', originating_base_num='B02682', request_datetime='2021-01-01 00:45:56', on_scene_datetime='2021-01-01 00:55:19', pickup_datetime='2021-01-01 00:55:19', dropoff_datetime='2021-01-01 01:18:21', PULocationID='152', DOLocationID='167', trip_miles='3.65', trip_time='1382', base_passenger_fare='18.36', tolls='0.0', bcf='0.55', sales_tax='1.63',

In [9]:
# Here we can see that all the types from .csv are StringType
df_from_csv.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('originating_base_num', StringType(), True), StructField('request_datetime', StringType(), True), StructField('on_scene_datetime', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('trip_miles', StringType(), True), StructField('trip_time', StringType(), True), StructField('base_passenger_fare', StringType(), True), StructField('tolls', StringType(), True), StructField('bcf', StringType(), True), StructField('sales_tax', StringType(), True), StructField('congestion_surcharge', StringType(), True), StructField('airport_fee', StringType(), True), StructField('tips', StringType(), True), StructField('driver_pay', StringType(), True), StructField('shared_request_flag', Strin

## Creating a schema with Pandas

In [10]:
# Let's use only 100 rows of data from .csv to fix the schema using pandas
# Save 101 row to a new .csv file (1 is for header)
!head -n 101 fhvhv_tripdata_2021-01.csv > head.csv

In [11]:
# Show number of rows
!wc -l head.csv

     101 head.csv


In [12]:
# Make a pandas dataframe from the new .csv file
df_pandas = pd.read_csv('head.csv')

In [13]:
# Show data types of the attributes
df_pandas.dtypes

hvfhs_license_num        object
dispatching_base_num     object
originating_base_num     object
request_datetime         object
on_scene_datetime        object
pickup_datetime          object
dropoff_datetime         object
PULocationID              int64
DOLocationID              int64
trip_miles              float64
trip_time                 int64
base_passenger_fare     float64
tolls                   float64
bcf                     float64
sales_tax               float64
congestion_surcharge    float64
airport_fee             float64
tips                    float64
driver_pay              float64
shared_request_flag      object
shared_match_flag        object
access_a_ride_flag       object
wav_request_flag         object
wav_match_flag           object
dtype: object

In [14]:
# Create a spark dataframe from pandas and show the schema
# We can see that the types were fixed
spark.createDataFrame(df_pandas).schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('originating_base_num', StringType(), True), StructField('request_datetime', StringType(), True), StructField('on_scene_datetime', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('trip_miles', DoubleType(), True), StructField('trip_time', LongType(), True), StructField('base_passenger_fare', DoubleType(), True), StructField('tolls', DoubleType(), True), StructField('bcf', DoubleType(), True), StructField('sales_tax', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('airport_fee', DoubleType(), True), StructField('tips', DoubleType(), True), StructField('driver_pay', DoubleType(), True), StructField('shared_request_flag', StringType(

In [15]:
# Let's change LongType to IntegerType and create our custom schema
schema = types.StructType([ 
    types.StructField('hvfhs_license_num', types.StringType(), True), 
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('originating_base_num', types.StringType(), True), 
    types.StructField('request_datetime', types.StringType(), True),
    types.StructField('on_scene_datetime', types.StringType(), True),
    types.StructField('pickup_datetime', types.StringType(), True), 
    types.StructField('dropoff_datetime', types.StringType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('trip_miles', types.DoubleType(), True), 
    types.StructField('trip_time', types.IntegerType(), True), 
    types.StructField('base_passenger_fare', types.DoubleType(), True), 
    types.StructField('tolls', types.DoubleType(), True), 
    types.StructField('bcf', types.DoubleType(), True), 
    types.StructField('sales_tax', types.DoubleType(), True), 
    types.StructField('congestion_surcharge', types.DoubleType(), True), 
    types.StructField('airport_fee', types.DoubleType(), True), 
    types.StructField('tips', types.DoubleType(), True), 
    types.StructField('driver_pay', types.DoubleType(), True), 
    types.StructField('shared_request_flag', types.StringType(), True), 
    types.StructField('shared_match_flag', types.StringType(), True), 
    types.StructField('access_a_ride_flag', types.StringType(), True), 
    types.StructField('wav_request_flag', types.StringType(), True), 
    types.StructField('wav_match_flag', types.StringType(), True)
])

In [16]:
# Create spark df with our custom schema, and load the full .csv dataset
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-01.csv')

In [17]:
df.head(10)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', originating_base_num='B02682', request_datetime='2021-01-01 00:28:09', on_scene_datetime='2021-01-01 00:31:42', pickup_datetime='2021-01-01 00:33:44', dropoff_datetime='2021-01-01 00:49:07', PULocationID=230, DOLocationID=166, trip_miles=5.26, trip_time=923, base_passenger_fare=22.28, tolls=0.0, bcf=0.67, sales_tax=1.98, congestion_surcharge=2.75, airport_fee=None, tips=0.0, driver_pay=14.99, shared_request_flag='N', shared_match_flag='N', access_a_ride_flag=' ', wav_request_flag='N', wav_match_flag='N'),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', originating_base_num='B02682', request_datetime='2021-01-01 00:45:56', on_scene_datetime='2021-01-01 00:55:19', pickup_datetime='2021-01-01 00:55:19', dropoff_datetime='2021-01-01 01:18:21', PULocationID=152, DOLocationID=167, trip_miles=3.65, trip_time=1382, base_passenger_fare=18.36, tolls=0.0, bcf=0.55, sales_tax=1.63, congestion_surcharge=0.0, airport_fee

23/05/04 21:07:53 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


## Partitions and saving to parquet

In [18]:
# Create partitions (Lazy command)
df = df.repartition(24)

In [19]:
# We cans et mode='overwrite' if we want to overwrite.
df.write.parquet('fhvhv/2021/01/', mode='overwrite')

23/05/04 21:08:21 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/05/04 21:08:27 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/05/04 21:08:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/05/04 21:08:33 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/05/04 21:08:33 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

## Spark DataFrames

In [20]:
# Reading parquet files
df = spark.read.parquet('fhvhv/2021/01/')

In [21]:
df

DataFrame[hvfhs_license_num: string, dispatching_base_num: string, originating_base_num: string, request_datetime: string, on_scene_datetime: string, pickup_datetime: string, dropoff_datetime: string, PULocationID: int, DOLocationID: int, trip_miles: double, trip_time: int, base_passenger_fare: double, tolls: double, bcf: double, sales_tax: double, congestion_surcharge: double, airport_fee: double, tips: double, driver_pay: double, shared_request_flag: string, shared_match_flag: string, access_a_ride_flag: string, wav_request_flag: string, wav_match_flag: string]

In [22]:
# Print schema, but in a nice way :)
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: string (nullable = true)
 |-- on_scene_datetime: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: integer (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_flag: string (nullab

### Transofrmations (Lazy):
Spark creates a sequence of transformations.
* select
* filter
* join
* group by

### Action (executed immediately):
When we do an action, the sequence of transformations is executed.
* show
* take
* head
* write

SELECT * FROM df WHERE hvfhs_license_num = 'HV0003'

In [23]:
from pyspark.sql import functions as F

In [24]:
# withColumn - transoformation, adds a new column
# if we give the name of existing column - it overwrites it!
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show(5)

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2021-01-02|  2021-01-02|         164|         255|
| 2021-01-02|  2021-01-02|         167|          60|
| 2021-01-01|  2021-01-01|         125|          25|
| 2021-01-02|  2021-01-02|          75|         213|
| 2021-01-01|  2021-01-01|          49|          49|
+-----------+------------+------------+------------+
only showing top 5 rows



In [25]:
# Select a few columns, filtering.
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
    .filter(df.hvfhs_license_num == 'HV0003') \
    .head(5)

[Row(pickup_datetime='2021-01-02 13:23:22', dropoff_datetime='2021-01-02 13:28:21', PULocationID=167, DOLocationID=60),
 Row(pickup_datetime='2021-01-01 06:03:49', dropoff_datetime='2021-01-01 06:20:40', PULocationID=125, DOLocationID=25),
 Row(pickup_datetime='2021-01-02 21:18:36', dropoff_datetime='2021-01-02 21:36:20', PULocationID=75, DOLocationID=213),
 Row(pickup_datetime='2021-01-01 15:38:34', dropoff_datetime='2021-01-01 15:43:07', PULocationID=49, DOLocationID=49),
 Row(pickup_datetime='2021-01-02 08:54:18', dropoff_datetime='2021-01-02 09:16:41', PULocationID=235, DOLocationID=141)]

## User Defined Functions

In [26]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f'e/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

In [27]:
# check our new function
crazy_stuff('B02884')

'e/b44'

In [28]:
# Set our UDF
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [29]:
# Use our UDF
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show(10)

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/9ce| 2021-01-02|  2021-01-02|         164|         255|
|  e/b48| 2021-01-02|  2021-01-02|         167|          60|
|  e/b3b| 2021-01-01|  2021-01-01|         125|          25|
|  e/a39| 2021-01-02|  2021-01-02|          75|         213|
|  e/acc| 2021-01-01|  2021-01-01|          49|          49|
|  e/b3b| 2021-01-02|  2021-01-02|         235|         141|
|  e/b38| 2021-01-01|  2021-01-01|         141|         151|
|  e/acc| 2021-01-03|  2021-01-03|         126|         248|
|  e/b32| 2021-01-01|  2021-01-01|          76|          61|
|  e/b35| 2021-01-02|  2021-01-02|          95|          28|
+-------+-----------+------------+------------+------------+
only showing top 10 rows



[Stage 11:>                                                         (0 + 1) / 1]                                                                                

## Spark SQL

In [42]:
df_green = spark.read.parquet('data/pq/green/*/*')

In [43]:
df_green.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [44]:
df_yellow = spark.read.parquet('data/pq/yellow/*/*')

In [45]:
df_yellow.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [47]:
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

In [48]:
df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [51]:
# To save the same order in common columns
common_columns = []

yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_columns.append(col)

In [52]:
common_columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

In [49]:
# Find the intersection of columns
set(df_green.columns) & set(df_yellow.columns)

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'dropoff_datetime',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'pickup_datetime',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

In [59]:
df_green_sel = df_green \
    .select(common_columns) \
    .withColumn('service_type', F.lit('green'))

In [60]:
df_yellow_sel = df_yellow \
    .select(common_columns) \
    .withColumn('service_type', F.lit('yellow'))

In [61]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [63]:
df_trips_data.groupBy('service_type').count().show()



+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2304517|
|      yellow|24648499|
+------------+--------+





In [66]:
# Register TempTable in order to use in spark.sql
df_trips_data.registerTempTable('trips_data')



In [69]:
spark.sql('''
SELECT service_type, count(*)
FROM trips_data
GROUP BY service_type
''').show(5)



+------------+--------+
|service_type|count(1)|
+------------+--------+
|       green| 2304517|
|      yellow|24648499|
+------------+--------+





In [70]:
df_result = spark.sql("""
SELECT 
    -- Reveneue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 
    service_type, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    trips_data
GROUP BY
    1, 2, 3
""")

In [None]:
# coalesce - to reduce the number of partitions
df_result.coalesce(1).write.parquet('data/report/revenue/', mode = 'overwrite')



In [None]:
spark.stop()