In [2]:
from dotenv import load_dotenv
load_dotenv()

True

In [14]:
import os
import sys

sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))

import calendar

# Add the parent directory to the Python path
from pathlib import Path

import numpy as np
import pandas as pd
import requests

from src.config import RAW_DATA_DIR
import zipfile


def fetch_raw_trip_data(year: int, month: int) -> Path:
    URL = f"https://s3.amazonaws.com/tripdata/JC-{year}{month:02}-citibike-tripdata.csv.zip"     
    response = requests.get(URL)

    if response.status_code == 200:
        zip_path = RAW_DATA_DIR / f"rides_{year}_{month:02}.zip"
        open(zip_path, "wb").write(response.content)
        # Extract the zip file
        extracted_dir = RAW_DATA_DIR / "extracted_raw"
        extracted_dir.mkdir(exist_ok=True)
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extracted_dir)
        
        # Find the CSV file in the extracted directory
        csv_files = list(extracted_dir.glob("*.csv"))
        if not csv_files:
            raise Exception(f"No CSV file found in the extracted zip: {zip_path}")
        
        return csv_files[0]
    else:
        raise Exception(f"{URL} is not available")
    

def load_and_process_citi_data(years: list) -> pd.DataFrame:
    """
    Load and process CitiBike data for a specified year and list of months.

    Args:
        years (list): List of years to load data for.

    Returns:
        pd.DataFrame: Combined and processed ride data for the specified years.

    Raises:
        Exception: If no data could be loaded for the specified years.
    """

    months = list(range(1, 13))
    # List to store DataFrames for each month
    monthly_rides = []

    for year in years:
        for month in months:
            # Construct the file path
            file_path = RAW_DATA_DIR / f"JC-{year}{month:02}-citibike-tripdata.csv"

            # Load the data
            print(f"Loading data for {year}-{month:02}.")
            try:
                rides = pd.read_csv(file_path)
            # Append the processed DataFrame to the list
                monthly_rides.append(rides)
            except FileNotFoundError:
                continue

        # Combine all monthly data
        if not monthly_rides:
            raise Exception(
                f"No data could be loaded for the year {year} and specified months: {months}"
            )

        print("Combining all monthly data...")
        combined_rides = pd.concat(monthly_rides, ignore_index=True)
        print("Data loading and processing complete!")

        columns_to_drop = ['ride_id', 'end_station_name', 'rideable_type', 'ended_at', 'end_station_id','start_lat', 'start_lng', 'end_lat', 'end_lng', 'member_casual']  # Specify the columns to drop
        processed_rides = combined_rides.drop(columns=columns_to_drop)

        processed_rides.rename(columns={"started_at": "pickup_hour", "start_station_name": "station_name", "start_station_id": "station_id"}, inplace=True)        

    return combined_rides, processed_rides

def transform_data_into_ts_data(rides: pd.DataFrame) -> pd.DataFrame:
    """
    Transform raw ride data into time series format.

    Args:
        rides: DataFrame with pickup_datetime and location columns

    Returns:
        pd.DataFrame: Time series data with filled gaps

        
    """
    # Remove the fractional seconds
    rides["pickup_hour"] = rides["pickup_hour"].str.split(".").str[0]

    # Convert to datetime
    rides["pickup_hour"] = pd.to_datetime(rides["pickup_hour"], format="%Y-%m-%d %H:%M:%S")

    # Floor datetime to hour efficiently
    rides["pickup_hour"] = pd.to_datetime(rides["pickup_hour"], format="%Y-%m-%d %H:%M:%S")
    rides["pickup_hour"] = rides["pickup_hour"].dt.floor("h")

    # Aggregate and fill gaps
    agg_rides = (
        rides.groupby(["pickup_hour", "station_id"])
        .size()
        .reset_index(name="rides")
    )

    # Filter data based on the list of station_id values
    station_ids = ["HB101", "HB202", "JC103","HB404", "JC009", "HB201", "JC005", 'JC006', 'JC106', 'JC115']  # Example list of station IDs
    filtered_data = agg_rides[agg_rides["station_id"].isin(station_ids)]
    return filtered_data 

In [15]:
years = [2021, 2022, 2023, 2024]
raw_df, df = load_and_process_citi_data(years)

Loading data for 2021-01.
Loading data for 2021-02.
Loading data for 2021-03.
Loading data for 2021-04.
Loading data for 2021-05.
Loading data for 2021-06.
Loading data for 2021-07.
Loading data for 2021-08.
Loading data for 2021-09.
Loading data for 2021-10.
Loading data for 2021-11.
Loading data for 2021-12.
Combining all monthly data...
Data loading and processing complete!
Loading data for 2022-01.
Loading data for 2022-02.
Loading data for 2022-03.
Loading data for 2022-04.
Loading data for 2022-05.
Loading data for 2022-06.
Loading data for 2022-07.
Loading data for 2022-08.
Loading data for 2022-09.
Loading data for 2022-10.
Loading data for 2022-11.
Loading data for 2022-12.
Combining all monthly data...
Data loading and processing complete!
Loading data for 2023-01.
Loading data for 2023-02.
Loading data for 2023-03.
Loading data for 2023-04.
Loading data for 2023-05.
Loading data for 2023-06.
Loading data for 2023-07.
Loading data for 2023-08.
Loading data for 2023-09.
Loadin

In [16]:
df

Unnamed: 0,tripduration,starttime,stoptime,start station id,start station name,start station latitude,start station longitude,end station id,end station name,end station latitude,end station longitude,bikeid,usertype,birth year,gender,pickup_hour,station_name,station_id
0,266.0,2021-01-01 00:03:35.5100,2021-01-01 00:08:01.7770,3273.0,Manila & 1st,40.721651,-74.042884,3209.0,Brunswick St,40.724176,-74.050656,42494.0,Subscriber,1988.0,1.0,,,
1,1543.0,2021-01-01 00:23:32.9250,2021-01-01 00:49:16.0830,3681.0,Grand St,40.715178,-74.037683,3213.0,Van Vorst Park,40.718489,-74.047727,45343.0,Customer,1996.0,2.0,,,
2,1461.0,2021-01-01 00:23:50.7940,2021-01-01 00:48:12.5660,3681.0,Grand St,40.715178,-74.037683,3213.0,Van Vorst Park,40.718489,-74.047727,31794.0,Customer,1995.0,1.0,,,
3,793.0,2021-01-01 00:31:09.0770,2021-01-01 00:44:22.9430,3185.0,City Hall,40.717732,-74.043845,3199.0,Newport Pkwy,40.728745,-74.032108,42316.0,Customer,1969.0,0.0,,,
4,596.0,2021-01-01 00:35:52.1900,2021-01-01 00:45:48.7740,3639.0,Harborside,40.719252,-74.034234,3209.0,Brunswick St,40.724176,-74.050656,32575.0,Customer,1969.0,0.0,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3472723,,,,,,,,,,,,,,,,2024-12-28 09:45:30.704,Marin Light Rail,JC013
3472724,,,,,,,,,,,,,,,,2024-12-12 16:21:50.427,Marin Light Rail,JC013
3472725,,,,,,,,,,,,,,,,2024-12-11 19:23:24.109,Grove St PATH,JC115
3472726,,,,,,,,,,,,,,,,2024-12-12 20:48:40.471,Grove St PATH,JC115


In [17]:
ts_data = transform_data_into_ts_data(df)

In [18]:
ts_data

Unnamed: 0,pickup_hour,station_id,rides
10,2021-02-03 15:00:00,JC006,1
15,2021-02-03 16:00:00,JC009,1
23,2021-02-03 18:00:00,JC106,1
24,2021-02-03 19:00:00,JC006,1
30,2021-02-03 19:00:00,JC106,2
...,...,...,...
1251995,2024-12-31 21:00:00,HB201,1
1251997,2024-12-31 21:00:00,JC006,2
1252001,2024-12-31 21:00:00,JC115,2
1252004,2024-12-31 22:00:00,JC115,1


In [19]:
ts_data

Unnamed: 0,pickup_hour,station_id,rides
10,2021-02-03 15:00:00,JC006,1
15,2021-02-03 16:00:00,JC009,1
23,2021-02-03 18:00:00,JC106,1
24,2021-02-03 19:00:00,JC006,1
30,2021-02-03 19:00:00,JC106,2
...,...,...,...
1251995,2024-12-31 21:00:00,HB201,1
1251997,2024-12-31 21:00:00,JC006,2
1252001,2024-12-31 21:00:00,JC115,2
1252004,2024-12-31 22:00:00,JC115,1


In [20]:
import hopsworks

api_key = os.getenv('HOPSWORKS_API_KEY')  
project_name = os.getenv('HOPSWORKS_PROJECT_NAME')  

# pip install confluent-kafka
# Initialize connection to Hopsworks  
project = hopsworks.login(  
    api_key_value=api_key,  
    project=project_name  
)  
print(f"Successfully connected to Hopsworks project: {project_name}")

2025-05-10 11:51:55,302 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-05-10 11:51:55,311 INFO: Initializing external client
2025-05-10 11:51:55,311 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-10 11:51:56,299 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1230996
Successfully connected to Hopsworks project: cityB


In [21]:
feature_store = project.get_feature_store()

In [22]:
FEATURE_GROUP_NAME = os.getenv('FEATURE_GROUP_NAME')
FEATURE_GROUP_VERSION = os.getenv('FEATURE_GROUP_VERSION')

In [23]:
feature_group = feature_store.get_or_create_feature_group(
    name=FEATURE_GROUP_NAME,
    version=FEATURE_GROUP_VERSION,
    description="Time series data aggregated by 6 hour",
    primary_key=["station_id","pickup_hour"],
    event_time="pickup_hour"
)

In [24]:
feature_group.insert(
    ts_data,
    write_options={"wait_for_job": True}
)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1230996/fs/1213535/fg/1458527


Uploading Dataframe: 100.00% |██████████| Rows 155629/155629 | Elapsed Time: 00:13 | Remaining Time: 00:00


Launching job: time_series_6_hour_feature_group_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1230996/jobs/named/time_series_6_hour_feature_group_1_offline_fg_materialization/executions
2025-05-10 11:52:43,710 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-05-10 11:52:46,795 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-05-10 11:54:41,474 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-05-10 11:54:41,559 INFO: Waiting for log aggregation to finish.
2025-05-10 11:54:49,909 INFO: Execution finished successfully.


(Job('time_series_6_hour_feature_group_1_offline_fg_materialization', 'SPARK'),
 None)