# Assignment
You were asked to analyse the last month of bike-sharing data in NYC (data source: https://citibikenyc.com/system-data ), and your manager thinks we could add insurance coverage for **rides that last more than 30 minutes**.

1. How many trips would be covered?

2. If your manager thinks we could charge 0.2 USD for each ride that takes longer than 30 minutes, how much revenue could we expect?

3. Your manager wants to understand the travel distance in distance buckets (0-1,2-4,4-9,10+). Please make a diagram.
 
Please present your analysis in a Jupyter notebook.

In [None]:
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType, LongType

In [None]:
from helpers import haversine, get_distance_bucket
from ingest import download_zip_file, extract_zip_archive

# Ingestion
Ingest the February 2024 data from https://citibikenyc.com/system-data and extract the zip archive. The resulting data is stored in the `/raw` folder.

In [None]:
file_name = "202402-citibike-tripdata.csv"
download_base_url = "https://s3.amazonaws.com/tripdata"

zip_archive_path = download_zip_file(file_name, download_base_url)

In [None]:
extract_zip_archive(zip_archive_path)

In [None]:
# create locally hosted Spark session
spark: SparkSession = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("Citibike Assignment") \
        .getOrCreate()

In [None]:
# read all csv files from "/raw" folder
df: DataFrame = spark.read.csv("raw", header=True, pathGlobFilter="*.csv")

## Explore the data
Check the schema, sample values and display descriptive stats

In [None]:
df.printSchema()

In [None]:
df.sample(fraction=0.1).show()

In [None]:
df.describe().show()

## Prepare data for analysis

In [None]:
# cast columns into correct data types
staged_df = (
    df.withColumn("start_station_id", df.start_station_id.cast(FloatType()))
    .withColumn("end_station_id", df.end_station_id.cast(FloatType()))
    .withColumn("started_at", F.to_timestamp(df.started_at, "yyyy-MM-dd HH:mm:ss"))
    .withColumn("ended_at", F.to_timestamp(df.ended_at, "yyyy-MM-dd HH:mm:ss"))
    .withColumn("start_lat", df.start_lat.cast(FloatType()))
    .withColumn("start_lng", df.start_lng.cast(FloatType()))
    .withColumn("end_lat", df.end_lat.cast(FloatType()))
    .withColumn("end_lng", df.end_lng.cast(FloatType()))
)

In [None]:
staged_df.show(10)

In [None]:
staged_df.printSchema()

# Solve assignment

In [None]:
df_with_trip_duration = staged_df.withColumn(
    "trip_duration_in_minutes",
    (staged_df.ended_at.cast(LongType()) - staged_df.started_at.cast(LongType()))
    / 60,  # convert to timestamp and calculate difference, convert to minutes
)

In [None]:
# filter for trips that are longer than 30 minutes
filtered_df = df_with_trip_duration.filter(df_with_trip_duration.trip_duration_in_minutes >= 30)

In [None]:
filtered_df.sample(0.1).show(10)

1. How many trips would be covered?

In [None]:
count_covered_trips = filtered_df.count()
print(f"There are {count_covered_trips:,d} covered trips with a duration greater than or equal to 30 minutes.")

2. If your manager thinks we could charge 0.2 USD for each ride that takes longer than 30 minutes, how much revenue could we expect?

In [None]:
expected_revenue = count_covered_trips * 0.2
print(f"We can expect a revenue of USD {expected_revenue:,.2f}.")

3. Your manager wants to understand the travel distance in distance buckets (0-1,2-4,4-9,10+). Please make a diagram.

In [None]:
# calculate haversine distance between coordinates to get travel distance
haversine_udf = F.udf(haversine)
df_with_distance = staged_df.withColumn(
    "distance_travelled",
    haversine_udf(
        F.col("start_lng"), F.col("start_lat"), F.col("end_lng"), F.col("end_lat")
    ),
)

In [None]:
# create histogram manually with the given buckets from the assignment
distance_bucket_udf = F.udf(get_distance_bucket)
df_with_distance_buckets = df_with_distance.withColumn(
    "distance_bucket",
    distance_bucket_udf(F.col("distance_travelled")),
)

In [None]:
# calculate counts by bucket
distance_hist = df_with_distance_buckets.groupBy(
    "distance_bucket"
).count()

In [None]:
# collect data to pandas dataframe and plot it
df_pd = distance_hist.pandas_api(index_col="distance_bucket")
df_pd_sorted = df_pd.reindex(["[0, 1)", "[1, 4)", "[4, 10)", "[10, inf)", "n/a"])
ax = df_pd_sorted.plot.bar(title="Count of rides by distance travelled in km")
for i, val in enumerate(df_pd_sorted["count"].to_numpy()):
    ax.add_annotation(x=i, y=val*1.005, text=f"{val:,d}")
ax.show()