In [3]:
import pyspark
from pyspark.sql import SparkSession

In [4]:
spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("batch")
    .config("spark.executor.memory", "4g")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

In [5]:
df=spark.read.option('header','true').parquet('fhvhv_tripdata_2025-01.parquet')

                                                                                

In [6]:
df.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('originating_base_num', StringType(), True), StructField('request_datetime', TimestampNTZType(), True), StructField('on_scene_datetime', TimestampNTZType(), True), StructField('pickup_datetime', TimestampNTZType(), True), StructField('dropoff_datetime', TimestampNTZType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('trip_miles', DoubleType(), True), StructField('trip_time', LongType(), True), StructField('base_passenger_fare', DoubleType(), True), StructField('tolls', DoubleType(), True), StructField('bcf', DoubleType(), True), StructField('sales_tax', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('airport_fee', DoubleType(), True), StructField('tips', DoubleType(), True), StructField('driver_pay', DoubleType(), True), StructField('sha

In [7]:
import pandas as pd

In [8]:
df = df.repartition(24)

In [10]:
df.write.parquet('fhvhv/2025/01/','overwrite')

                                                                                

In [11]:
df.rdd.getNumPartitions()




24

In [12]:
df = spark.read.parquet('fhvhv/2025/01/')

                                                                                

In [13]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp_ntz (nullable = true)
 |-- on_scene_datetime: timestamp_ntz (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_

In [15]:
df.head()

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+-----+----------+-------------------+-----------------+------------------+----------------+--------------+------------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee| tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|cbd_congestion_fee|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+-----

In [30]:
df.select('pickup_datetime','dropoff_datetime','PULocationID','DOLocationID').filter(df.hvfhs_license_num=='HV0003').count()

15356455

In [39]:
from pyspark.sql import functions as F
from pyspark.sql import types

In [43]:
df.withColumn('pickup_date', F.to_date(df.pickup_datetime)).withColumn('dropoff_date', F.to_date(df.dropoff_datetime)).withColumn('base_id',crazy_stuff_udf('dispatching_base_num')).select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID').show()

[Stage 36:>                                                         (0 + 1) / 1]

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/d4c| 2025-01-02|  2025-01-02|         167|         168|
|  e/d4e| 2025-01-02|  2025-01-02|         219|         130|
|  e/d4e| 2025-01-01|  2025-01-01|          36|         162|
|  e/d4e| 2025-01-02|  2025-01-02|         212|          87|
|  e/d4e| 2025-01-01|  2025-01-01|         134|         138|
|  e/d4c| 2025-01-02|  2025-01-02|         159|         168|
|  e/d4e| 2025-01-01|  2025-01-01|         236|          88|
|  e/d4c| 2025-01-01|  2025-01-01|         158|         232|
|  e/d4c| 2025-01-01|  2025-01-01|         213|         119|
|  e/d4c| 2025-01-01|  2025-01-01|          79|         256|
|  e/d4c| 2025-01-02|  2025-01-02|         158|         234|
|  e/d4e| 2025-01-01|  2025-01-01|         118|         187|
|  e/d4c| 2025-01-01|  2025-01-01|         160|          82|
|  e/d4c| 2025-01-02|  2

                                                                                

In [40]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())