Make sure you change the start date to today so we can ensure there is data for today.

In [1]:
# Import NYC yellow cab data from Azure Open Datasets
from azureml.opendatasets import NycTlcYellow

from datetime import datetime
from dateutil import parser

from pyspark.sql.functions import *

# Get the current month and year so we can build out a dataset for partitioning through today
sqldf = spark.sql("select month(current_date()) as month, dayofmonth(current_date()) as day")

for datecol in sqldf.collect():
    currentmonth = datecol["month"]
    currentday = datecol["day"]

#2018 is the last year in the yellowtaxi dataset
end_date_val = "2018-" + str(currentmonth) + "-" + str(currentday) + " 23:59:59"
end_date = parser.parse(end_date_val)
start_date = parser.parse('2018-01-01 00:00:00')

nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
nyc_tlc_df = nyc_tlc.to_spark_dataframe()

StatementMeta(cgmsynapsewpbi, 6, 2, Submitted, Running)

We're using the NYC Yellow Taxi dataset for our baseline dataset.  This dataset only covers up to 2018 in the public samples, so we need to do some manipulations on the file for our purposes:
1. We add 4 years to the start and end date to make the data appear current.
2. We add 4 years to the pyYear column to correspond to the manipulated dates.


In [None]:
##  Shift the dates from 2018 -> 2022, and create a field with pickup date
##  This assumes you're running the code in 2022, in the future you may need to adjust this logic.
nyc_tlc_df = nyc_tlc_df.withColumn('tpepPickupDateTime',add_months(col('tpepPickupDateTime'),48))
nyc_tlc_df = nyc_tlc_df.withColumn('tpepDropoffDateTime',add_months(col('tpepDropoffDateTime'),48))
nyc_tlc_df = nyc_tlc_df.withColumn('puYear',col('puYear')+4)

nyc_tlc_df.write.mode("overwrite").partitionBy("puYear","puMonth").parquet("/output/nycyellow-ym")

StatementMeta(, , , Waiting, )

A common approach to partitioning is to partition on year/month/day, where day = day of month.  Here we extract the day from the tpepPickupDateTime to fabricate the day field, and persist the results so we can demonstrate how this can impact query performance.

In [None]:
#This may seem counter intuitive, I'm grabbing the day from the drop off not the pickup.   However, I discovered that the pickup
#year and month matched the dropoff date not the picup date.
nyc_df_ymd = nyc_tlc_df.withColumn('puDay', dayofmonth(col('tpepDropoffDateTime')))
nyc_df_ymd.write.mode("overwrite").partitionBy("puYear","puMonth","puDay").parquet("/output/nycyellow-ymd")

StatementMeta(, , , Waiting, )

While we do have the attributes for year, month and day in our paritioning structure, to illustrate a secondary concept for partitioning we're going to fabricate a full date field.  This will be used to persist the partitons in year/month/date format where date represents the full date instead of just the calendar day of month.

In [None]:
nyc_tlc_df = nyc_tlc_df.withColumn("puDate",to_date(col('tpepDropoffDateTime')))
nyc_tlc_df.write.mode("overwrite").partitionBy("puYear","puMonth","puDate").parquet("/output/nycyellow-ymdate")

StatementMeta(, , , Waiting, )

Finally we'll persist the same dataset in delta format instead of parquet to see how this imapcts the Serverless SQL Endpoint.

In [None]:
nyc_tlc_df.write.mode("overwrite").partitionBy("puYear","puMonth","puDate").format("delta").save("/output/nycyellow-ymdate-delta")

StatementMeta(, , , Waiting, )