# Citi Bike Data Analysis - Insurance Coverage and Distance Buckets

## Introduction
This notebook analyzes NYC Citi Bike trip data for January 2025. The goal is to:
- Determine how many trips exceed 30 minutes and would be covered by insurance.
- Calculate expected revenue from charging $0.20 per ride over 30 minutes.
- Analyze trip distances and visualize them in distance buckets.

## Data Preparation

- Read CSV files with schema inference  
- Select `started_at`, `ended_at`, `start_lat`, `start_lng`, `end_lat`, `end_lng` columns
- Cast timestamps and coordinates to proper types  
- Drop rows with missing values  
- Print schema and dropped row count  

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType, TimestampType
from pyspark.sql.functions import col, when, radians, sin, cos, asin, sqrt
from geopy import distance
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import pandas as pd

# Initialize Spark session
spark = SparkSession.builder.appName("CitiBikeAnalysis").config("spark.ui.showConsoleProgress", "false").getOrCreate()

# Read CSV files with automatic schema inference
df = spark.read.option("header", "true").option("inferSchema", "true").csv("202501-citibike-tripdata/*.csv")

# Print schema to inspect column names and types
df.printSchema()

# Select only the necessary columns, casting types where needed
df = df.select(
    col("started_at").cast(TimestampType()),
    col("ended_at").cast(TimestampType()),
    col("start_lat").cast(DoubleType()),
    col("start_lng").cast(DoubleType()),
    col("end_lat").cast(DoubleType()),
    col("end_lng").cast(DoubleType())
)

count_rows = df.count()
# Drop rows where any of the required columns contain NaN values
df = df.na.drop()

# Print row count after cleaning
print(f"Dropped {count_rows-df.count()} rows with N/A values")

df.printSchema()

## Trip Duration Calculation and Filtering Long Trips

We calculate trip duration by converting start and end timestamps to seconds and computing the difference.  
Trips longer than **30 minutes** (1800 seconds) are filtered and counted.


In [None]:
# Subtract timestamps and cast the result to long (seconds)
df = df.withColumn("tripduration", (col("ended_at") - col("started_at")).cast("long"))

# Filter trips longer than 30 minutes (30*60 = 1800 seconds)
long_trips = df.filter(col("tripduration") > 1800)
num_trips = long_trips.count()
print("Trips longer than 30 min:", num_trips)

## Insurance Coverage and Revenue Estimation

The expected revenue is calculated by multiplying the number of long trips by **$0.20 per ride**.


In [None]:
expected_revenue = num_trips * 0.2
print(f"Expected revenue: ${expected_revenue:.2f}")

## Travel Distance Calculation using the Haversine Formula

The **Haversine formula** is used to calculate the great-circle distance between the trip's start and end locations.  
We assume the Earth's radius to be **3959 miles** for distance in miles (or **6371 km** for kilometers).  

The Haversine formula is:

$$
d = 2R \cdot \arcsin \left( \sqrt{\sin^2 \left( \frac{\Delta \varphi}{2} \right) + \cos(\varphi_1) \cdot \cos(\varphi_2) \cdot \sin^2 \left( \frac{\Delta \lambda}{2} \right) } \right)
$$

Where:
- $R$ is the Earth's radius (miles or kilometers).
- $\varphi_1, \varphi_2$ are the latitudes of the start and end locations in radians.
- $\lambda_1, \lambda_2$ are the longitudes of the start and end locations in radians.
- $\Delta \varphi = \varphi_2 - \varphi_1$ (difference in latitude).
- $\Delta \lambda = \lambda_2 - \lambda_1$ (difference in longitude).

Source: https://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.haversine_distances.html

An optional test compares the calculated distances with values from the `geopy` library to verify accuracy.


In [None]:
# Compute travel distance using the Haversine formula
R = 3959  # Earth's radius in miles, for distance in Kilometers use 6371
df = df.withColumn(
    "distance",
    R * 2 * asin(
        sqrt(
            sin((radians(col("end_lat")) - radians(col("start_lat"))) / 2) ** 2 +
            cos(radians(col("start_lat"))) * cos(radians(col("end_lat"))) *
            sin((radians(col("end_lng")) - radians(col("start_lng"))) / 2) ** 2
        )
    )
)
# Testing the calculated distance values against a library
testing = False
if testing:
    # Get the first 20 rows with necessary coordinates and the calculated distance
    sample_rows = df.select("start_lat", "start_lng", "end_lat", "end_lng", "distance").limit(20).collect()
    
    for row in sample_rows:
        geopy_dist = distance.distance((row.start_lat, row.start_lng), (row.end_lat, row.end_lng)).miles
        print(f"Calculated: {row.distance:.2f} miles, Geopy: {geopy_dist:.2f} miles, Difference: {row.distance-geopy_dist:.6f} miles or {(row.distance-geopy_dist)/geopy_dist*100:.2f}%")

## Categorizing Trips into Distance Buckets

- Group trips into **0-1**, **2-4**, **4-9**, and **10+ mile** buckets.  
- Count the number of trips in each bucket using **Spark aggregation**.  
- Convert results to **Pandas** for visualization.  


In [None]:
# Aggregate distance buckets
bucket_counts = df.groupBy(
    when(col("distance") <= 1, "0-1")
    .when((col("distance") > 1) & (col("distance") <= 4), "2-4")
    .when((col("distance") > 4) & (col("distance") <= 9), "4-9")
    .otherwise("10+")
    .alias("distance_bucket")
).count()

# Convert Spark DataFrame to Pandas for visualization
bucket_pd = bucket_counts.toPandas()
bucket_pd

## Visualizing Trip Distance Distribution

- Sort distance buckets in the order: **0-1, 2-4, 4-9, 10+ miles**.  
- Create a **bar chart** to display trip counts per bucket.  
- Format the y-axis to show values in **thousands (K)**.  
- Annotate each bar with the number of trips.  

This visualization highlights how trip distances are distributed across the bike-sharing system.  


In [None]:
# Define the desired bucket order
bucket_order = ["0-1", "2-4", "4-9", "10+"]
bucket_pd['distance_bucket'] = pd.Categorical(bucket_pd['distance_bucket'], categories=bucket_order, ordered=True)
bucket_pd = bucket_pd.sort_values("distance_bucket")

plt.figure(figsize=(8, 6))
bars = plt.bar(bucket_pd['distance_bucket'], bucket_pd['count'], color='cornflowerblue')
plt.xlabel('Distance Bucket (miles)')
plt.ylabel('Number of Trips (in thousands)')
plt.title('Number of Trips per Distance Bucket')

ax = plt.gca()
ax.yaxis.set_major_formatter(ticker.FuncFormatter(lambda x, pos: f'{x/1000:.1f}'))

for bar, count in zip(bars, bucket_pd['count']):
    ax.text(bar.get_x() + bar.get_width()/2, bar.get_height(), f'{count/1000:.1f}K', ha='center', va='bottom')

plt.show()


In [None]:
spark.stop()