In [1]:
# autoreload
%load_ext autoreload
%autoreload 2

# set current working directory to root
import os
os.chdir('..')

import src.config as config

In [9]:
from datetime import datetime, timedelta

import pandas as pd

current_date = pd.to_datetime(datetime.utcnow()).floor('H')
print(f'{current_date=}')

# fetch raw data for the last 28 days. The pipeline runs every hour, however, 28 days is a conservative number as a redundancy
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=28)

current_date=Timestamp('2023-03-03 01:00:00')


In [10]:
from src.data import load_raw_data

def fetch_batch_raw_data(from_date: datetime, to_date: datetime) -> pd.DataFrame:
    """Fetches raw data from the database for a given time interval.
    However, this function is only simulating production data by sampling historical data from
    52 weeks ago. This is because recent data is not accessible."""

    from_date = from_date - timedelta(weeks=52)
    to_date = to_date - timedelta(weeks=52)

    rides = load_raw_data(year=from_date.year, months=from_date.month)
    rides = rides[rides['pickup_datetime'] >= from_date]
    rides_2 = load_raw_data(year=to_date.year, months=to_date.month)
    rides_2 = rides_2[rides_2['pickup_datetime'] < to_date]

    rides = pd.concat([rides, rides_2], axis=0)

    # shift data to simulate current data
    rides['pickup_datetime'] += timedelta(weeks=52)

    rides.sort_values(['pickup_datetime', 'pickup_location_id'], inplace=True)

    return rides

In [11]:
rides = fetch_batch_raw_data(fetch_data_from, fetch_data_to)

File for 2022-02 already exists
File for 2022-03 already exists


In [12]:
# transform raw data into time series data
from src.data import transform_raw_data_into_ts_data
ts_data = transform_raw_data_into_ts_data(rides)
ts_data

100%|██████████| 255/255 [00:00<00:00, 910.20it/s]


Unnamed: 0,pickup_hour,rides,pickup_location_id
0,2023-02-03 01:00:00,2,4
1,2023-02-03 02:00:00,1,4
2,2023-02-03 03:00:00,0,4
3,2023-02-03 04:00:00,1,4
4,2023-02-03 05:00:00,0,4
...,...,...,...
171355,2023-03-02 20:00:00,0,109
171356,2023-03-02 21:00:00,0,109
171357,2023-03-02 22:00:00,0,109
171358,2023-03-02 23:00:00,0,109


In [14]:
# connect to hopsworks feature store
import hopsworks

# connect to project
project = hopsworks.login(project=config.HOPSWORKS_PROJECT_NAME, api_key_value=config.HOPSWORKS_API_KEY)

# connect to feature store
feature_store = project.get_feature_store()

# connect to the feature group
feature_group = feature_store.get_or_create_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
    description='Time series data at hourly frequency',
    primary_key=['pickup_datetime', 'pickup_hour'],
    event_time='pickup_hour',)

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/20648
Connected. Call `.close()` to terminate connection gracefully.


In [15]:
feature_group.insert(ts_data, write_options={'wait_for_job': False})

Uploading Dataframe: 0.00% |          | Rows 0/171360 | Elapsed Time: 00:00 | Remaining Time: ?

Launching offline feature group backfill job...
Backfill Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/20648/jobs/named/time_series_hourly_feature_group_1_offline_fg_backfill/executions


(<hsfs.core.job.Job at 0x17f956dc0>, None)