In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('test') \
                    .getOrCreate()

In [3]:
df = spark.read.option("header","true").csv('taxi+_zone_lookup.csv')

In [4]:
df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [5]:
df = spark.read.parquet('fhvhv_tripdata_2021-01.parquet')

In [6]:
df.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('originating_base_num', StringType(), True), StructField('request_datetime', TimestampNTZType(), True), StructField('on_scene_datetime', TimestampNTZType(), True), StructField('pickup_datetime', TimestampNTZType(), True), StructField('dropoff_datetime', TimestampNTZType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('trip_miles', DoubleType(), True), StructField('trip_time', LongType(), True), StructField('base_passenger_fare', DoubleType(), True), StructField('tolls', DoubleType(), True), StructField('bcf', DoubleType(), True), StructField('sales_tax', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('airport_fee', DoubleType(), True), StructField('tips', DoubleType(), True), StructField('driver_pay', DoubleType(), True), StructField('shared_re

In [7]:
from pyspark.sql import types

In [8]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('originating_base_num', types.StringType(), True),
    types.StructField('request_datetime', types.TimestampNTZType(), True),
    types.StructField('on_scene_datetime', types.TimestampNTZType(), True),
    types.StructField('pickup_datetime', types.TimestampNTZType(), True),
    types.StructField('dropoff_datetime', types.TimestampNTZType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('trip_miles', types.DoubleType(), True),
    types.StructField('trip_time', types.LongType(), True),
    types.StructField('base_passenger_fare', types.DoubleType(), True),
    types.StructField('tolls', types.DoubleType(), True),
    types.StructField('bcf', types.DoubleType(), True),
    types.StructField('sales_tax', types.DoubleType(), True),
    types.StructField('congestion_surcharge', types.DoubleType(), True),
    types.StructField('airport_fee', types.DoubleType(), True),
    types.StructField('tips', types.DoubleType(), True),
    types.StructField('driver_pay', types.DoubleType(), True),
    types.StructField('shared_request_flag', types.StringType(), True),
    types.StructField('shared_match_flag', types.StringType(), True),
    types.StructField('access_a_ride_flag', types.StringType(), True),
    types.StructField('wav_request_flag', types.StringType(), True),
    types.StructField('wav_match_flag', types.StringType(), True)
])

In [9]:
spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")
spark.conf.set("spark.sql.parquet.writeLegacyFormat","true")

In [11]:
df = spark.read.schema(schema).parquet('fhvhv_tripdata_2021-01.parquet')

In [39]:
df.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('originating_base_num', StringType(), True), StructField('request_datetime', TimestampNTZType(), True), StructField('on_scene_datetime', TimestampNTZType(), True), StructField('pickup_datetime', TimestampNTZType(), True), StructField('dropoff_datetime', TimestampNTZType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('trip_miles', DoubleType(), True), StructField('trip_time', LongType(), True), StructField('base_passenger_fare', DoubleType(), True), StructField('tolls', DoubleType(), True), StructField('bcf', DoubleType(), True), StructField('sales_tax', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('airport_fee', DoubleType(), True), StructField('tips', DoubleType(), True), StructField('driver_pay', DoubleType(), True), StructField('sha

In [10]:
df = df.repartition(24)

In [16]:
df.write.parquet('fhvhv/2021/01/')

In [11]:
df = spark.read.parquet('fhvhv/2021/01/')

In [12]:
df

DataFrame[hvfhs_license_num: string, dispatching_base_num: string, originating_base_num: string, request_datetime: timestamp_ntz, on_scene_datetime: timestamp_ntz, pickup_datetime: timestamp_ntz, dropoff_datetime: timestamp_ntz, PULocationID: bigint, DOLocationID: bigint, trip_miles: double, trip_time: bigint, base_passenger_fare: double, tolls: double, bcf: double, sales_tax: double, congestion_surcharge: double, airport_fee: double, tips: double, driver_pay: double, shared_request_flag: string, shared_match_flag: string, access_a_ride_flag: string, wav_request_flag: string, wav_match_flag: string]

In [13]:
df.show()

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

In [14]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp_ntz (nullable = true)
 |-- on_scene_datetime: timestamp_ntz (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_f

In [15]:
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID','DOLocationID','hvfhs_license_num').filter(df.hvfhs_license_num == 'HV0003').show()

+-------------------+-------------------+------------+------------+-----------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|hvfhs_license_num|
+-------------------+-------------------+------------+------------+-----------------+
|2021-01-05 12:42:09|2021-01-05 13:04:44|          22|         261|           HV0003|
|2021-01-02 14:16:06|2021-01-02 14:22:35|         189|         181|           HV0003|
|2021-01-06 08:26:45|2021-01-06 08:53:00|          56|          75|           HV0003|
|2021-01-28 01:07:33|2021-01-28 01:17:39|         169|          69|           HV0003|
|2021-01-17 09:00:11|2021-01-17 09:13:05|         169|          69|           HV0003|
|2021-01-14 22:58:21|2021-01-14 23:12:56|         164|         145|           HV0003|
|2021-01-12 14:34:31|2021-01-12 15:01:36|         169|         250|           HV0003|
|2021-01-06 14:26:33|2021-01-06 14:44:08|          74|         152|           HV0003|
|2021-01-22 03:33:31|2021-01-22 03:42:22|          37|

In [16]:
from pyspark.sql import functions as F

In [23]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    else:
        return f'e/{num:03x}'

In [30]:
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [32]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('pickup_date', 'dropoff_date', 'PULocationID','DOLocationID','hvfhs_license_num', 'base_id') \
    .show()

+-----------+------------+------------+------------+-----------------+-------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|hvfhs_license_num|base_id|
+-----------+------------+------------+------------+-----------------+-------+
| 2021-01-05|  2021-01-05|          22|         261|           HV0003|  e/acc|
| 2021-01-02|  2021-01-02|         189|         181|           HV0003|  e/b33|
| 2021-01-06|  2021-01-06|          56|          75|           HV0003|  s/acd|
| 2021-01-28|  2021-01-28|         169|          69|           HV0003|  s/acd|
| 2021-01-17|  2021-01-17|         169|          69|           HV0003|  e/b3b|
| 2021-01-29|  2021-01-29|         198|          37|           HV0005|  e/9ce|
| 2021-01-14|  2021-01-14|         164|         145|           HV0003|  e/acc|
| 2021-01-10|  2021-01-10|          58|         213|           HV0005|  e/9ce|
| 2021-01-21|  2021-01-21|          51|         174|           HV0005|  e/9ce|
| 2021-01-12|  2021-01-12|         169|         250|