In [11]:
import src.config as config

In [12]:
from datetime import datetime, timedelta, timezone

import pandas as pd

current_date = pd.to_datetime(datetime.now(timezone.utc)).floor('H')
print(f'{current_date = }')

fetch_date_to = current_date

fetch_date_from = current_date - timedelta(days = 32)

current_date = Timestamp('2025-01-14 09:00:00+0000', tz='UTC')


In [13]:
from src.data import load_raw_data

def fetch_batch_raw_data(from_date: datetime, to_date: datetime) -> pd.DataFrame:
    from_date_ = from_date - timedelta(days = 7*52)
    to_date_ = to_date - timedelta(days = 7*52)

    rides = load_raw_data(year = from_date_.year, months = from_date_.month)
    rides = rides[rides.pickup_datetime >= from_date_]

    rides_2 = load_raw_data(year = to_date_.year, months=to_date_.month)
    rides_2 = rides_2[rides_2.pickup_datetime < to_date_]
    rides = pd.concat([rides, rides_2])

    rides["pickup_datetime"] += timedelta(days=7*52)

    rides.sort_values(by = ["pickup_location_id", "pickup_datetime"], inplace = True)
    return rides

In [14]:
rides = fetch_batch_raw_data(from_date=fetch_date_from, to_date=fetch_date_to)

 File 2023-12 already exists
 File 2024-01 already exists


In [15]:
from src.data import transform_raw_data_into_ts_data
ts_data = transform_raw_data_into_ts_data(rides)

100%|██████████| 256/256 [00:00<00:00, 274.86it/s]


In [16]:
location_id_per_pickup_hour = set(list(ts_data.groupby('pickup_hour')['pickup_location_id'].nunique()))
pickup_hour_per_location_id = set(list(ts_data.groupby('pickup_location_id')['pickup_hour'].nunique()))

In [17]:
assert len(location_id_per_pickup_hour) == 1 and len(pickup_hour_per_location_id) == 1

In [18]:
import hopsworks

project = hopsworks.login(
    project = config.HOPSWORKS_PROJECT_NAME,
    api_key_value = config.HOPSWORKS_API_KEY
)

feature_store = project.get_feature_store()

feature_group = feature_store.get_or_create_feature_group(
    name = config.FEATURE_GROUP_NAME,
    version = config.FEATURE_GROUP_VERSION,
    description = "Time-series data at hourly frequency",
    primary_key = ["pickup_location_id","pickup_hour"],
    event_time = "pickup_hour"
)

2025-01-14 15:05:08,613 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-01-14 15:05:08,620 INFO: Initializing external client
2025-01-14 15:05:08,621 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-01-14 15:05:10,563 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1207467


In [19]:
ts_data['pickup_location_id'] = ts_data['pickup_location_id'].astype('int64')

print(ts_data.dtypes)

pickup_hour           datetime64[ns, UTC]
pickup_location_id                  int64
rides                             float64
dtype: object


In [20]:
feature_group.insert(ts_data, write_options={"wait_for_job": False})

Uploading Dataframe: 100.00% |██████████| Rows 205660/205660 | Elapsed Time: 00:13 | Remaining Time: 00:00


Launching job: time_series_hourly_feature_group_v2_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1207467/jobs/named/time_series_hourly_feature_group_v2_1_offline_fg_materialization/executions


(Job('time_series_hourly_feature_group_v2_1_offline_fg_materialization', 'SPARK'),
 None)