In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [3]:
# !wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-01.parquet

--2022-06-15 04:06:25--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-01.parquet
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.84.220
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.84.220|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 308924937 (295M) [application/x-www-form-urlencoded]
Saving to: ‘fhvhv_tripdata_2021-01.parquet.1’


2022-06-15 04:08:39 (2.22 MB/s) - ‘fhvhv_tripdata_2021-01.parquet.1’ saved [308924937/308924937]



In [4]:
!wc -l fhvhv_tripdata_2021-01.parquet

 1006794 fhvhv_tripdata_2021-01.parquet


In [5]:
df = spark.read \
    .option("header", "true") \
    .parquet('fhvhv_tripdata_2021-01.parquet')

In [6]:
df.schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(originating_base_num,StringType,true),StructField(request_datetime,TimestampType,true),StructField(on_scene_datetime,TimestampType,true),StructField(pickup_datetime,TimestampType,true),StructField(dropoff_datetime,TimestampType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(trip_miles,DoubleType,true),StructField(trip_time,LongType,true),StructField(base_passenger_fare,DoubleType,true),StructField(tolls,DoubleType,true),StructField(bcf,DoubleType,true),StructField(sales_tax,DoubleType,true),StructField(congestion_surcharge,DoubleType,true),StructField(airport_fee,DoubleType,true),StructField(tips,DoubleType,true),StructField(driver_pay,DoubleType,true),StructField(shared_request_flag,StringType,true),StructField(shared_match_flag,StringType,true),StructField(access_a_ride_flag,StringType,true),StructField(wav_req

In [7]:
# !head -n 1001 fhvhv_tripdata_2021-01.parquet > head.csv

In [8]:
import pandas as pd
import pyarrow.parquet as pq

In [9]:
# df_pandas = pd.read_parquet('head.parquet')
# Read data in chunks of size 100_000 and create a dataframe iterator
filename = "fhvhv_tripdata_2021-01.parquet"
pfile = pq.ParquetFile(filename)
table_iter = pfile.iter_batches(batch_size=1001)
table = next(table_iter)
df_pandas = table.to_pandas()

In [10]:
df_pandas.dtypes

hvfhs_license_num               object
dispatching_base_num            object
originating_base_num            object
request_datetime        datetime64[ns]
on_scene_datetime       datetime64[ns]
pickup_datetime         datetime64[ns]
dropoff_datetime        datetime64[ns]
PULocationID                     int64
DOLocationID                     int64
trip_miles                     float64
trip_time                        int64
base_passenger_fare            float64
tolls                          float64
bcf                            float64
sales_tax                      float64
congestion_surcharge           float64
airport_fee                    float64
tips                           float64
driver_pay                     float64
shared_request_flag             object
shared_match_flag               object
access_a_ride_flag              object
wav_request_flag                object
wav_match_flag                  object
dtype: object

In [11]:
spark.createDataFrame(df_pandas).schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(originating_base_num,StringType,true),StructField(request_datetime,TimestampType,true),StructField(on_scene_datetime,TimestampType,true),StructField(pickup_datetime,TimestampType,true),StructField(dropoff_datetime,TimestampType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(trip_miles,DoubleType,true),StructField(trip_time,LongType,true),StructField(base_passenger_fare,DoubleType,true),StructField(tolls,DoubleType,true),StructField(bcf,DoubleType,true),StructField(sales_tax,DoubleType,true),StructField(congestion_surcharge,DoubleType,true),StructField(airport_fee,DoubleType,true),StructField(tips,DoubleType,true),StructField(driver_pay,DoubleType,true),StructField(shared_request_flag,StringType,true),StructField(shared_match_flag,StringType,true),StructField(access_a_ride_flag,StringType,true),StructField(wav_req

Integer - 4 bytes
Long - 8 bytes

In [12]:
from pyspark.sql import types

In [13]:
schema = types.StructType([
        types.StructField("hvfhs_license_num", types.StringType(), True),
        types.StructField("dispatching_base_num", types.StringType(), True),
        types.StructField("originating_base_num", types.StringType(), True),
        types.StructField("request_datetime", types.TimestampType(), True),
        types.StructField("on_scene_datetime", types.TimestampType(), True),
        types.StructField("pickup_datetime", types.TimestampType(), True),
        types.StructField("dropoff_datetime", types.TimestampType(), True),
        types.StructField("PULocationID", types.IntegerType(), True),
        types.StructField("DOLocationID", types.IntegerType(), True),
        types.StructField("trip_miles", types.FloatType(), True),
        types.StructField("trip_time", types.IntegerType(), True),
        types.StructField("base_passenger_fare", types.FloatType(), True),
        types.StructField("tolls", types.FloatType(), True),
        types.StructField("bcf", types.FloatType(), True),
        types.StructField("sales_tax", types.FloatType(), True),
        types.StructField("congestion_surcharge", types.FloatType(), True),
        types.StructField("airport_fee", types.FloatType(), True),
        types.StructField("tips", types.FloatType(), True),
        types.StructField("driver_pay", types.FloatType(), True),
        types.StructField("shared_request_flag", types.StringType(), True),
        types.StructField("shared_match_flag", types.StringType(), True),
        types.StructField("access_a_ride_flag", types.StringType(), True),
        types.StructField("wav_request_flag", types.StringType(), True),
        types.StructField("wav_match_flag", types.StringType(), True),
])

# df = spark.read \
#     .option("header", "true") \
#     .schema(schema) \
#     .csv('fhvhv_tripdata_2021-01.csv')

In [14]:
df = spark.read \
    .option("header", "true") \
    .parquet('fhvhv_tripdata_2021-01.parquet')

In [15]:
df = df.repartition(24)

In [17]:
df.write.parquet('fhvhv/2021/01/')

In [18]:
df = spark.read.parquet('fhvhv/2021/01/')

In [19]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_flag: string (nul

SELECT * FROM df WHERE hvfhs_license_num =  HV0003

In [20]:
from pyspark.sql import functions as F

In [21]:
df.show()

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

In [22]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

In [23]:
crazy_stuff('B02884')

's/b44'

In [24]:
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [25]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/acc| 2021-01-11|  2021-01-11|         262|         231|
|  e/a39| 2021-01-05|  2021-01-05|          61|         181|
|  e/9ce| 2021-01-02|  2021-01-02|         100|           1|
|  e/b42| 2021-01-31|  2021-01-31|         232|           4|
|  s/af0| 2021-01-05|  2021-01-05|         162|           1|
|  a/b43| 2021-01-28|  2021-01-28|          68|          68|
|  e/9ce| 2021-01-18|  2021-01-18|         205|         205|
|  e/b35| 2021-01-30|  2021-01-30|         256|         255|
|  e/b3b| 2021-01-16|  2021-01-16|          89|          91|
|  e/9ce| 2021-01-05|  2021-01-05|         132|         102|
|  e/acc| 2021-01-11|  2021-01-11|          97|          61|
|  e/9ce| 2021-01-22|  2021-01-22|          79|          37|
|  e/b32| 2021-01-03|  2021-01-03|          26|         178|
|  a/b49| 2021-01-14|  2

In [28]:
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
  .filter(df.hvfhs_license_num == 'HV0003').head(10)


[Row(pickup_datetime=datetime.datetime(2021, 1, 11, 21, 40, 22), dropoff_datetime=datetime.datetime(2021, 1, 11, 22, 15, 49), PULocationID=262, DOLocationID=231),
 Row(pickup_datetime=datetime.datetime(2021, 1, 5, 18, 13, 22), dropoff_datetime=datetime.datetime(2021, 1, 5, 18, 27, 50), PULocationID=61, DOLocationID=181),
 Row(pickup_datetime=datetime.datetime(2021, 1, 31, 21, 42, 9), dropoff_datetime=datetime.datetime(2021, 1, 31, 21, 59, 52), PULocationID=232, DOLocationID=4),
 Row(pickup_datetime=datetime.datetime(2021, 1, 28, 1, 24, 36), dropoff_datetime=datetime.datetime(2021, 1, 28, 1, 26, 43), PULocationID=68, DOLocationID=68),
 Row(pickup_datetime=datetime.datetime(2021, 1, 30, 11, 35, 46), dropoff_datetime=datetime.datetime(2021, 1, 30, 11, 39, 42), PULocationID=256, DOLocationID=255),
 Row(pickup_datetime=datetime.datetime(2021, 1, 16, 5, 25, 35), dropoff_datetime=datetime.datetime(2021, 1, 16, 5, 34, 21), PULocationID=89, DOLocationID=91),
 Row(pickup_datetime=datetime.dateti

In [50]:
# !head -n 10 head.csv

hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag
HV0003,B02682,2021-01-01 00:33:44,2021-01-01 00:49:07,230,166,
HV0003,B02682,2021-01-01 00:55:19,2021-01-01 01:18:21,152,167,
HV0003,B02764,2021-01-01 00:23:56,2021-01-01 00:38:05,233,142,
HV0003,B02764,2021-01-01 00:42:51,2021-01-01 00:45:50,142,143,
HV0003,B02764,2021-01-01 00:48:14,2021-01-01 01:08:42,143,78,
HV0005,B02510,2021-01-01 00:06:59,2021-01-01 00:43:01,88,42,
HV0005,B02510,2021-01-01 00:50:00,2021-01-01 01:04:57,42,151,
HV0003,B02764,2021-01-01 00:14:30,2021-01-01 00:50:27,71,226,
HV0003,B02875,2021-01-01 00:22:54,2021-01-01 00:30:20,112,255,
