In [2]:
import src.config as config

In [3]:
from datetime import datetime, timedelta
import pandas as pd

# Timestamp for when the notebook runs
current_date = pd.to_datetime(datetime.utcnow()).floor('h')
print(f"{current_date=}")

# We only need the previous hour but fetch data for the last 28 days to add redundancy to the data pipeline.
# This way, if it fails to grab the current hour, it won't break everything.
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=28)

current_date=Timestamp('2024-09-26 12:00:00')


In [4]:
from src.data import load_raw_data

def fetch_batch_raw_data(from_date: datetime, to_date: datetime) -> pd.DataFrame:
    """
    Simulates production data by sampling historical data from 52 weeks = 1 year ago.
    """
    from_date_ = from_date - timedelta(days=7*52)
    to_date_ = to_date - timedelta(days=7*52)
    
    # Download 2 files from website
    rides = load_raw_data(year=from_date_.year, months=from_date_.month)
    rides = rides[rides['pickup_datetime'] >= from_date_]
    
    rides_2 = load_raw_data(year=to_date_.year, months=to_date_.month)
    rides_2 = rides_2[rides_2['pickup_datetime'] < to_date_]
    
    rides = pd.concat([rides, rides_2])
    
    # Shift the date to pretend this is recent data
    rides['pickup_datetime'] += timedelta(days=7*52)
    
    rides.sort_values(by=['pickup_location_id', 'pickup_datetime'], inplace=True)
    
    return rides

In [5]:
rides = fetch_batch_raw_data(from_date=fetch_data_from, to_date=fetch_data_to)

File 2023-08 was already in local storage
File 2023-09 was already in local storage


In [6]:
# Convert to time series
from src.data import transform_raw_data_into_ts_data
ts_data = transform_raw_data_into_ts_data(rides)

100%|██████████| 258/258 [00:00<00:00, 1094.58it/s]


In [7]:
# Need to modify the datatype for insertion into feature group to work.
ts_data['pickup_location_id'] = ts_data['pickup_location_id'].astype('int64')

In [8]:
import hopsworks

# Connect to project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY,
)

# Connect to feature store
feature_store = project.get_feature_store()

# Connect to feature group
feature_group = feature_store.get_or_create_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
    description='Time-series data at hourly frequency',
    primary_key=['pickup_location_id', 'pickup_hour'],
    event_time='pickup_hour',
)

Connected. Call `.close()` to terminate connection gracefully.




To ensure compatibility please install the latest bug fix release matching the minor version of your backend (3.7) by running 'pip install hopsworks==3.7.*'



Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1051798
Connected. Call `.close()` to terminate connection gracefully.


In [9]:
feature_group.insert(ts_data, write_options={'wait_for_job': False})

Uploading Dataframe: 0.00% |          | Rows 0/173376 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: time_series_hourly_feature_group_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1051798/jobs/named/time_series_hourly_feature_group_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x29e83dc10>, None)