In [1]:
import sys
import os

# Add the parent directory to the Python path
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
import src.config as config

In [2]:
from datetime import datetime, timedelta

import pandas as pd

current_date = pd.to_datetime(datetime.utcnow())
print(f"{current_date}")

2025-05-09 02:03:35.386049


In [3]:
current_date.to_datetime64()

numpy.datetime64('2025-05-09T02:03:35.386049000')

In [4]:
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=29)

In [5]:
fetch_data_to

Timestamp('2025-05-09 02:03:35.386049')

In [6]:
fetch_data_from

Timestamp('2025-04-10 02:03:35.386049')

In [13]:
from datetime import datetime, timedelta
from typing import Union
import pandas as pd
from src.data_utils import load_and_process_bike_data

def fetch_batch_raw_data(
    from_date: Union[datetime, str], to_date: Union[datetime, str]
) -> pd.DataFrame:
    """
    Simulate production data by sampling historical data from 52 weeks ago (i.e., 1 year).

    Args:
        from_date (datetime or str): The start date for the data batch.
        to_date (datetime or str): The end date for the data batch.

    Returns:
        pd.DataFrame: A DataFrame containing the simulated production data.
    """

    # Convert string inputs to datetime if necessary
    if isinstance(from_date, str):
        from_date = datetime.fromisoformat(from_date)
    if isinstance(to_date, str):
        to_date = datetime.fromisoformat(to_date)

    # Validate input dates
    if not isinstance(from_date, datetime) or not isinstance(to_date, datetime):
        raise ValueError(
            "Both 'from_date' and 'to_date' must be datetime objects or valid ISO format strings."
        )
    if from_date >= to_date:
        raise ValueError("'from_date' must be earlier than 'to_date'.")

    # Shift dates back by 52 weeks (1 year)
    historical_from_date = from_date - timedelta(weeks=52)
    historical_to_date = to_date - timedelta(weeks=52)

    # Load and filter data for the historical period
    rides_from = load_and_process_bike_data(
        year=historical_from_date.year, months=[historical_from_date.month]
    )
    rides_from = rides_from[
        rides_from.start_datetime >= historical_from_date.to_datetime64()
    ]

    if historical_to_date.month != historical_from_date.month:
        rides_to = load_and_process_bike_data(
            year=historical_to_date.year, months=[historical_to_date.month]
        )
        rides_to = rides_to[
            rides_to.start_datetime < historical_to_date.to_datetime64()
        ]
        # Combine the filtered data
        rides = pd.concat([rides_from, rides_to], ignore_index=True)
    else:
        rides = rides_from
    # Shift the data forward by 52 weeks to simulate recent data
    rides["start_datetime"] += timedelta(weeks=52)
    # Sort the data for consistency
    rides.sort_values(by=["start_station_id", "start_datetime"], inplace=True)

    return rides

In [11]:
rides = fetch_batch_raw_data(fetch_data_from, fetch_data_to)

File already exists for 2024-04.
Loading data for 2024-04...


  rides = pd.read_csv(file_path)


Total records: 3,214,453
Valid records: 3,211,063
Records dropped: 3,390 (0.11%)
Successfully processed data for 2024-04.
Combining all monthly data...
Data loading and processing complete!
File already exists for 2024-05.
Loading data for 2024-05...


  rides = pd.read_csv(file_path)


Total records: 4,228,540
Valid records: 4,221,736
Records dropped: 6,804 (0.16%)
Successfully processed data for 2024-05.
Combining all monthly data...
Data loading and processing complete!


In [12]:
rides

Unnamed: 0,start_datetime,start_station_id
742384,2025-04-10 07:20:44.123,2733.03
542920,2025-04-10 08:12:28.871,2733.03
1001433,2025-04-10 11:05:15.790,2733.03
1789763,2025-04-10 14:51:08.590,2733.03
991605,2025-04-10 17:46:02.053,2733.03
...,...,...
3229278,2025-05-08 13:26:54.000,8897.05
3231290,2025-05-08 17:43:13.000,8897.05
2909109,2025-05-08 19:10:30.000,8897.05
2805203,2025-05-08 19:11:35.000,8897.05


In [14]:
from src.data_utils import transform_raw_data_into_ts_data
ts_data = transform_raw_data_into_ts_data(rides)

In [16]:
locations = [5308.04, 5872.1, 6230.02]
ts_data = ts_data[ts_data["start_station_id"].isin(locations)]

In [17]:
ts_data

Unnamed: 0,start_hour,start_station_id,rides
515780,2025-04-10 02:00:00,5308.04,1
515781,2025-04-10 03:00:00,5308.04,1
515782,2025-04-10 04:00:00,5308.04,0
515783,2025-04-10 05:00:00,5308.04,0
515784,2025-04-10 06:00:00,5308.04,2
...,...,...,...
768089,2025-05-08 22:00:00,6230.02,8
768090,2025-05-08 23:00:00,6230.02,11
768091,2025-05-09 00:00:00,6230.02,7
768092,2025-05-09 01:00:00,6230.02,0


In [18]:
import hopsworks

# connect to the project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

# connect to the feature store
feature_store = project.get_feature_store()

# connect to the feature group
feature_group = feature_store.get_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
)



2025-05-08 22:16:04,001 INFO: Initializing external client
2025-05-08 22:16:04,004 INFO: Base URL: https://c.app.hopsworks.ai:443




To ensure compatibility please install the latest bug fix release matching the minor version of your backend (4.2) by running 'pip install hopsworks==4.2.*'


2025-05-08 22:16:07,135 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1213682


In [19]:
feature_group.insert(ts_data, write_options={"wait_for_job": False})

Uploading Dataframe: 100.00% |███████████████████████████| Rows 2091/2091 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: time_series_hourly_feature_group_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1213682/jobs/named/time_series_hourly_feature_group_1_offline_fg_materialization/executions


(Job('time_series_hourly_feature_group_1_offline_fg_materialization', 'SPARK'),
 None)