# Visual Analytics

## Assignment 3

**Instructor:** Dr. Marco D'Ambros  
**TAs:** Carmen Armenti, Mattia Giannaccari

**Contacts:** marco.dambros@usi.ch, carmen.armenti@usi.ch, mattia.giannaccari@usi.ch

**Due Date:** May 16, 2025 @ 23:55

---
The goal of this assignment is to use **Spark (PySpark)** and **Polars** in Jupyter notebooks.  
The files `trip_data.csv`, `trip_fare.csv`, and `nyc_boroughs.geojson` are available in the provided folder: [Assignment3-data](https://usi365-my.sharepoint.com/:f:/g/personal/armenc_usi_ch/Ejp7sb8QAMROoWe0XUDcAkMBoqUFk-w2Vgroup025NhAww?e=2I7SMC).

You may clean the data as needed; however, please note that specific data cleaning steps will be required in **Exercise 5**. If you choose to clean the data before Exercise 5, make sure to retain the **original dataset** for use with the Polars exercises.

- Use **Spark** to solve **Exercises 1–4**
- Use **Polars** to solve **Exercises 5–8**

You are encouraged to use [Spark window functions](https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-window.html) whenever appropriate.

Please name your notebook file as `SurnameName_Assignment3.ipynb`

## Spark

### Exercise 1
Join the `trip_data` and `trip_fare` dataframes into one and consider only data on 2013-01-01. Please specify the number of rows obtained after joining the 2 datasets.

In [6]:
# Set JAVA_HOME and PATH to use conda's OpenJDK 11 for PySpark compatibility
import os
os.environ["JAVA_HOME"] = "/opt/miniconda3/envs/jyvenv"
os.environ["PATH"] = f"/opt/miniconda3/envs/jyvenv/bin:" + os.environ["PATH"]
# Now you can safely import and use PySpark in this notebook

In [6]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, count, when, isnan, isnull
import os

# Create and configure Spark session with more robust settings
spark = SparkSession.builder \
    .appName("NYC Taxi Data Analysis") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.local.dir", "/tmp") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .master("local[*]") \
    .getOrCreate()

try:
    # Get the current working directory
    current_dir = os.getcwd()
    print(f"Current working directory: {current_dir}")

    # Load the data with error handling
    print("Loading trip data...")
    trip_data = spark.read.csv(
        "data/trip_data.csv",
        header=True,
        inferSchema=True,
        mode="PERMISSIVE"
    )

    print("Loading trip fare data...")
    trip_fare = spark.read.csv(
        "data/trip_fare.csv",
        header=True,
        inferSchema=True,
        mode="PERMISSIVE"
    )

    # Clean column names in trip_fare (remove leading spaces)
    print("Cleaning column names...")
    trip_fare = trip_fare.withColumnRenamed(" hack_license", "hack_license") \
                        .withColumnRenamed(" vendor_id", "vendor_id") \
                        .withColumnRenamed(" pickup_datetime", "pickup_datetime") \
                        .withColumnRenamed(" payment_type", "payment_type") \
                        .withColumnRenamed(" fare_amount", "fare_amount") \
                        .withColumnRenamed(" surcharge", "surcharge") \
                        .withColumnRenamed(" mta_tax", "mta_tax") \
                        .withColumnRenamed(" tip_amount", "tip_amount") \
                        .withColumnRenamed(" tolls_amount", "tolls_amount") \
                        .withColumnRenamed(" total_amount", "total_amount")

    # Display schema of the dataframes
    print("\nTrip Data Schema:")
    trip_data.printSchema()

    print("\nTrip Fare Schema:")
    trip_fare.printSchema()

    # Convert pickup_datetime to date type and cache for better performance
    print("\nConverting dates...")
    trip_data = trip_data.withColumn("pickup_date", to_date(col("pickup_datetime"))).cache()
    trip_fare = trip_fare.withColumn("pickup_date", to_date(col("pickup_datetime"))).cache()

    # Filter data for January 1, 2013
    print("\nFiltering data for 2013-01-01...")
    trip_data_jan1 = trip_data.filter(col("pickup_date") == "2013-01-01")
    trip_fare_jan1 = trip_fare.filter(col("pickup_date") == "2013-01-01")

    # Join the two dataframes on common keys
    print("\nJoining datasets...")
    joined_data = trip_data_jan1.join(
        trip_fare_jan1,
        on=["medallion", "hack_license", "vendor_id", "pickup_datetime"],
        how="inner"
    )

    # Count the number of rows after joining
    print("\nCounting rows...")
    row_count = joined_data.count()
    print(f"Number of rows after joining the datasets for 2013-01-01: {row_count}")

    # Show a sample of the joined data
    print("\nSample of joined data:")
    joined_data.show(5, truncate=False)

except Exception as e:
    print(f"An error occurred: {str(e)}")
    raise
finally:
    # Clean up cached data
    if 'trip_data' in locals():
        trip_data.unpersist()
    if 'trip_fare' in locals():
        trip_fare.unpersist()

Current working directory: /Users/zitian/Visual-Analytics-SP-2025/assigment3
Loading trip data...


                                                                                

Loading trip fare data...


                                                                                

Cleaning column names...

Trip Data Schema:
root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)


Trip Fare Schema:
root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- surcharge: do

                                                                                

Number of rows after joining the datasets for 2013-01-01: 412630

Sample of joined data:




+--------------------------------+--------------------------------+---------+-------------------+---------+------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+-----------+------------+-----------+---------+-------+----------+------------+------------+-----------+
|medallion                       |hack_license                    |vendor_id|pickup_datetime    |rate_code|store_and_fwd_flag|dropoff_datetime   |passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|pickup_date|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|pickup_date|
+--------------------------------+--------------------------------+---------+-------------------+---------+------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+----

                                                                                

### Exercise 2
Provide a graphical representation to compare the average fare amount for trips _within_ and _across_ all the boroughs. You may want to have a look at: https://docs.bokeh.org/en/latest/docs/user_guide/topics/categorical.html#categorical-heatmaps

In [11]:
# Exercise 2: Visualize average fare amount for trips within and across all boroughs

from pyspark.sql.functions import col, isnan, when, avg as spark_avg
from pyspark.sql.types import StringType
from pyspark.sql import functions as F
from bokeh.plotting import figure, show, output_notebook
from bokeh.models import ColumnDataSource, LinearColorMapper, ColorBar, BasicTicker, PrintfTickFormatter
from bokeh.palettes import Viridis256
import numpy as np
import geopandas as gpd
from shapely.geometry import Point

# Enable Bokeh output in the notebook
output_notebook()

# 1. Data cleaning: remove records with missing coordinates or fare amount
cleaned_data = joined_data.filter(
    (col("pickup_longitude").isNotNull()) & 
    (col("pickup_latitude").isNotNull()) &
    (col("dropoff_longitude").isNotNull()) & 
    (col("dropoff_latitude").isNotNull()) &
    (col("fare_amount").isNotNull())
)

# 2. Load NYC boroughs GeoJSON
boroughs_gdf = gpd.read_file("data/nyc-boroughs.geojson")

# 3. Define a function to map coordinates to borough name
def lookup_borough(lon, lat):
    point = Point(lon, lat)
    for _, row in boroughs_gdf.iterrows():
        if row['geometry'].contains(point):
            return row['borough']
    return "Other"

# 4. Register the function as a Spark UDF
from pyspark.sql.functions import udf
lookup_borough_udf = udf(lookup_borough, StringType())

# 5. Add pickup and dropoff borough columns
with_boroughs = cleaned_data \
    .withColumn("pickup_borough", lookup_borough_udf(col("pickup_longitude"), col("pickup_latitude"))) \
    .withColumn("dropoff_borough", lookup_borough_udf(col("dropoff_longitude"), col("dropoff_latitude")))

# 6. Compute average fare for each (pickup_borough, dropoff_borough) pair
borough_fare = with_boroughs.groupBy("pickup_borough", "dropoff_borough") \
    .agg(spark_avg("fare_amount").alias("avg_fare")) \
    .filter((col("pickup_borough") != "Other") & (col("dropoff_borough") != "Other"))

# 7. Convert to pandas DataFrame for visualization
pdf = borough_fare.toPandas().pivot(index="pickup_borough", columns="dropoff_borough", values="avg_fare")

# 8. Prepare data for Bokeh heatmap
boroughs_list = sorted(list(set(pdf.index) | set(pdf.columns)))
data = {
    "pickup_borough": [],
    "dropoff_borough": [],
    "avg_fare": []
}
for i in boroughs_list:
    for j in boroughs_list:
        data["pickup_borough"].append(i)
        data["dropoff_borough"].append(j)
        value = pdf.loc[i, j] if (i in pdf.index and j in pdf.columns) else np.nan
        data["avg_fare"].append(value)

source = ColumnDataSource(data)
mapper = LinearColorMapper(palette=Viridis256, low=np.nanmin(data["avg_fare"]), high=np.nanmax(data["avg_fare"]))

# 9. Create and show the heatmap (use width/height instead of plot_width/plot_height for Bokeh 3.6+)
p = figure(
    title="Average Fare Amount (Within and Across Boroughs)",
    x_range=boroughs_list, y_range=boroughs_list,
    x_axis_location="above", width=600, height=600,
    tools="hover,save", tooltips=[('Pickup', '@pickup_borough'), ('Dropoff', '@dropoff_borough'), ('Avg Fare', '@avg_fare{0.2f}')]
)
p.rect(x="pickup_borough", y="dropoff_borough", width=1, height=1,
       source=source,
       fill_color={'field': 'avg_fare', 'transform': mapper},
       line_color=None)

color_bar = ColorBar(color_mapper=mapper, major_label_text_font_size="10pt",
                     ticker=BasicTicker(desired_num_ticks=10),
                     formatter=PrintfTickFormatter(format="%.2f"),
                     label_standoff=12, border_line_color=None, location=(0,0))
p.add_layout(color_bar, 'right')

show(p)

                                                                                

### Exercise 3
Consider only Manhattan, Bronx and Brooklyn boroughs. Then create a dataframe that shows the total number of trips *within* the same borough and *across* all the other boroughs mentioned before (Manhattan, Bronx, and Brooklyn) where the passengers are more or equal than 3.

For example, for Manhattan borough you should consider the total number of the following trips:
- Manhattan → Manhattan
- Manhattan → Bronx
- Manhattan → Brooklyn

You should then do the same for Bronx and Brooklyn boroughs.

In [4]:
# 1. Ensure borough columns exist
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import geopandas as gpd
from shapely.geometry import Point

# Load boroughs GeoJSON (只需加载一次)
boroughs_gdf = gpd.read_file("data/nyc-boroughs.geojson")

# Define lookup function
def lookup_borough(lon, lat):
    point = Point(lon, lat)
    for _, row in boroughs_gdf.iterrows():
        if row['geometry'].contains(point):
            return row['borough']
    return "Other"

lookup_borough_udf = udf(lookup_borough, StringType())

# Add borough columns if not already present
with_boroughs = joined_data \
    .withColumn("pickup_borough", lookup_borough_udf(col("pickup_longitude"), col("pickup_latitude"))) \
    .withColumn("dropoff_borough", lookup_borough_udf(col("dropoff_longitude"), col("dropoff_latitude")))

# 2. Now do Exercise 3 on with_boroughs
selected_boroughs = ['Manhattan', 'Bronx', 'Brooklyn']

filtered_df = with_boroughs.filter(
    (col("pickup_borough").isin(selected_boroughs)) &
    (col("dropoff_borough").isin(selected_boroughs)) &
    (col("passenger_count") >= 3)
).persist()

print(f"Number of trips between Manhattan, Bronx, Brooklyn with >=3 passengers: {filtered_df.count()}")

# Detailed counts for each (pickup, dropoff) pair
detailed_counts = (
    filtered_df
    .groupBy("pickup_borough", "dropoff_borough")
    .count()
    .orderBy("pickup_borough", "dropoff_borough")
)
print("Detailed trip counts (pickup_borough -> dropoff_borough):")
detailed_counts.show(truncate=False)

# Summary: for each pickup_borough, count trips to each target dropoff_borough
from pyspark.sql import functions as F
summary_df = (
    filtered_df
    .groupBy("pickup_borough")
    .agg(
        F.sum(when(col("dropoff_borough") == "Manhattan", 1).otherwise(0)).alias("to_Manhattan"),
        F.sum(when(col("dropoff_borough") == "Bronx", 1).otherwise(0)).alias("to_Bronx"),
        F.sum(when(col("dropoff_borough") == "Brooklyn", 1).otherwise(0)).alias("to_Brooklyn"),
        F.count("*").alias("total_trips")
    )
    .orderBy("pickup_borough")
)
print("Summary of trips by origin borough:")
summary_df.show(truncate=False)

filtered_df.unpersist()

                                                                                

Number of trips between Manhattan, Bronx, Brooklyn with >=3 passengers: 69389
Detailed trip counts (pickup_borough -> dropoff_borough):
+--------------+---------------+-----+
|pickup_borough|dropoff_borough|count|
+--------------+---------------+-----+
|Bronx         |Bronx          |103  |
|Bronx         |Brooklyn       |2    |
|Bronx         |Manhattan      |55   |
|Brooklyn      |Bronx          |10   |
|Brooklyn      |Brooklyn       |2018 |
|Brooklyn      |Manhattan      |1329 |
|Manhattan     |Bronx          |532  |
|Manhattan     |Brooklyn       |2778 |
|Manhattan     |Manhattan      |62562|
+--------------+---------------+-----+

Summary of trips by origin borough:
+--------------+------------+--------+-----------+-----------+
|pickup_borough|to_Manhattan|to_Bronx|to_Brooklyn|total_trips|
+--------------+------------+--------+-----------+-----------+
|Bronx         |55          |103     |2          |160        |
|Brooklyn      |1329        |10      |2018       |3357       |
|Manh

DataFrame[medallion: string, hack_license: string, vendor_id: string, pickup_datetime: string, rate_code: int, store_and_fwd_flag: string, dropoff_datetime: string, passenger_count: int, trip_time_in_secs: int, trip_distance: double, pickup_longitude: double, pickup_latitude: double, dropoff_longitude: double, dropoff_latitude: double, pickup_date: date, payment_type: string, fare_amount: double, surcharge: double, mta_tax: double, tip_amount: double, tolls_amount: double, total_amount: double, pickup_date: date, pickup_borough: string, dropoff_borough: string]

### Exercise 4
Create a dataframe where each row represents a driver, and there is one column per borough.
For each driver-borough, the dataframe provides the maximum number of consecutive trips
for the given driver, within the given borough. Please consider only trips which were payed by card. 

For example, if for driver A we have (sorted by time):
- Trip 1: Bronx → Bronx
- Trip 2: Bronx → Bronx
- Trip 3: Bronx → Manhattan
- Trip 4: Manhattan → Bronx.
    
The maximum number of consecutive trips for Bronx is 2.

In [8]:
# Exercise 4: Maximum consecutive trips within boroughs for card payments

from pyspark.sql.functions import col, count, lag, when, sum as spark_sum, max as spark_max
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import geopandas as gpd
from shapely.geometry import Point

# 1. Load NYC boroughs GeoJSON
boroughs_gdf = gpd.read_file("data/nyc-boroughs.geojson")

# 2. Define a robust lookup function for boroughs
def lookup_borough(lon, lat):
    # Return "Other" if coordinates are missing or clearly invalid
    if lon is None or lat is None:
        return "Other"
    # NYC bounding box (approximate, can be adjusted)
    if not (-74.3 < lon < -73.65 and 40.45 < lat < 40.95):
        return "Other"
    point = Point(lon, lat)
    for _, row in boroughs_gdf.iterrows():
        if row['geometry'].contains(point):
            return row['borough']
    return "Other"

lookup_borough_udf = F.udf(lookup_borough, StringType())

# 3. Filter out rows with missing or clearly invalid coordinates before borough mapping
filtered_data = joined_data.filter(
    (col("pickup_longitude").isNotNull()) &
    (col("pickup_latitude").isNotNull()) &
    (col("dropoff_longitude").isNotNull()) &
    (col("dropoff_latitude").isNotNull()) &
    (col("pickup_longitude") != 0) &
    (col("pickup_latitude") != 0) &
    (col("dropoff_longitude") != 0) &
    (col("dropoff_latitude") != 0)
)

# 4. Add borough columns
with_boroughs = filtered_data \
    .withColumn("pickup_borough", lookup_borough_udf(col("pickup_longitude"), col("pickup_latitude"))) \
    .withColumn("dropoff_borough", lookup_borough_udf(col("dropoff_longitude"), col("dropoff_latitude")))

# 5. Filter for card payments only
card_trips = with_boroughs.filter(
    col("payment_type") == "CRD"
).persist()

print(f"Total card payment trips: {card_trips.count()}")

# 6. Define window specification for each driver
driver_window = Window.partitionBy("hack_license").orderBy("pickup_datetime")

# 7. Function to calculate max consecutive trips within a borough for each driver
def calculate_max_consecutive_trips(df, borough):
    # Mark trips within the borough
    borough_trips = df.withColumn(
        "is_borough_trip",
        when(
            (col("pickup_borough") == borough) & 
            (col("dropoff_borough") == borough),
            1
        ).otherwise(0)
    )
    # Calculate group changes (when sequence of borough trips breaks)
    borough_trips = borough_trips.withColumn(
        "group_change",
        when(
            (col("is_borough_trip") == 0) & 
            (lag("is_borough_trip", 1, 0).over(driver_window) == 1),
            1
        ).otherwise(0)
    )
    # Calculate group IDs for consecutive sequences
    borough_trips = borough_trips.withColumn(
        "group_id",
        spark_sum("group_change").over(
            Window.partitionBy("hack_license")
            .orderBy("pickup_datetime")
            .rowsBetween(Window.unboundedPreceding, 0)
        )
    )
    # Calculate max consecutive trips for each driver
    return borough_trips.filter(col("is_borough_trip") == 1) \
        .groupBy("hack_license", "group_id") \
        .count() \
        .groupBy("hack_license") \
        .agg(spark_max("count").alias(borough))

# 8. List of boroughs
boroughs = ['Manhattan', 'Bronx', 'Brooklyn', 'Queens', 'Staten Island']

# 9. Calculate results for each borough
result_df = None
for borough in boroughs:
    borough_stats = calculate_max_consecutive_trips(card_trips, borough)
    if result_df is None:
        result_df = borough_stats
    else:
        result_df = result_df.join(borough_stats, on="hack_license", how="outer")

# 10. Fill null values with 0 and show results
result_df = result_df.fillna(0)

print("\nSample of results (first 10 drivers):")
result_df.orderBy("hack_license").show(10, truncate=False)

# 11. Calculate and display statistics
print("\nStatistics by borough:")
for borough in boroughs:
    stats = result_df.select(
        F.avg(col(borough)).alias("avg"),
        F.max(col(borough)).alias("max"),
        F.count(when(col(borough) > 0, 1)).alias("drivers_with_trips")
    ).first()
    print(f"\n{borough}:")
    print(f"  Average max consecutive trips: {stats['avg']:.2f}")
    print(f"  Maximum consecutive trips: {stats['max']}")
    print(f"  Number of drivers with trips: {stats['drivers_with_trips']}")

# 12. Clean up
card_trips.unpersist()

                                                                                

Total card payment trips: 160792

Sample of results (first 10 drivers):


                                                                                

+--------------------------------+---------+-----+--------+------+-------------+
|hack_license                    |Manhattan|Bronx|Brooklyn|Queens|Staten Island|
+--------------------------------+---------+-----+--------+------+-------------+
|0002555BBE359440D6CEB34B699D3932|1        |0    |0       |1     |0            |
|000A4EBF1CEB9C6BD9978D4362493C6E|0        |0    |1       |0     |0            |
|000B8D660A329BBDBF888500E4BD8B98|4        |0    |1       |0     |0            |
|000CCA239BFDC0ABE2895AC9086C4290|8        |0    |0       |0     |0            |
|00117D7CCD47D125E77163A7AC2C66EB|10       |0    |0       |0     |0            |
|0011B1575B9F5398BBC0F27EA560D631|10       |0    |1       |0     |0            |
|0012703023AC1788D34F6694908900FC|6        |0    |0       |0     |0            |
|001C8AAB90AEE49F36FCAA7B4136C81A|4        |0    |0       |0     |0            |
|0022A86EEFEE75C5224C53AE81339793|2        |0    |2       |0     |0            |
|0023CF49F98E4F04AFF553B9589

                                                                                


Manhattan:
  Average max consecutive trips: 5.31
  Maximum consecutive trips: 29
  Number of drivers with trips: 18466


                                                                                


Bronx:
  Average max consecutive trips: 0.00
  Maximum consecutive trips: 14
  Number of drivers with trips: 54


                                                                                


Brooklyn:
  Average max consecutive trips: 0.18
  Maximum consecutive trips: 11
  Number of drivers with trips: 2496


                                                                                


Queens:
  Average max consecutive trips: 0.12
  Maximum consecutive trips: 26
  Number of drivers with trips: 1899





Staten Island:
  Average max consecutive trips: 0.00
  Maximum consecutive trips: 2
  Number of drivers with trips: 5


                                                                                

DataFrame[medallion: string, hack_license: string, vendor_id: string, pickup_datetime: string, rate_code: int, store_and_fwd_flag: string, dropoff_datetime: string, passenger_count: int, trip_time_in_secs: int, trip_distance: double, pickup_longitude: double, pickup_latitude: double, dropoff_longitude: double, dropoff_latitude: double, pickup_date: date, payment_type: string, fare_amount: double, surcharge: double, mta_tax: double, tip_amount: double, tolls_amount: double, total_amount: double, pickup_date: date, pickup_borough: string, dropoff_borough: string]

## Polars

### Exercise 5

Please work on the merged dataset of trips and fares and perform the following data cleaning tasks:

1. Remove trips with invalid locations (i.e. not in New York City);
3. Remove trips with invalid amounts:
    - Total amount must be greater than zero;
    - Total amount must correspond to the sum of all the other amounts.
5. Remove trips with invalid time:
    - Pick-up before drop-off;
    - Valid duration.

After each data cleaning task, report how many rows where removed. Finally report:
- Are there **duplicate trips**?
- How many trips remain after cleaning?

In [2]:
# Exercise 5: Data cleaning with Polars

import polars as pl

# 1. Define explicit schemas for both datasets
trip_data_schema = {
    "medallion": pl.Utf8,
    "hack_license": pl.Utf8,
    "vendor_id": pl.Utf8,
    "rate_code": pl.Int64,
    "store_and_fwd_flag": pl.Utf8,
    "pickup_datetime": pl.Utf8,
    "dropoff_datetime": pl.Utf8,
    "passenger_count": pl.Int64,
    "trip_time_in_secs": pl.Int64,
    "trip_distance": pl.Float64,
    "pickup_longitude": pl.Float64,
    "pickup_latitude": pl.Float64,
    "dropoff_longitude": pl.Float64,
    "dropoff_latitude": pl.Float64
}
trip_fare_schema = {
    "medallion": pl.Utf8,
    " hack_license": pl.Utf8,
    " vendor_id": pl.Utf8,
    " pickup_datetime": pl.Utf8,
    " payment_type": pl.Utf8,
    " fare_amount": pl.Float64,
    " surcharge": pl.Float64,
    " mta_tax": pl.Float64,
    " tip_amount": pl.Float64,
    " tolls_amount": pl.Float64,
    " total_amount": pl.Float64
}

print("Reading data files with Polars using explicit schemas...")
try:
    # Read CSVs with explicit schema
    trip_data = pl.read_csv("data/trip_data.csv", schema=trip_data_schema)
    trip_fare = pl.read_csv("data/trip_fare.csv", schema=trip_fare_schema)
    print(f"Trip data shape: {trip_data.shape}")
    print(f"Trip fare shape: {trip_fare.shape}")

    # 2. Clean column names (remove leading spaces)
    trip_fare = trip_fare.rename({col: col.strip() for col in trip_fare.columns})

    # 3. Merge datasets
    print("Merging datasets...")
    merged_df = trip_data.join(
        trip_fare,
        left_on=["medallion", "hack_license", "vendor_id", "pickup_datetime"],
        right_on=["medallion", "hack_license", "vendor_id", "pickup_datetime"],
        how="inner"
    )
    initial_count = merged_df.height
    print(f"Initial merged dataset: {initial_count} rows")

    # --- Task 1: Remove trips with invalid locations ---
    print("\n--- Task 1: Remove trips with invalid locations ---")
    NYC_BOUNDS = {
        "lon_min": -74.25, "lon_max": -73.7,
        "lat_min": 40.5, "lat_max": 41.0
    }
    merged_df = merged_df.filter(
        (pl.col("pickup_longitude").is_between(NYC_BOUNDS["lon_min"], NYC_BOUNDS["lon_max"])) &
        (pl.col("pickup_latitude").is_between(NYC_BOUNDS["lat_min"], NYC_BOUNDS["lat_max"])) &
        (pl.col("dropoff_longitude").is_between(NYC_BOUNDS["lon_min"], NYC_BOUNDS["lon_max"])) &
        (pl.col("dropoff_latitude").is_between(NYC_BOUNDS["lat_min"], NYC_BOUNDS["lat_max"])) &
        pl.col("pickup_longitude").is_not_null() &
        pl.col("pickup_latitude").is_not_null() &
        pl.col("dropoff_longitude").is_not_null() &
        pl.col("dropoff_latitude").is_not_null()
    )
    after_loc = merged_df.height
    print(f"Removed {initial_count - after_loc} rows with invalid locations. Remaining: {after_loc}")

    # --- Task 2: Remove trips with invalid amounts ---
    print("\n--- Task 2: Remove trips with invalid amounts ---")
    merged_df = merged_df.filter(pl.col("total_amount") > 0)
    after_amount = merged_df.height
    print(f"Removed {after_loc - after_amount} rows with total_amount <= 0. Remaining: {after_amount}")

    merged_df = merged_df.with_columns(
        (pl.col("fare_amount") + pl.col("surcharge") + pl.col("mta_tax") +
         pl.col("tip_amount") + pl.col("tolls_amount")).alias("sum_of_amounts")
    )
    merged_df = merged_df.with_columns(
        (pl.col("total_amount") - pl.col("sum_of_amounts")).abs().alias("amount_diff")
    )
    merged_df = merged_df.filter(pl.col("amount_diff") <= 0.01)
    after_sum = merged_df.height
    print(f"Removed {after_amount - after_sum} rows with inconsistent amounts. Remaining: {after_sum}")

    # --- Task 3: Remove trips with invalid time ---
    print("\n--- Task 3: Remove trips with invalid time ---")
    merged_df = merged_df.with_columns([
        pl.col("pickup_datetime").str.to_datetime().alias("pickup_datetime_parsed"),
        pl.col("dropoff_datetime").str.to_datetime().alias("dropoff_datetime_parsed")
    ])
    merged_df = merged_df.filter(pl.col("pickup_datetime_parsed") < pl.col("dropoff_datetime_parsed"))
    after_time_order = merged_df.height
    print(f"Removed {after_sum - after_time_order} rows with pickup >= dropoff. Remaining: {after_time_order}")

    merged_df = merged_df.with_columns(
        (pl.col("dropoff_datetime_parsed") - pl.col("pickup_datetime_parsed")).dt.total_seconds().alias("duration_seconds")
    )
    merged_df = merged_df.filter(
        (pl.col("duration_seconds") >= 10) & (pl.col("duration_seconds") <= 86400)
    )
    after_duration = merged_df.height
    print(f"Removed {after_time_order - after_duration} rows with invalid duration. Remaining: {after_duration}")

    # --- Final: Remove duplicates ---
    print("\n--- Final Checks: Remove duplicate trips ---")
    key_columns = [
        "medallion", "hack_license", "pickup_datetime", "dropoff_datetime",
        "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"
    ]
    before_dedup = merged_df.height
    merged_df = merged_df.unique(subset=key_columns)
    after_dedup = merged_df.height
    print(f"Removed {before_dedup - after_dedup} duplicate trips. Remaining: {after_dedup}")

    # --- Summary ---
    print("\n--- Summary of Data Cleaning ---")
    print(f"Initial count: {initial_count}")
    print(f"Final count: {after_dedup}")
    print(f"Total removed: {initial_count - after_dedup} ({(initial_count - after_dedup)/initial_count*100:.2f}%)")

    # Save cleaned dataset for later use
    merged_df.write_parquet("cleaned_taxi_data.parquet")
    print("Cleaned dataset saved to 'cleaned_taxi_data.parquet'")

except Exception as e:
    print(f"Error encountered: {type(e).__name__}: {e}")
    print("Try to check your CSV files and schema definitions.")

Reading data files with Polars using explicit schemas...
Trip data shape: (14776615, 14)
Trip fare shape: (14776615, 11)
Merging datasets...
Initial merged dataset: 14776615 rows

--- Task 1: Remove trips with invalid locations ---
Removed 295789 rows with invalid locations. Remaining: 14480826

--- Task 2: Remove trips with invalid amounts ---
Removed 0 rows with total_amount <= 0. Remaining: 14480826
Removed 0 rows with inconsistent amounts. Remaining: 14480826

--- Task 3: Remove trips with invalid time ---
Removed 10098 rows with pickup >= dropoff. Remaining: 14470728
Removed 13875 rows with invalid duration. Remaining: 14456853

--- Final Checks: Remove duplicate trips ---
Removed 0 duplicate trips. Remaining: 14456853

--- Summary of Data Cleaning ---
Initial count: 14776615
Final count: 14456853
Total removed: 319762 (2.16%)
Cleaned dataset saved to 'cleaned_taxi_data.parquet'


### Exercise 6

Compute the **total revenue** (total_amount) grouped by:
- Pick-up hour of the day (0–23)
- Passenger count (group >=6 into “6+”)

Create a heatmap where:
- X-axis = hour
- Y-axis = passenger count group
- Cell value = average revenue per trip

In [6]:
# Exercise 6: Average revenue heatmap by hour and passenger count (green theme, Bokeh version)

import polars as pl
import numpy as np
from bokeh.plotting import figure, show, output_notebook
from bokeh.models import ColumnDataSource, LinearColorMapper, ColorBar, BasicTicker, PrintfTickFormatter
from bokeh.palettes import Greens256

# 1. Read cleaned data
print("Reading cleaned taxi data...")
cleaned_df = pl.read_parquet("cleaned_taxi_data.parquet")
print(f"Shape of cleaned data: {cleaned_df.shape}")

# 2. Convert pickup_datetime to Datetime and extract pickup hour as two-digit string
cleaned_df = cleaned_df.with_columns(
    pl.col("pickup_datetime").str.to_datetime().alias("pickup_datetime_parsed")
)
cleaned_df = cleaned_df.with_columns(
    pl.col("pickup_datetime_parsed").dt.hour().alias("pickup_hour_int")
)
cleaned_df = cleaned_df.with_columns(
    pl.col("pickup_hour_int").cast(pl.Utf8).str.zfill(2).alias("pickup_hour")
)

# 3. Create passenger count group (group >=6 as "6+")
cleaned_df = cleaned_df.with_columns(
    pl.when(pl.col("passenger_count") >= 6)
    .then(pl.lit("6+"))
    .otherwise(pl.col("passenger_count").cast(pl.Utf8))
    .alias("passenger_group")
)

# 4. Filter valid data (passenger_count > 0 and not null)
valid_data = cleaned_df.filter(
    pl.col("passenger_count").is_not_null() & 
    (pl.col("passenger_count") > 0)
)

# 5. Group by hour and passenger group, calculate mean revenue
revenue_stats = valid_data.group_by(
    "pickup_hour", "passenger_group"
).agg(
    pl.col("total_amount").mean().alias("avg_revenue_per_trip")
)

# 6. Ensure all hour x passenger_group combinations are present
hours = [f"{h:02d}" for h in range(24)]
passenger_groups = ["1", "2", "3", "4", "5", "6+"]
all_combinations = pl.DataFrame({
    "pickup_hour": np.repeat(hours, len(passenger_groups)),
    "passenger_group": passenger_groups * len(hours)
})

complete_data = all_combinations.join(
    revenue_stats, on=["pickup_hour", "passenger_group"], how="left"
).with_columns([
    pl.col("avg_revenue_per_trip").fill_null(0)
])

# 7. Prepare data for Bokeh
data = {
    "pickup_hour": [],
    "passenger_group": [],
    "avg_revenue_per_trip": []
}
for row in complete_data.iter_rows(named=True):
    data["pickup_hour"].append(row["pickup_hour"])
    data["passenger_group"].append(row["passenger_group"])
    data["avg_revenue_per_trip"].append(row["avg_revenue_per_trip"])

source = ColumnDataSource(data)

# 8. Set up color mapper (green palette)
mapper = LinearColorMapper(
    palette=Greens256, 
    low=min(data["avg_revenue_per_trip"]), 
    high=max(data["avg_revenue_per_trip"])
)

# 9. Output in notebook
output_notebook()

# 10. Create Bokeh heatmap
p = figure(
    title="Average Revenue per Trip by Hour and Passenger Count (Green Theme, Bokeh)",
    x_range=hours,
    y_range=passenger_groups,  # from small to large, so y轴从下到上是1,2,3,4,5,6+
    x_axis_location="above",
    width=900, height=400,
    tools="hover,save",
    tooltips=[
        ("Hour", "@pickup_hour"),
        ("Passengers", "@passenger_group"),
        ("Avg Revenue", "@avg_revenue_per_trip{0.2f}")
    ]
)

p.rect(
    x="pickup_hour", y="passenger_group", width=1, height=1,
    source=source,
    fill_color={'field': 'avg_revenue_per_trip', 'transform': mapper},
    line_color=None
)

color_bar = ColorBar(
    color_mapper=mapper,
    major_label_text_font_size="10pt",
    ticker=BasicTicker(desired_num_ticks=10),
    formatter=PrintfTickFormatter(format="%.2f"),
    label_standoff=12, border_line_color=None, location=(0,0)
)
p.add_layout(color_bar, 'right')

p.xaxis.axis_label = "Hour of Day (0-23)"
p.yaxis.axis_label = "Passenger Count"
p.xaxis.major_label_orientation = 1.0

show(p)

Reading cleaned taxi data...
Shape of cleaned data: (14456853, 26)


### Exercise 7

Define an "anomalous trip" as one that satisfies at least two of the following:
- Fare per mile is above the 95th percentile
- Tip amount > 100% of fare
- trip_time_in_secs is less than 60 seconds but distance is more than 1 mile

Create a dataframe of anomalous trips and:
- Report how many such trips exist
- Create a scatterplot to visualize the anomaly metrics
- Describe the visualization identifying groups and outliers

In [4]:
# Exercise 7: Anomalous Taxi Trips Analysis with Log-Log Visualization
# Using log scale because:
# 1. Fare per mile and fare per second often follow power-law distributions
# 2. Log scale helps visualize both small and large values clearly
# 3. Makes it easier to identify patterns across multiple orders of magnitude
# 4. Reduces visual bias from extreme outliers

import polars as pl
import numpy as np
from bokeh.plotting import figure, show, output_notebook
from bokeh.models import ColumnDataSource, HoverTool, Legend, LegendItem
from bokeh.transform import factor_cmap
from bokeh.palettes import Category10

output_notebook()

# 1. Read cleaned data
print("Reading cleaned taxi data...")
cleaned_df = pl.read_parquet("cleaned_taxi_data.parquet")
print(f"Shape of cleaned data: {cleaned_df.shape}")

# 2. Calculate fare per mile and fare per second
df = cleaned_df.with_columns([
    (pl.col("fare_amount") / pl.col("trip_distance")).alias("fare_per_mile"),
    (pl.col("fare_amount") / pl.col("trip_time_in_secs")).alias("fare_per_sec")
])

# 3. Calculate 95th percentile thresholds
fare_per_mile_95 = df["fare_per_mile"].quantile(0.95, interpolation="nearest")
fare_per_sec_95 = df["fare_per_sec"].quantile(0.95, interpolation="nearest")
trip_distance_95 = df["trip_distance"].quantile(0.95, interpolation="nearest")

print(f"95th percentile (fare_per_mile): ${fare_per_mile_95:.2f}/mile")
print(f"95th percentile (fare_per_sec): ${fare_per_sec_95:.4f}/sec")
print(f"95th percentile (trip_distance): {trip_distance_95:.2f} miles")

# 4. Mark anomalous conditions
df = df.with_columns([
    (pl.col("fare_per_mile") > fare_per_mile_95).alias("cond_fare_per_mile"),
    (pl.col("fare_per_sec") > fare_per_sec_95).alias("cond_fare_per_sec"),
    (pl.col("trip_distance") > trip_distance_95).alias("cond_trip_distance")
])

# 5. Mark anomalous trips (satisfying at least two conditions)
df = df.with_columns([
    (
        pl.col("cond_fare_per_mile").cast(pl.Int8) +
        pl.col("cond_fare_per_sec").cast(pl.Int8) +
        pl.col("cond_trip_distance").cast(pl.Int8)
    ).alias("anomaly_score")
])
df = df.with_columns([
    (pl.col("anomaly_score") >= 2).alias("is_anomalous")
])

# Print anomaly statistics
total_trips = df.height
anomalous_trips = df.filter(pl.col("is_anomalous")).height
print(f"\nTotal trips: {total_trips:,}")
print(f"Anomalous trips: {anomalous_trips:,}")
print(f"Anomaly percentage: {(anomalous_trips/total_trips)*100:.2f}%")

# 6. Sample data for visualization (to avoid too many points)
sample_size = 10000
if df.height > sample_size:
    plot_df = df.sample(n=sample_size, seed=42)
else:
    plot_df = df

# 7. Filter out invalid or non-positive values (log scale requires positive values)
plot_df = plot_df.filter(
    (pl.col("fare_per_mile") > 0) &
    (pl.col("fare_per_sec") > 0) &
    (pl.col("trip_distance") > 0)
)

# 8. Convert to pandas for Bokeh
pdf = plot_df.select([
    "fare_per_mile", "fare_per_sec", "trip_distance", "is_anomalous",
    "anomaly_score"  # Include anomaly score for detailed tooltips
]).to_pandas()

# 9. Create Bokeh visualization
source = ColumnDataSource(pdf)
color_map = factor_cmap('is_anomalous', palette=[Category10[3][0], Category10[3][1]], factors=[False, True])

# Create figure with log scales
p = figure(
    title="Anomalous Taxi Trips Analysis (Log-Log Scale)",
    x_axis_label="Fare per Mile ($/mile, log scale)",
    y_axis_label="Fare per Second ($/sec, log scale)",
    width=800, height=500,
    x_axis_type="log", y_axis_type="log",
    tools="pan,box_zoom,wheel_zoom,reset,save"
)

# Add scatter plot
scatter = p.scatter(
    x="fare_per_mile", y="fare_per_sec",
    source=source,
    color=color_map,
    legend_field="is_anomalous",
    alpha=0.6,
    size=6,
    muted_alpha=0.1
)

# Customize legend
p.legend.title = "Trip Classification"
p.legend.location = "top_left"
p.legend.click_policy = "mute"
p.legend.label_text_font_size = "10pt"
p.legend.title_text_font_size = "12pt"

# Add hover tooltips with detailed information
p.add_tools(HoverTool(
    tooltips=[
        ("Fare per Mile", "@fare_per_mile{$0.00}/mile"),
        ("Fare per Second", "@fare_per_sec{$0.0000}/sec"),
        ("Trip Distance", "@trip_distance{0.00} miles"),
        ("Anomaly Score", "@anomaly_score"),
        ("Classification", "@is_anomalous")
    ],
    formatters={
        "@fare_per_mile": "printf",
        "@fare_per_sec": "printf",
        "@trip_distance": "printf"
    }
))

# Add grid lines and styling
p.grid.grid_line_alpha = 0.3
p.grid.minor_grid_line_alpha = 0.1
p.axis.axis_line_width = 2
p.axis.major_label_text_font_size = "10pt"
p.axis.axis_label_text_font_size = "12pt"
p.title.text_font_size = "14pt"

# Add threshold lines (optional)
p.line(x=[fare_per_mile_95, fare_per_mile_95], y=[pdf["fare_per_sec"].min(), pdf["fare_per_sec"].max()],
       line_dash="dashed", line_color="red", alpha=0.3, legend_label="95th Percentile (Fare/Mile)")
p.line(x=[pdf["fare_per_mile"].min(), pdf["fare_per_mile"].max()], y=[fare_per_sec_95, fare_per_sec_95],
       line_dash="dashed", line_color="red", alpha=0.3, legend_label="95th Percentile (Fare/Sec)")

show(p)

Reading cleaned taxi data...
Shape of cleaned data: (14456853, 26)
95th percentile (fare_per_mile): $9.29/mile
95th percentile (fare_per_sec): $0.0285/sec
95th percentile (trip_distance): 9.40 miles

Total trips (after cleaning): 14,413,678
Anomalous trips: 270,542
Anomaly percentage: 1.88%
Any zero or NaN values in plot data?
Empty DataFrame
Columns: [fare_per_mile, fare_per_sec, trip_distance, is_anomalous, anomaly_score]
Index: []
fare_per_mile    0
fare_per_sec     0
trip_distance    0
is_anomalous     0
anomaly_score    0
dtype: int64


## Visualization Description: Groups and Outliers
This scatter plot shows NYC taxi trips by fare per mile (x-axis, log scale) and fare per second (y-axis, log scale).
* Blue points represent normal trips.
* Orange points highlight anomalous trips, which meet at least two of these criteria: high fare per mile, high fare per second, or long trip distance (all above the 95th percentile).
Most trips form a dense cluster in the lower-left, reflecting typical fares.
Anomalous trips stand out, separated from the main group, and are easily identified thanks to the log-log scale and color coding.

### Exercise 8
For each driver (hack_license), calculate the **total profit per hour worked**, where:
> profit = 0.7 * (fare_amount + tip_amount) when the trip starts between 7:01 AM and 7:00 PM\
> profit = 0.8 * (fare_amount + tip_amount) when the trip starts between 7:01PM and 7:00 AM

Estimate "hours worked" by summing trip_time_in_secs.

Plot a line chart showing the distribution of average profit per hour **for the top 10% drivers** in terms of total trips.

Which time of day offers **best earning efficiency**?

In [19]:


import polars as pl
import numpy as np
from bokeh.plotting import figure, show, output_notebook
from bokeh.models import ColumnDataSource, HoverTool, Legend
from bokeh.palettes import Category10
from datetime import datetime, time

output_notebook()

# 1. Read cleaned data
print("Reading cleaned taxi data...")
df = pl.read_parquet("cleaned_taxi_data.parquet")
print(f"Shape of data: {df.shape}")

# 2. Calculate profit based on time of day
# First, convert pickup_datetime to datetime and extract hour
df = df.with_columns([
    pl.col("pickup_datetime").str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S").dt.hour().alias("pickup_hour")
])

# Calculate profit using when().otherwise()
df = df.with_columns([
    pl.when(
        (pl.col("pickup_hour") >= 7) & (pl.col("pickup_hour") < 19)
    ).then(
        0.7 * (pl.col("fare_amount") + pl.col("tip_amount"))
    ).otherwise(
        0.8 * (pl.col("fare_amount") + pl.col("tip_amount"))
    ).alias("trip_profit")
])

# 3. Aggregate by driver
driver_stats = df.group_by("hack_license").agg([
    pl.sum("trip_profit").alias("total_profit"),
    pl.sum("trip_time_in_secs").alias("total_seconds"),
    pl.count().alias("trip_count")
])

# 4. Calculate profit per hour
driver_stats = driver_stats.with_columns([
    (pl.col("total_profit") / (pl.col("total_seconds") / 3600)).alias("profit_per_hour")
])

# 5. Select top 10% drivers by trip count
trip_count_threshold = driver_stats.select(
    pl.col("trip_count").quantile(0.9, interpolation="nearest")
).item()

top_drivers = driver_stats.filter(pl.col("trip_count") >= trip_count_threshold)

print(f"\nTotal drivers: {driver_stats.height:,}")
print(f"Top 10% drivers: {top_drivers.height:,}")
print(f"Trip count threshold: {trip_count_threshold:,} trips")

# 6. Create visualization for profit per hour distribution
# Sort by profit_per_hour for line plot
top_drivers = top_drivers.sort("profit_per_hour")

# Convert to pandas for Bokeh
pdf = top_drivers.select(["profit_per_hour"]).to_pandas()

# Create figure for line plot
p = figure(
    title="Distribution of Profit per Hour for Top 10% Drivers",
    x_axis_label="Driver Rank",
    y_axis_label="Profit per Hour ($)",
    width=800,
    height=500,
    tools="pan,box_zoom,wheel_zoom,save,reset"
)

# Create line plot 
x_values = list(range(len(pdf)))
p.line(
    x=x_values,
    y=pdf["profit_per_hour"],
    line_width=2,
    line_color="#036564",
    legend_label="Profit per Hour"
)

# Add hover tool
hover = HoverTool()
hover.tooltips = [
    ("Driver Rank", "@x"),
    ("Profit per Hour", "@y{0.00}")
]
p.add_tools(hover)

# Style the plot
p.title.text_font_size = "16pt"
p.axis.axis_label_text_font_size = "12pt"
p.axis.axis_label_text_font_style = "bold"
p.grid.grid_line_alpha = 0.3

# Add mean line
mean_profit = pdf["profit_per_hour"].mean()
p.line(
    x=[0, len(pdf)],
    y=[mean_profit, mean_profit],
    line_dash="dashed",
    line_color="red",
    legend_label=f"Mean: ${mean_profit:.2f}"
)

# Show the plot
show(p)

# 7. Print summary statistics
print("\nSummary Statistics:")
print(f"Number of top 10% drivers: {len(top_drivers)}")
print("\nProfit per Hour Statistics:")
print(top_drivers["profit_per_hour"].describe())

# 8. Additional analysis: Compare day vs night efficiency
# First, add a time period column
df = df.with_columns([
    pl.when(pl.col("pickup_hour").is_between(7, 19))
    .then(pl.lit("day"))
    .otherwise(pl.lit("night"))
    .alias("time_period")
])

# Calculate profit per hour by time period
period_stats = df.group_by(["hack_license", "time_period"]).agg([
    pl.sum("trip_profit").alias("total_profit"),
    pl.sum("trip_time_in_secs").alias("total_seconds")
])

# Calculate profit per hour for each period
period_stats = period_stats.with_columns([
    (pl.col("total_profit") / (pl.col("total_seconds") / 3600)).alias("profit_per_hour")
])

# Calculate means for each period
day_stats = period_stats.filter(pl.col("time_period") == "day")
night_stats = period_stats.filter(pl.col("time_period") == "night")

print("\nTime Period Statistics:")
print(f"Day (7:01 AM - 7:00 PM):")
print(day_stats["profit_per_hour"].describe())
print(f"\nNight (7:01 PM - 7:00 AM):")
print(night_stats["profit_per_hour"].describe())

Reading cleaned taxi data...
Shape of data: (14456853, 26)


  pl.count().alias("trip_count")



Total drivers: 32,067
Top 10% drivers: 3,223
Trip count threshold: 719.0 trips



Summary Statistics:
Number of top 10% drivers: 3223

Profit per Hour Statistics:
shape: (9, 2)
┌────────────┬───────────┐
│ statistic  ┆ value     │
│ ---        ┆ ---       │
│ str        ┆ f64       │
╞════════════╪═══════════╡
│ count      ┆ 3223.0    │
│ null_count ┆ 0.0       │
│ mean       ┆ 49.881404 │
│ std        ┆ 4.368664  │
│ min        ┆ 32.060989 │
│ 25%        ┆ 46.434277 │
│ 50%        ┆ 49.915971 │
│ 75%        ┆ 53.168233 │
│ max        ┆ 76.8921   │
└────────────┴───────────┘

Time Period Statistics:
Day (7:01 AM - 7:00 PM):
shape: (9, 2)
┌────────────┬─────────────┐
│ statistic  ┆ value       │
│ ---        ┆ ---         │
│ str        ┆ f64         │
╞════════════╪═════════════╡
│ count      ┆ 31843.0     │
│ null_count ┆ 0.0         │
│ mean       ┆ 47.172882   │
│ std        ┆ 30.816028   │
│ min        ┆ 2.724913    │
│ 25%        ┆ 43.560666   │
│ 50%        ┆ 45.52706    │
│ 75%        ┆ 48.036701   │
│ max        ┆ 2263.935484 │
└────────────┴─────────────┘


## Taxi Driver Profit Analysis: Day vs Night
### Conclusion
Night shift offers better earning efficiency:
* 34.7% higher average earnings
* 31% higher median earnings
* Higher rates (0.8 vs 0.7)
* Less traffic