# NYC Taxi Spark Project
**Thomas Ernest CERESA, DISS Master, Lyon1**

Date: October 14, 2025

Please, feel free to read the report of this project, before.

## 1. Exploration

In [2]:
import subprocess
import sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "shapely", "-q"])

import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, broadcast
from pyspark.sql.types import StringType, FloatType, BooleanType
from shapely.geometry import shape, Point
from pyspark.sql import functions as F
import pandas as pd
from pyspark.sql.window import Window

In [None]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setMaster("local").setAppName("NYC Taxi Spark Project")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

First exploration for comprehension of data:

In [None]:
CSV_PATH = "../data/Sample NYC Data.csv"

In [4]:
taxi_rdd = sc.textFile(CSV_PATH)
taxi_rdd.take(5)

['medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude',
 '89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,01-01-13 15:11,01-01-13 15:18,4,-73.978165,40.757977,-73.989838,40.751171',
 '0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,06-01-13 00:18,06-01-13 00:22,1,-74.006683,40.731781,-73.994499,40.75066',
 '0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,05-01-13 18:49,05-01-13 18:54,1,-74.004707,40.73777,-74.009834,40.726002',
 'DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,07-01-13 23:54,07-01-13 23:58,2,-73.974602,40.759945,-73.984734,40.759388']

In [69]:
GEOJSON_PATH = "../data/nyc-boroughs.geojson.json"

In [70]:
try:
    with open(GEOJSON_PATH, 'r') as f:
        geojson_data = json.load(f)
    print(f"Number of features: {len(geojson_data['features'])}")
except FileNotFoundError:
    print(f"file not found")
except Exception as e:
    print(f"current error: {e}")

Number of features: 104


In [7]:
geojson_data["features"][:3]

[{'type': 'Feature',
  'id': 0,
  'properties': {'boroughCode': 5,
   'borough': 'Staten Island',
   '@id': 'http://nyc.pediacities.com/Resource/Borough/Staten_Island'},
  'geometry': {'type': 'Polygon',
   'coordinates': [[[-74.05050806403247, 40.566422034160816],
     [-74.04998352562575, 40.56639592492827],
     [-74.04931640362088, 40.56588774778044],
     [-74.04923629842045, 40.5653627363681],
     [-74.05002620158643, 40.565318180621134],
     [-74.05090601705089, 40.5660943421306],
     [-74.05067916748614, 40.5663108457364],
     [-74.05107159803778, 40.5667224933978],
     [-74.05050806403247, 40.566422034160816]]]}},
 {'type': 'Feature',
  'id': 1,
  'properties': {'boroughCode': 5,
   'borough': 'Staten Island',
   '@id': 'http://nyc.pediacities.com/Resource/Borough/Staten_Island'},
  'geometry': {'type': 'Polygon',
   'coordinates': [[[-74.05314036821109, 40.577702715545755],
     [-74.05406044939875, 40.57711644523887],
     [-74.05489778210804, 40.57778244091981],
     [

## 2. Enrichment & Analysis
### 2.1 Spatial

Creation of the polygones thanks to Shapely feature GeoJSON → Polygons
I want to check if all geometries are polygon too

In [71]:
geojson_enriched = []

for comp, feature in enumerate(geojson_data['features']):
    props = feature['properties']
    borough_name = props.get('borough')
    borough_code = props.get('boroughCode')
    borough_at_id = props.get('@id')
    
    # GeoJSON en objet Shapely Polygon
    polygon = shape(feature['geometry'])
    area = polygon.area
    
    # DEBUG: Check geometry type
    geom_type = feature['geometry'].get('type')
    
    geojson_enriched.append({
        'name': borough_name,
        'code': borough_code,
        'polygon': polygon,
        'area': area
    })
    
    print(f"{comp}. {borough_name:15} | Code: {borough_code} | Area: {area} | Type: {geom_type} | Valid: {polygon.is_valid}")

0. Staten Island   | Code: 5 | Area: 1.2242288022741093e-06 | Type: Polygon | Valid: True
1. Staten Island   | Code: 5 | Area: 5.013645098231966e-06 | Type: Polygon | Valid: True
2. Staten Island   | Code: 5 | Area: 7.6442837783665e-06 | Type: Polygon | Valid: True
3. Staten Island   | Code: 5 | Area: 0.01603547601912208 | Type: Polygon | Valid: True
4. Queens          | Code: 4 | Area: 3.969785765133643e-05 | Type: Polygon | Valid: True
5. Queens          | Code: 4 | Area: 2.027805254332613e-05 | Type: Polygon | Valid: True
6. Queens          | Code: 4 | Area: 3.929145017066745e-06 | Type: Polygon | Valid: True
7. Queens          | Code: 4 | Area: 2.0365800895741225e-06 | Type: Polygon | Valid: True
8. Queens          | Code: 4 | Area: 1.5554287776358482e-05 | Type: Polygon | Valid: True
9. Queens          | Code: 4 | Area: 5.434306511324284e-05 | Type: Polygon | Valid: True
10. Queens          | Code: 4 | Area: 2.569079559815976e-08 | Type: Polygon | Valid: True
11. Queens          |

For gaining time, I will sort it by code (which indicates main places), and then area. That's quite obvious that an investigate will be faster, searching before in the biggest areas.

In [72]:
"""Ordre attendu:
Manhattan (code=1), plus grand polygone au plus petit
Bronx (code=2), plus grand polygone au plus petit
Brooklyn (code=3), plus grand polygone au plus petit"""

geojson_sorted = sorted(geojson_enriched, key=lambda x: (x['code'], -x['area']))

In [10]:
for item in geojson_sorted:
    print(f"{item['code']}. {item['name']} | Area: {item['area']}")

1. Manhattan | Area: 0.005859077996035753
1. Manhattan | Area: 0.00023271655856762013
1. Manhattan | Area: 7.6037752599342e-05
1. Manhattan | Area: 6.23157479510608e-05
1. Manhattan | Area: 3.2658591272044954e-05
1. Manhattan | Area: 1.1828883137677094e-05
1. Manhattan | Area: 6.143638903459381e-06
1. Manhattan | Area: 3.3831273674444417e-06
1. Manhattan | Area: 2.858823502476497e-06
1. Manhattan | Area: 2.393654308790746e-06
1. Manhattan | Area: 2.3345540772235924e-06
1. Manhattan | Area: 2.2627340976761147e-06
1. Manhattan | Area: 2.2465297475388565e-06
1. Manhattan | Area: 2.24116737819868e-06
1. Manhattan | Area: 2.2094455927089308e-06
1. Manhattan | Area: 2.0486362960502745e-06
1. Manhattan | Area: 1.3300494774255711e-06
1. Manhattan | Area: 1.1111770551654054e-06
1. Manhattan | Area: 2.354033827718773e-07
1. Manhattan | Area: 2.0929022926724522e-07
1. Manhattan | Area: 1.846462957014482e-07
1. Manhattan | Area: 1.4422339519800844e-07
1. Manhattan | Area: 1.3331706776569288e-07
1.

Now attempt to set well up the boroughs. In the json there are only 5 boroughs, so it is quite intresting to broadcast them. (It keeps an unique local memory.)

In [73]:
geojson_bc = sc.broadcast(geojson_sorted)

User Defines Functions

In [74]:
def find_borough(latitude, longitude):
    """
    Find the borough having the point (lat, lon)
    Return: borough name or "Unknown"
    """
    if latitude is None or longitude is None:
        return "Unknown"
    
    try:
        point = Point(longitude, latitude)
        for borough in geojson_bc.value:
            if point.within(borough['polygon']):
                return borough['name']
        return "Unknown"
    except:
        return "Unknown"

find_borough_udf = udf(find_borough, StringType())

In [75]:
def is_same_location(pickup_lat, pickup_lon, dropoff_lat, dropoff_lon, eps=0.0001):
    """
    Check if pickup and dropoff coordinates are essentially the same
    tolerance: maximum difference in degrees (default ~11 meters)
    Return: boolean or None
    """
    if any(x is None for x in [pickup_lat, pickup_lon, dropoff_lat, dropoff_lon]):
        return None
    
    lat_diff = abs(pickup_lat - dropoff_lat)
    lon_diff = abs(pickup_lon - dropoff_lon)
    return (lat_diff < eps) and (lon_diff < eps)

is_same_location_udf = udf(is_same_location, BooleanType())

In [76]:
def get_trip_type(pickup_borough, dropoff_borough):
    """
    Classify trip as intra-borough or inter-borough
    """
    if pickup_borough is None or dropoff_borough is None:
        return "Unknown"
    if pickup_borough == "Unknown" or dropoff_borough == "Unknown":
        return "Unknown"
    if pickup_borough == dropoff_borough:
        return "Intra-borough"
    else:
        return "Inter-borough"

get_trip_type_udf = udf(get_trip_type, StringType())

Alongside of the project, I take sides with using more Spark SQL than RDD, because this is more natural with the columns, and plus catalyst automatic optimizer may be intresting.

In [77]:
taxi_df = spark.read.csv(CSV_PATH, header=True, inferSchema=True)
taxi_df.take(5)

[Row(medallion='89D227B655E5C82AECF13C3F540D4CF4', hack_license='BA96DE419E711691B9445D6A6307C170', vendor_id='CMT', rate_code=1, store_and_fwd_flag='N', pickup_datetime='01-01-13 15:11', dropoff_datetime='01-01-13 15:18', passenger_count=4, pickup_longitude=-73.978165, pickup_latitude=40.757977, dropoff_longitude=-73.989838, dropoff_latitude=40.751171),
 Row(medallion='0BD7C8F5BA12B88E0B67BED28BEA73D8', hack_license='9FD8F69F0804BDB5549F40E9DA1BE472', vendor_id='CMT', rate_code=1, store_and_fwd_flag='N', pickup_datetime='06-01-13 00:18', dropoff_datetime='06-01-13 00:22', passenger_count=1, pickup_longitude=-74.006683, pickup_latitude=40.731781, dropoff_longitude=-73.994499, dropoff_latitude=40.75066),
 Row(medallion='0BD7C8F5BA12B88E0B67BED28BEA73D8', hack_license='9FD8F69F0804BDB5549F40E9DA1BE472', vendor_id='CMT', rate_code=1, store_and_fwd_flag='N', pickup_datetime='05-01-13 18:49', dropoff_datetime='05-01-13 18:54', passenger_count=1, pickup_longitude=-74.004707, pickup_latitude=

Now we want to add two columns "pickup_borough" and "dropoff_borough"

In [78]:
taxi_enriched = taxi_df.withColumn("pickup_borough", 
                                    find_borough_udf(col("pickup_latitude"), col("pickup_longitude")))
taxi_enriched = taxi_enriched.withColumn("dropoff_borough", 
                                         find_borough_udf(col("dropoff_latitude"), col("dropoff_longitude")))

In [15]:
taxi_enriched.select("pickup_borough", "dropoff_borough", "pickup_latitude", "pickup_longitude").take(30)

[Row(pickup_borough='Manhattan', dropoff_borough='Manhattan', pickup_latitude=40.757977, pickup_longitude=-73.978165),
 Row(pickup_borough='Manhattan', dropoff_borough='Manhattan', pickup_latitude=40.731781, pickup_longitude=-74.006683),
 Row(pickup_borough='Manhattan', dropoff_borough='Manhattan', pickup_latitude=40.73777, pickup_longitude=-74.004707),
 Row(pickup_borough='Manhattan', dropoff_borough='Manhattan', pickup_latitude=40.759945, pickup_longitude=-73.974602),
 Row(pickup_borough='Manhattan', dropoff_borough='Manhattan', pickup_latitude=40.748528, pickup_longitude=-73.97625),
 Row(pickup_borough='Manhattan', dropoff_borough='Manhattan', pickup_latitude=40.764252, pickup_longitude=-73.966743),
 Row(pickup_borough='Manhattan', dropoff_borough='Manhattan', pickup_latitude=40.743977, pickup_longitude=-73.995804),
 Row(pickup_borough='Manhattan', dropoff_borough='Queens', pickup_latitude=40.756775, pickup_longitude=-73.989937),
 Row(pickup_borough='Manhattan', dropoff_borough='Man

Okay it seems to be okay, however, we have to be cautionned. Are there unknown ones? What are their distributions?

In [16]:
taxi_df.select('pickup_latitude', 'pickup_longitude').describe().show()

+-------+-----------------+------------------+
|summary|  pickup_latitude|  pickup_longitude|
+-------+-----------------+------------------+
|  count|            99999|             99999|
|   mean|40.04864264028555|-72.65573395207987|
| stddev|5.795547771378353| 9.784012203735834|
|    min|              0.0|        -98.116669|
|    max|        646.43829|               0.0|
+-------+-----------------+------------------+



In [17]:
taxi_enriched.groupBy('pickup_borough').count().orderBy(F.desc('count')).show()

+--------------+-----+
|pickup_borough|count|
+--------------+-----+
|     Manhattan|90099|
|        Queens| 5912|
|      Brooklyn| 1965|
|       Unknown| 1940|
|         Bronx|   81|
| Staten Island|    2|
+--------------+-----+



In [18]:
taxi_enriched.groupBy('dropoff_borough').count().orderBy(F.desc('count')).show()

+---------------+-----+
|dropoff_borough|count|
+---------------+-----+
|      Manhattan|88164|
|         Queens| 5475|
|       Brooklyn| 3595|
|        Unknown| 2357|
|          Bronx|  395|
|  Staten Island|   13|
+---------------+-----+



I'd like to analyse the Unknowns, why are they unknowns? For this, I add some flags of the quality of the unkonwn borough

In [79]:
NYC_LAT_MIN, NYC_LAT_MAX = 40.4, 41.0
NYC_LON_MIN, NYC_LON_MAX = -74.3, -73.7

In [80]:
taxi_enriched = (taxi_enriched
    # Flag 1 : (0,0) ?
    .withColumn('flag_coords_zero',
                ((F.col('pickup_latitude') == 0.0) & (F.col('pickup_longitude') == 0.0)) |
                ((F.col('dropoff_latitude') == 0.0) & (F.col('dropoff_longitude') == 0.0)))
    
    # Flag 2 : Out NYC
    .withColumn('flag_out_of_nyc',
                ~F.col('pickup_latitude').between(NYC_LAT_MIN, NYC_LAT_MAX) |
                ~F.col('pickup_longitude').between(NYC_LON_MIN, NYC_LON_MAX) |
                ~F.col('dropoff_latitude').between(NYC_LAT_MIN, NYC_LAT_MAX) |
                ~F.col('dropoff_longitude').between(NYC_LON_MIN, NYC_LON_MAX))
    
    # Flag 3 : In NYC but Unknown
    .withColumn('flag_in_nyc_but_unknown',
                ((F.col('pickup_borough') == 'Unknown') | (F.col('dropoff_borough') == 'Unknown')) &
                ~F.col('flag_coords_zero') &
                ~F.col('flag_out_of_nyc'))
    
    # Flag 4 : Sum up
    .withColumn('spatial_quality',
                F.when(F.col('flag_coords_zero'), 'COORDS_ZERO')
                .when(F.col('flag_out_of_nyc'), 'OUT_OF_NYC')
                .when(F.col('flag_in_nyc_but_unknown'), 'IN_NYC_UNKNOWN')
                .when((F.col('pickup_borough') == 'Unknown') | (F.col('dropoff_borough') == 'Unknown'), 'OTHER_UNKNOWN')
                .otherwise('VALID')))

Here is the distribution:

In [21]:
unknown_stats = taxi_enriched.select(
    F.sum(F.col('flag_coords_zero').cast('int')).alias('coords_zero'),
    F.sum(F.col('flag_out_of_nyc').cast('int')).alias('out_of_nyc'),
    F.sum(F.col('flag_in_nyc_but_unknown').cast('int')).alias('in_nyc_unknown'),
    F.sum(((F.col('pickup_borough') == 'Unknown') | (F.col('dropoff_borough') == 'Unknown')).cast('int')).alias('total_unknown'),
    F.count('*').alias('total_records')
)

unknown_stats.show()

stats = unknown_stats.first()
coords_zero = stats['coords_zero']
out_nyc = stats['out_of_nyc']
in_nyc_unknown = stats['in_nyc_unknown']
total_unknown = stats['total_unknown']

print(f"""
CATEGORIES D'UNKNOWN:
- (0,0):              {coords_zero:>6} ({coords_zero/total_unknown*100:>5.1f}% of Unknown)
- OUT NYC:            {out_nyc:>6} ({out_nyc/total_unknown*100:>5.1f}% of Unknown)
- In NYC But unknown: {in_nyc_unknown:>6} ({in_nyc_unknown/total_unknown*100:>5.1f}% of Unknown)
----------------------------------------
- TOTAL Unknown:      {total_unknown:>6} ({total_unknown/stats['total_records']*100:>5.2f}% of dataset)
""")

+-----------+----------+--------------+-------------+-------------+
|coords_zero|out_of_nyc|in_nyc_unknown|total_unknown|total_records|
+-----------+----------+--------------+-------------+-------------+
|       1932|      2035|           457|         2492|        99999|
+-----------+----------+--------------+-------------+-------------+


CATEGORIES D'UNKNOWN:
- (0,0):                1932 ( 77.5% of Unknown)
- OUT NYC:              2035 ( 81.7% of Unknown)
- In NYC But unknown:    457 ( 18.3% of Unknown)
----------------------------------------
- TOTAL Unknown:        2492 ( 2.49% of dataset)



### 2.2 Time

Plus, I add columns for transforming into timestamp. Before DC, I want to put some flags into unwaited cells' comportements too

In [81]:
from pyspark.sql import functions as F

max_duration_hours = 4
max_seconds = max_duration_hours * 3600

taxi_enriched = (taxi_enriched
    # 1. Conversion into timestamps
    .withColumn('pickup_ts', F.to_timestamp('pickup_datetime', 'dd-MM-yy HH:mm'))
    .withColumn('dropoff_ts', F.to_timestamp('dropoff_datetime', 'dd-MM-yy HH:mm'))
    
    # 2. Conversion in second or minutes. Please not to_timestamp returns time in seconds
    .withColumn('duration_seconds',
                F.col('dropoff_ts').cast('long') - F.col('pickup_ts').cast('long'))
    .withColumn('duration_minutes', F.col('duration_seconds') / 60)
    .withColumn('duration_hours', F.col('duration_seconds') / 3600)
    
    # 3. Some straight forward flags
    .withColumn('flag_parsing_error',
                F.col('pickup_ts').isNull() | F.col('dropoff_ts').isNull())
    .withColumn('flag_negative_duration',
                F.when(F.col('duration_seconds') < 0, True).otherwise(False))
    .withColumn('flag_zero_duration',
                F.when(F.col('duration_seconds') == 0, True).otherwise(False))
    .withColumn('flag_excessive_duration',
                F.when(F.col('duration_seconds') > max_seconds, True).otherwise(False))
    .withColumn('flag_too_short',
                F.when((F.col('duration_seconds') > 0) & (F.col('duration_seconds') < 30), True).otherwise(False))
    
    # 4. I catch up the flags here in one column
    .withColumn('data_quality_label',
                F.when(F.col('flag_parsing_error'), 'PARSING_ERROR')
                .when(F.col('flag_negative_duration'), 'NEGATIVE_DURATION')
                .when(F.col('flag_zero_duration'), 'ZERO_DURATION')
                .when(F.col('flag_excessive_duration'), 'EXCESSIVE_DURATION')
                .when(F.col('flag_too_short'), 'TOO_SHORT')
                .otherwise('VALID'))
    
    # 5. Flag global sum up
    .withColumn('has_anomaly', F.col('data_quality_label') != 'VALID')
)

# Rapport
print("Report of quality: ")
taxi_enriched.groupBy('data_quality_label').count().orderBy(F.desc('count')).show()

print("\nflags stats: ")
taxi_enriched.select(
    F.sum(F.col('flag_parsing_error').cast('int')).alias('parsing_errors'),
    F.sum(F.col('flag_negative_duration').cast('int')).alias('negative_durations'),
    F.sum(F.col('flag_zero_duration').cast('int')).alias('zero_durations'),
    F.sum(F.col('flag_excessive_duration').cast('int')).alias('excessive_durations'),
    F.sum(F.col('flag_too_short').cast('int')).alias('too_short_trips'),
    F.sum(F.col('has_anomaly').cast('int')).alias('total_anomalies'),
    F.count('*').alias('total_records')
).show()

# quelques exemples
print("Some examples:")
taxi_enriched.select('pickup_datetime', 'dropoff_datetime', 'duration_minutes', 'data_quality_label').show(10)

Report of quality: 
+------------------+-----+
|data_quality_label|count|
+------------------+-----+
|             VALID|99549|
|     ZERO_DURATION|  450|
+------------------+-----+


flags stats: 
+--------------+------------------+--------------+-------------------+---------------+---------------+-------------+
|parsing_errors|negative_durations|zero_durations|excessive_durations|too_short_trips|total_anomalies|total_records|
+--------------+------------------+--------------+-------------------+---------------+---------------+-------------+
|             0|                 0|           450|                  0|              0|            450|        99999|
+--------------+------------------+--------------+-------------------+---------------+---------------+-------------+

Some examples:
+---------------+----------------+----------------+------------------+
|pickup_datetime|dropoff_datetime|duration_minutes|data_quality_label|
+---------------+----------------+----------------+--------

We remark 450 duration equals to 0. We would like to understand why:

In [34]:
print("\nExample with duration of ZERO")
zero_duration_trips = taxi_enriched.filter(F.col('flag_zero_duration') == True)
print(f"Nombre total: {zero_duration_trips.count()}")

print("\nExemples détaillés:")
zero_duration_trips.select(
    'pickup_datetime',
    'dropoff_datetime',
    'pickup_borough',
    'dropoff_borough',
    'pickup_latitude',
    'pickup_longitude',
    'dropoff_latitude',
    'dropoff_longitude',
    'duration_seconds',
    'passenger_count'
).show(20, truncate=False)

print("\nDistribution over boroughs?")
zero_duration_trips.groupBy('pickup_borough', 'dropoff_borough').count().orderBy(F.desc('count')).show()

print("\nIdentical coords from pickup to dropoff?")
same_location = zero_duration_trips.withColumn(
    'is_same_location',
    is_same_location_udf(
        F.col('pickup_latitude'),
        F.col('pickup_longitude'),
        F.col('dropoff_latitude'),
        F.col('dropoff_longitude')
    )
)
print(f"Nombre avec coordonnées identiques: {same_location.count()}")


Example with duration of ZERO
Nombre total: 450

Exemples détaillés:
+---------------+----------------+--------------+---------------+---------------+----------------+----------------+-----------------+----------------+---------------+
|pickup_datetime|dropoff_datetime|pickup_borough|dropoff_borough|pickup_latitude|pickup_longitude|dropoff_latitude|dropoff_longitude|duration_seconds|passenger_count|
+---------------+----------------+--------------+---------------+---------------+----------------+----------------+-----------------+----------------+---------------+
|08-01-13 07:51 |08-01-13 07:51  |Manhattan     |Manhattan      |40.80209       |-73.945396      |40.802025       |-73.945412       |0               |1              |
|13-01-13 11:14 |13-01-13 11:14  |Manhattan     |Manhattan      |40.758301      |-73.991867      |40.755203       |-73.985252       |0               |1              |
|13-01-13 11:10 |13-01-13 11:10  |Unknown       |Unknown        |0.0            |0.0           

To sum up, we have analised the main problems of this dataset and flagged them.
- **Space**: the unknown borough, why are they unknown?
- **Time**: the hypothesis of anormality.

The next section will clean and make precise selections of the data – so as to create other datasets.

### 2.3 ID Quality

Simple check but important. Apparantly, all IDs are correct. Let us pass to DC.

In [18]:
taxi_enriched = taxi_enriched.withColumn(
    'flag_invalid_hack_license',
    F.col('hack_license').isNull() | 
    (F.col('hack_license') == '') | 
    (F.length('hack_license') < 10) 
).withColumn(
    'flag_invalid_medallion',
    F.col('medallion').isNull() | 
    (F.col('medallion') == '') | 
    (F.length('medallion') < 10)
).withColumn(
    'identifier_quality',
    F.when(
        F.col('flag_invalid_hack_license') | F.col('flag_invalid_medallion'),
        'INVALID_IDENTIFIER'
    ).otherwise('VALID_IDENTIFIER')
)

taxi_enriched.groupBy('identifier_quality').count().show()

+------------------+-----+
|identifier_quality|count|
+------------------+-----+
|  VALID_IDENTIFIER|99999|
+------------------+-----+



### 2.4 Passenger Count Quality

In [82]:
taxi_enriched = taxi_enriched.filter(
    F.col('passenger_count') > 0
)

## 3. Data Cleaning

Let us begin to put a strict dataset where all raws are VALID.This one will be the most use as the final dataset. But I want to explore partial dataset too.

In [83]:
taxi_strict = taxi_enriched.filter(
    (F.col('spatial_quality') == 'VALID') &
    (F.col('data_quality_label') == 'VALID')
)

strict_count = taxi_strict.count()
print(f" {strict_count:,} ({strict_count/99999*100:.2f}% of dataset)")

 97,375 (97.38% of dataset)


 Dataset with "IN_NYC_UNKNOWN" which might be recoverable. 
These trips are within NYC limits but do not correspond to any borough

In [84]:
taxi_recoverable = taxi_enriched.filter(
    (F.col('spatial_quality') == 'IN_NYC_UNKNOWN') &
    (F.col('data_quality_label') == 'VALID')
)

print(f"\nDataset 'Recoverable Unknown': {taxi_recoverable.count():,}")


Dataset 'Recoverable Unknown': 451


We can also split according to the partial validity of the raws (
If you need to analyze time without spatial constraint, or vice and versas)

In [85]:

taxi_spatial_only = taxi_enriched.filter(
    (F.col('spatial_quality') == 'VALID') &
    (F.col('data_quality_label') != 'VALID')
)
print(f"Dataset spatial only (temps anormal): {taxi_spatial_only.count():,}")


taxi_temporal_only = taxi_enriched.filter(
    (F.col('spatial_quality') != 'VALID') &
    (F.col('data_quality_label') == 'VALID')
)
print(f"Dataset temporal only (lieu unknown): {taxi_temporal_only.count():,}")

Dataset spatial only (temps anormal): 132
Dataset temporal only (lieu unknown): 2,173


Dataset without zero duration but keep the other anomalies

In [86]:

taxi_no_zero_duration = taxi_enriched.filter(
    F.col('flag_zero_duration') == False
)

print(f"\nDataset sans durées nulles: {taxi_no_zero_duration.count():,}")


Dataset sans durées nulles: 99,548


Dataset with raisonnable criterions (less strict than VALID)

In [98]:

taxi_reasonable = taxi_enriched.filter(
    # Spatial: soit VALID, soit IN_NYC_UNKNOWN (on accepte les imprécisions)
    (F.col('spatial_quality').isin(['VALID', 'IN_NYC_UNKNOWN'])) &
    # Temporal: durée entre 1 min et 4h
    (F.col('duration_minutes') >= 1) &
    (F.col('duration_minutes') <= 240) &
    # Pas de parsing errors
    ~F.col('flag_parsing_error')
)

reasonable_count = taxi_reasonable.count()
print(f"\nDataset 'Reasonable': {reasonable_count:,} ({reasonable_count/99999*100:.2f}%)")


Dataset 'Reasonable': 97,826 (97.83%)


In the report, composite datasets are also designed. As followed:

In [88]:
taxi_extended_spatial = taxi_enriched.filter(
    (
        (F.col('spatial_quality') == 'VALID') |
        (F.col('spatial_quality') == 'IN_NYC_UNKNOWN')
    ) &
    (F.col('data_quality_label') == 'VALID')
)

# D_complete_temporal: (D_strict ∪ D_temporal) ∧ duration > 0
taxi_complete_temporal = taxi_enriched.filter(
    (F.col('data_quality_label') == 'VALID') &
    (F.col('duration_seconds') > 0)
)

# D_spatial_valid: D_strict ∪ D_spatial
taxi_spatial_valid = taxi_enriched.filter(
    (F.col('spatial_quality') == 'VALID')
)


There is also a wish to explore a geographic stratisfication :

In [89]:
# D_outer: Trips entirely outside Manhattan (pickup AND dropoff NOT Manhattan)
taxi_outer = taxi_strict.filter(
    (F.col('pickup_borough') != 'Manhattan') &
    (F.col('dropoff_borough') != 'Manhattan')
)

taxi_manhattan = taxi_strict.filter(
    (F.col('pickup_borough') == 'Manhattan') |
    (F.col('dropoff_borough') == 'Manhattan')
)

In [63]:
print("\n COMPOSITE DATASETS : ")
print(f"D_extended_spatial:    {taxi_extended_spatial.count():>6,} ({taxi_extended_spatial.count()/99999*100:>5.2f}%)")
print(f"D_complete_temporal:   {taxi_complete_temporal.count():>6,} ({taxi_complete_temporal.count()/99999*100:>5.2f}%)")
print(f"D_spatial_valid:       {taxi_spatial_valid.count():>6,} ({taxi_spatial_valid.count()/99999*100:>5.2f}%)")

print("\n GEOGRAPHIC STRATIFICATION :")
print(f"D_manhattan:           {taxi_manhattan.count():>6,} ({taxi_manhattan.count()/taxi_strict.count()*100:>5.2f}% of D_strict)")
print(f"D_outer:               {taxi_outer.count():>6,} ({taxi_outer.count()/taxi_strict.count()*100:>5.2f}% of D_strict)")


 COMPOSITE DATASETS : 
D_extended_spatial:    97,826 (97.83%)
D_complete_temporal:   99,548 (99.55%)
D_spatial_valid:       97,507 (97.51%)

 GEOGRAPHIC STRATIFICATION :
D_manhattan:           94,077 (96.61% of D_strict)
D_outer:                3,298 ( 3.39% of D_strict)


Other dataset can be imagined. For the moment, we will use first the following:

In [90]:
taxi_final = taxi_strict.withColumn(
    'trip_type',
    get_trip_type_udf(F.col('pickup_borough'), F.col('dropoff_borough'))
).select(
    'medallion',
    'hack_license',
    'pickup_ts',
    'dropoff_ts',
    'duration_seconds',
    'duration_minutes',
    'duration_hours',
    'pickup_borough',
    'dropoff_borough',
    'trip_type',
    'passenger_count',
    'pickup_latitude',
    'pickup_longitude',
    'dropoff_latitude',
    'dropoff_longitude'
)

print(f"\n FINAL Dataset: {taxi_final.count():,}")


# because we are gonna use it a much..
taxi_final.cache()


 FINAL Dataset: 97,375


DataFrame[medallion: string, hack_license: string, pickup_ts: timestamp, dropoff_ts: timestamp, duration_seconds: bigint, duration_minutes: double, duration_hours: double, pickup_borough: string, dropoff_borough: string, trip_type: string, passenger_count: int, pickup_latitude: double, pickup_longitude: double, dropoff_latitude: double, dropoff_longitude: double]

## 4. Queries

### 4.1 Utilisation

In [91]:
MAX_IDLE_SECONDS = 4 * 3600  # 4 hours of pause maximum

In [114]:
def utilisation(dataset, max_idle_seconds=MAX_IDLE_SECONDS, aggregator='hack_license'):

    # shuffle
    dataset = dataset.repartition('hack_license') 
    window_spec = Window.partitionBy('hack_license').orderBy('pickup_ts')
    
    taxi_with_lag = dataset.withColumn(
        'previous_dropoff_ts',
        F.lag('dropoff_ts').over(window_spec)
    )

    # then, here, it is shuffled and ordered with the previous ones
    
    taxi_with_idle = taxi_with_lag.withColumn(
        'idle_seconds',
        F.when(  # no idle, first session
            F.col('previous_dropoff_ts').isNull(),
            0
        ).when(  # do not be over 4 hours → new session → idle = 0
            (F.col('pickup_ts').cast('long') - F.col('previous_dropoff_ts').cast('long')) > max_idle_seconds,
            0
        ).otherwise(  # all is okay, just calculate the time of inactivity
            F.col('pickup_ts').cast('long') - F.col('previous_dropoff_ts').cast('long')
        )
    )

    # aggregation
    utilisation_stats = taxi_with_idle.groupBy(aggregator).agg(
        F.sum('duration_seconds').alias('total_occupied_seconds'),
        F.sum('idle_seconds').alias('total_idle_seconds'),
        F.count('*').alias('num_trips'),
        F.min('pickup_ts').alias('first_pickup'),
        F.max('dropoff_ts').alias('last_dropoff')
    )

    # sum up 
    utilisation_final = utilisation_stats.withColumn(
        'total_active_seconds',
        F.col('total_occupied_seconds') + F.col('total_idle_seconds')
    ).withColumn(
        'utilisation_rate',
        F.when(
            F.col('total_active_seconds') > 0,
            F.col('total_occupied_seconds') / F.col('total_active_seconds')
        ).otherwise(0)
    ).withColumn(
        'utilisation_percentage',
        F.round(F.col('utilisation_rate') * 100, 2)
    )

    return utilisation_final


In [27]:
def best_bottom_utilisation(utilisation_final, n=10):
    # results
    print("\n Top 10 drivers by utilisation:")
    utilisation_final.select(
        'hack_license',
        'num_trips',
        'utilisation_percentage',
        'total_occupied_seconds',
        'total_idle_seconds'
    ).orderBy(F.desc('utilisation_percentage')).show(n, truncate=False)

    print("\n Bottom 10 drivers by utilisation:")
    utilisation_final.select(
        'hack_license',
        'num_trips',
        'utilisation_percentage',
        'total_occupied_seconds',
        'total_idle_seconds'
    ).orderBy('utilisation_percentage').show(n, truncate=False)

In [30]:
utilisation_final = utilisation(taxi_final)

In [31]:
best_bottom_utilisation(utilisation_final)


 Top 10 drivers by utilisation:
+--------------------------------+---------+----------------------+----------------------+------------------+
|hack_license                    |num_trips|utilisation_percentage|total_occupied_seconds|total_idle_seconds|
+--------------------------------+---------+----------------------+----------------------+------------------+
|01549C8DFA3BA91714B7444DEB4195F5|1        |100.0                 |480                   |0                 |
|010149C4A7812863669DB7DBFC91ED60|2        |100.0                 |1380                  |0                 |
|012F5AA15F8535EF648C6726EAA8B72F|1        |100.0                 |480                   |0                 |
|00374328FBA75FBFCA7522671250F573|1        |100.0                 |960                   |0                 |
|00B938EAB7C933C9B6F30F95D72DAFEB|1        |100.0                 |300                   |0                 |
|0078BA33E03313B58559834168E7495F|1        |100.0                 |240                 

Here, it is not really interesting to see this result. It is not representative at all. That is why mabe we have to keep drivers who had done more than 10 trips.

In [33]:
best_bottom_utilisation(utilisation_final.filter(F.col('num_trips') >= 10))


 Top 10 drivers by utilisation:
+--------------------------------+---------+----------------------+----------------------+------------------+
|hack_license                    |num_trips|utilisation_percentage|total_occupied_seconds|total_idle_seconds|
+--------------------------------+---------+----------------------+----------------------+------------------+
|9C868A366E5435A3AF21869F9EB3C432|11       |78.63                 |11040                 |3000              |
|837B990B2A90D37AD61B83AFE261E50E|10       |77.95                 |9120                  |2580              |
|52A96B36A1BB00A5DEA977FF0611EB9C|11       |74.57                 |10380                 |3540              |
|586151FDDA73DFFAC1BA0616A6FE087E|17       |74.09                 |12180                 |4260              |
|588F20354D2C3711D1150B136F812E24|15       |73.27                 |13980                 |5100              |
|7B3DAEAD0556C7DC4BB925B0A8BED5D7|30       |72.95                 |21360               

In [42]:
def stats_utilisation(utilisation_final):
    utilisation_final.select(
    F.count('*').alias('total_drivers'),
    F.round(F.mean('utilisation_percentage'), 2).alias('avg_utilization'),
    F.round(F.stddev('utilisation_percentage'), 2).alias('std_utilization'),
    F.round(F.min('utilisation_percentage'), 2).alias('min_utilization'),
    F.round(F.max('utilisation_percentage'), 2).alias('max_utilization'),
    F.round(F.mean('num_trips'), 2).alias('avg_trips_per_driver')
    ).show()

In [43]:
stats_utilisation(utilisation_final)

+-------------+---------------+---------------+---------------+---------------+--------------------+
|total_drivers|avg_utilization|std_utilization|min_utilization|max_utilization|avg_trips_per_driver|
+-------------+---------------+---------------+---------------+---------------+--------------------+
|         9846|          53.03|          25.55|           2.97|          100.0|                9.89|
+-------------+---------------+---------------+---------------+---------------+--------------------+



*Note :* for sure scorer functions shall be best solutions to compare drivers -- keeping proportions between num_trips and the utilisation_percentage. But it is out of scope of this project.

### 4.2 Next Fare

The main idea here is to compute the times of the next fares, and then group by drop off bouroughs.

In [93]:
def average_time_to_next_fare(dataset, max_idle_seconds=MAX_IDLE_SECONDS):

    
    # shuffle
    dataset = dataset.repartition('hack_license')

    window_spec = Window.partitionBy('hack_license').orderBy('pickup_ts')
    
    # adding the previous departure
    dataset_with_context = dataset.withColumn(
        'previous_dropoff_ts',
        F.lag('dropoff_ts').over(window_spec)
    ).withColumn(
        'previous_dropoff_borough',
        F.lag('dropoff_borough').over(window_spec)
    ).withColumn(
        'next_pickup_ts',
        F.lead('pickup_ts').over(window_spec)
    )
    
    # compute times
    dataset_with_wait = dataset_with_context.withColumn(
        'wait_time_seconds',
        F.when(
            F.col('next_pickup_ts').isNotNull(),
            F.col('next_pickup_ts').cast('long') - F.col('dropoff_ts').cast('long')
        ).otherwise(None)
    )
    
    # filtering the differences
    valid_waits = dataset_with_wait.filter(
        (F.col('wait_time_seconds').isNotNull()) &
        (F.col('wait_time_seconds') > 0) &
        (F.col('wait_time_seconds') <= max_idle_seconds)
    )
    
    # group by borough of drop-off and computes stats
    stats_per_borough = valid_waits.groupBy('dropoff_borough').agg(
        F.count('wait_time_seconds').alias('num_observations'),
        F.round(F.mean('wait_time_seconds'), 2).alias('avg_wait_seconds'),
        F.round(F.mean('wait_time_seconds') / 60, 2).alias('avg_wait_minutes'),
        F.round(F.stddev('wait_time_seconds'), 2).alias('std_wait_seconds'),
        F.round(F.min('wait_time_seconds'), 2).alias('min_wait_seconds'),
        F.round(F.max('wait_time_seconds'), 2).alias('max_wait_seconds'),
        F.round(F.percentile_approx('wait_time_seconds', 0.5), 2).alias('median_wait_seconds')
    ).orderBy('avg_wait_seconds')
    
    return stats_per_borough

In [56]:
next_fare_stats = average_time_to_next_fare(taxi_final)
next_fare_stats.select(
    'dropoff_borough',
    'avg_wait_minutes',
    'num_observations',
    'median_wait_seconds'
).show(truncate=False)

+---------------+----------------+----------------+-------------------+
|dropoff_borough|avg_wait_minutes|num_observations|median_wait_seconds|
+---------------+----------------+----------------+-------------------+
|Manhattan      |15.46           |77892           |420                |
|Brooklyn       |35.25           |2569            |1380               |
|Bronx          |37.02           |298             |1560               |
|Queens         |45.2            |4081            |2040               |
|Staten Island  |88.14           |7               |4800               |
+---------------+----------------+----------------+-------------------+



### 4.3-4 Intra-bourough, inter-bourough

In the enrichment section, we have already provided trip_type. I can just invite you to scroll up.

In [59]:
print("\n Repartition:")
taxi_final.groupBy('trip_type').count().orderBy(F.desc('count')).show()


 Repartition:
+-------------+-----+
|    trip_type|count|
+-------------+-----+
|Intra-borough|85944|
|Inter-borough|11431|
+-------------+-----+



To complete this, I will group by the distinct bouroughs

In [57]:
inter_borough_flows = taxi_final.filter(
    F.col('trip_type') == 'Inter-borough'
).groupBy('pickup_borough', 'dropoff_borough').agg(
    F.count('*').alias('trip_count')
).orderBy(F.desc('trip_count'))


total_inter = inter_borough_flows.agg(F.sum('trip_count')).first()[0]
inter_borough_flows = inter_borough_flows.withColumn(
    'percentage',
    F.round((F.col('trip_count') / total_inter) * 100, 2)
)


inter_borough_flows.select(
    'pickup_borough',
    'dropoff_borough', 
    'trip_count',
    'percentage'
).show(15, truncate=False)

+--------------+---------------+----------+----------+
|pickup_borough|dropoff_borough|trip_count|percentage|
+--------------+---------------+----------+----------+
|Manhattan     |Queens         |3943      |34.49     |
|Queens        |Manhattan      |3697      |32.34     |
|Manhattan     |Brooklyn       |1923      |16.82     |
|Brooklyn      |Manhattan      |773       |6.76      |
|Queens        |Brooklyn       |597       |5.22      |
|Manhattan     |Bronx          |244       |2.13      |
|Brooklyn      |Queens         |115       |1.01      |
|Queens        |Bronx          |100       |0.87      |
|Bronx         |Manhattan      |25        |0.22      |
|Manhattan     |Staten Island  |9         |0.08      |
|Bronx         |Queens         |2         |0.02      |
|Queens        |Staten Island  |2         |0.02      |
|Staten Island |Queens         |1         |0.01      |
+--------------+---------------+----------+----------+



In [58]:
intra_borough_flows = taxi_final.filter(
    F.col('trip_type') == 'Intra-borough'
).groupBy('pickup_borough', 'dropoff_borough').agg(
    F.count('*').alias('trip_count')
).orderBy(F.desc('trip_count'))


total_intra = intra_borough_flows.agg(F.sum('trip_count')).first()[0]
intra_borough_flows = intra_borough_flows.withColumn(
    'percentage',
    F.round((F.col('trip_count') / total_inter) * 100, 2)
)

intra_borough_flows.select(
    'pickup_borough',
    'dropoff_borough', 
    'trip_count',
    'percentage'
).show(15, truncate=False)

+--------------+---------------+----------+----------+
|pickup_borough|dropoff_borough|trip_count|percentage|
+--------------+---------------+----------+----------+
|Manhattan     |Manhattan      |83463     |730.15    |
|Queens        |Queens         |1369      |11.98     |
|Brooklyn      |Brooklyn       |1062      |9.29      |
|Bronx         |Bronx          |49        |0.43      |
|Staten Island |Staten Island  |1         |0.01      |
+--------------+---------------+----------+----------+



### 4.4 Overall results

In [103]:
QUERY_CONFIGS = {
    'Q1_utilization': {
        'datasets': ['strict', 'reasonable', 'complete_temporal', 'manhattan', 'outer'],
    },
    'Q2_next_fare': {
        'datasets': ['strict', 'reasonable', 'extended_spatial', 'manhattan', 'outer'],
    },
    'Q3_Q4_flows': {
        'datasets': ['strict', 'extended_spatial', 'spatial_valid', 'manhattan', 'outer'],
    }
}

In [119]:
# Dataset registry
DATASETS = {
    'strict': taxi_final,
    'reasonable': taxi_reasonable.withColumn(
        'trip_type',
        get_trip_type_udf(F.col('pickup_borough'), F.col('dropoff_borough'))
    ),
    'extended_spatial': taxi_extended_spatial.withColumn(
        'trip_type',
        get_trip_type_udf(F.col('pickup_borough'), F.col('dropoff_borough'))
    ),
    'complete_temporal': taxi_complete_temporal.withColumn(
        'trip_type',
        get_trip_type_udf(F.col('pickup_borough'), F.col('dropoff_borough'))
    ),
    'spatial_valid': taxi_spatial_valid.withColumn(
        'trip_type',
        get_trip_type_udf(F.col('pickup_borough'), F.col('dropoff_borough'))
    ),
    'manhattan': taxi_manhattan.withColumn(
        'trip_type',
        get_trip_type_udf(F.col('pickup_borough'), F.col('dropoff_borough'))
    ),
    'outer': taxi_outer.withColumn(
        'trip_type',
        get_trip_type_udf(F.col('pickup_borough'), F.col('dropoff_borough'))
    ),
}

In [115]:
# for the query 1
for d in QUERY_CONFIGS['Q1_utilization']['datasets']:
    print(d)
    dataset.cache() #for gaining time over others queries
    dataset = DATASETS[d]
    utilisation_stats = utilisation(dataset)
    stats_utilisation(utilisation_stats)

strict
+-------------+---------------+---------------+---------------+---------------+--------------------+
|total_drivers|avg_utilization|std_utilization|min_utilization|max_utilization|avg_trips_per_driver|
+-------------+---------------+---------------+---------------+---------------+--------------------+
|         9846|          53.03|          25.55|           2.97|          100.0|                9.89|
+-------------+---------------+---------------+---------------+---------------+--------------------+

reasonable
+-------------+---------------+---------------+---------------+---------------+--------------------+
|total_drivers|avg_utilization|std_utilization|min_utilization|max_utilization|avg_trips_per_driver|
+-------------+---------------+---------------+---------------+---------------+--------------------+
|         9861|           53.1|          25.48|           2.97|          100.0|                9.92|
+-------------+---------------+---------------+---------------+---------

In [112]:
# for query 2
for d in QUERY_CONFIGS['Q2_next_fare']['datasets']:
    print(d)
    dataset = DATASETS[d]
    next_fare_stats = average_time_to_next_fare(dataset)
    next_fare_stats.select(
    'dropoff_borough',
    'avg_wait_minutes',
    'num_observations',
    'median_wait_seconds'
    ).show(truncate=False)
    

strict
+---------------+----------------+----------------+-------------------+
|dropoff_borough|avg_wait_minutes|num_observations|median_wait_seconds|
+---------------+----------------+----------------+-------------------+
|Manhattan      |15.46           |77892           |420                |
|Brooklyn       |35.25           |2569            |1380               |
|Bronx          |37.02           |298             |1560               |
|Queens         |45.2            |4081            |2040               |
|Staten Island  |88.14           |7               |4800               |
+---------------+----------------+----------------+-------------------+

reasonable
+---------------+----------------+----------------+-------------------+
|dropoff_borough|avg_wait_minutes|num_observations|median_wait_seconds|
+---------------+----------------+----------------+-------------------+
|Manhattan      |15.33           |77994           |420                |
|Brooklyn       |34.88           |2576       

In [120]:
# for query 3 and 4 INTRA
for d in QUERY_CONFIGS['Q3_Q4_flows']['datasets']:
    print(d)
    dataset = DATASETS[d]
    intra_borough_flows = dataset.filter(
        F.col('trip_type') == 'Intra-borough'
    ).groupBy('pickup_borough', 'dropoff_borough').agg(
        F.count('*').alias('trip_count')
    ).orderBy(F.desc('trip_count'))
    
    
    total_intra = intra_borough_flows.agg(F.sum('trip_count')).first()[0]
    intra_borough_flows = intra_borough_flows.withColumn(
        'percentage',
        F.round((F.col('trip_count') / total_inter) * 100, 2)
    )
    
    intra_borough_flows.select(
        'pickup_borough',
        'dropoff_borough', 
        'trip_count',
        'percentage'
    ).show(15, truncate=False)

strict
+--------------+---------------+----------+----------+
|pickup_borough|dropoff_borough|trip_count|percentage|
+--------------+---------------+----------+----------+
|Manhattan     |Manhattan      |83463     |730.15    |
|Queens        |Queens         |1369      |11.98     |
|Brooklyn      |Brooklyn       |1062      |9.29      |
|Bronx         |Bronx          |49        |0.43      |
|Staten Island |Staten Island  |1         |0.01      |
+--------------+---------------+----------+----------+

reasonable
+--------------+---------------+----------+----------+
|pickup_borough|dropoff_borough|trip_count|percentage|
+--------------+---------------+----------+----------+
|Manhattan     |Manhattan      |83463     |730.15    |
|Queens        |Queens         |1369      |11.98     |
|Brooklyn      |Brooklyn       |1062      |9.29      |
|Bronx         |Bronx          |49        |0.43      |
|Staten Island |Staten Island  |1         |0.01      |
+--------------+---------------+----------+---

In [121]:
# for query 3 and 4 INTER
for d in QUERY_CONFIGS['Q3_Q4_flows']['datasets']:
    print(d)
    dataset = DATASETS[d]
    inter_borough_flows = dataset.filter(
    F.col('trip_type') == 'Inter-borough'
    ).groupBy('pickup_borough', 'dropoff_borough').agg(
        F.count('*').alias('trip_count')
    ).orderBy(F.desc('trip_count'))
    
    
    total_inter = inter_borough_flows.agg(F.sum('trip_count')).first()[0]
    inter_borough_flows = inter_borough_flows.withColumn(
        'percentage',
        F.round((F.col('trip_count') / total_inter) * 100, 2)
    )
    
    
    inter_borough_flows.select(
        'pickup_borough',
        'dropoff_borough', 
        'trip_count',
        'percentage'
    ).show(15, truncate=False)

strict
+--------------+---------------+----------+----------+
|pickup_borough|dropoff_borough|trip_count|percentage|
+--------------+---------------+----------+----------+
|Manhattan     |Queens         |3943      |34.49     |
|Queens        |Manhattan      |3697      |32.34     |
|Manhattan     |Brooklyn       |1923      |16.82     |
|Brooklyn      |Manhattan      |773       |6.76      |
|Queens        |Brooklyn       |597       |5.22      |
|Manhattan     |Bronx          |244       |2.13      |
|Brooklyn      |Queens         |115       |1.01      |
|Queens        |Bronx          |100       |0.87      |
|Bronx         |Manhattan      |25        |0.22      |
|Manhattan     |Staten Island  |9         |0.08      |
|Bronx         |Queens         |2         |0.02      |
|Queens        |Staten Island  |2         |0.02      |
|Staten Island |Queens         |1         |0.01      |
+--------------+---------------+----------+----------+

reasonable
+--------------+---------------+----------+---