In [None]:
# Dataset reference:
# https://learn.microsoft.com/en-us/azure/open-datasets/dataset-taxi-yellow?tabs=pyspark

In [None]:
if "spark" not in globals():
    from pyspark.sql import SparkSession
    packages = [
        "io.delta:delta-spark_2.12:3.2.0",
        "org.apache.hadoop:hadoop-azure:3.2.4",
    ]
    spark = (
        SparkSession.builder
        .config("spark.jars.packages", ",".join(packages))
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        # Example resource configuration
        .config("spark.driver.memory", "16g")
        .config("spark.driver.maxResultSize", "8g")
        .config("spark.executor.memory", "16g")
        .getOrCreate()
    )

In [None]:
# Example time zone setting, if required
spark.sql("set time zone 'Asia/Singapore'")

In [None]:
blob_account_name = "azureopendatastorage"
blob_container_name = "nyctlc"
blob_relative_path = "yellow"
blob_sas_token = "r"

wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}"
print(wasbs_path)

In [None]:
df = spark.read.format("parquet").load(wasbs_path)
df.printSchema()

In [None]:
from rich.pretty import pprint
# Check the partitions and files
pprint(sorted(df.inputFiles(), reverse=True), max_length=5)

In [None]:
from pyspark.sql.functions import *
pprint(df.filter(col("puYear") == 2088).toPandas().to_dict("records"), max_length=10)

In [None]:
import tempfile
# Save as Delta format in temporary directory
tmpdir = tempfile.mkdtemp()

In [None]:
tmpdir

In [None]:
# Write as Delta Table
df.filter(col("puYear") >= 2084).write.format("delta").save(tmpdir)

In [None]:
df_delta = spark.read.format("delta").load(tmpdir)
df_delta.printSchema()
df_delta.count()

In [None]:
pprint(df_delta.limit(50).toPandas().to_dict("records"), max_length=5)