In [1]:
import pyspark
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings("ignore")

In [2]:
spark = SparkSession.builder \
    .master('local[*]') \
        .appName('test') \
            .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/20 20:28:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# # Download data
# !wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-01.parquet

In [4]:
# Show number observations
!wc -l fhvhv_tripdata_2021-01.parquet

1006794 fhvhv_tripdata_2021-01.parquet


In [5]:
df = spark.read \
    .option('header', 'true') \
        .option('inferSchema', 'true') \
            .parquet('fhvhv_tripdata_2021-01.parquet')

                                                                                

In [6]:
df.show(2)

                                                                                

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

In [7]:
df.head(2)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', originating_base_num='B02682', request_datetime=datetime.datetime(2021, 1, 1, 0, 28, 9), on_scene_datetime=datetime.datetime(2021, 1, 1, 0, 31, 42), pickup_datetime=datetime.datetime(2021, 1, 1, 0, 33, 44), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 49, 7), PULocationID=230, DOLocationID=166, trip_miles=5.26, trip_time=923, base_passenger_fare=22.28, tolls=0.0, bcf=0.67, sales_tax=1.98, congestion_surcharge=2.75, airport_fee=None, tips=0.0, driver_pay=14.99, shared_request_flag='N', shared_match_flag='N', access_a_ride_flag=' ', wav_request_flag='N', wav_match_flag='N'),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', originating_base_num='B02682', request_datetime=datetime.datetime(2021, 1, 1, 0, 45, 56), on_scene_datetime=datetime.datetime(2021, 1, 1, 0, 55, 19), pickup_datetime=datetime.datetime(2021, 1, 1, 0, 55, 19), dropoff_datetime=datetime.datetime(2021, 1, 1, 1, 18, 21), PULocationID=152, DO

In [8]:
df.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('originating_base_num', StringType(), True), StructField('request_datetime', TimestampType(), True), StructField('on_scene_datetime', TimestampType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('trip_miles', DoubleType(), True), StructField('trip_time', LongType(), True), StructField('base_passenger_fare', DoubleType(), True), StructField('tolls', DoubleType(), True), StructField('bcf', DoubleType(), True), StructField('sales_tax', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('airport_fee', DoubleType(), True), StructField('tips', DoubleType(), True), StructField('driver_pay', DoubleType(), True), StructField('shared_request_flag',

In [9]:
# Read data with custom schema
# from pyspark.sql import types
# schema = types.StructType(
#     [
#         types.StructField('hvfhs_license_num', types.StringType(), True),
#         types.StructField('dispatching_base_num', types.StringType(), True),
#         types.StructField('pickup_datetime', types.TimestampType(), True),
#         types.StructField('dropoff_datetime', types.TimestampType(), True),
#         types.StructField('PULocationID', types.IntegerType(), True),
#         types.StructField('DOLocationID', types.IntegerType(), True),
#         types.StructField('SR_Flag', types.StringType(), True)
#     ]
# )
# df = spark.read \
#     .option("header", "true") \
#     .schema(schema) \
#     .csv('fhvhv_tripdata_2021-01.csv')

In [10]:
# Split data into 24 partitions
df = df.repartition(24)

In [11]:
# Save data to parquet format, the result will have 24 explicit partitions data
df.write.parquet('fhvhv/2021/01/')

                                                                                

In [12]:
# Create a dataframe from the partitions of parquet files
df = spark.read.parquet('fhvhv/2021/01/')

In [13]:
# Check the schema
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_flag: string (nul

In [14]:
# Select some columns and filter the conditions
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
    .filter(df.hvfhs_license_num == 'HV0003') \
        .show()

+-------------------+-------------------+------------+------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|
+-------------------+-------------------+------------+------------+
|2021-01-11 18:40:22|2021-01-11 19:15:49|         262|         231|
|2021-01-05 15:13:22|2021-01-05 15:27:50|          61|         181|
|2021-01-31 18:42:09|2021-01-31 18:59:52|         232|           4|
|2021-01-27 22:24:36|2021-01-27 22:26:43|          68|          68|
|2021-01-30 08:35:46|2021-01-30 08:39:42|         256|         255|
|2021-01-16 02:25:35|2021-01-16 02:34:21|          89|          91|
|2021-01-11 11:58:23|2021-01-11 12:14:19|          97|          61|
|2021-01-03 07:44:58|2021-01-03 08:04:45|          26|         178|
|2021-01-14 18:52:00|2021-01-14 19:19:00|         181|         198|
|2021-01-08 20:35:35|2021-01-08 21:06:33|          76|          91|
|2021-01-15 13:49:48|2021-01-15 14:35:23|         246|          16|
|2021-01-27 10:37:56|2021-01-27 10:53:35|       

In [19]:
# Pyspark built-in functions
from pyspark.sql import functions as F
from pyspark.sql import types

In [16]:
# Add new columns (convert the datetime column to date format)
df.withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
        .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
            .show()

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2021-01-11|  2021-01-11|         262|         231|
| 2021-01-05|  2021-01-05|          61|         181|
| 2021-01-02|  2021-01-02|         100|           1|
| 2021-01-31|  2021-01-31|         232|           4|
| 2021-01-05|  2021-01-05|         162|           1|
| 2021-01-27|  2021-01-27|          68|          68|
| 2021-01-18|  2021-01-18|         205|         205|
| 2021-01-30|  2021-01-30|         256|         255|
| 2021-01-16|  2021-01-16|          89|          91|
| 2021-01-05|  2021-01-05|         132|         102|
| 2021-01-11|  2021-01-11|          97|          61|
| 2021-01-22|  2021-01-22|          79|          37|
| 2021-01-03|  2021-01-03|          26|         178|
| 2021-01-14|  2021-01-14|         181|         198|
| 2021-01-08|  2021-01-08|          76|          91|
| 2021-01-15|  2021-01-15|         246|       

In [20]:
# Custom functions with UDF
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [22]:
df.withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
        .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
            .select('base_id','pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
                .show()

[Stage 12:>                                                         (0 + 1) / 1]

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/acc| 2021-01-11|  2021-01-11|         262|         231|
|  e/a39| 2021-01-05|  2021-01-05|          61|         181|
|  e/9ce| 2021-01-02|  2021-01-02|         100|           1|
|  e/b42| 2021-01-31|  2021-01-31|         232|           4|
|  s/af0| 2021-01-05|  2021-01-05|         162|           1|
|  a/b43| 2021-01-27|  2021-01-27|          68|          68|
|  e/9ce| 2021-01-18|  2021-01-18|         205|         205|
|  e/b35| 2021-01-30|  2021-01-30|         256|         255|
|  e/b3b| 2021-01-16|  2021-01-16|          89|          91|
|  e/9ce| 2021-01-05|  2021-01-05|         132|         102|
|  e/acc| 2021-01-11|  2021-01-11|          97|          61|
|  e/9ce| 2021-01-22|  2021-01-22|          79|          37|
|  e/b32| 2021-01-03|  2021-01-03|          26|         178|
|  a/b49| 2021-01-14|  2

                                                                                