# Silver - Chicago Taxi Rides

## 0. Setup

In [1]:
import operator
from collections.abc import Callable
from datetime import datetime

import findspark

findspark.init()

In [2]:
from pyspark import RDD
from pyspark import StorageLevel
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import types as t

from utils import setup_spark

spark: SparkSession = setup_spark(title="Silver Chicago")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/11/28 13:53:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
debug_exploration: bool = True
debug_dropped: bool = True
explore_storage_level: StorageLevel = StorageLevel.MEMORY_AND_DISK

In [4]:
from schemas import schema_chicago_bronze

df_chicago: DataFrame = spark.read.schema(schema_chicago_bronze).csv(
    path="data/chicago/chicago_taxi_trips_*.csv",
    header=True,
    mode="PERMISSIVE",
)

rdd_chicago: RDD[t.Row] = df_chicago.rdd

## 1. Prerequisites

In [5]:
import shared

FIFTEEN_MINUTES = 15 * 60
PAYMENT_VALID_VALUES: set[str] = {str(p.value) for p in list(shared.PaymentTypeCHI)}


def is_time_rounded(dt: datetime) -> bool:
    seconds = dt.minute * 60 * dt.second

    return abs(seconds % FIFTEEN_MINUTES) < shared.DATETIME_ROUND_TOLERANCE


def is_taxi_id_valid(x: int | None) -> bool:
    if x is None:
        return False
    return x > 0


def is_trip_start_timestamp_valid(x: datetime | None) -> bool:
    if x is None:
        return False
    return is_time_rounded(x)


def is_trip_end_timestamp_valid(x: datetime | None) -> bool:
    if x is None:
        return False
    return is_time_rounded(x)


def is_trip_seconds_valid(x: int | None) -> bool:
    if x is None:
        return False
    return shared.SECONDS_MIN <= x <= shared.SECONDS_MAX


def is_trip_miles_valid(x: float | None) -> bool:
    if x is None:
        return False
    return shared.MILES_MIN <= x <= shared.MILES_MAX


def is_pickup_census_tract_valid(x: int | None) -> bool:
    if x is None:
        return False
    return 0 <= x


def is_dropoff_census_tract_valid(x: int | None) -> bool:
    if x is None:
        return False
    return 0 <= x


def is_pickup_community_area_valid(x: int | None) -> bool:
    if x is None:
        return True
    return 1 <= x <= shared.COMMUNITY_AREAS_AMOUNT


def is_dropoff_community_area_valid(x: int | None) -> bool:
    if x is None:
        return True
    return 1 <= x <= shared.COMMUNITY_AREAS_AMOUNT


def is_fare_valid(x: float | None) -> bool:
    if x is None:
        return False
    return shared.FARE_MIN <= x <= shared.FARE_MAX


def is_tips_valid(x: float | None) -> bool:
    if x is None:
        return False
    return shared.TIPS_MIN <= x <= shared.TIPS_MAX


def is_tolls_valid(x: float | None) -> bool:
    if x is None:
        return False
    return shared.TOLLS_MIN <= x <= shared.TOLLS_MAX


def is_extras_valid(x: float | None) -> bool:
    if x is None:
        return False
    return shared.EXTRAS_MIN <= x <= shared.EXTRAS_MAX


def is_trip_total_valid(x: float | None) -> bool:
    if x is None:
        return False
    return shared.TOTAL_MIN <= x <= shared.TOTAL_MAX


def is_payment_type_valid(x: str | None) -> bool:
    return x in PAYMENT_VALID_VALUES


def is_company_valid(x: int | None) -> bool:
    if x is None:
        return True
    return 0 <= x


def is_pickup_location_valid(x: str | None) -> bool:
    return x is not None


def is_dropoff_location_valid(x: str | None) -> bool:
    return x is not None


# Longitude and Latitude was dropped from dataset
def is_dropoff_longitude_valid(_: int | None) -> bool:
    return True


def is_dropoff_latitude_valid(_: int | None) -> bool:
    return True


def is_pickup_longitude_valid(_: int | None) -> bool:
    return True


def is_pickup_latitude_valid(_: int | None) -> bool:
    return True


def is_calculated_total_valid(x: t.Row) -> bool:
    calculated_total: float = x.fare + x.tips + x.tolls + x.extras
    return abs(x.trip_total - calculated_total) < shared.CALCULATED_TOTAL_TOLERANCE


def is_start_end_trip_order_valid(x: t.Row) -> bool:
    return x.trip_start_timestamp <= x.trip_end_timestamp

## 2 Exploration

### Taxi ID

In [6]:
if debug_exploration:
    taxi_ids = rdd_chicago.map(lambda row: row.taxi_id).persist(explore_storage_level)
    missing_taxis = taxi_ids.filter(lambda x: not is_taxi_id_valid(x)).count()
    unique_taxis = taxi_ids.filter(is_taxi_id_valid).distinct().count()
    taxi_ids.unpersist()

    print("missing taxi ids", missing_taxis)
    print("unique taxi ids", unique_taxis)

missing taxi ids 7471
unique taxi ids 7665


### Trip-start time

In [7]:
if debug_exploration:
    start_times = rdd_chicago.map(lambda row: row.trip_start_timestamp).persist(explore_storage_level)
    missing_start_time = start_times.filter(lambda x: not is_trip_start_timestamp_valid(x)).count()
    start_times.unpersist()

    print("Missing end times", missing_start_time)

Missing end times 0


### Trip-end time

In [8]:
if debug_exploration:
    end_times = rdd_chicago.map(lambda row: row.trip_end_timestamp).persist(explore_storage_level)
    missing_end_time = end_times.filter(lambda x: not is_trip_end_timestamp_valid(x)).count()
    end_times.unpersist()

    print("Missing end times", missing_end_time)

Missing end times 2413


### trip seconds

In [9]:
if debug_exploration:
    trip_seconds = rdd_chicago.map(lambda row: row.trip_seconds).persist(explore_storage_level)
    missing_seconds = trip_seconds.filter(lambda x: not is_trip_seconds_valid(x)).count()
    nonzero_miles = rdd_chicago.filter(
        lambda row: row.trip_seconds is not None
        and row.trip_seconds == 0
        and row.trip_miles is not None
        and row.trip_miles > 0
    ).count()
    trip_seconds.unpersist()

    print("Missing seconds", missing_seconds)
    print("nonezero miles", nonzero_miles)

Missing seconds 3215
nonezero miles 51934


### trip_miles 

In [10]:
if debug_exploration:
    trip_miles = rdd_chicago.map(lambda row: row.trip_miles).persist(explore_storage_level)
    missing_trip_miles = trip_miles.filter(lambda x: not is_trip_miles_valid(x)).count()
    trip_miles.unpersist()

    print("Missing trip miles", missing_trip_miles)

Missing trip miles 3244


### pickup census


In [11]:
if debug_exploration:
    pickup_census = rdd_chicago.map(lambda row: row.pickup_census_tract).persist(explore_storage_level)
    missing_pickup_census = pickup_census.filter(lambda x: not is_pickup_census_tract_valid(x)).count()
    pickup_census.unpersist()

    print("missing pickup census", missing_pickup_census)

missing pickup census 19866157


### dropoff census

In [12]:
if debug_exploration:
    dropoff_census = rdd_chicago.map(lambda row: row.dropoff_census_tract).persist(explore_storage_level)
    missing_dropoff_census = dropoff_census.filter(lambda x: not is_dropoff_census_tract_valid(x)).count()
    unique_dropoff_tract = dropoff_census.distinct().count()
    dropoff_census.unpersist()

    print("missing dropoff", missing_dropoff_census)
    print("unique dropoff tract", unique_dropoff_tract)

missing dropoff 7734854
unique dropoff tract 877


### pickup community area

In [13]:
if debug_exploration:
    pickup_community_areas = rdd_chicago.map(lambda row: row.pickup_community_area).persist(explore_storage_level)
    missing_pickup_commmunity, none_pickup_community = pickup_community_areas.map(
        lambda x: (not is_pickup_community_area_valid(x), x is None)
    ).reduce(lambda a, b: (a[0] + b[0], a[1] + b[1]))
    unique_pickpoint_area_count = pickup_community_areas.filter(lambda x: x is not None).distinct().count()
    pickup_community_areas.unpersist()

    print("missing pickup:", missing_pickup_commmunity)
    print("unique pickup points", unique_pickpoint_area_count)
    print("None pickup community areas", none_pickup_community)

missing pickup: 0
unique pickup points 77
None pickup community areas 2755980


### dropoff community area

In [14]:
if debug_exploration:
    dropoff_community_areas = rdd_chicago.map(lambda row: row.dropoff_community_area).persist(explore_storage_level)
    missing_dropoff_community = dropoff_community_areas.filter(lambda x: x is None).count()
    unique_dropoff_area_count = dropoff_community_areas.filter(lambda x: x is not None).distinct().count()
    dropoff_community_areas.unpersist()

    print("missing dropoff:", missing_dropoff_community)
    print("unique dropoff points", unique_dropoff_area_count)

missing dropoff: 3086050
unique dropoff points 77


### fare

In [15]:
if debug_exploration:
    fares = rdd_chicago.map(lambda row: row.fare).persist(explore_storage_level)
    missing_fare = fares.filter(lambda x: not is_fare_valid(x)).count()
    fares_min, fares_max, fares_sum, fares_count = (
        fares.filter(is_fare_valid)
        .map(lambda x: (x, x, x, 1))
        .reduce(lambda a, b: (min(a[0], b[0]), max(a[1], b[1]), operator.add(a[2], b[2]), operator.add(a[3], b[3])))
    )
    fares.unpersist()

    print("missing fares", missing_fare)
    print("min for validated fares:", fares_min)
    print("max for validated fares:", fares_max)
    print("mean for validated fares:", fares_sum / fares_count)

missing fares 630
min for validated fares: 0.0
max for validated fares: 1000.0
mean for validated fares: 13.81899632316784


### tip

In [16]:
def is_cash(x: t.Row) -> bool:
    return (x.payment_type is not None) and (x.payment_type == "Cash")


if debug_exploration:
    tips = rdd_chicago.map(lambda row: row.tips).persist(explore_storage_level)
    missing_tips_rdd = tips.filter(lambda x: not is_tips_valid(x)).count()
    tips_min, tips_max, tips_sum, tips_count = (
        tips.filter(is_tips_valid)
        .map(lambda x: (x, x, x, 1))
        .reduce(lambda a, b: (min(a[0], b[0]), max(a[1], b[1]), operator.add(a[2], b[2]), operator.add(a[3], b[3])))
    )
    tips.unpersist()

    print("Missing tips", missing_tips_rdd)
    print("Valid tips count", tips_count)
    print("min for validated tips:", tips_min)
    print("max for validated tips:", tips_max)
    print("mean for validated tips:", tips_sum / tips_count)

Missing tips 300
Valid tips count 19865857
min for validated tips: 0.0
max for validated tips: 496.5
mean for validated tips: 1.6447692599134545


### toll

In [17]:
if debug_exploration:
    tolls = rdd_chicago.map(lambda row: row.tolls).persist(explore_storage_level)
    missing_toll_rdd = tolls.filter(lambda x: not is_tolls_valid(x)).count()
    tolls_min, tolls_max, tolls_sum, tolls_count = (
        tolls.filter(is_tolls_valid)
        .map(lambda x: (x, x, x, 1))
        .reduce(lambda a, b: (min(a[0], b[0]), max(a[1], b[1]), a[2] + b[2], a[3] + b[3]))
    )
    tolls.unpersist()

    print("Missing tolls", missing_toll_rdd)
    print("Valid tolls count", tolls_count)
    print("min for validated tolls:", tolls_min)
    print("max for validated tolls:", tolls_max)
    print("mean for validated tolls:", tolls_sum / tolls_count)

Missing tolls 300
Valid tolls count 19865857
min for validated tolls: 0.0
max for validated tolls: 999.989990234375
mean for validated tolls: 0.003279728626845826


### Extras

In [18]:
if debug_exploration:
    extras = rdd_chicago.map(lambda row: row.extras).persist(explore_storage_level)
    missing_extras_rdd = extras.filter(lambda x: not is_extras_valid(x)).count()
    extra_min, extra_max, extra_sum, extra_count = (
        extras.filter(is_extras_valid)
        .map(lambda x: (x, x, x, 1))
        .reduce(lambda a, b: (min(a[0], b[0]), max(a[1], b[1]), a[2] + b[2], a[3] + b[3]))
    )
    extras.unpersist()

    print("Missing extras", missing_extras_rdd)
    print("Extras count", extra_count)
    print("min for validated extras:", extra_min)
    print("max for validated extras:", extra_max)
    print("mean for validated extras:", extra_sum / extra_count)

Missing extras 692
Extras count 19865465
min for validated extras: 0.0
max for validated extras: 999.989990234375
mean for validated extras: 0.9603079575414082


### Trip total

In [19]:
if debug_exploration:
    totals = rdd_chicago.map(lambda row: row.trip_total).persist(explore_storage_level)
    missing_total_rdd = totals.filter(lambda x: not is_trip_total_valid(x)).count()
    totals_min, totals_max, totals_sum, totals_count = (
        totals.filter(is_trip_total_valid)
        .map(lambda x: (x, x, x, 1))
        .reduce(lambda a, b: (min(a[0], b[0]), max(a[1], b[1]), operator.add(a[2], b[2]), operator.add(a[3], b[3])))
    )
    totals.unpersist()

    print("Missing trip total", missing_total_rdd)
    print("trip total price count", totals_count)
    print("min for validated trip total:", totals_min)
    print("max for validated trip total:", totals_max)
    print("mean for validated trip total:", totals_sum / totals_count)

Missing trip total 1036
trip total price count 19865121
min for validated trip total: 0.0
max for validated trip total: 1000.0
mean for validated trip total: 16.47593088679625


### Payment type

In [20]:
if debug_exploration:
    payment_types = rdd_chicago.map(lambda row: row.payment_type).persist(explore_storage_level)
    missing_payment_type = payment_types.filter(lambda x: not is_payment_type_valid(x)).count()
    payment_counts_rdd = payment_types.filter(is_payment_type_valid).countByValue()
    payment_types.unpersist()

    print("payment type counts:", sorted(payment_counts_rdd.items(), key=lambda x: -x[1]))
    print("missing payments types:", missing_payment_type)

payment type counts: [('Cash', 10449095), ('Credit Card', 9258402), ('No Charge', 99817), ('Unknown', 37692), ('Dispute', 11948), ('Pcard', 5250), ('Prcard', 3942)]
missing payments types: 11


###  id code for company

In [21]:
if debug_exploration:
    companies = rdd_chicago.map(lambda row: row.company).persist(explore_storage_level)
    missing_company_id = companies.filter(lambda x: not is_company_valid(x)).count()
    company_count_rdd = companies.filter(is_company_valid).distinct().count()
    companies.unpersist()

    print("missing company ids:", missing_company_id)
    print("unique companies:", company_count_rdd)

missing company ids: 0
unique companies: 60


### pickup  latitude

In [22]:
if debug_exploration:
    pickup_latitudes = rdd_chicago.map(lambda row: row.pickup_latitude).persist(explore_storage_level)
    pickup_latitiude_missing = pickup_latitudes.filter(lambda x: x is None).count()
    pickup_latitude_count = pickup_latitudes.filter(lambda x: x is not None).take(10)
    pickup_latitudes.unpersist()

    print("Hidden census", pickup_latitiude_missing)
    print("validated pickup latitude count", pickup_latitude_count)

Hidden census 2755600
validated pickup latitude count [199, 686, 385, 599, 527, 210, 161, 419, 385, 419]


### pickup location

In [23]:
if debug_exploration:
    pickup_locations = rdd_chicago.map(lambda row: row.pickup_location).persist(explore_storage_level)
    pickup_location_missing = pickup_locations.filter(lambda x: not is_pickup_location_valid(x)).count()
    pickup_location_count = pickup_locations.filter(is_pickup_location_valid).countByValue()
    pickup_locations.unpersist()

    print("Hidden pickup location:", pickup_location_missing)
    print("different pickup location based on count", sorted(pickup_location_count.items(), key=lambda x: -x[1])[:10])

Hidden pickup location: 3047662
different pickup location based on count [('18', 1750473), ('744', 1011854), ('210', 655687), ('686', 649922), ('754', 603985), ('599', 593149), ('225', 526307), ('688', 525415), ('411', 506136), ('419', 494363)]


### dropoff location

In [24]:
if debug_exploration:
    dropoff_locations = rdd_chicago.map(lambda row: row.dropoff_location).persist(explore_storage_level)
    dropoff_location_missing = dropoff_locations.filter(lambda x: not is_dropoff_location_valid(x)).count()
    dropoff_location_count = dropoff_locations.filter(is_pickup_location_valid).countByValue()
    dropoff_locations.unpersist()

    print("Hidden dropoff location:", dropoff_location_missing)
    print("different dropoff location based on count", sorted(dropoff_location_count.items(), key=lambda x: -x[1])[:10])

Hidden dropoff location: 19866157
different dropoff location based on count []


## 3 Filtering

In [None]:
validators_column: list[tuple[str, Callable[[str], bool]]] = [
    ("taxi_id", is_taxi_id_valid),
    ("trip_start_timestamp", is_trip_start_timestamp_valid),
    ("trip_end_timestamp", is_trip_end_timestamp_valid),
    ("trip_seconds", is_trip_seconds_valid),
    ("trip_miles", is_trip_miles_valid),
    ("pickup_community_area", is_pickup_community_area_valid),
    ("dropoff_community_area", is_dropoff_community_area_valid),
    ("fare", is_fare_valid),
    ("tips", is_tips_valid),
    ("tolls", is_tolls_valid),
    ("extras", is_extras_valid),
    ("trip_total", is_trip_total_valid),
    ("payment_type", is_payment_type_valid),
    ("company", is_company_valid),
]

validators_whole: list[tuple[str, Callable[[t.Row], bool]]] = [
    ("calculated_total", is_calculated_total_valid),
    ("start_end_trip_order", is_start_end_trip_order_valid),
]


def is_row_valid(row: t.Row) -> bool:
    columns_valid = all(fil(row[col]) for col, fil in validators_column)
    whole_valid = columns_valid and all(fil(row) for _, fil in validators_whole)

    return columns_valid and whole_valid


rdd_chicago_filtered = rdd_chicago.filter(is_row_valid)

### Dropped Rows

In [26]:
from schemas import schema_chicago_dropped_silver


def add_reason(row: t.Row) -> t.Row:
    failed_column = next((col for col, fil in validators_column if not fil(row[col])), None)
    failed_whole = failed_column or next((col for col, fil in validators_whole if not fil(row)), None)
    return t.Row(reason=failed_column or failed_whole, **row.asDict())


if debug_dropped:
    rdd_chicago_dropped = rdd_chicago.filter(lambda x: not is_row_valid(x)).map(add_reason)

    rdd_chicago_count = rdd_chicago.count()
    rdd_chicago_dropped_count = rdd_chicago_dropped.count()
    print(f"{rdd_chicago_dropped_count} dropped ({rdd_chicago_dropped_count / rdd_chicago_count:.2%})")
    if rdd_chicago_dropped_count > 0:
        rdd_chicago_dropped.toDF(schema=schema_chicago_dropped_silver).show()

14864 dropped (0.07%)


+------------------+-------+--------------------+-------------------+------------+----------+-------------------+--------------------+---------------------+----------------------+-------+----+-----+------+----------+------------+-------+---------------+----------------+---------------+----------------+-----------------+----------------+
|            reason|taxi_id|trip_start_timestamp| trip_end_timestamp|trip_seconds|trip_miles|pickup_census_tract|dropoff_census_tract|pickup_community_area|dropoff_community_area|   fare|tips|tolls|extras|trip_total|payment_type|company|pickup_latitude|pickup_longitude|pickup_location|dropoff_latitude|dropoff_longitude|dropoff_location|
+------------------+-------+--------------------+-------------------+------------+----------+-------------------+--------------------+---------------------+----------------------+-------+----+-----+------+----------+------------+-------+---------------+----------------+---------------+----------------+-----------------+-

## 4. Saving results

In [27]:
from schemas import schema_chicago_silver
from shared import PATH_SILVER_CHICAGO

df_chicago_silver = rdd_chicago_filtered.toDF(schema=schema_chicago_silver)
df_chicago_silver.write.parquet(PATH_SILVER_CHICAGO, mode="overwrite")

In [28]:
spark.stop()