In [1]:
import os
import sys
ROOT_DIR = os.path.abspath(os.path.join(os.getcwd(), ".."))
sys.path.append(ROOT_DIR)
import boto3
import logging
import requests
import time
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
from dotenv import load_dotenv


# Experimental Code for Taxi Data API

### S3 Bucket Attempt

In [2]:
load_dotenv()

aws_access_key = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_key = os.getenv("AWS_SECRET_ACESS_KEY")
aws_region = os.getenv("AWS_REGION")

session = boto3.Session()
credentials = session.get_credentials()

In [3]:
# S3_BUCKET = os.getenv('AWS_S3_BUCKET')
s3_client = session.client('s3')
bucket_name = "nyc-tlc"
file_key = "trip data/yellow_tripdata_2023-03.parquet"
local_file = "data/raw/yellow_tripdata_2023-03.parquet"

In [4]:
try:
    s3_client.download_file(bucket_name, file_key, local_file)
    print(f"Downloaded {file_key} from {bucket_name} to {local_file}")
except Exception as e:
    print(f"Failed to download file: {e}")

Failed to download file: An error occurred (403) when calling the HeadObject operation: Forbidden


Ideally, the 2023 yellow taxi data would be available in an AWS S3 bucket, then we could use `pyspark.sql.SparkSession` to read the file from the hosting S3 bucket to another storage efficiently. However, even with AWS credentials, we get either `forbidden` or `Access Denied` errors when attempting to download the data or list objects in the bucket in Python and CLI. This has been a documented issue which is discussed on [awslabs open-data-registry GitHub issues page](https://github.com/awslabs/open-data-registry/issues/1418). Below I attempt another method.

### Attempting API Requests Directly to NYC Open Data

Here I attempted to construct the dataset using the [NYC OpenData API for the Yellow Taxi 2023 full dataset](https://data.cityofnewyork.us/Transportation/2023-Yellow-Taxi-Trip-Data/4b4i-vvec/about_data). This ended up being way too slow given the size of the data (~38.3 million rows), API request batch size limits (50,000), and throttling.

In [None]:
def get_existing_taxi_data_info(output_file):
    """Check if an existing Parquet file is present and return metadata."""
    if os.path.exists(output_file):
        existing_table = pq.read_table(output_file)
        offset = existing_table.num_rows
        schema_columns = existing_table.column_names
        del existing_table
        print(f"🔄 Resuming from offset {offset} (Existing rows: {offset})")
    else:
        offset = 0
        schema_columns = None
        print("🚀 Starting new data fetch...")
    return offset, schema_columns


def fetch_taxi_data(api_url, offset, batch_size = 50000, order = "tpep_pickup_datetime ASC", max_retries = 5, retry_delay = 10):
    """Fetch taxi data from the API with exponential backoff."""
    params = {
        "$limit": batch_size,
        "$offset": offset,
        "$order": order or "tpep_pickup_datetime ASC",
    }

    # Attempt the request multiple times with exponential backoff
    attempt = 0
    while attempt < max_retries:
        try:
            response = requests.get(api_url, params=params, timeout=10)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"⚠️ Request failed (attempt {attempt + 1}/{max_retries}): {e}")
            attempt += 1
            time.sleep(retry_delay * (2 ** attempt)) # Exponential backoff

    print(f"❌ Max retries exceeded. Skipping offset {offset}.")
    return None


def process_and_save_data(output_file, batch_data, first_write):
    """Process and append batch data to the Parquet file."""
    if not batch_data:
        print("🏁 Reached end of dataset. Done fetching!")
        return False
    
    df = pd.DataFrame(batch_data)  # Convert batch to DataFrame
    table = pa.Table.from_pandas(df)  # Convert DataFrame to Apache Arrow Table

    # Initialize ParquetWriter on first batch, else append to existing file
    if first_write:
        parquet_writer = pq.ParquetWriter(output_file, table.schema, compression="snappy")
        parquet_writer.write_table(table)
        parquet_writer.close()
        print("📝 First batch written!")
    else:
        existing_table = pq.read_table(output_file)
        table = table.select(existing_table.column_names)  # Reorder new table to match existing schema
        combined_table = pa.concat_tables([existing_table, table])
        pq.write_table(combined_table, output_file, compression="snappy")

    return True


def main(api_url, output_file, batch_size=50000, max_retries=5, retry_delay=10):
    """Main function to fetch and save NYC Yellow Taxi data."""
    offset, _ = get_existing_taxi_data_info(output_file)
    total_rows_fetched = offset
    first_write = not os.path.exists(output_file)

    while True:
        # Fetch a batch of taxi data
        batch_data = fetch_taxi_data(api_url, offset, batch_size, max_retries=max_retries, retry_delay=retry_delay)

        if not batch_data:
            break  # Stop if no more data is available

        # Process and save the batch data 
        if not process_and_save_data(output_file, batch_data, first_write):
            break  # stop if data processing fails

        # Increment counters
        total_rows_fetched += len(batch_data)
        offset += batch_size
        first_write = False

        print(f"✅ Fetched {total_rows_fetched} rows so far...")

        # Short delay to avoid API rate limits
        time.sleep(2)

    print(f"📁 Dataset successfully saved to {output_file} with {total_rows_fetched} rows.")

    

In [None]:
# Directory to save the full dataset
from src.config import RAW_DATA_DIR

params = {
    "api_url": "https://data.cityofnewyork.us/resource/4b4i-vvec.json",
    "output_file": os.path.join(RAW_DATA_DIR, "yellow_taxi_2023_full.parquet"),
    "batch_size": 50000,  # Max records per request
    "max_retries": 5,  # Max retries per request
    "retry_delay": 10,  # Base delay between request retries (exponential backoff)
}

In [None]:
main(**params)

🔄 Resuming from offset 3250000 (Existing rows: 3250000)
⚠️ Request failed (attempt 1/5): HTTPSConnectionPool(host='data.cityofnewyork.us', port=443): Max retries exceeded with url: /resource/4b4i-vvec.json?%24limit=50000&%24offset=3250000&%24order=tpep_pickup_datetime+ASC (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x000001244D8DFD40>: Failed to resolve 'data.cityofnewyork.us' ([Errno 11002] getaddrinfo failed)"))
⚠️ Request failed (attempt 2/5): HTTPSConnectionPool(host='data.cityofnewyork.us', port=443): Max retries exceeded with url: /resource/4b4i-vvec.json?%24limit=50000&%24offset=3250000&%24order=tpep_pickup_datetime+ASC (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x000001244D9FEED0>: Failed to resolve 'data.cityofnewyork.us' ([Errno 11001] getaddrinfo failed)"))
📝 Appended 50000 new rows.
✅ Fetched 3300000 rows so far...
📝 Appended 50000 new rows.
✅ Fetched 3350000 rows so far...
⚠️ Request failed (attempt 1/5):

In [None]:
import pyarrow.parquet as pq
import pandas as pd

# Read the Parquet file
table = pq.read_table(params["output_file"])

# Get the number of rows and columns
num_rows, num_columns = table.num_rows, len(table.schema)

# Convert to Pandas DataFrame for sampling
df_sample = table.to_pandas().sample(n=10, random_state=42)  # Get 10 random rows

# Print dataset shape
print(f"📊 Dataset Shape: {num_rows} rows, {num_columns} columns")

# Display sample rows
print(df_sample)



📊 Dataset Shape: 50000 rows, 19 columns
      vendorid     tpep_pickup_datetime    tpep_dropoff_datetime  \
33553        2  2023-01-08T15:45:11.000  2023-01-08T15:50:41.000   
9427         2  2023-01-08T11:13:26.000  2023-01-08T11:18:24.000   
199          2  2023-01-08T07:54:52.000  2023-01-08T08:04:38.000   
12447        2  2023-01-08T11:53:13.000  2023-01-08T12:00:43.000   
39489        1  2023-01-08T16:46:34.000  2023-01-08T16:53:23.000   
42724        1  2023-01-08T17:21:13.000  2023-01-08T17:37:58.000   
10822        2  2023-01-08T11:31:58.000  2023-01-08T11:46:48.000   
49498        2  2023-01-08T18:40:50.000  2023-01-08T18:49:37.000   
4144         1  2023-01-08T09:51:50.000  2023-01-08T09:57:02.000   
36958        1  2023-01-08T16:20:08.000  2023-01-08T16:41:31.000   

      trip_distance pulocationid dolocationid payment_type fare_amount extra  \
33553          1.14          161          170            1         7.9   0.0   
9427           1.59          141          263      