In [0]:
dbfs_subfolder_path = "/FileStore/tables/taxi_zones/"
dbutils.fs.mkdirs(dbfs_subfolder_path)
dbutils.fs.ls("/FileStore/tables/")

[FileInfo(path='dbfs:/FileStore/tables/01_Data_Ingention_trip_data.py', name='01_Data_Ingention_trip_data.py', size=4574, modificationTime=1732611700000),
 FileInfo(path='dbfs:/FileStore/tables/nyc_taxi_data_delta/', name='nyc_taxi_data_delta/', size=0, modificationTime=1732619284337),
 FileInfo(path='dbfs:/FileStore/tables/taxi_zone_lookup.csv', name='taxi_zone_lookup.csv', size=12331, modificationTime=1732611654000),
 FileInfo(path='dbfs:/FileStore/tables/taxi_zones/', name='taxi_zones/', size=0, modificationTime=1732619284337),
 FileInfo(path='dbfs:/FileStore/tables/taxi_zones.zip', name='taxi_zones.zip', size=1025147, modificationTime=1732611639000),
 FileInfo(path='dbfs:/FileStore/tables/yellow_tripdata_2023_01.parquet', name='yellow_tripdata_2023_01.parquet', size=47673370, modificationTime=1732611400000),
 FileInfo(path='dbfs:/FileStore/tables/yellow_tripdata_2023_02.parquet', name='yellow_tripdata_2023_02.parquet', size=47748012, modificationTime=1732611400000),
 FileInfo(path=

In [0]:
import zipfile
import os

# Define paths
dbfs_zip_path = "/FileStore/tables/taxi_zones/taxi_zones.zip"
local_zip_path = "/tmp/taxi_zones.zip"
local_extract_path = "/tmp/taxi_zones/"

# Copy the zip file from DBFS to a local path
dbutils.fs.cp(dbfs_zip_path, "file:" + local_zip_path)

# Unzip the file locally and list the contents
with zipfile.ZipFile(local_zip_path, 'r') as zip_ref:
    zip_ref.extractall(local_extract_path)

# Verify extracted files
extracted_files = os.listdir(local_extract_path)
print("Extracted files:", extracted_files)

Extracted files: ['taxi_zones.dbf', 'taxi_zones.shp.xml', 'taxi_zones.shp', 'taxi_zones.prj', 'taxi_zones.shx', 'taxi_zones.sbx', 'taxi_zones.sbn']


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import radians, cos, sin, asin, sqrt, col, udf, hour, dayofweek
from pyspark.sql.types import FloatType

# Create a Spark session
spark = SparkSession.builder \
    .appName("NYC Taxi Trip Analysis") \
    .getOrCreate()

In [0]:
# Check for the .shp file in the extracted directory
if "taxi_zones.shp" in extracted_files:
    import geopandas as gpd
    # Load the shapefile
    zones = gpd.read_file(os.path.join(local_extract_path, "taxi_zones.shp"))
    
    # Convert to WGS 84 coordinate system if necessary
    zones = zones.to_crs(epsg=4326)
else:
    print("Error: 'taxi_zones.shp' not found in extracted files.")



In [0]:
zones

Unnamed: 0,OBJECTID,Shape_Leng,Shape_Area,zone,LocationID,borough,geometry
0,1,0.116357,0.000782,Newark Airport,1,EWR,"POLYGON ((-74.18445 40.695, -74.18449 40.6951,..."
1,2,0.433470,0.004866,Jamaica Bay,2,Queens,"MULTIPOLYGON (((-73.82338 40.63899, -73.82277 ..."
2,3,0.084341,0.000314,Allerton/Pelham Gardens,3,Bronx,"POLYGON ((-73.84793 40.87134, -73.84725 40.870..."
3,4,0.043567,0.000112,Alphabet City,4,Manhattan,"POLYGON ((-73.97177 40.72582, -73.97179 40.725..."
4,5,0.092146,0.000498,Arden Heights,5,Staten Island,"POLYGON ((-74.17422 40.56257, -74.17349 40.562..."
...,...,...,...,...,...,...,...
258,259,0.126750,0.000395,Woodlawn/Wakefield,259,Bronx,"POLYGON ((-73.85107 40.91037, -73.85207 40.909..."
259,260,0.133514,0.000422,Woodside,260,Queens,"POLYGON ((-73.90175 40.76078, -73.90147 40.759..."
260,261,0.027120,0.000034,World Trade Center,261,Manhattan,"POLYGON ((-74.01333 40.70503, -74.01327 40.704..."
261,262,0.049064,0.000122,Yorkville East,262,Manhattan,"MULTIPOLYGON (((-73.94383 40.78286, -73.94376 ..."


In [0]:
# Extract 'LocationID', 'latitude', and 'longitude' (using centroids of the polygons)
zones['latitude'] = zones.geometry.centroid.y
zones['longitude'] = zones.geometry.centroid.x


  zones['latitude'] = zones.geometry.centroid.y

  zones['longitude'] = zones.geometry.centroid.x


In [0]:
# Select 'LocationID', 'latitude', and 'longitude'
location_lat_long_df = zones[['LocationID', 'latitude', 'longitude']]

In [0]:
# Convert GeoDataFrame to Spark DataFrame
location_lat_long_spark_df = spark.createDataFrame(location_lat_long_df)

In [0]:
display(location_lat_long_spark_df)

In [0]:
# List all files in the specified DBFS directory
dbutils.fs.ls("dbfs:/dbfs/FileStore/tables/data_processed_trip_data/")

[FileInfo(path='dbfs:/dbfs/FileStore/tables/data_processed_trip_data/_delta_log/', name='_delta_log/', size=0, modificationTime=1732619326421),
 FileInfo(path='dbfs:/dbfs/FileStore/tables/data_processed_trip_data/part-00000-ee03aa54-de58-4c15-a1a4-e7305da5797c.c000.snappy.parquet', name='part-00000-ee03aa54-de58-4c15-a1a4-e7305da5797c.c000.snappy.parquet', size=54014771, modificationTime=1732618288000),
 FileInfo(path='dbfs:/dbfs/FileStore/tables/data_processed_trip_data/part-00001-1dcc2395-4d06-4d69-90b3-696a0d024b1e.c000.snappy.parquet', name='part-00001-1dcc2395-4d06-4d69-90b3-696a0d024b1e.c000.snappy.parquet', size=54046480, modificationTime=1732618282000),
 FileInfo(path='dbfs:/dbfs/FileStore/tables/data_processed_trip_data/part-00002-41e890a8-d1d6-4ae8-8918-3817dc1eb79e.c000.snappy.parquet', name='part-00002-41e890a8-d1d6-4ae8-8918-3817dc1eb79e.c000.snappy.parquet', size=53928881, modificationTime=1732618282000),
 FileInfo(path='dbfs:/dbfs/FileStore/tables/data_processed_trip_dat

In [0]:
# Load the Delta table
taxi_zones_df = spark.read.format("delta").load("dbfs:/dbfs/FileStore/tables/data_processed_taxi_zones/")

# Show the first few rows to verify
display(taxi_zones_df)

In [0]:
# Load the Delta table
taxi_df = spark.read.format("delta").load("dbfs:/dbfs/FileStore/tables/data_processed_trip_data/")

# Show the first few rows to verify
display(taxi_df)

In [0]:
location_lat_long_spark_df = location_lat_long_spark_df \
    .withColumnRenamed("latitude", "latitude") \
    .withColumnRenamed("longitude", "longitude") \
    .withColumnRenamed("LocationID", "LocationID")

In [0]:
# Step 2: Join on PULocationID for pickup coordinates
taxi_df = taxi_df.join(
    location_lat_long_spark_df.withColumnRenamed("LocationID", "PULocationID")
    .withColumnRenamed("latitude", "pickup_latitude")
    .withColumnRenamed("longitude", "pickup_longitude"),
    on="PULocationID",
    how="left"
)

In [0]:
# Step 3: Join on DOLocationID for dropoff coordinates
taxi_df = taxi_df.join(
    location_lat_long_spark_df.withColumnRenamed("LocationID", "DOLocationID")
    .withColumnRenamed("latitude", "dropoff_latitude")
    .withColumnRenamed("longitude", "dropoff_longitude"),
    on="DOLocationID",
    how="left"
)

In [0]:
# Show result to verify joins
taxi_df.show(5)

+------------+------------+-------------------+-------------------+---------------+-------------+------------+-----------+------------+---------------------+------------+-------------+------------------+-----------+---------+------------+------------+-------------------+-------------+--------------------+-----------+-----------+------------+------------+-----------------+------------------+------------------+------------------+
|DOLocationID|PULocationID|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|payment_type|fare_amount|tolls_amount|improvement_surcharge|total_amount|trip_duration|pickup_day_of_week|pickup_hour|Month_Num|pickup_month|dropoff_hour|dropoff_day_of_week|dropoff_month|dropoff_week_of_year|pickup_date|pickup_time|dropoff_date|dropoff_time|  pickup_latitude|  pickup_longitude|  dropoff_latitude| dropoff_longitude|
+------------+------------+-------------------+-------------------+---------------+-------------+------------+-----------+------------+-

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import math

In [0]:
# Define the Haversine function
def haversine(lat1, lon1, lat2, lon2):
    # Earth radius in kilometers
    R = 6371.0  
    # Convert degrees to radians
    lat1_rad = math.radians(lat1)
    lon1_rad = math.radians(lon1)
    lat2_rad = math.radians(lat2)
    lon2_rad = math.radians(lon2)

    # Haversine formula
    dlat = lat2_rad - lat1_rad
    dlon = lon2_rad - lon1_rad
    a = (math.sin(dlat / 2) ** 2 +
         math.cos(lat1_rad) * math.cos(lat2_rad) * (math.sin(dlon / 2) ** 2))
    c = 2 * math.asin(math.sqrt(a))
    distance = R * c  # Result in kilometers
    return distance

In [0]:
# Register the Haversine UDF
haversine_udf = udf(haversine, DoubleType())

In [0]:
# Add 'distance_km' column using the Haversine UDF, ensuring no null values
taxi_df = taxi_df.withColumn(
    "distance_km",
    haversine_udf(
        col("pickup_latitude"),
        col("pickup_longitude"),
        col("dropoff_latitude"),
        col("dropoff_longitude")
    )
).na.fill({"distance_km": 0})  
# Replace null distances with 0 if needed

In [0]:
taxi_df.show(5)

+------------+------------+-------------------+-------------------+---------------+-------------+------------+-----------+------------+---------------------+------------+-------------+------------------+-----------+---------+------------+------------+-------------------+-------------+--------------------+-----------+-----------+------------+------------+-----------------+------------------+------------------+------------------+------------------+
|DOLocationID|PULocationID|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|payment_type|fare_amount|tolls_amount|improvement_surcharge|total_amount|trip_duration|pickup_day_of_week|pickup_hour|Month_Num|pickup_month|dropoff_hour|dropoff_day_of_week|dropoff_month|dropoff_week_of_year|pickup_date|pickup_time|dropoff_date|dropoff_time|  pickup_latitude|  pickup_longitude|  dropoff_latitude| dropoff_longitude|       distance_km|
+------------+------------+-------------------+-------------------+---------------+-------------+-

In [0]:
# Check the Shape and Schema
# Get the number of rows and columns
num_rows = taxi_df.count()
num_cols = len(taxi_df.columns)
print(f"Shape: ({num_rows}, {num_cols})")

Shape: (37013080, 29)


In [0]:
# Get DataFrame information
taxi_df.printSchema()

root
 |-- DOLocationID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- improvement_surcharge: float (nullable = true)
 |-- total_amount: float (nullable = true)
 |-- trip_duration: float (nullable = true)
 |-- pickup_day_of_week: integer (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- Month_Num: integer (nullable = true)
 |-- pickup_month: integer (nullable = true)
 |-- dropoff_hour: integer (nullable = true)
 |-- dropoff_day_of_week: integer (nullable = true)
 |-- dropoff_month: integer (nullable = true)
 |-- dropoff_week_of_year: integer (nullable = true)
 |-- pickup_date: string (nullable = true)
 |-- pickup_time: strin

In [0]:
taxi_df.printSchema()
taxi_zones_df.printSchema()

root
 |-- DOLocationID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- improvement_surcharge: float (nullable = true)
 |-- total_amount: float (nullable = true)
 |-- trip_duration: float (nullable = true)
 |-- pickup_day_of_week: integer (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- Month_Num: integer (nullable = true)
 |-- pickup_month: integer (nullable = true)
 |-- dropoff_hour: integer (nullable = true)
 |-- dropoff_day_of_week: integer (nullable = true)
 |-- dropoff_month: integer (nullable = true)
 |-- dropoff_week_of_year: integer (nullable = true)
 |-- pickup_date: string (nullable = true)
 |-- pickup_time: strin

In [0]:
from pyspark.sql import functions as F

# Renaming columns in taxi_zones_df to avoid conflicts when joining
taxi_zones_df_renamed = taxi_zones_df \
    .withColumnRenamed("Borough", "pickup_borough") \
    .withColumnRenamed("Zone", "pickup_zone") \
    .withColumnRenamed("service_zone", "pickup_service_zone")

In [0]:
# Join on PULocationID to get pickup information
taxi_df = taxi_df.join(
    taxi_zones_df_renamed,
    taxi_df.PULocationID == taxi_zones_df_renamed.LocationID,
    "left"
).drop(taxi_zones_df_renamed.LocationID)

In [0]:
# Renaming columns again for dropoff information
taxi_zones_df_renamed = taxi_zones_df \
    .withColumnRenamed("Borough", "dropoff_borough") \
    .withColumnRenamed("Zone", "dropoff_zone") \
    .withColumnRenamed("service_zone", "dropoff_service_zone")

In [0]:
# Join on DOLocationID to get dropoff information
taxi_df = taxi_df.join(
    taxi_zones_df_renamed,
    taxi_df.DOLocationID == taxi_zones_df_renamed.LocationID,
    "left"
).drop(taxi_zones_df_renamed.LocationID)

In [0]:
# Show the result
taxi_df.show(truncate=False)

In [0]:
# Check for Missing Values in each column
# Check for missing values
missing_values = taxi_df.select([((taxi_df[column].isNull()).cast("int")).alias(column) for column in taxi_df.columns]) \
                                .agg({column: 'sum' for column in taxi_df.columns})

display(missing_values.limit(5))


sum(dropoff_time),sum(DOLocationID),sum(improvement_surcharge),sum(pickup_hour),sum(PULocationID),sum(trip_distance),sum(dropoff_borough),sum(pickup_service_zone),sum(pickup_date),sum(dropoff_longitude),sum(LocationID),sum(pickup_latitude),sum(dropoff_day_of_week),sum(tolls_amount),sum(dropoff_date),sum(pickup_time),sum(Month_Num),sum(pickup_zone),sum(pickup_borough),sum(payment_type),sum(fare_amount),sum(pickup_longitude),sum(passenger_count),sum(dropoff_month),sum(dropoff_hour),sum(pickup_day_of_week),sum(dropoff_zone),sum(distance_km),sum(dropoff_datetime),sum(dropoff_week_of_year),sum(trip_duration),sum(total_amount),sum(pickup_datetime),sum(dropoff_latitude),sum(dropoff_service_zone),sum(pickup_month)
0,0,0,0,0,0,545179,376552,0,545919,545179,376665,0,0,0,0,0,376552,376552,0,0,376665,0,0,0,0,545179,0,0,0,0,0,0,545919,545179,0


In [0]:
taxi_df.select("PULocationID", "DOLocationID").filter(col("PULocationID").isNull() | col("DOLocationID").isNull()).show()

+------------+------------+
|PULocationID|DOLocationID|
+------------+------------+
+------------+------------+



In [0]:
# Get distinct pickup locations and rename for joining
pickup_locations = taxi_df.select("PULocationID").distinct().withColumnRenamed("PULocationID", "LocationID")

# Get distinct dropoff locations and rename for joining
dropoff_locations = taxi_df.select("DOLocationID").distinct().withColumnRenamed("DOLocationID", "LocationID")

# Find missing pickup locations
missing_pickup_locations = pickup_locations.join(location_lat_long_spark_df, on="LocationID", how="left_anti")

# Find missing dropoff locations
missing_dropoff_locations = dropoff_locations.join(location_lat_long_spark_df, on="LocationID", how="left_anti")

In [0]:
# Display the results
print("Missing Pickup Locations:")
missing_pickup_locations.show()

Missing Pickup Locations:
+----------+
|LocationID|
+----------+
|        57|
|       264|
|       265|
|       105|
+----------+



In [0]:
print("Missing Dropoff Locations:")
missing_dropoff_locations.show()


Missing Dropoff Locations:
+----------+
|LocationID|
+----------+
|        57|
|       264|
|       265|
|       105|
+----------+



In [0]:
# Count distinct LocationID values in location_lat_long_spark_df
distinct_count = location_lat_long_spark_df.select("LocationID").distinct().count()
print(f"Distinct LocationID count: {distinct_count}")

Distinct LocationID count: 260


In [0]:
# Display all distinct LocationID values in sorted order
display(location_lat_long_spark_df.select("LocationID").distinct().orderBy("LocationID"))

LocationID
1
2
3
4
5
6
7
8
9
10


In [0]:
# Drop the geometry column if it's not needed
zones_cleaned = zones.drop(columns=['geometry'])
zone_df = spark.createDataFrame(zones_cleaned)

In [0]:
# Convert geometry to string representation (e.g., WKT format)
zones['geometry'] = zones['geometry'].apply(lambda geom: geom.wkt if geom else None)  # If using Shapely
zone_geometry_df = spark.createDataFrame(zones)

  zones['geometry'] = zones['geometry'].apply(lambda geom: geom.wkt if geom else None)  # If using Shapely


In [0]:
zone_df=zone_df[['zone','LocationID', 'latitude', 'longitude']]

In [0]:
display(zone_df)

In [0]:
# List of Location IDs to search for
location_ids_to_find = [57, 264, 265, 105]

# Filter zone_df for these Location IDs
zone_df_filtered = zone_df.filter(zone_df.LocationID.isin(location_ids_to_find))

# Show the results
zone_df_filtered.show(truncate=False)

+----+----------+--------+---------+
|zone|LocationID|latitude|longitude|
+----+----------+--------+---------+
+----+----------+--------+---------+



In [0]:
# Specify the IDs to check
ids_to_check = [57, 264, 265, 105]

# Total number of rows in the DataFrame
total_count = taxi_df.count()

# Count occurrences of specified IDs in pulocationid and dulocationid
pulocationid_count = taxi_df.filter(taxi_df.PULocationID.isin(ids_to_check)).count()
dulocationid_count = taxi_df.filter(taxi_df.DOLocationID.isin(ids_to_check)).count()

# Calculate percentages
pulocationid_percentage = (pulocationid_count / total_count) * 100
dulocationid_percentage = (dulocationid_count / total_count) * 100

print(f"Percentage of specified IDs in pulocationid: {pulocationid_percentage:.2f}%")
print(f"Percentage of specified IDs in dulocationid: {dulocationid_percentage:.2f}%")

Percentage of specified IDs in pulocationid: 1.02%
Percentage of specified IDs in dulocationid: 1.47%


In [0]:
# Remove rows containing the specified IDs in pulocationid or dulocationid
taxi_df = taxi_df.filter(~taxi_df.PULocationID.isin(ids_to_check) & ~taxi_df.DOLocationID.isin(ids_to_check))

# Show the filtered DataFrame
taxi_df.show()

In [0]:
# Check for Missing Values in each column
# Check for missing values
missing_values = taxi_df.select([((taxi_df[column].isNull()).cast("int")).alias(column) for column in taxi_df.columns]) \
                                .agg({column: 'sum' for column in taxi_df.columns})

display(missing_values.limit(5))


sum(dropoff_time),sum(DOLocationID),sum(improvement_surcharge),sum(pickup_hour),sum(PULocationID),sum(trip_distance),sum(dropoff_borough),sum(pickup_service_zone),sum(pickup_date),sum(dropoff_longitude),sum(LocationID),sum(pickup_latitude),sum(dropoff_day_of_week),sum(tolls_amount),sum(dropoff_date),sum(pickup_time),sum(Month_Num),sum(pickup_zone),sum(pickup_borough),sum(payment_type),sum(fare_amount),sum(pickup_longitude),sum(passenger_count),sum(dropoff_month),sum(dropoff_hour),sum(pickup_day_of_week),sum(dropoff_zone),sum(distance_km),sum(dropoff_datetime),sum(dropoff_week_of_year),sum(trip_duration),sum(total_amount),sum(pickup_datetime),sum(dropoff_latitude),sum(dropoff_service_zone),sum(pickup_month)
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [0]:
# Check the Shape and Schema
# Get the number of rows and columns
num_rows = taxi_df.count()
num_cols = len(taxi_df.columns)
print(f"Shape: ({num_rows}, {num_cols})")

Shape: (36395607, 36)


In [0]:
display(taxi_df)

In [0]:
# Define a more descriptive Delta Lake storage path
delta_path = "/dbfs/FileStore/tables/data_processed_lat_long/"
# Write the DataFrame to Delta format
taxi_df.write.format("delta").mode("overwrite").save(delta_path)