In [None]:
import os
import time

import boto3
import pyspark.pandas as ps
from botocore import UNSIGNED
from botocore.config import Config
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col,
    dayofweek,
    hour,
    month,
    to_date,
)

### Setup 

Download the datasets from the public S3 bucket.

In [2]:
central_park_weather_path_s3 = "nyc-taxi/central_park_weather.csv"
bucket_name = "bodo-example-data"
hvfhv_5M_path_s3 = "nyc-taxi/fhvhv_5M_rows.pq"

In [3]:
def download_data_s3(path_to_s3: str, local_data_dir: str = "data") -> str:
    """Download the dataset from S3 if already exists, skip download."""
    file_name = path_to_s3.split("/", -1)[1]
    local_path = os.path.join(local_data_dir, file_name)

    if os.path.exists(local_path):
        return local_path

    print("Downloading dataset from S3...")

    s3 = boto3.client("s3", config=Config(signature_version=UNSIGNED))

    if not os.path.exists(local_data_dir):
        os.mkdir(local_data_dir)

    s3.download_file(bucket_name, path_to_s3, local_path)
    return local_path

In [None]:
weather_path = download_data_s3(central_park_weather_path_s3)
hvfhv_5M_path = download_data_s3(hvfhv_5M_path_s3)

In [5]:
def get_monthly_travels_weather(weather_dataset, hvfhv_dataset):
    spark = (
        SparkSession.builder.appName("MonthlyTravelsWeather")
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.2")
        .config("spark.sql.execution.arrow.pyspark.enabled", "true")
        .config(
            "fs.s3a.aws.credentials.provider",
            "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider",
        )
        .getOrCreate()
    )

    start = time.time()

    # Read in weather data using pandas-on-Spark
    central_park_weather_observations = ps.read_csv(
        weather_dataset,
    ).rename(columns={"DATE": "date", "PRCP": "precipitation"})

    central_park_weather_observations["date"] = ps.to_datetime(
        central_park_weather_observations["date"]
    )

    # Read in trip data using spark, this reads a re-written dataset because spark doesn't support reading the original dataset
    # due to schema unification issues
    fhvhv_tripdata = spark.read.parquet(hvfhv_dataset).drop("__index_level_0__")

    # Convert datetime columns and create necessary features
    fhvhv_tripdata = (
        (
            fhvhv_tripdata.withColumn("date", to_date(col("pickup_datetime")))
            .withColumn("month", month(col("pickup_datetime")))
            .withColumn("hour", hour(col("pickup_datetime")))
            .withColumn(
                "weekday", dayofweek(col("pickup_datetime")).isin([2, 3, 4, 5, 6])
            )
            # pandas-on-Spark doesn't like these datetime columns which is why we use Spark APIs for the read and this conversion
        )
        .drop("pickup_datetime")
        .drop("dropoff_datetime")
        .drop("on_scene_datetime")
        .drop("request_datetime")
    )
    # Convert trip data to pandas-on-Spark
    fhvhv_tripdata = ps.DataFrame(fhvhv_tripdata)

    # Join trip data with weather observations on 'date'
    monthly_trips_weather = fhvhv_tripdata.merge(
        central_park_weather_observations, on="date", how="inner"
    )

    ## Create a new column for precipitation indicator
    monthly_trips_weather["date_with_precipitation"] = (
        monthly_trips_weather["precipitation"] > 0.1
    )

    ## Define time bucket based on hour of the day
    def get_time_bucket(t):
        bucket = "other"
        if t in (8, 9, 10):
            bucket = "morning"
        elif t in (11, 12, 13, 14, 15):
            bucket = "midday"
        elif t in (16, 17, 18):
            bucket = "afternoon"
        elif t in (19, 20, 21):
            bucket = "evening"
        return bucket

    monthly_trips_weather["time_bucket"] = monthly_trips_weather.hour.map(
        get_time_bucket
    )
    monthly_trips_weather.groupby(
        [
            "PULocationID",
            "DOLocationID",
            "month",
            "weekday",
            "date_with_precipitation",
            "time_bucket",
        ],
        as_index=False,
    ).agg({"hvfhs_license_num": "count", "trip_miles": "mean"})

    sorted_data = monthly_trips_weather.sort_values(
        by=[
            "PULocationID",
            "DOLocationID",
            "month",
            "weekday",
            "date_with_precipitation",
            "time_bucket",
        ]
    )

    ## Write the results to a parquet file
    sorted_data.to_parquet("spark_monthly_trips_weather.pq", mode="overwrite")
    print("Execution time:", time.time() - start)

In [None]:
get_monthly_travels_weather(weather_path, hvfhv_5M_path)

### Running on a Larger Dataset



In [None]:
hvfhv_20M_path_s3 = "nyc-taxi/fhvhv_tripdata/fhvhv_tripdata_2019-02.parquet"
hvfhv_20M_path = download_data_s3(hvfhv_20M_path_s3)

In [None]:
get_monthly_travels_weather(weather_path, hvfhv_20M_path)