In [1]:
HOPSWORKS_PROJECT_NAME = 'taxi_demand_hwfs'

In [2]:
import os
from dotenv import load_dotenv
from src.paths import PARENT_DIR

# load key-value pairs from .env file located in the parent directory
load_dotenv(PARENT_DIR / '.env')

HOPSWORKS_API_KEY = os.environ['HOPSWORKS_API_KEY']

In [3]:
from datetime import datetime
import pandas as pd
from src.data import load_raw_data

from_year = 2022
to_year = datetime.now().year
print(f'Downloading raw data from {from_year} to {to_year}')

rides = pd.DataFrame()
for year in range(from_year, to_year+1):
    
    # download data for the whole year
    rides_one_year = load_raw_data(year)
    
    # append rows
    rides = pd.concat([rides, rides_one_year])

Downloading raw data from 2022 to 2024
File 2022-01 was already in local storage
File 2022-02 was already in local storage
File 2022-03 was already in local storage
File 2022-04 was already in local storage
File 2022-05 was already in local storage
File 2022-06 was already in local storage
File 2022-07 was already in local storage
File 2022-08 was already in local storage
File 2022-09 was already in local storage
File 2022-10 was already in local storage
File 2022-11 was already in local storage
File 2022-12 was already in local storage
File 2023-01 was already in local storage
File 2023-02 was already in local storage
File 2023-03 was already in local storage
File 2023-04 was already in local storage
File 2023-05 was already in local storage
File 2023-06 was already in local storage
File 2023-07 was already in local storage
File 2023-08 was already in local storage
File 2023-09 was already in local storage
Downloading file 2023-10
Downloading file 2023-11
Downloading file 2023-12
2023

In [4]:
print(f'{len(rides)=}')

len(rides)=74587606


In [5]:
from src.data import transform_raw_data_into_ts_data

ts_data = transform_raw_data_into_ts_data(rides)

# add new column pickup_ts with the timestamp in Unix seconds, as datetime features such as pickup_hour 
# are not great type to use for primary_keys, if we use them as primary_keys they may cause potential issues.
ts_data['pickup_hour'] = pd.to_datetime(ts_data['pickup_hour'], utc=True)    
ts_data['pickup_ts'] = ts_data['pickup_hour'].astype(int) // 10**6 # Unix milliseconds

ts_data

100%|██████████| 265/265 [00:07<00:00, 33.91it/s]


Unnamed: 0,pickup_hour,rides,pickup_location_id,pickup_ts
0,2022-01-01 00:00:00+00:00,0,1,1640995200000
1,2022-01-01 01:00:00+00:00,0,1,1640998800000
2,2022-01-01 02:00:00+00:00,0,1,1641002400000
3,2022-01-01 03:00:00+00:00,0,1,1641006000000
4,2022-01-01 04:00:00+00:00,1,1,1641009600000
...,...,...,...,...
4445635,2023-11-30 19:00:00+00:00,3,265,1701370800000
4445636,2023-11-30 20:00:00+00:00,3,265,1701374400000
4445637,2023-11-30 21:00:00+00:00,6,265,1701378000000
4445638,2023-11-30 22:00:00+00:00,3,265,1701381600000


In [6]:
date_from = ts_data['pickup_hour'].min()
date_to = ts_data['pickup_hour'].max()

# Print or use the date range as needed
print("pickup_hour Range From:", date_from)
print("pickup_hour Range To:", date_to)

date_from = ts_data['pickup_ts'].min()
date_to = ts_data['pickup_ts'].max()

# Print or use the date range as needed
print("pickup_ts Range From:", date_from)
print("pickup_ts Range To:", date_to)

pickup_hour Range From: 2022-01-01 00:00:00+00:00
pickup_hour Range To: 2023-11-30 23:00:00+00:00
pickup_ts Range From: 1640995200000
pickup_ts Range To: 1701385200000


In [7]:
import hopsworks

In [8]:
project = hopsworks.login(
    project=HOPSWORKS_PROJECT_NAME,
    api_key_value=HOPSWORKS_API_KEY
)

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/241814


In [9]:
feature_store = project.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.


In [10]:
FEATURE_GROUP_NAME = 'time_series_hourly_feature_group'
FEATURE_GROUP_VERSION = 1

In [11]:
feature_group = feature_store.get_or_create_feature_group(
    name=FEATURE_GROUP_NAME,
    version=FEATURE_GROUP_VERSION,
    description="Time-series data at hourly frequency",
    primary_key = ['pickup_location_id', 'pickup_ts'],
    event_time='pickup_ts',
)

In [12]:
feature_group.insert(ts_data, write_options={"wait_for_job": False})

Uploading Dataframe: 0.00% |          | Rows 0/4445640 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: time_series_hourly_feature_group_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/241814/jobs/named/time_series_hourly_feature_group_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x7fd75fd71ca0>, None)