In [58]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [59]:
# Show all output for a cell
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [60]:
import sys
import os

# Add the parent directory to the Python path
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
import src.config as config

In [61]:
from datetime import datetime, timedelta, timezone
from typing import Union
import pandas as pd
from src.data_utils import load_and_process_citibike_data

def fetch_batch_raw_data(from_date: Union[datetime, str], to_date: Union[datetime, str]) -> pd.DataFrame:
    """
    Simulates production batch data for Citi Bike by sampling data from 52 weeks earlier.

    Args:
        from_date (datetime or str): Start of the batch window.
        to_date (datetime or str): End of the batch window.

    Returns:
        pd.DataFrame: Shifted Citi Bike ride data for the given time window.
    """
    # Parse string inputs if needed
    if isinstance(from_date, str):
        from_date = datetime.fromisoformat(from_date)
    if isinstance(to_date, str):
        to_date = datetime.fromisoformat(to_date)

    if from_date >= to_date:
        raise ValueError("'from_date' must be earlier than 'to_date'.")

    # Shift 52 weeks back and remove timezone info
    historical_from_date = (from_date - timedelta(weeks=52)).replace(tzinfo=None)
    historical_to_date = (to_date - timedelta(weeks=52)).replace(tzinfo=None)

    # Load historical month(s) and remove timezone
    rides_from = load_and_process_citibike_data(year=historical_from_date.year, months=[historical_from_date.month])
    rides_from['started_at'] = pd.to_datetime(rides_from['started_at']).dt.tz_localize(None)
    rides_from = rides_from[rides_from['started_at'] >= historical_from_date]

    if historical_to_date.month != historical_from_date.month:
        rides_to = load_and_process_citibike_data(year=historical_to_date.year, months=[historical_to_date.month])
        rides_to['started_at'] = pd.to_datetime(rides_to['started_at']).dt.tz_localize(None)
        rides_to = rides_to[rides_to['started_at'] < historical_to_date]
        rides = pd.concat([rides_from, rides_to], ignore_index=True)
    else:
        rides = rides_from

    # Shift timestamps forward again by 52 weeks to simulate "now"
    rides['started_at'] += timedelta(weeks=52)

    # Sort by station and timestamp for consistency
    rides.sort_values(by=['start_station_id', 'started_at'], inplace=True)

    return rides

In [62]:
current_date = pd.to_datetime(datetime.utcnow())
print(f"{current_date}")
type(current_date)
current_date = pd.to_datetime(datetime.now(timezone.utc)).floor("h")
current_date.to_datetime64()
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=29)
fetch_data_to
fetch_data_from

2025-05-11 14:30:55.764268


pandas._libs.tslibs.timestamps.Timestamp

numpy.datetime64('2025-05-11T14:00:00.000000000')

Timestamp('2025-05-11 14:00:00+0000', tz='UTC')

Timestamp('2025-04-12 14:00:00+0000', tz='UTC')

In [63]:
citi_bike_rides = fetch_batch_raw_data(fetch_data_from, fetch_data_to)

📁 File already exists: citi_bike_rides_raw_2024_04.parquet
📖 Loading data from: citi_bike_rides_raw_2024_04.parquet


🔍 Filtering data for 2024-04...
✅ Saved filtered data to: /Users/yashmathur/Documents/MS_DS/Python_Spring_25/CDA_500/Test_Final/data/processed/citi_bike_rides_processed_2024_04.parquet
✅ Successfully combined all filtered months.
📁 File already exists: citi_bike_rides_raw_2024_05.parquet
📖 Loading data from: citi_bike_rides_raw_2024_05.parquet
🔍 Filtering data for 2024-05...
✅ Saved filtered data to: /Users/yashmathur/Documents/MS_DS/Python_Spring_25/CDA_500/Test_Final/data/processed/citi_bike_rides_processed_2024_05.parquet
✅ Successfully combined all filtered months.


In [64]:
citi_bike_rides

Unnamed: 0,started_at,start_station_id
1408117,2025-04-12 15:45:28.770,2733.03
709133,2025-04-12 17:27:16.857,2733.03
623163,2025-04-13 12:38:50.285,2733.03
1688686,2025-04-13 17:22:18.671,2733.03
1497914,2025-04-13 18:07:27.791,2733.03
...,...,...
2493403,2025-05-03 09:00:32.000,SYS038
3060911,2025-05-03 19:02:21.000,SYS038
2320329,2025-05-06 21:04:19.000,SYS038
2320328,2025-05-07 17:03:17.000,SYS038


In [65]:
from src.data_utils import transform_raw_data_into_ts_data
citi_bike_ts_data = transform_raw_data_into_ts_data(citi_bike_rides)

In [66]:
citi_bike_ts_data

Unnamed: 0,hour,start_station_id,ride_count
0,2025-04-12 14:00:00,5329.03,13
1,2025-04-12 14:00:00,5905.14,31
2,2025-04-12 14:00:00,6140.05,33
3,2025-04-12 15:00:00,5329.03,19
4,2025-04-12 15:00:00,5905.14,28
...,...,...,...
2083,2025-05-11 12:00:00,5905.14,20
2084,2025-05-11 12:00:00,6140.05,7
2085,2025-05-11 13:00:00,5329.03,14
2086,2025-05-11 13:00:00,5905.14,17


In [67]:
import hopsworks

# connect to the project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

# connect to the feature store
feature_store = project.get_feature_store()

# connect to the feature group
feature_group = feature_store.get_or_create_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
    description="Time series data at hourly freaquency",
    primary_key=["start_station_id", "hour"],
    event_time="hour"
)

2025-05-11 10:31:11,090 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-05-11 10:31:11,104 INFO: Initializing external client
2025-05-11 10:31:11,105 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-11 10:31:11,870 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1231009


In [68]:
feature_group.insert(citi_bike_ts_data, write_options={"wait_for_job": False})

Uploading Dataframe: 100.00% |██████████| Rows 2088/2088 | Elapsed Time: 00:00 | Remaining Time: 00:00
Use fg.materialization_job.run(args=-op offline_fg_materialization -path hdfs:///Projects/Citi_Bike_3010/Resources/jobs/citi_bike_time_series_hourly_feature_group_1_offline_fg_materialization/config_1746959707034) to trigger the materialization job again.


(Job('citi_bike_time_series_hourly_feature_group_1_offline_fg_materialization', 'SPARK'),
 None)