In [1]:
import sys
sys.path.append('../')

import src.config as config

In [2]:
from datetime import datetime, timedelta, timezone

import pandas as pd

current_date = pd.to_datetime(datetime.now(timezone.utc)).floor('h')
print(f"{current_date=}")

# we fetch raw data for the last 28 days, to add redundancy to our data pipeline
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=28)

current_date=Timestamp('2025-12-31 18:00:00+0000', tz='UTC')


We need to fetch the recent data. We don't have access to NYC Association Data Warehouse. So we are going to simulate a call to a data warehouse

In [3]:
from src.data import load_raw_data

def fetch_batch_raw_data(from_date: datetime, to_date: datetime) -> pd.DataFrame:
    """
    Simulate production data by sampling historical data from 52 weeks ago (ie 1 year)
    """
    from_date_ = from_date - timedelta(days=7*52)
    to_date_ = to_date - timedelta(days=7*52)

    # download 2 files from website
    rides = load_raw_data(year=from_date_.year, months=from_date_.month)
    rides['pickup_datetime'] = pd.to_datetime(rides['pickup_datetime']).dt.tz_localize(None)
    rides = rides[rides.pickup_datetime >= from_date_.replace(tzinfo=None)]    
    rides_2 = load_raw_data(year=to_date_.year, months=to_date_.month)
    rides_2['pickup_datetime'] = pd.to_datetime(rides_2['pickup_datetime']).dt.tz_localize(None)
    rides_2 = rides_2[rides_2.pickup_datetime < to_date_.replace(tzinfo=None)]

    rides = pd.concat([rides, rides_2])

    # Shift the data to pretend this is recent data
    rides['pickup_datetime'] += timedelta(days=7*52)

    rides.sort_values(by=['pickup_location_id', 'pickup_datetime'], inplace=True)

    return rides

In [4]:
rides = fetch_batch_raw_data(from_date=fetch_data_from, to_date=fetch_data_to)

File 2024-12 was already in local storage
File 2025-01 was already in local storage


In [5]:
from src.data import transform_raw_data_into_ts_data
ts_data = transform_raw_data_into_ts_data(rides)

100%|██████████| 259/259 [00:01<00:00, 170.76it/s]


In [6]:
import hopsworks

# Connect to the project 
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

# Connect to the feature store
feature_store = project.get_feature_store()

# Connect to the feature group
feature_group = feature_store.get_or_create_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
    description = "Time series data at hourly frequency",
    primary_key = ['pickup_location_id', 'pickup_hour'],
    event_time='pickup_hour'
)

  from .autonotebook import tqdm as notebook_tqdm


2025-12-31 12:50:10,065 INFO: Initializing external client
2025-12-31 12:50:10,066 INFO: Base URL: https://c.app.hopsworks.ai:443




To ensure compatibility please install the latest bug fix release matching the minor version of your backend (4.2) by running 'pip install hopsworks==4.2.*'


2025-12-31 12:50:11,040 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1329302


In [7]:
ts_data['pickup_location_id'] = ts_data['pickup_location_id'].astype('int32')
feature_group.insert(
    ts_data,
    write_options={
        "wait_for_job": False
    }
)

Uploading Dataframe: 100.00% |██████████| Rows 678321/678321 | Elapsed Time: 00:43 | Remaining Time: 00:00
Use fg.materialization_job.run(args=-op offline_fg_materialization -path hdfs:///Projects/nyc_taxiride_demand/Resources/jobs/time_series_hourly_feature_group_1_offline_fg_materialization/config_1767206643318) to trigger the materialization job again.


(Job('time_series_hourly_feature_group_1_offline_fg_materialization', 'SPARK'),
 None)