In [0]:
import os

# os.environ["AZURE_STORAGE_KEY"] = <key>

# Should use unity catalog but was struggling with that, need to come back to this.
spark.conf.set(
    "fs.azure.account.key.ughnovgs.dfs.core.windows.net",
    os.environ["AZURE_STORAGE_KEY"]
)

In [0]:
# Replace with your values
storage_account_name = "ughnovgs"
container_name = "raw"

display(dbutils.fs.ls(
    f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/",
))

In [0]:
yellowTaxiFilePath = "abfss://raw@ughnovgs.dfs.core.windows.net/YellowTaxis_202501.csv"

yellowTaxiDF = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(yellowTaxiFilePath)
)

yellowTaxiDF.printSchema()

display(yellowTaxiDF)

Databricks data profile. Run in Databricks to view.

In [0]:
json_example = "abfss://raw@ughnovgs.dfs.core.windows.net/PaymentTypes.json"
paymentTypes = spark.read.json(json_example)
display(paymentTypes)

In [0]:
pc = yellowTaxiDF.describe("passenger_count",  "trip_distance")
display(pc)

In [0]:
# Remove zeroes
print(f"Before: {str(yellowTaxiDF.count())}")
from pyspark.sql.functions import col

yellowTaxiDF = yellowTaxiDF.where(yellowTaxiDF.passenger_count > 0).filter(col("trip_distance") > 0)

print(f"After: {str(yellowTaxiDF.count())}")

In [0]:
from pyspark.sql.functions import col

# Remove nulls
print(f"Before: {str(yellowTaxiDF.count())}")
yellowTaxiDF = yellowTaxiDF.na.drop('all')
print(f"After: {str(yellowTaxiDF.count())}")

In [0]:
# Replace nulls
print(f"Before: {str(yellowTaxiDF.count())}")
yellowTaxiDF = yellowTaxiDF.na.fill('all')
print(f"After: {str(yellowTaxiDF.count())}")

In [0]:
# Drop duplicates
print(f"Before: {str(yellowTaxiDF.count())}")

yellowTaxiDF = yellowTaxiDF.dropDuplicates()

print(f"After: {str(yellowTaxiDF.count())}")

In [0]:
from pyspark.sql.types import IntegerType

# Select only columns we need

yellowTaxiDF = yellowTaxiDF.select(
    'VendorID',
    col('passenger_count').cast(IntegerType()),
    col('trip_distance').alias('TripDistance'),
    col('tpep_dropoff_datetime'),
    'tpep_pickup_datetime',
    'RatecodeID',
    'PULocationID',
    'DOLocationID',    
    'payment_type',
    'fare_amount'
)

yellowTaxiDF.printSchema()
display(yellowTaxiDF)

In [0]:
# Rename columns
yellowTaxiDF = (
    yellowTaxiDF
        .withColumnRenamed("tpep_pickup_datetime", "PickupTime")
        .withColumnRenamed("tpep_dropoff_datetime", "DropoffTime")
        .withColumnRenamed("PULocationID", "PickupLocationID")
        .withColumnRenamed("DOLocationID", "DropoffLocationID")
        .withColumnRenamed("fare_amount", "TotalAmount")
        .withColumnRenamed("payment_type", "PaymentType")
        .withColumnRenamed("passenger_count", "PassengerCount")
        )

display(yellowTaxiDF)


In [0]:
from pyspark.sql.functions import year, month, dayofmonth, col, expr
# Derive columns
yellowTaxiDF = (
    yellowTaxiDF
        .withColumn("TripYear", year(col("PickupTime")))
        .select(
            "*", 
            expr("month(PickupTime) AS TripMonth"),
            dayofmonth(col("PickupTime")).alias("TripDay")
        )
)  

display(yellowTaxiDF)
                                                    

In [0]:
from pyspark.sql.functions import unix_timestamp, round, col

# Add calculated field for trip time in minutes

tripTimeInSecondsExpr = unix_timestamp(col("DropoffTime")) - unix_timestamp(col("PickupTime"))
                                       
tripTimeInMinutesExpr = round(tripTimeInSecondsExpr / 60)

yellowTaxiDF = (
    yellowTaxiDF
        .withColumn("TripTimeInMinutes", tripTimeInMinutesExpr)
)
display(yellowTaxiDF)

In [0]:
from pyspark.sql.functions import when

# Create derived column - TripType

tripTypeColumn = (
    when(
        col("RatecodeID") == 6,
        "SharedTrip"
    )
    .otherwise("SoloTrip")
)

yellowTaxiDF = (
    yellowTaxiDF
        .withColumn("TripType", tripTypeColumn)
)
display(yellowTaxiDF)

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, LongType

taxiBasesSchema = (
                    StructType
                    ([
                        StructField("License Number"         , StringType()    , True),
                        StructField("Entity Name"            , StringType()    , True),
                        StructField("Telephone Number"       , LongType()      , True),
                        StructField("SHL Endorsed"           , StringType()    , True),
                        StructField("Type of Base"           , StringType()    , True),

                        StructField("Address", 
                                        StructType
                                        ([
                                            StructField("Building"   , StringType(),   True),
                                            StructField("Street"     , StringType(),   True), 
                                            StructField("City"       , StringType(),   True), 
                                            StructField("State"      , StringType(),   True), 
                                            StructField("Postcode"   , StringType(),   True)
                                        ]),
                                    True
                                   ),
                        
                        StructField("GeoLocation", 
                                        StructType
                                        ([
                                            StructField("Latitude"   , StringType(),   True),
                                            StructField("Longitude"  , StringType(),   True), 
                                            StructField("Location"   , StringType(),   True)
                                        ]),
                                    True
                                   )  
                  ])
                )

# Read JSON file using the defined schema
taxiBasesFilePath = "abfss://raw@ughnovgs.dfs.core.windows.net/TaxiBases.json"

taxiBasesDF = (
                  spark
                    .read    
                    .option("multiline", "true")
                    .schema(taxiBasesSchema)
                    .json(taxiBasesFilePath)
              )

display(taxiBasesDF)


In [0]:
from pyspark.sql.functions import col

# Extract nested fields from JSON

taxiBasesFlatDF = (

                        taxiBasesDF
                            .select(
                                      col("License Number").alias("BaseLicenseNumber"),
                                      col("Entity Name").alias("EntityName"),

                                      col("Address.Building").alias("AddressBuilding"),

                                      col("Address.Street").alias("AddressStreet"),
                                      col("Address.City").alias("AddressCity"),
                                      col("Address.State").alias("AddressState"),
                                      col("Address.Postcode").alias("AddressPostCode"),

                                      col("GeoLocation.Latitude").alias("GeoLatitude"),
                                      col("GeoLocation.Longitude").alias("GeoLongitude")
                                   )
                  )

display(taxiBasesFlatDF)


In [0]:
from pyspark.sql.functions import avg, sum, col

# group by and aggregate

yellowTaxiDFReport = (
    yellowTaxiTime
        .groupBy("PickupLocationId", "DropoffLocationID")
        .agg(
            avg("TripTimeInMinutes").alias("AvgTripTime"),
            sum("TotalAmount").alias("SumAmount")
        )
        .orderBy(col("PickupLocationId").desc())
)

display(yellowTaxiDFReport)

In [0]:
yellowTaxisParquetOutputPath = "abfss://raw@ughnovgs.dfs.core.windows.net/Output/YellowTaxis.parquet"

# Write output in parquet format

(
    yellowTaxiDF
        .write
        .mode("overwrite")                                  # Other modes: append, errorifexists, ignore
        .partitionBy("VendorID")
        .format("parquet")                                  # Other formats: csv, json, avro, jdbc, etc.
        .save(yellowTaxisParquetOutputPath)
)

In [0]:
yellowTaxisParquetOutputPath = "abfss://raw@ughnovgs.dfs.core.windows.net/Output/YellowTaxis.delta"

# Write output in delta format

(
    yellowTaxiDF
        .write
        .mode("overwrite")                                  # Other modes: append, errorifexists, ignore
        .partitionBy("VendorID")
        .format("delta")                                  # Other formats: csv, json, avro, jdbc, etc.
        .save(yellowTaxisParquetOutputPath)
)

In [0]:
# Write output as yellowtaxis table

(
    yellowTaxiDF
        .write
        .mode("overwrite")
        .partitionBy("VendorID")
        .format("delta")
        # .option("path", <path>)                       # If not provided, files are saved in storage of metastore
        .saveAsTable("taxicatalog.rides.yellowtaxis")
)

In [0]:
%sql
select count(*) from taxicatalog.rides.yellowtaxis 


In [0]:
spark.read.table("taxicatalog.rides.yellowtaxis").count()

In [0]:
%sql
describe history taxicatalog.rides.yellowtaxis

In [0]:
%sql

CREATE TABLE taxicatalog.rides.greentaxis
(
    RideId                  INT               COMMENT 'This is the primary key column',
    VendorId                INT,

    PickupTime              TIMESTAMP,
    DropTime                TIMESTAMP,

    PickupLocationId        INT,
    DropLocationId          INT,

    CabNumber               STRING,
    DriverLicenseNumber     STRING,

    PassengerCount          INT,

    TripDistance            DOUBLE,
    RatecodeId              INT,

    PaymentType             INT,

    TotalAmount             DOUBLE,
    FareAmount              DOUBLE,
    Extra                   DOUBLE,
    MtaTax                  DOUBLE,
    TipAmount               DOUBLE,

    TollsAmount             DOUBLE,         
    ImprovementSurcharge    DOUBLE,
    
    PickupYear              INT              GENERATED ALWAYS AS (YEAR  (PickupTime))    COMMENT 'Auto-generated year from PickupTime',
    PickupMonth             INT              GENERATED ALWAYS AS (MONTH (PickupTime))    COMMENT 'Auto-generated month from PickupTime',
    PickupDay               INT              GENERATED ALWAYS AS (DAY   (PickupTime))    COMMENT 'Auto-generated day from PickupTime'
)
USING DELTA                  -- default in Databricks is Delta
PARTITIONED BY (VendorId)    -- optional
COMMENT 'This table stores ride information for Green Taxis'

In [0]:
%sql
-- Check existing records
SELECT * FROM taxicatalog.rides.yellowtaxis WHERE VendorId = 3

In [0]:
%sql
INSERT INTO taxicatalog.rides.yellowtaxis
(VendorId, PickupTime, DropoffTime, PickupLocationId, DropoffLocationId, PassengerCount, TripDistance, RatecodeID, TotalAmount, PaymentType, TripYear, TripMonth, TripDay, TripTimeInMinutes, TripType)
VALUES (3, '2025-01-01T00:00:00.000Z', '2025-01-01T00:15:34.000Z', 170, 140, 1, 2.9, 1, 25, 1, 2025, 1, 1, 15.3, 'SoloTrip')

In [0]:
# Get records from storage

yellowTaxisAppendFilePath = "abfss://raw@ughnovgs.dfs.core.windows.net/YellowTaxis_append.csv"

yellowTaxisAppendDF = (
                          spark
                            .read
                            .option("header", "true")
                            .option("inferSchema", "true")
                            .csv(yellowTaxisAppendFilePath)
                      )

display( yellowTaxisAppendDF )


In [0]:
yellowTaxisAppendDF = (
    yellowTaxisAppendDF
        .withColumnRenamed("DropTime", "DropoffTime")
        .withColumnRenamed("DropLocationLd", "DropoffLocationId")
)
display(yellowTaxisAppendDF)

In [0]:
# Append records to delta table
(
    yellowTaxisAppendDF
        .write
        .mode("append")
        .saveAsTable('taxicatalog.rides.yellowtaxis')
)



In [0]:
# Getting a schema error trying to append the csv records
# Show schema differences between yellowTaxisAppendDF and yellowTaxiDF
append_cols = set([f.name for f in yellowTaxisAppendDF.schema.fields])
main_cols = set([f.name for f in yellowTaxiDF.schema.fields])
diff_append = append_cols - main_cols
diff_main = main_cols - append_cols

print("Columns in yellowTaxisAppendDF not in yellowTaxiDF:", diff_append)
print("Columns in yellowTaxiDF not in yellowTaxisAppendDF:", diff_main)

In [0]:
# Modify the records appending first
yellowTaxisAppendDF = (
    yellowTaxisAppendDF
        .withColumnRenamed("VendorId", "VendorID")
        .withColumnRenamed("PickupLocationId", "PickupLocationID")
        .withColumnRenamed("DropLocationId", "DropoffLocationID")
)

display(yellowTaxisAppendDF)

In [0]:
%sql

select * from taxicatalog.rides.yellowtaxis where VendorId = 3

In [0]:
# Now can append records to delta table
(
    yellowTaxisAppendDF
        .write
        .mode("append")
        .saveAsTable('taxicatalog.rides.yellowtaxis')
)

In [0]:
%sql
select * from taxicatalog.rides.yellowtaxis where VendorID = 3

In [0]:
%sql
update taxicatalog.rides.yellowtaxis
set PassengerCount = 1
where PickupLocationID = 249;

select * from taxicatalog.rides.yellowtaxis where VendorID = 3

In [0]:
spark.sql("DELETE FROM taxicatalog.rides.yellowtaxis WHERE PickupLocationID = 151")

In [0]:
%sql describe history taxicatalog.rides.yellowtaxis

In [0]:
%sql
select * 
from taxicatalog.rides.yellowtaxis version as of 2
where VendorId = 3 

In [0]:
%sql
select * 
from taxicatalog.rides.yellowtaxis timestamp as of '2025-10-08T00:39:40.000+00:00'
where VendorId = 3 