In [15]:
#import previous data
import requests
import os
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

def download_nyc_taxi_data(data_type, start_date, dest_folder,time_back):
    """
    Downloads NYC TLC trip data for a specific data type from the start date to the past 3 years.

    Args:
        data_type: "yellow" or "green" for trip data type.
        start_date: The start date in the format "yyyy-mm" (e.g., "2024-01").
    """
    # Base URL for data download
    base_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/"

    # Validate data type
    if data_type not in ("yellow", "green"):
        raise ValueError(f"Invalid data type: {data_type}. Choose 'yellow' or 'green'.")

    # Parse the start date
    start_date = datetime.strptime(start_date, "%Y-%m")

    # Calculate the end date (3 years before the start date)
    end_date = start_date - relativedelta(months=time_back) #to go back years instead of months change months to years

    # Generate the list of year-month strings for each month in the past 3 years
    date_list = []
    current_date = start_date
    while current_date > end_date:
        date_list.append(current_date.strftime("%Y-%m"))
        current_date -= relativedelta(months=1)

    for year_month in date_list:
        # Construct filename
        filename = f"{data_type}_tripdata_{year_month}.parquet"
        filepath = os.path.join(dest_folder, filename)

        # Full download URL
        download_url = os.path.join(base_url, filename)

        # Download data
        response = requests.get(download_url)

        if response.status_code == 200:
            # Save data directly to the current directory
            with open(filepath, "wb") as f:
                f.write(response.content)
            print(f"Downloaded {data_type} trip data for {year_month} to {filepath}")
        else:
            print(f"Error downloading data for {year_month}: {response.status_code}")

# Example usage
#data_type = "yellow"
#start_date = "2024-01"




In [None]:
#import latest data
import requests
import os
from datetime import datetime, timedelta

def download_nyc_taxi_data(data_type):
    """
    Downloads NYC TLC trip data for a specific data type for four months prior to the current date.

    Args:
        data_type: "yellow" or "green" for trip data type.
    """
    # Base URL for data download
    base_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/"

    # Validate data type
    if data_type not in ("yellow", "green"):
        raise ValueError(f"Invalid data type: {data_type}. Choose 'yellow' or 'green'.")

    # Get the current date
    current_date = datetime.now()

    # Calculate the year and month for four months ago
    four_months_ago = current_date - timedelta(days=30*4)
    year_month = four_months_ago.strftime("%Y-%m")

    # Construct filename
    filename = f"{data_type}_tripdata_{year_month}.parquet"

    # Full download URL
    download_url = os.path.join(base_url, filename)

    # Download data
    response = requests.get(download_url)

    if response.status_code == 200:
        # Save data directly to the current directory
        with open(filename, "wb") as f:
            f.write(response.content)
        print(f"Downloaded {data_type} trip data for {year_month} to {filename}")
    else:
        print(f"Error downloading data: {response.status_code}")

# Example usage
data_type = "yellow"
download_nyc_taxi_data(data_type)



In [None]:
import pyarrow as pa

def get_output_schema():
    schema = pa.schema([
        ('VendorID', pa.int64()),
        ('tpep_pickup_datetime', pa.timestamp('s')),
        ('tpep_dropoff_datetime', pa.timestamp('s')),
        ('passenger_count', pa.int64()),
        ('trip_distance', pa.float64()),
        ('RatecodeID', pa.int64()),
        ('store_and_fwd_flag', pa.string()),
        ('PULocationID', pa.int64()),
        ('DOLocationID', pa.int64()),
        ('payment_type', pa.int64()),
        ('fare_amount', pa.float64()),
        ('extra', pa.float64()),
        ('mta_tax', pa.float64()),
        ('tip_amount', pa.float64()),
        ('tolls_amount', pa.float64()),
        ('improvement_surcharge', pa.float64()),
        ('total_amount', pa.float64()),
        ('congestion_surcharge', pa.float64()),
        ('tpep_pickup_hour', pa.int64()),
        ('tpep_dropoff_hour', pa.int64()),
        ('tpep_pickup_dayofweek', pa.int64()),
        ('tpep_dropoff_dayofweek', pa.int64())
    ])
    return schema

In [2]:
import pandas as pd
def calculate_avg_distance_per_hour(df):
    # Converting to datetime type
    df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])

    # Extract hour component
    df['pickup_hour'] = df['tpep_pickup_datetime'].dt.hour
    df['dropoff_hour'] = df['tpep_dropoff_datetime'].dt.hour

    # Avg distance by pickup hour
    avg_dist_pickup_hour = df.groupby('pickup_hour')['trip_distance'].mean()

    # Avg distance by dropoff hour
    avg_dist_dropoff_hour = df.groupby('dropoff_hour')['trip_distance'].mean()

    # Combine results
    avg_dist_per_hour = pd.concat([avg_dist_pickup_hour, avg_dist_dropoff_hour], axis=0)
    avg_dist_per_hour = avg_dist_per_hour.groupby(level=0).mean()

    return avg_dist_per_hour

In [7]:
def top_busiest_hours(df, top_n=3):
    
    df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df['pickup_hour'] = df['tpep_pickup_datetime'].dt.hour
    
    #count number of trips for each hour
    trips_per_hour = df['pickup_hour'].value_counts().sort_index()
    
    #find the busiest hours
    top_busiest_hours = trips_per_hour.nlargest(top_n)
    
    return top_busiest_hours

In [22]:
import pandas as pd

def hour_week_level(df):
    #Convert datetime columns to datetime type
    df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])

    #Extract hour features
    df['tpep_pickup_hour'] = df['tpep_pickup_datetime'].dt.hour
    df['tpep_dropoff_hour'] = df['tpep_dropoff_datetime'].dt.hour

    #Extract day of week (1=Monday, 2=Tuesday, ..., 7=Sunday)
    df['tpep_pickup_dayofweek'] = df['tpep_pickup_datetime'].dt.dayofweek + 1
    df['tpep_dropoff_dayofweek'] = df['tpep_dropoff_datetime'].dt.dayofweek + 1
    
    return df


In [None]:
import fastavro
from fastavro.schema import load_schema
from fastavro import writer as avro_writer
def save_as_avro(df, file_path, schema_file):
    records = df.to_dict(orient='records')
    schema = load_schema(schema_file)
    with open(file_path, 'wb') as out:
        avro_writer(out, schema, records)

In [21]:
def main():
    data_type = "yellow"
    start_date = "2024-01"
    dest_folder = "./data"
    time_back = 5
    
    # Check if data is already downloaded
    all_files_exist = True
    current_date = datetime.strptime(start_date, "%Y-%m")
    end_date = current_date - relativedelta(months=time_back) #to go back years instead of months change months to years

    while current_date > end_date:
        filename = f"{data_type}_tripdata_{current_date.strftime('%Y-%m')}.parquet"
        file_path = os.path.join(dest_folder, filename)
        if not os.path.exists(file_path):
            latest_file_date = current_date + relativedelta(months=1) 
            new_date_to_start = latest_file_date - relativedelta(months=1)
            new_date_to_start = new_date_to_start.strftime("%Y-%m")
            start_date = new_date_to_start
            all_files_exist = False
            break
        current_date -= relativedelta(months=1)
    
    
    if not all_files_exist:
        #download the data
        download_nyc_taxi_data(data_type, start_date, dest_folder,time_back)
        
    all_dfs = []
    
    #process each downloaded file and append to list
    for file_name in os.listdir(dest_folder):
        if file_name.endswith('.parquet'):
            file_path = os.path.join(dest_folder, file_name)
            print(f"processing {file_path}")
            
            #load the file into dfs
            df=pd.read_parquet(file_path)
            all_dfs.append(df)
    
    #concat 
    combined_dfs = pd.concat(all_dfs, ignore_index=True)
    
    #calc avg distance per hour
    '''avg_distance_per_hour = calculate_avg_distance_per_hour(combined_dfs)
    print("Average distance per hour (combined data):\n",avg_distance_per_hour.head(10))'''
    
    #find top busiest hours
    '''busiest_hours = top_busiest_hours(combined_dfs)
    print("Top busiest hours (combined data):\n",busiest_hours)'''
    
    with_features = hour_week_level(combined_dfs)
    print("First 10 rows")
    print(with_features.head(10))
    
if __name__ == "__main__":
    main()

processing ./data\yellow_tripdata_2023-06.parquet
processing ./data\yellow_tripdata_2023-07.parquet
processing ./data\yellow_tripdata_2023-08.parquet
processing ./data\yellow_tripdata_2023-09.parquet
processing ./data\yellow_tripdata_2023-10.parquet
processing ./data\yellow_tripdata_2023-11.parquet
processing ./data\yellow_tripdata_2023-12.parquet
processing ./data\yellow_tripdata_2024-01.parquet


TypeError: list indices must be integers or slices, not str