Action 1.1: Download and Understand the Dataset & Dictionary

Download the Yellow Taxi Trip Records for January 2024 (Parquet format) from the NYC TLC website. (Direct link: https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet)
Download the corresponding data dictionary. Thoroughly read and internalize the data dictionary. This is critical for all subsequent tasks.

Deliverable Snippet (for your report): 

In your notebook (and later, your report), create a markdown section summarizing:
- The key columns you anticipate being most relevant for an initial ETL and potential business analysis (justify your choices).
- Any coded values (e.g., RatecodeID, payment_type) and their meanings as per the dictionary.
- Initial thoughts on data types and potential data quality issues based on the dictionary's descriptions.

## Summary for Initial ETL and Business Analysis

### Key Columns
- `tpep_pickup_datetime` and `tpep_dropoff_datetime`: Essential for time-based analysis and trip duration calculation.
- `passenger_count`: Important for understanding trip occupancy and demand.
- `trip_distance`: Key for distance-based fare and efficiency analysis.
- `RatecodeID`: Indicates fare type, useful for segmenting trips.
- `PULocationID` and `DOLocationID`: Critical for geographic analysis of trip origins and destinations.
- `payment_type`: Important for payment method analysis.
- `fare_amount`, `tip_amount`, `total_amount`: Core financial metrics for revenue analysis.
- `congestion_surcharge`, `airport_fee`, `cbd_congestion_fee`: Relevant for surcharge impact studies.

### Coded Values and Meanings
- `RatecodeID`:
    - 1 = Standard
    - 2 = JFK
    - 3 = Newark
    - 4 = Nassau/Westchester
    - 5 = Negotiated fare
    - 6 = Group ride
    - 99 = Null/unknown
- `payment_type`:
    - 0 = Flex Fare
    - 1 = Credit card
    - 2 = Cash
    - 3 = No charge
    - 4 = Dispute
    - 5 = Unknown
    - 6 = Voided trip
- `store_and_fwd_flag`:
    - Y = store and forward trip
    - N = not a store and forward trip
- `VendorID`:
    - 1 = Creative Mobile Technologies, LLC
    - 2 = Curb Mobility, LLC
    - 6 = Myle Technologies Inc
    - 7 = Helix

### Data Types and Potential Quality Issues
- Date/time fields (`tpep_pickup_datetime`, `tpep_dropoff_datetime`) should be datetime types; watch for invalid or missing timestamps.
- Numeric fields (`passenger_count`, `trip_distance`, `fare_amount`, etc.) should be floats or integers; check for negative or zero values where not logical.
- Categorical fields (`RatecodeID`, `payment_type`, `store_and_fwd_flag`) require mapping to meaningful labels; watch for unknown or null codes (e.g., 99 in RatecodeID).
- Potential missing or inconsistent data in location IDs (`PULocationID`, `DOLocationID`) could affect spatial analysis.
- Tip amounts may be zero or missing for cash payments, which should be considered in analysis.

---

### Data Dictionary (Excerpt)

| Field Name             | Description                                                                                   |
|------------------------|----------------------------------------------------------------------------------------------|
| VendorID               | TPEP provider: 1=Creative Mobile Technologies, 2=Curb Mobility, 6=Myle Technologies, 7=Helix |
| tpep_pickup_datetime   | Date and time when the meter was engaged                                                     |
| tpep_dropoff_datetime  | Date and time when the meter was disengaged                                                  |
| passenger_count        | Number of passengers in the vehicle                                                          |
| trip_distance          | Trip distance in miles reported by the taximeter                                             |
| RatecodeID             | Final rate code (see above for codes)                                                        |
| store_and_fwd_flag     | Y=store and forward trip, N=not a store and forward trip                                     |
| PULocationID           | TLC Taxi Zone where meter was engaged                                                        |
| DOLocationID           | TLC Taxi Zone where meter was disengaged                                                     |
| payment_type           | Payment method (see above for codes)                                                         |
| fare_amount            | Meter-calculated fare                                                                        |
| extra                  | Miscellaneous extras and surcharges                                                          |
| mta_tax                | Tax triggered by metered rate                                                                |
| tip_amount             | Tip amount (credit card tips only)                                                           |
| tolls_amount           | Total tolls paid                                                                             |
| improvement_surcharge  | Surcharge at flag drop (since 2015)                                                          |
| total_amount           | Total charged to passengers (excludes cash tips)                                             |
| congestion_surcharge   | NYS congestion surcharge amount                                                              |
| airport_fee            | Fee for pickups at LaGuardia and JFK                                                         |
| cbd_congestion_fee     | MTA Congestion Relief Zone fee (since Jan 5, 2025)                                           |

Action 1.2: Set up Azure Storage and Upload Raw Data

## Storage Account Details

### Basic Information
- **Subscription**: Azure Subscription 1
- **Resource Group**: myResourceGroup
- **Location**: East US
- **Storage Account Name**: innovateretaildatalake

### Service Principal
- **Performance**: Standard
- **Replication**: Geo-redundant storage with read access (RA-GRS)

### Advanced Settings
- **Hierarchical Namespace**: Enabled
- **SFTP**: Disabled
- **NFS v3**: Disabled
- **Cross-tenant Replication**: Disabled
- **Access Tier**: Hot
- **Large File Shares**: Enabled

### Security
- **Secure Transfer**: Enabled
- **Anonymous Blob Access**: Enabled
- **Storage Account Key Access**: Enabled
- **Default Azure AD Authorization**: Disabled
- **Minimum TLS Version**: 1.2
- **Allowed Copy Scope**: From any storage account

### Networking
- **Network Connectivity**: Public endpoint (all networks)
- **Default Routing Tier**: Microsoft network routing

### Data Protection
- **Point-in-time Restore**: Disabled
- **Soft Delete for Blobs**: Enabled (7 days retention)
- **Soft Delete for Containers**: Enabled (7 days retention)
- **Soft Delete for File Shares**: Enabled (7 days retention)
- **Versioning**: Disabled
- **Blob Change Feed**: Disabled
- **Version-level Immutability**: Disabled

### Encryption
- **Encryption Type**: Microsoft-managed keys (MMK)
- **Customer-managed Keys Support**: Blobs and files only
- **Infrastructure Encryption**: Disabled

## Blob Container Details

- **Name**: nyc-taxi-data
- **URL**: [https://innovateretaildatalake.blob.core.windows.net/nyc-taxi-data](https://innovateretaildatalake.blob.core.windows.net/nyc-taxi-data)
- **Last Modified**: 06/06/2025 11:34:29 AM
- **ETag**: 0x8DDA4CC9298F722
- **Lease Status**: Unlocked
- **Lease State**: Available
- **Encryption Scope**: $account-encryption-key
- **Version-level Immutability**: Disabled

## Data Directory

- **Location**: nyc-taxi-data/raw/yellow_taxi/2024/01

### Action 1.3: Configure Databricks Cluster for Larger Data

**Final Cluster Configuration:**

- **Workers:** 2–4 (autoscaling enabled)
- **Worker Type:** Standard_DS3_v2 (14 GB memory, 4 cores each)
- **Driver:** Standard_DS3_v2 (14 GB memory, 4 cores)
- **Runtime:** 15.4.x-scala2.12
- **Unity Catalog:** Enabled
- **Photon:** Enabled
- **DBU/hour:** 3–5

**Justification:**

The NYC Taxi dataset for one month (~50–100MB Parquet) is much larger than the mock data used in Week 1. To ensure reasonable processing times and handle expected shuffle operations during transformations, I increased the number of workers and enabled autoscaling. This configuration balances cost and performance, providing enough resources for efficient data processing without over-provisioning. The adjustment from Week 1 (which used a single worker) is necessary to accommodate the larger dataset and more complex operations.

Action 1.4: Create New Notebook and Load Raw Data into Spark DataFrame

## Mounting Azure

In [0]:
%python

storage_account_name = "innovateretaildatalake"
container_name = "nyc-taxi-data"
# connexion_string is not needed for mounting with dbutils.fs.mount
# Best practice: Get key from secrets
# REMOVED for security: # storage_account_key = dbutils.secrets.get(scope="azure-keyvault-scope", key="storage-account-key")
source_uri = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/"
mount_point = f"/mnt/{container_name}" # Choose a mount point name

# Check if already mounted, unmount if necessary for re-configuration
mounts = [m.mountPoint for m in dbutils.fs.mounts()]
if mount_point in mounts:
    dbutils.fs.unmount(mount_point)

try:
    dbutils.fs.mount(
        source = source_uri,
        mount_point = mount_point,
# REMOVED for security:         extra_configs = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key}
    )
    print(f"Successfully mounted {source_uri} to {mount_point}")
except Exception as e:
    print(f"Error mounting: {e}")
    # Check if it's already mounted, which can cause an error
    if f"Directory {mount_point} already mounted" not in str(e):
        raise e # Re-raise if it's not an "already mounted" error
    else:
        print(f"{mount_point} is already mounted. Proceeding.")
        
# List files in the mount to verify
# display(dbutils.fs.ls(mount_point + "/raw/yellow_taxi/2024/01/"))

/mnt/nyc-taxi-data has been unmounted.
Successfully mounted wasbs://nyc-taxi-data@innovateretaildatalake.blob.core.windows.net/ to /mnt/nyc-taxi-data


In [0]:
# List files in the mount to verify
display(dbutils.fs.ls(mount_point + "/raw/yellow_taxi/2024/01/"))

path,name,size,modificationTime
dbfs:/mnt/nyc-taxi-data/raw/yellow_taxi/2024/01/yellow_tripdata_2024-01.parquet,yellow_tripdata_2024-01.parquet,49961641,1749196676000


In [0]:
raw_data_path_mounted = f"{mount_point}/raw/yellow_taxi/2024/01/yellow_tripdata_2024-01.parquet"
df = spark.read.parquet(raw_data_path_mounted) 
display(df.limit(10))
#df.show(10)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
2,2024-01-01T00:57:55,2024-01-01T01:17:43,1,1.72,1,N,186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0
1,2024-01-01T00:03:00,2024-01-01T00:09:36,1,1.8,1,N,140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0
1,2024-01-01T00:17:06,2024-01-01T00:35:01,1,4.7,1,N,236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0
1,2024-01-01T00:36:38,2024-01-01T00:44:56,1,1.4,1,N,79,211,1,10.0,3.5,0.5,2.0,0.0,1.0,17.0,2.5,0.0
1,2024-01-01T00:46:51,2024-01-01T00:52:57,1,0.8,1,N,211,148,1,7.9,3.5,0.5,3.2,0.0,1.0,16.1,2.5,0.0
1,2024-01-01T00:54:08,2024-01-01T01:26:31,1,4.7,1,N,148,141,1,29.6,3.5,0.5,6.9,0.0,1.0,41.5,2.5,0.0
2,2024-01-01T00:49:44,2024-01-01T01:15:47,2,10.82,1,N,138,181,1,45.7,6.0,0.5,10.0,0.0,1.0,64.95,0.0,1.75
1,2024-01-01T00:30:40,2024-01-01T00:58:40,0,3.0,1,N,246,231,2,25.4,3.5,0.5,0.0,0.0,1.0,30.4,2.5,0.0
2,2024-01-01T00:26:01,2024-01-01T00:54:12,1,5.44,1,N,161,261,2,31.0,1.0,0.5,0.0,0.0,1.0,36.0,2.5,0.0
2,2024-01-01T00:28:08,2024-01-01T00:29:16,1,0.04,1,N,113,113,2,3.0,1.0,0.5,0.0,0.0,1.0,8.0,2.5,0.0


Action 1.5: Initial Data Profiling and Understanding

In [0]:
# Initial Data Profiling and Understanding
# Display the schema
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [0]:
# Get the row count
row_count = df.count()
display(row_count)
#No of rows: 2964624

2964624

In [0]:
# Display a sample of rows
display(df.limit(3))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
2,2024-01-01T00:57:55,2024-01-01T01:17:43,1,1.72,1,N,186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0
1,2024-01-01T00:03:00,2024-01-01T00:09:36,1,1.8,1,N,140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0
1,2024-01-01T00:17:06,2024-01-01T00:35:01,1,4.7,1,N,236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0


In [0]:
# Statistical overview of numerical and string columns
display(df.summary())

summary,VendorID,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
count,2964624.0,2824462.0,2964624.0,2824462.0,2824462,2964624.0,2964624.0,2964624.0,2964624.0,2964624.0,2964624.0,2964624.0,2964624.0,2964624.0,2964624.0,2824462.0,2824462.0
mean,1.7542042431013174,1.3392808966805003,3.6521691789577058,2.069359403666964,,166.01788354948215,165.11671227110082,1.1612707041432573,18.175061916786696,1.4515984320439976,0.4833823108765226,3.3358700158961274,0.5270212040355687,0.9756318507844316,26.801504770889355,2.2561220508542865,0.141161130863152
stddev,0.4325902017035976,0.850281692480088,225.46257238219965,9.823218952795497,,63.6239144874133,69.31534978524881,0.5808685566109416,18.949547705905296,1.8041024767539,0.1177600301537983,3.896550599806768,2.1283096763989,0.2183644577274297,23.385577429672534,0.8232746699398359,0.4876238872392771
min,1.0,0.0,0.0,1.0,N,1.0,1.0,0.0,-899.0,-7.5,-0.5,-80.0,-80.0,-1.0,-900.0,-2.5,-1.75
25%,2.0,1.0,1.0,1.0,,132.0,114.0,1.0,8.6,0.0,0.5,1.0,0.0,1.0,15.38,2.5,0.0
50%,2.0,1.0,1.68,1.0,,162.0,162.0,1.0,12.8,1.0,0.5,2.7,0.0,1.0,20.1,2.5,0.0
75%,2.0,1.0,3.11,1.0,,234.0,234.0,1.0,20.5,2.5,0.5,4.12,0.0,1.0,28.56,2.5,0.0
max,6.0,9.0,312722.3,99.0,Y,265.0,265.0,4.0,5000.0,14.25,4.0,428.0,115.92,1.0,5000.0,2.5,1.75


In [0]:
from pyspark.sql.functions import countDistinct# Calculate the number of distinct values for key categorical columns

columns_to_count = ["VendorID", "RatecodeID", "payment_type", "PULocationID", "DOLocationID"]
distinct_values = df.agg(*(countDistinct(col).alias(col) for col in columns_to_count))
display(distinct_values)

VendorID,RatecodeID,payment_type,PULocationID,DOLocationID
3,7,5,260,261


In [0]:
%python
from pyspark.sql.functions import col, sum

# Check for null values across all columns
null_values = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
display(null_values)

report = "Null Value Report:\n\n"
for col_name in df.columns:
    null_count = null_values.collect()[0][col_name]
    report += f"- {col_name}: {null_count} null values\n"
    
dbutils.notebook.exit(report)

Task 2: Data Cleaning, Validation, and Transformation Strategy (Est. 10 - 14 hours)

Action 2.1: Develop a Data Cleaning Strategy

## Data Cleaning Strategy

### 1. Invalid Trip Distances
**Issue**: Rows with trip distances <= 0 or >= 100.
**Percentage of invalid trip distances: 2.04%
**Action**: Filter out these rows.
**Justification**: Negative or zero distances are physically impossible, and extremely high values are likely errors. These represent a small fraction of the data, so filtering them out will not significantly impact the dataset.

### 2. Missing Passenger Count
**Issue**: Rows with missing passenger_count.
**Action**: Impute missing values with the median passenger count.
**Justification**: Removing these rows would cause significant data loss. Imputing with the median preserves the data and maintains realistic values.

### 3. Invalid Fare Amounts
**Issue**: Rows with fare amounts <= 0.
**Action**: Filter out these rows.
**Justification**: Negative or zero fare amounts are not valid and likely indicate data entry errors. Filtering these rows ensures data integrity.

### 4. Missing RatecodeID
**Issue**: Rows with missing RatecodeID.
**Action**: Impute missing values with the most frequent RatecodeID.
**Justification**: RatecodeID is important for fare calculation. Imputing with the most frequent value maintains the dataset's usability without introducing significant bias.

### 5. Outlier Passenger Counts
**Issue**: Rows with passenger counts > 6.
**Action**: Cap passenger counts at 6.
**Justification**: Most taxis have a maximum capacity of 6 passengers. Capping ensures realistic values while retaining the majority of the data.

### 6. Missing Store_and_fwd_flag
**Issue**: Rows with missing store_and_fwd_flag.
**Action**: Impute missing values with the most frequent flag.
**Justification**: Store_and_fwd_flag indicates whether the trip record was held in vehicle memory before sending to the vendor. Imputing with the most frequent value maintains data consistency.

In [0]:
from pyspark.sql.functions import col, when, count

# 1. Rows with invalid trip distances (<=0 or unreasonably high)
invalid_trip_distance = df.filter((col("trip_distance") <= 0) | (col("trip_distance") >= 100))
#display(invalid_trip_distance)

total_count = df.count()
invalid_count = invalid_trip_distance.count()
invalid_percentage = (invalid_count / total_count) * 100

print(f"Percentage of invalid trip distances: {invalid_percentage:.2f}%")

Percentage of invalid trip distances: 2.04%


In [0]:
# 2. Rows with negative fare or total amounts
negative_fare_total = df.filter((col("fare_amount") < 0) | (col("total_amount") < 0))
# display(negative_fare_total)
total_count = df.count()
invalid_count = negative_fare_total.count()
invalid_percentage = (invalid_count / total_count) * 100

print(f"Percentage of negative fare or total amounts: {invalid_percentage:.2f}%")

Percentage of negative fare or total amounts: 1.27%


In [0]:
# 3. Rows where pickup datetime is after dropoff datetime or outside January 2024
invalid_datetime = df.filter(
    (col("tpep_pickup_datetime") >= col("tpep_dropoff_datetime")) |
    (col("tpep_pickup_datetime") < "2024-01-01") |
    (col("tpep_pickup_datetime") >= "2024-02-01")
)
# display(invalid_datetime)

total_count = df.count()
invalid_count = invalid_datetime.count()
invalid_percentage = (invalid_count / total_count) * 100

print(f"Percentage of invalid datetime rows: {invalid_percentage:.2f}%")


Percentage of invalid datetime rows: 0.03%


In [0]:
# 4. Rows with invalid location IDs (NYC Taxi Zone IDs are 1-263)
invalid_location_ids = df.filter(
    (~col("PULocationID").between(1, 263)) |
    (~col("DOLocationID").between(1, 263))
)
# display(invalid_location_ids)

total_count = df.count()
invalid_count = invalid_location_ids.count()
invalid_percentage = (invalid_count / total_count) * 100

print(f"Percentage of invalid location ID rows: {invalid_percentage:.2f}%")


Percentage of invalid location ID rows: 1.06%


In [0]:
# 5. Rows with passenger_count <= 0 or unreasonably high (>8)
invalid_passenger_count = df.filter((col("passenger_count") <= 0) | (col("passenger_count") > 8))
# display(invalid_passenger_count)

total_count = df.count()
invalid_count = invalid_passenger_count.count()
invalid_percentage = (invalid_count / total_count) * 100

print(f"Percentage of invalid passenger count rows: {invalid_percentage:.2f}%")

Percentage of invalid passenger count rows: 1.06%


In [0]:
# 6. Report and impute missing payment_type with 'Unknown'
missing_payment_type = df.filter(col("payment_type").isNull())
# display(missing_payment_type)

total_count = df.count()
missing_count = missing_payment_type.count()
missing_percentage = (missing_count / total_count) * 100

print(f"Percentage of missing payment_type rows: {missing_percentage:.2f}%")



#display(df_cleaned)

Percentage of missing payment_type rows: 0.00%


Action 2.2: Implement Data Cleaning and Type Casting



In [0]:
%python
from pyspark.sql.functions import col, when, count, isnan, to_timestamp

# Cast columns to appropriate data types
df_casted = df.withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime"))) \
    .withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime"))) \
    .withColumn("passenger_count", col("passenger_count").cast("integer")) \
    .withColumn("trip_distance", col("trip_distance").cast("double")) \
    .withColumn("fare_amount", col("fare_amount").cast("double")) \
    .withColumn("total_amount", col("total_amount").cast("double")) \
    .withColumn("PULocationID", col("PULocationID").cast("integer")) \
    .withColumn("DOLocationID", col("DOLocationID").cast("integer")) \
    .withColumn("payment_type", col("payment_type").cast("string"))

# Check nulls after casting
numeric_columns = ["trip_distance", "fare_amount", "total_amount"]
non_numeric_columns = [c for c in df_casted.columns if c not in numeric_columns]

display(df_casted.select(
    [count(when(col(c).isNull() | (isnan(c) if c in numeric_columns else col(c).isNull()), c)).alias(c) for c in df_casted.columns]
))

# Step 1: Filter trip_distance
df_cleaned = df_casted.filter((col("trip_distance") > 0) & (col("trip_distance") < 100))
#display(df_cleaned.filter((col("trip_distance") <= 0) | (col("trip_distance") >= 100)))
print(f"Rows after trip_distance filter: {df_cleaned.count()}")

# Step 2: Filter fare_amount and total_amount
df_cleaned = df_cleaned.filter((col("fare_amount") >= 0) & (col("total_amount") >= 0))
#display(df_cleaned.filter((col("fare_amount") < 0) | (col("total_amount") < 0)))
print(f"Rows after fare/total amount filter: {df_cleaned.count()}")

# Step 3: Filter datetime logic
df_cleaned = df_cleaned.filter(
    (col("tpep_pickup_datetime") < col("tpep_dropoff_datetime")) &
    (col("tpep_pickup_datetime") >= "2024-01-01") &
    (col("tpep_pickup_datetime") < "2024-02-01")
)
#display(df_cleaned.filter(
#    (col("tpep_pickup_datetime") >= col("tpep_dropoff_datetime")) |
#    (col("tpep_pickup_datetime") < "2024-01-01") |
#    (col("tpep_pickup_datetime") >= "2024-02-01")
#))
print(f"Rows after datetime filter: {df_cleaned.count()}")

# Step 4: Filter location IDs
df_cleaned = df_cleaned.filter(
    (col("PULocationID").between(1, 263)) &
    (col("DOLocationID").between(1, 263))
)
#display(df_cleaned.filter(
#    (~col("PULocationID").between(1, 263)) |
#    (~col("DOLocationID").between(1, 263))
#))
print(f"Rows after location ID filter: {df_cleaned.count()}")

# Step 5: Filter passenger_count
df_cleaned = df_cleaned.filter((col("passenger_count") > 0) & (col("passenger_count") <= 8))
#display(df_cleaned.filter((col("passenger_count") <= 0) | (col("passenger_count") > 8)))
print(f"Rows after passenger_count filter: {df_cleaned.count()}")

# Step 6: Impute missing payment_type
df_cleaned = df_cleaned.withColumn(
    "payment_type",
    when(col("payment_type").isNull(), "Unknown").otherwise(col("payment_type"))
)
#display(df_cleaned.filter(col("payment_type") == "Unknown"))
print(f"Rows with payment_type 'Unknown': {df_cleaned.filter(col('payment_type') == 'Unknown').count()}")

# Final null check
# display(df_cleaned.select(
#     [count(when(col(c).isNull() | (isnan(c) if c in numeric_columns else col(c).isNull()), c)).# alias(c) for c in df_cleaned.columns]
# ))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,0,0,140162,0,140162,140162,0,0,0,0,0,0,0,0,0,0,140162,140162


Rows after trip_distance filter: 2904194
Rows after fare/total amount filter: 2870052
Rows after datetime filter: 2869923
Rows after location ID filter: 2841701
Rows after passenger_count filter: 2697620
Rows with payment_type 'Unknown': 0


Action 2.3: Derive New Features (Feature Engineering)

1. **trip_duration_min**: Calculates the trip duration in minutes. Useful for analyzing trip lengths and identifying outliers.
2. **avg_speed_mph**: Computes the average speed in miles per hour. Helps in understanding traffic patterns and driver behavior.
3. **pickup_day_of_week**: Extracts the day of the week from the pickup datetime. Valuable for identifying trends based on days.
4. **pickup_hour**: Extracts the hour from the pickup datetime. Useful for time-based analysis and peak hour identification.
5. **time_of_day_slot**: Categorizes the pickup time into Morning, Afternoon, Evening, and Night. Helps in segmenting data based on time slots.

In [0]:
%python
# Register the DataFrame as a temporary view
df_cleaned.createOrReplaceTempView("df_cleaned")

# Now you can run your SQL query
query = """
SELECT
  *,
  -- Trip Duration in minutes
  (unix_timestamp(tpep_dropoff_datetime) - unix_timestamp(tpep_pickup_datetime)) / 60.0 AS trip_duration_min,
  -- Filter out zero or negative durations for speed calculation
  CASE 
    WHEN (unix_timestamp(tpep_dropoff_datetime) - unix_timestamp(tpep_pickup_datetime)) > 0
    THEN trip_distance / ((unix_timestamp(tpep_dropoff_datetime) - unix_timestamp(tpep_pickup_datetime)) / 3600.0)
    ELSE NULL
  END AS avg_speed_mph,
  -- Day of Week (0=Sunday, 6=Saturday)
  dayofweek(tpep_pickup_datetime) AS pickup_day_of_week,
  -- Hour of Day
  hour(tpep_pickup_datetime) AS pickup_hour,
  -- Time of Day Slot
  CASE
    WHEN hour(tpep_pickup_datetime) BETWEEN 5 AND 11 THEN 'Morning'
    WHEN hour(tpep_pickup_datetime) BETWEEN 12 AND 16 THEN 'Afternoon'
    WHEN hour(tpep_pickup_datetime) BETWEEN 17 AND 20 THEN 'Evening'
    ELSE 'Night'
  END AS time_of_day_slot
FROM
  df_cleaned
WHERE
  unix_timestamp(tpep_dropoff_datetime) > unix_timestamp(tpep_pickup_datetime)
"""

# Execute the query
result_df = spark.sql(query)
display(result_df.limit(3))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,trip_duration_min,avg_speed_mph,pickup_day_of_week,pickup_hour,time_of_day_slot
2,2024-01-01T00:57:55Z,2024-01-01T01:17:43Z,1,1.72,1,N,186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0,19.8,5.212121212121212,2,0,Night
1,2024-01-01T00:03:00Z,2024-01-01T00:09:36Z,1,1.8,1,N,140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0,6.6,16.363636363636363,2,0,Night
1,2024-01-01T00:17:06Z,2024-01-01T00:35:01Z,1,4.7,1,N,236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0,17.916667,15.73954074029423,2,0,Night


Action 2.4: Data Validation Post-Transformation

In [0]:
# After all cleaning and transformations, let's perform a final validation:

# Display the schema of the final transformed DataFrame
result_df.printSchema()

# Recalculate summary statistics and null counts
display(result_df.describe())
display(result_df.select(
    [count(when(col(c).isNull() | (isnan(c) if c in numeric_columns else col(c).isNull()), c)).alias(c) for c in result_df.columns]
))

# Explicitly check if your cleaning rules were effective
assert result_df.filter(col("trip_distance") <= 0).count() == 0, "There are rows with non-positive trip_distance"
assert result_df.filter(col("fare_amount") < 0).count() == 0, "There are rows with negative fare_amount"
assert result_df.filter(col("total_amount") < 0).count() == 0, "There are rows with negative total_amount"
assert result_df.filter(col("tpep_pickup_datetime") >= col("tpep_dropoff_datetime")).count() == 0, "There are rows with pickup datetime >= dropoff datetime"
assert result_df.filter(~col("PULocationID").between(1, 263)).count() == 0, "There are rows with invalid PULocationID"
assert result_df.filter(~col("DOLocationID").between(1, 263)).count() == 0, "There are rows with invalid DOLocationID"
assert result_df.filter((col("passenger_count") <= 0) | (col("passenger_count") > 8)).count() == 0, "There are rows with invalid passenger_count"

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- trip_duration_min: decimal(27,6) (nullable = true)
 |-- avg_speed_mph: double (nullable = true)
 |-- pickup_day_of_week: int

summary,VendorID,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,trip_duration_min,avg_speed_mph,pickup_day_of_week,pickup_hour,time_of_day_slot
count,2697620.0,2697620.0,2697620.0,2697620.0,2697620,2697620.0,2697620.0,2697620.0,2697620.0,2697620.0,2697620.0,2697620.0,2697620.0,2697620.0,2697620.0,2697620.0,2697620.0,2697620.0,2697620.0,2697620.0,2697620.0,2697620
mean,1.767625907281233,1.3538081716476005,3.229592804026712,2.032152045136083,,166.13226733194446,164.73021923028446,1.1850764748185438,18.11029795152948,1.5309953180952114,0.4974484916333657,3.4395620917712835,0.5256130848650278,0.9998639912218904,27.030179717658143,2.332507729035224,0.1429233917304883,15.6766981387,11.873919001604998,4.081087032272892,14.267973991889146,
stddev,0.4223463506123043,0.843769864150112,4.23663351794273,9.764326127981024,,63.01814444862045,68.83811659140156,0.4559298809349903,16.015901901398284,1.7924573439326803,0.0358442986294347,3.717198272509116,2.019571711211677,0.0105873262491545,20.51892777885544,0.6250408381138748,0.4792587242498916,35.812409784988844,68.7322667121902,1.918411153736949,5.626889565623755,
min,1.0,1.0,0.01,1.0,N,1.0,1.0,1.0,0.0,-2.5,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.016667,0.0004181281814589337,1.0,0.0,Afternoon
max,2.0,8.0,80.0,99.0,Y,263.0,263.0,4.0,2221.3,14.25,4.0,422.7,95.46,1.0,2225.3,2.5,1.75,9455.4,41726.61870503597,7.0,23.0,Night


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,trip_duration_min,avg_speed_mph,pickup_day_of_week,pickup_hour,time_of_day_slot
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


Task 3: Advanced Transformations and Business Logic (Est. 8 - 12 hours)
Objective: Apply more complex transformations, potentially including window functions or UDFs for sophisticated business logic, to further refine the dataset for specific analytical scenarios.

Action 3.1: Define a Scenario for Advanced Transformation



- I want to calculate the average tip percentage for trips that start at airport locations versus those that don't. To do this, I'll first identify which PULocationIDs correspond to airports (I'll need to look these up or make some reasonable assumptions). Then, I'll filter the trips accordingly and use window functions to get the average tip percentage for both groups.

- I'm interested in finding trips that are much longer or shorter than the average for a given route (from PULocationID to DOLocationID) and time of day. My plan is to use window functions to calculate the average trip duration for each route and time window, then flag trips that are significant outliers.

- I want to see which payment types are most popular within each rate code. I'll use window functions to partition the data by rate code and rank the payment types by how often they're used.

- I plan to detect unusual fare amounts for specific vendor and rate code combinations. I'll calculate the mean and standard deviation of fares within each group using window functions, then identify trips where the fare is an outlier.

- I'm curious about how weather events, like heavy rain, affect trip duration and fare in different boroughs. I'll join weather data with the trip data, then use window functions to compare trip durations and fares during bad weather versus normal conditions.

- Finally, I want to segment customers based on how often they ride and how much they spend on average, so I can identify high-value customer profiles. I'll use window functions to calculate these metrics for each customer and then group them accordingly.

Action 3.2: Implement Advanced Transformations (Window Functions / UDFs)

In [0]:
from pyspark.sql.functions import col, avg, count, when, expr, stddev, mean, lit, countDistinct, sum as spark_sum, row_number
from pyspark.sql.window import Window

# 1. Average tip percentage for airport vs non-airport pickups
airport_pulocationids = [1, 132, 138]  # JFK=132, LaGuardia=138, Newark=1 (assumed)
result_df_with_tip = result_df.withColumn(
    "tip_percentage", when(col("total_amount") > 0, col("tip_amount") / col("total_amount") * 100).otherwise(None)
).withColumn(
    "is_airport_pickup", when(col("PULocationID").isin(airport_pulocationids), "Airport").otherwise("Non-Airport")
)
avg_tip_pct = result_df_with_tip.groupBy("is_airport_pickup").agg(avg("tip_percentage").alias("avg_tip_percentage"))
display(avg_tip_pct)

# 2. Trips significantly longer/shorter than average for route and time of day
route_stats = result_df.groupBy("PULocationID", "DOLocationID", "time_of_day_slot").agg(
    avg("trip_duration_min").alias("avg_duration"),
    stddev("trip_duration_min").alias("std_duration")
)
joined = result_df.join(
    route_stats,
    on=["PULocationID", "DOLocationID", "time_of_day_slot"],
    how="left"
).withColumn(
    "duration_zscore", (col("trip_duration_min") - col("avg_duration")) / col("std_duration")
)
significant_trips = joined.filter((col("duration_zscore") > 2) | (col("duration_zscore") < -2))
display(significant_trips.select("PULocationID", "DOLocationID", "time_of_day_slot", "trip_duration_min", "avg_duration", "duration_zscore").limit(4))

# 3. Ranking payment types by usage frequency within rate codes
window_spec = Window.partitionBy("RatecodeID").orderBy(col("trip_count").desc())
payment_rank = result_df.groupBy("RatecodeID", "payment_type").agg(
    count("*").alias("trip_count")
).withColumn(
    "rank", row_number().over(window_spec)
)
display(payment_rank.limit(4))

# 4. Detecting anomalous fare amounts for vendor and rate code (using 3-sigma rule)
fare_stats = result_df.groupBy("VendorID", "RatecodeID").agg(
    mean("fare_amount").alias("mean_fare"),
    stddev("fare_amount").alias("std_fare")
)
fare_anomalies = result_df.join(
    fare_stats, on=["VendorID", "RatecodeID"], how="left"
).withColumn(
    "fare_zscore", (col("fare_amount") - col("mean_fare")) / col("std_fare")
).filter((col("fare_zscore") > 3) | (col("fare_zscore") < -3))
display(fare_anomalies.select("VendorID", "RatecodeID", "fare_amount", "mean_fare", "fare_zscore").limit(4))

# 5. Impact of weather events (requires weather data joined by date and borough)
# Assume weather_df with columns: date, borough, heavy_rain (boolean)
# Assume result_df has 'pickup_date' and 'pickup_borough' columns
# Join and analyze
if "weather_df" in locals():
    weather_joined = result_df.join(
        weather_df,
        (result_df.tpep_pickup_datetime.cast("date") == weather_df.date) &
        (result_df.pickup_borough == weather_df.borough),
        how="left"
    )
    rain_impact = weather_joined.groupBy("pickup_borough", "heavy_rain").agg(
        avg("trip_duration_min").alias("avg_duration"),
        avg("fare_amount").alias("avg_fare"),
        count("*").alias("trip_count")
    )
    display(rain_impact.limit(4))

# 6. Segmenting customers by ride frequency and average spend (assuming unique customer_id)
if "customer_id" in result_df.columns:
    customer_seg = result_df.groupBy("customer_id").agg(
        count("*").alias("ride_count"),
        avg("total_amount").alias("avg_spend"),
        spark_sum("total_amount").alias("total_spend")
    )
    high_value = customer_seg.filter((col("ride_count") > 10) & (col("avg_spend") > 30))
    display(high_value.limit(4))

is_airport_pickup,avg_tip_percentage
Airport,11.25589703287847
Non-Airport,12.463699630120871


PULocationID,DOLocationID,time_of_day_slot,trip_duration_min,avg_duration,duration_zscore
233,43,Night,23.916667,11.5180556667,2.8097546681729093
163,186,Night,37.3,10.0351207652,6.6587307687170485
141,79,Night,21.65,13.7109118161,2.090375707393651
141,79,Night,26.483333,13.7109118161,3.3630006808047463


RatecodeID,payment_type,trip_count,rank
1,1,2137773,1
1,2,396164,2
1,4,21163,3
1,3,9118,4


VendorID,RatecodeID,fare_amount,mean_fare,fare_zscore
1,99,77.5,33.50606889414547,3.361369472353549
1,99,76.5,33.50606889414547,3.2849641730802084
1,99,74.5,33.50606889414547,3.1321535745335267
1,99,78.5,33.50606889414547,3.4377747716268896


Action 3.3: Validate Advanced Transformation Results

In [0]:
from pyspark.sql.functions import col, avg, count, when, expr, stddev, mean, lit, countDistinct, sum as spark_sum, row_number
from pyspark.sql.window import Window

# 1. Average tip percentage for airport vs non-airport pickups
airport_pulocationids = [1, 132, 138]  # JFK=132, LaGuardia=138, Newark=1 (assumed)
avg_tip_pct = result_df_with_tip.groupBy("is_airport_pickup").agg(avg("tip_percentage").alias("avg_tip_percentage"))
display(avg_tip_pct.filter(col("is_airport_pickup") == "Airport").limit(4))

# 2. Trips significantly longer/shorter than average for route and time of day
significant_trips = joined.filter((col("duration_zscore") > 2) | (col("duration_zscore") < -2))
display(significant_trips.filter(col("duration_zscore") > 2).limit(4))

# 3. Ranking payment types by usage frequency within rate codes
payment_rank = result_df.groupBy("RatecodeID", "payment_type").agg(
    count("*").alias("trip_count")
).withColumn(
    "rank", row_number().over(window_spec)
)
display(payment_rank.filter(col("rank") == 1).limit(4))

# 4. Detecting anomalous fare amounts for vendor and rate code (using 3-sigma rule)
fare_anomalies = result_df.join(
    fare_stats, on=["VendorID", "RatecodeID"], how="left"
).withColumn(
    "fare_zscore", (col("fare_amount") - col("mean_fare")) / col("std_fare")
).filter((col("fare_zscore") > 3) | (col("fare_zscore") < -3))
display(fare_anomalies.filter(col("fare_zscore") > 3).limit(4))

# 5. Impact of weather events (requires weather data joined by date and borough)
if "weather_df" in locals():
    rain_impact = weather_joined.groupBy("pickup_borough", "heavy_rain").agg(
        avg("trip_duration_min").alias("avg_duration"),
        avg("fare_amount").alias("avg_fare"),
        count("*").alias("trip_count")
    )
    display(rain_impact.filter(col("heavy_rain") == True).limit(4))

# 6. Segmenting customers by ride frequency and average spend (assuming unique customer_id)
if "customer_id" in result_df.columns:
    high_value = customer_seg.filter((col("ride_count") > 10) & (col("avg_spend") > 30))
    display(high_value.filter(col("ride_count") > 20).limit(4))

is_airport_pickup,avg_tip_percentage
Airport,11.25589703287847


PULocationID,DOLocationID,time_of_day_slot,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,trip_duration_min,avg_speed_mph,pickup_day_of_week,pickup_hour,avg_duration,std_duration,duration_zscore
148,141,Night,1,2024-01-01T00:54:08Z,2024-01-01T01:26:31Z,1,4.7,1,N,1,29.6,3.5,0.5,6.9,0.0,1.0,41.5,2.5,0.0,32.383333,8.708186807282267,2,0,16.2415010828,4.336702979450849,3.7221437561407584
246,170,Night,1,2024-01-01T00:36:30Z,2024-01-01T01:13:53Z,2,1.7,1,N,1,29.6,3.5,0.5,6.9,0.0,1.0,41.5,2.5,0.0,37.383333,2.728486684984977,2,0,12.0217592222,4.014319296486399,6.317776914257456
79,41,Night,2,2024-01-01T00:49:31Z,2024-01-01T01:35:41Z,2,8.89,1,N,1,47.8,1.0,0.5,7.92,0.0,1.0,60.72,2.5,0.0,46.166667,11.553797287391935,2,0,25.0342766887,6.210815733804803,3.402514454949721
68,125,Night,2,2024-01-01T00:58:11Z,2024-01-01T01:24:47Z,1,2.42,1,N,1,23.3,1.0,0.5,0.0,0.0,1.0,28.3,2.5,0.0,26.6,5.458650720790016,2,0,10.7756944417,4.024266311522048,3.9322212630393674


RatecodeID,payment_type,trip_count,rank
1,1,2137773,1
2,1,77278,1
3,1,5177,1
4,1,387,1


VendorID,RatecodeID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,trip_duration_min,avg_speed_mph,pickup_day_of_week,pickup_hour,time_of_day_slot,mean_fare,std_fare,fare_zscore
1,1,2024-01-01T00:35:16Z,2024-01-01T01:11:52Z,2,8.2,N,246,190,1,59.0,3.5,0.5,14.15,6.94,1.0,85.09,2.5,0.0,36.6,13.442622950819672,2,0,Night,14.901246000401962,10.62049779920962,4.152230416438661
2,1,2024-01-01T00:54:16Z,2024-01-01T01:27:40Z,1,13.74,N,239,95,1,56.9,1.0,0.5,13.77,6.94,1.0,82.61,2.5,0.0,33.4,24.682619950526973,2,0,Night,16.00500293806924,12.028247858981686,3.39991306642337
2,1,2024-01-01T00:50:28Z,2024-01-01T01:38:39Z,1,20.34,N,132,26,1,80.0,1.0,0.5,2.0,0.0,1.0,86.25,0.0,1.75,48.183333,25.328246099898383,2,0,Night,16.00500293806924,12.028247858981686,5.320392280921005
2,1,2024-01-01T00:55:31Z,2024-01-01T01:21:15Z,1,18.42,N,132,213,2,68.1,1.0,0.5,0.0,6.94,1.0,79.29,0.0,1.75,25.733333,42.9481754020271,2,0,Night,16.00500293806924,12.028247858981686,4.331054503755556


Task 4: Loading Data into Delta Lake and Basic Optimization (Est. 6 - 8 hours)
Objective: Load the fully processed and transformed taxi data into a Delta Lake table. Apply basic Delta Lake optimization techniques and understand their purpose.

Action 4.1: Define Schema and Partitioning Strategy for Delta Table

**Schema Review:**  
The final transformed DataFrame (`final_taxi_df`) includes all necessary columns for analysis, such as trip times, locations, fare details, and timestamps.

**Partitioning Strategy:**  
We will partition the Delta table by `pickup_year`, `pickup_month`, and `pickup_day`, which are derived from the `tpep_pickup_datetime` column.

**Justification:**  
Partitioning by pickup date components is chosen because most analytical queries on taxi data are time-based (e.g., daily, monthly trends, or filtering by specific dates). This strategy will significantly improve query performance by allowing Delta Lake to efficiently prune partitions during reads, reducing scan time and resource usage.

Action 4.2: Write to Delta Lake Table



In [0]:
%python
from pyspark.sql.functions import year, month, dayofmonth, col

# Save the original DataFrame before transformations that may remove customer-level info
final_taxi_df = result_df

# Add partitioning columns if they don't exist
if 'final_taxi_df' in locals() and not {"pickup_year", "pickup_month", "pickup_day"}.issubset(set(final_taxi_df.columns)):
    final_taxi_df = final_taxi_df.withColumn("pickup_year", year(col("tpep_pickup_datetime"))) \
                                 .withColumn("pickup_month", month(col("tpep_pickup_datetime"))) \
                                 .withColumn("pickup_day", dayofmonth(col("tpep_pickup_datetime")))

# Write to Delta Lake table with partitioning
final_taxi_df.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("pickup_year", "pickup_month", "pickup_day") \
    .save("/mount_point/delta/taxi_trips")

Action 4.3: Register Delta Table (Optional but Recommended)

In [0]:
%python
# Read the Delta table into a DataFrame
df = spark.read.format("delta").load("dbfs:/mount_point/delta/taxi_trips")

# Display the DataFrame
display(df.limit(3))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,trip_duration_min,avg_speed_mph,pickup_day_of_week,pickup_hour,time_of_day_slot,pickup_year,pickup_month,pickup_day
2,2024-01-16T23:45:34Z,2024-01-16T23:53:08Z,1,3.09,1,N,132,10,1,14.2,1.0,0.5,0.1,0.0,1.0,18.55,0.0,1.75,7.566667,24.502224231034564,3,23,Night,2024,1,16
2,2024-01-16T23:59:51Z,2024-01-17T00:01:22Z,1,0.28,1,N,263,263,2,4.4,1.0,0.5,0.0,0.0,1.0,9.4,2.5,0.0,1.516667,11.076825698235622,3,23,Night,2024,1,16
2,2024-01-16T23:59:42Z,2024-01-17T00:30:37Z,1,6.63,1,N,114,257,1,33.8,1.0,0.5,9.7,0.0,1.0,48.5,2.5,0.0,30.916667,12.866840812144124,3,23,Night,2024,1,16


Action 4.4: Perform Basic Delta Lake Optimization

Run OPTIMIZE to compact files and sort by PULocationID, DOLocationID:
spark.sql("OPTIMIZE delta.`/mount_point/delta/taxi_trips` ZORDER BY (PULocationID, DOLocationID)")

In [0]:
# Run OPTIMIZE on your Delta table.
spark.sql("OPTIMIZE delta.`/mount_point/delta/taxi_trips` ZORDER BY (PULocationID, DOLocationID)")

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

Action 4.5: Basic DML and Time Travel (Conceptual/Optional Practical)

In [0]:
%python
# Read the Delta table into a DataFrame
df = spark.read.format("delta").load("dbfs:/mount_point/delta/taxi_trips")

# Display the DataFrame
display(df.limit(3))

# Perform a simple UPDATE on a small subset of data
spark.sql("""
    UPDATE delta.`/mount_point/delta/taxi_trips`
    SET fare_amount = fare_amount * 1.1
    WHERE PULocationID = 1
""")

# Query a previous version of the table using VERSION AS OF
previous_version_df = spark.read.format("delta").option("versionAsOf", 0).load("dbfs:/mount_point/delta/taxi_trips")

# Display the previous version of the DataFrame
display(previous_version_df.limit(3))

# Run OPTIMIZE on your Delta table.
spark.sql("OPTIMIZE delta.`/mount_point/delta/taxi_trips` ZORDER BY (PULocationID, DOLocationID)")

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,trip_duration_min,avg_speed_mph,pickup_day_of_week,pickup_hour,time_of_day_slot,pickup_year,pickup_month,pickup_day
2,2024-01-01T00:57:55Z,2024-01-01T01:17:43Z,1,1.72,1,N,186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0,19.8,5.212121212121212,2,0,Night,2024,1,1
1,2024-01-01T00:03:00Z,2024-01-01T00:09:36Z,1,1.8,1,N,140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0,6.6,16.363636363636363,2,0,Night,2024,1,1
1,2024-01-01T00:17:06Z,2024-01-01T00:35:01Z,1,4.7,1,N,236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0,17.916667,15.73954074029423,2,0,Night,2024,1,1


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,trip_duration_min,avg_speed_mph,pickup_day_of_week,pickup_hour,time_of_day_slot,pickup_year,pickup_month,pickup_day
2,2024-01-09T23:56:19Z,2024-01-10T00:06:29Z,1,1.52,1,N,186,230,2,11.4,1.0,0.5,0.0,0.0,1.0,16.4,2.5,0.0,10.166667,8.970515332499232,3,23,Night,2024,1,9
2,2024-01-09T23:59:39Z,2024-01-10T00:19:26Z,1,5.92,1,N,230,87,1,26.8,1.0,0.5,6.36,0.0,1.0,38.16,2.5,0.0,19.783333,17.954519261681053,3,23,Night,2024,1,9
2,2024-01-09T23:54:39Z,2024-01-10T00:07:46Z,1,3.71,1,N,164,263,1,17.7,1.0,0.5,4.54,0.0,1.0,27.24,2.5,0.0,13.116667,16.970783720855767,3,23,Night,2024,1,9


DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

## Deliverable Snippet (for your report)

### ACID Transactions in Delta Lake
Delta Lake ensures ACID (Atomicity, Consistency, Isolation, Durability) transactions, which means that all operations are completed successfully or none at all, the data remains consistent, operations are isolated from each other, and once a transaction is committed, it remains durable even in the case of failures.

### Time Travel in Delta Lake
Time Travel allows you to query previous versions of your Delta Lake table. This is useful for auditing, debugging, and reproducing experiments.

### DML and Time Travel Commands
- **UPDATE Command**: We performed an UPDATE to increase the `fare_amount` by 10% where `PULocationID` is 1.
- **Time Travel Command**: We queried the table as it was at version 0 to see the data before the UPDATE.

### Outcome
- The UPDATE command modified the `fare_amount` for records with `PULocationID` 1.
- The Time Travel command allowed us to view the table's state before the UPDATE, demonstrating the ability to access historical data.

Task 5: Data Analysis and Insights from Delta Table (Est. 6 - 8 hours)
Objective: Query the optimized Delta Lake table using Spark SQL to derive meaningful insights and answer analytical questions.



Action 5.1: Formulate Analytical Questions

Based on the enriched and processed data in your Delta table, we aim to answer 3-5 analytical questions that can provide business value or interesting insights into taxi trip patterns.
- How does average fare amount vary by time_of_day_slot and pickup_day_of_week?
- What are the top 10 busiest routes (PULocationID to DOLocationID pairs) during peak hours?
- Is there a correlation between trip_duration_minutes and tip_amount for different payment_types?
- How do airport trips (trips to/from known airport LocationIDs) differ in terms of average distance, fare, and tip percentage compared to non-airport trips?

Action 5.2: Write Spark SQL Queries for Analysis

In [0]:
%python
# How does average fare amount vary by time_of_day_slot and pickup_day_of_week?
query = """
    WITH avg_fare AS (
        SELECT 
            time_of_day_slot, 
            pickup_day_of_week, 
            AVG(fare_amount) AS avg_fare_amount
        FROM delta.`/mount_point/delta/taxi_trips`
        GROUP BY time_of_day_slot, pickup_day_of_week
    )
    SELECT * FROM avg_fare
    ORDER BY pickup_day_of_week, time_of_day_slot
"""

df = spark.sql(query)
display(df.limit(3))

time_of_day_slot,pickup_day_of_week,avg_fare_amount
Afternoon,1,18.195038825540003
Evening,1,19.137561389697545
Morning,1,17.72485988255403


In [0]:
%python
# How does average fare amount vary by time_of_day_slot and pickup_day_of_week?
query = """
    WITH peak_hours AS (
        SELECT 
            PULocationID, 
            DOLocationID, 
            COUNT(*) AS trip_count
        FROM delta.`/mount_point/delta/taxi_trips`
        WHERE time_of_day_slot IN ('morning_peak', 'evening_peak')
        GROUP BY PULocationID, DOLocationID
    )
    SELECT * FROM peak_hours
    ORDER BY trip_count DESC
    LIMIT 10
"""
df = spark.sql(query)
display(df.limit(3))

PULocationID,DOLocationID,trip_count


In [0]:
# Is there a correlation between trip_duration_minutes and tip_amount for different payment_types?
df = spark.sql("""
    WITH correlation_data AS (
        SELECT 
            payment_type, 
            trip_duration_min, 
            tip_amount
        FROM delta.`/mount_point/delta/taxi_trips`
    )
    SELECT 
        payment_type, 
        corr(trip_duration_min, tip_amount) AS correlation
    FROM correlation_data
    GROUP BY payment_type
""")
display(df.limit(3))

payment_type,correlation
3,0.0054343052755681
1,0.200201572692807
4,0.0014307468306927


In [0]:
# How do airport trips (trips to/from known airport LocationIDs) differ in terms of average distance, fare, and tip percentage compared to non-airport trips?
df = spark.sql("""
    WITH airport_trips AS (
        SELECT 
            CASE 
                WHEN PULocationID IN (1, 2, 3) OR DOLocationID IN (1, 2, 3) THEN 'airport'
                ELSE 'non_airport'
            END AS trip_type,
            trip_distance,
            fare_amount,
            tip_amount / fare_amount * 100 AS tip_percentage
        FROM delta.`/mount_point/delta/taxi_trips`
    )
    SELECT 
        trip_type, 
        AVG(trip_distance) AS avg_distance, 
        AVG(fare_amount) AS avg_fare, 
        AVG(tip_percentage) AS avg_tip_percentage
    FROM airport_trips
    GROUP BY trip_type
""")
display(df.limit(3))

trip_type,avg_distance,avg_fare,avg_tip_percentage
non_airport,3.1926727705607414,17.928830773198005,21.472844461001984
airport,18.39588154480233,92.76895061822624,15.65056132718484


Action 5.3: Interpret and Document Insights