In [1]:
import sys
import os
from pathlib import Path
from dotenv import load_dotenv

# Determine paths
current_directory = Path(os.getcwd())
project_root = current_directory.parent
src_path = project_root / 'src'

print("Current Directory:", current_directory)
print("Project Root:", project_root)
print("SRC Path:", src_path)

# Adjust Python's search path
sys.path.append(str(project_root))  # Adding project root to sys.path

# Load environment variables
from src.paths import PARENT_DIR
load_dotenv(PARENT_DIR / '.env')

# Import custom modules
import src.config as config


Current Directory: /Users/eugene/Github/taxi_demand_predictor/notebooks
Project Root: /Users/eugene/Github/taxi_demand_predictor
SRC Path: /Users/eugene/Github/taxi_demand_predictor/src


In [2]:
from datetime import datetime, timedelta

import pandas as pd

current_date = pd.to_datetime(datetime.utcnow()).floor('H')
print(f'{current_date=}')

# fetch raw data for the last 28 days to add redundancy to our data pipeline
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=28)

current_date=Timestamp('2024-07-01 16:00:00')


In [3]:
from src.data import load_raw_data

def fetch_batch_raw_data(
        from_date:datetime,
        to_date:datetime) -> pd.DataFrame:
    """
    simulate production data by sampling historical data from 52 weeks ago
    """

    from_date_ = from_date - timedelta(days=7*52)
    to_date_ = to_date - timedelta(days=7*52)

    # download 2 files from website
    rides = load_raw_data(year=from_date_.year, months=from_date_.month)
    rides = rides[rides.pickup_datetime >= from_date_]
    rides_2 = load_raw_data(year=to_date_.year, months=to_date_.month)
    rides_2 = rides_2[rides_2.pickup_datetime < from_date_]

    rides = pd.concat([rides, rides_2])

    # shift the data to pretend this is recent data
    rides['pickup_datetime'] += timedelta(days=7*52)

    rides.sort_values(by=['pickup_location_id', 'pickup_datetime'], inplace=True)

    return rides

In [4]:
rides = fetch_batch_raw_data(from_date=fetch_data_from, to_date=fetch_data_to)

Checking for file: /Users/eugene/Github/taxi_demand_predictor/data/raw/rides_2023-06.parquet
File 2023-06 was already in local storage: /Users/eugene/Github/taxi_demand_predictor/data/raw/rides_2023-06.parquet
Files in RAW_DATA_DIR: ['rides_2024-01.parquet', 'rides_2023-03.parquet', 'rides_2022-05.parquet', 'rides_2023-02.parquet', 'rides_2023-12.parquet', 'rides_2022-04.parquet', 'rides_2023-10.parquet', 'rides_2022-06.parquet', 'rides_2023-09.parquet', 'rides_2024-02.parquet', 'rides_2023-08.parquet', 'rides_2024-03.parquet', 'rides_2023-01.parquet', 'rides_2023-11.parquet', 'rides_2022-07.parquet', 'rides_2022-12.parquet', 'rides_2022-02.parquet', 'rides_2023-04.parquet', 'rides_2022-03.parquet', 'rides_2023-05.parquet', 'rides_2022-08.parquet', 'rides_2022-11.parquet', 'rides_2022-01.parquet', 'rides_2023-07.parquet', 'rides_2022-10.parquet', 'rides_2023-06.parquet', 'rides_2022-09.parquet', 'rides_2024-04.parquet']
Successfully loaded file: /Users/eugene/Github/taxi_demand_predict

In [20]:
from src.data import transform_raw_data_into_ts_data
ts_data = transform_raw_data_into_ts_data(rides)

100%|██████████| 257/257 [00:00<00:00, 296.03it/s]


In [21]:
# string to datetime
ts_data['pickup_hour'] = pd.to_datetime(ts_data['pickup_hour'], utc=True)

# add column with Unix epoch milliseconds
ts_data['pickup_ts'] = ts_data['pickup_hour'].astype(int) // 10**6
ts_data['pickup_ts'] = ts_data['pickup_ts'].astype('int64')

ts_data['pickup_location_id'] = ts_data['pickup_location_id'].astype('int64')

# Drop columns that are not part of the feature group schema
ts_data = ts_data[['pickup_location_id', 'pickup_ts']]

In [23]:
import hopsworks

# connect to project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

# connect to feature store
feature_store = project.get_feature_store()

from hsfs.feature import Feature

# Attempt to create or get the feature group with an updated schema
feature_group = feature_store.get_or_create_feature_group(
    name=config.FEATURE_GROUP_NAME,
    # version=config.FEATURE_GROUP_VERSION,
    version=2,
    description='Time-series data at hourly frequency',
    primary_key=['pickup_location_id', 'pickup_ts'],
    event_time='pickup_ts',
    features=[
        Feature(name='pickup_location_id', type='BIGINT'),
        Feature(name='pickup_ts', type='BIGINT')
    ]
)

# Check if the feature group contains the correct schema
print("Feature Group Schema:", feature_group.features)

Connection closed.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/837781
Connected. Call `.close()` to terminate connection gracefully.
Feature Group Schema: [Feature('pickup_location_id', 'BIGINT', None, False, False, None, None, None), Feature('pickup_ts', 'BIGINT', None, False, False, None, None, None)]
Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/837781/fs/833604/fg/955767


Uploading Dataframe: 0.00% |          | Rows 0/156256 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: time_series_hourly_feature_group_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/837781/jobs/named/time_series_hourly_feature_group_2_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x7fa110f65b20>, None)

In [None]:
# Insert data
feature_group.insert(ts_data, write_options={'wait_for_job':False})