In [2]:
import sys
from pathlib import Path
import os

# Locate the project root (adjust as needed)
project_root = Path.cwd().parent  # If running from notebooks/
src_path = project_root / "src"

# Debugging outputs
print(f"Project root: {project_root}")
print(f"Checking if src exists at: {src_path} -> {src_path.exists()}")

# Add src to Python path
if src_path.exists() and str(src_path) not in sys.path:
    sys.path.append(str(src_path))
    print(f"Added {src_path} to sys.path")

# Verify the path inclusion
print("Current sys.path:", sys.path)

# Try importing the module
try:
    import src.config as config
    print("Module imported successfully!")
except ModuleNotFoundError as e:
    print(f"Error importing module: {e}")


Project root: /Users/farshid/taxi_demand_predictor
Checking if src exists at: /Users/farshid/taxi_demand_predictor/src -> True
Added /Users/farshid/taxi_demand_predictor/src to sys.path
Current sys.path: ['/Library/Frameworks/Python.framework/Versions/3.12/lib/python312.zip', '/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12', '/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/lib-dynload', '', '/Users/farshid/taxi_demand_predictor/.venv/lib/python3.12/site-packages', '/Users/farshid/taxi_demand_predictor/src']
Error importing module: No module named 'src'


In [3]:
import os

#src_path = "/Users/farshid/taxi_demand_predictor/src"
#print("Files in src:", os.listdir(src_path))


src_path = os.path.join(os.getcwd(), "src")
print("Files in src:", os.listdir(src_path) if os.path.exists(src_path) else "Directory not found")



Files in src: ['plot.py', 'config.py', 'requirements.txt', 'paths.py', '__init__.py', '__pycache__', 'model.py', 'data_split.py', '.env', 'data.py']


In [10]:
from dotenv import load_dotenv
import os
from pathlib import Path

env_path = Path.cwd().parent / "src" / ".env"
print(f"Loading .env from: {env_path}")

if load_dotenv(env_path):
    print("Environment variables loaded successfully!")
else:
    print("Failed to load .env")

print("HOPSWORKS_API_KEY:", os.getenv("HOPSWORKS_API_KEY"))


Loading .env from: /Users/farshid/taxi_demand_predictor/src/.env
Environment variables loaded successfully!
HOPSWORKS_API_KEY: RTnmIWMeThUXBxPy.AppJdHvFMv6roAecw4UqE3B2NQaF8QsXoD4sZUTewaeRKJaULhrZCNYahNpFDNKe


In [12]:
import config 

In [14]:
from datetime import datetime, timedelta, UTC
import pandas as pd

# Use timezone-aware UTC timestamp
current_date = pd.to_datetime(datetime.now(UTC)).floor('H')
print(f'{current_date=}')

# Fetch raw data for the last 28 days
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=28)


current_date=Timestamp('2025-02-05 22:00:00+0000', tz='UTC')


In [18]:
from datetime import datetime, timedelta
import pandas as pd
from data import load_raw_data

def fetch_batch_raw_data(from_date: datetime, to_date: datetime) -> pd.DataFrame:
    """
    Simulate production data by sampling historical data from 52 weeks ago (i.e. 1 year)
    """
    from_date_ = from_date - timedelta(days=7*52)
    to_date_ = to_date - timedelta(days=7*52)

    # Ensure from_date_ and to_date_ are timezone-aware
    from_date_ = from_date_.tz_convert("UTC")
    to_date_ = to_date_.tz_convert("UTC")

    print(f'{from_date=}, {to_date_=}')

    # Download 2 files from website
    rides = load_raw_data(year=from_date_.year, months=from_date_.month)
    rides["pickup_datetime"] = pd.to_datetime(rides["pickup_datetime"], utc=True)  # Ensure timezone awareness
    rides = rides[rides.pickup_datetime >= from_date_]

    rides_2 = load_raw_data(year=to_date_.year, months=to_date_.month)
    rides_2["pickup_datetime"] = pd.to_datetime(rides_2["pickup_datetime"], utc=True)  # Ensure timezone awareness
    rides_2 = rides_2[rides_2.pickup_datetime < to_date_]

    rides = pd.concat([rides, rides_2])

    # Shift the data to pretend this is recent data
    rides['pickup_datetime'] += timedelta(days=7*52)

    rides.sort_values(by=['pickup_location_id', 'pickup_datetime'], inplace=True)

    return rides

rides = fetch_batch_raw_data(from_date=fetch_data_from, to_date=fetch_data_to)


from_date=Timestamp('2025-01-08 22:00:00+0000', tz='UTC'), to_date_=Timestamp('2024-02-07 22:00:00+0000', tz='UTC')
File 2024-01 was already in local storage
File 2024-02 was already in local storage


In [19]:
from data import transform_raw_data_into_ts_data
ts_data = transform_raw_data_into_ts_data(rides)

100%|██████████| 265/265 [00:00<00:00, 564.51it/s]


In [20]:
# string to datetime
ts_data['pickup_hour'] = pd.to_datetime(ts_data['pickup_hour'], utc=True)

# add column with Unix epoch milliseconds
ts_data['pickup_ts'] = ts_data['pickup_hour'].astype(int) // 10**6

In [22]:
import hopsworks

# connect to the project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

# connect to the feature store
feature_store = project.get_feature_store()

# connect to the feature group
feature_group = feature_store.get_or_create_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
    description="Time-series data at hourly frequency",
    primary_key = ['pickup_location_id', 'pickup_ts'],
    event_time='pickup_ts',
)

2025-02-05 17:45:52,310 INFO: Initializing external client
2025-02-05 17:45:52,311 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-02-05 17:45:53,005 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1211556


In [23]:
# insert the data into the feature group
feature_group.insert(ts_data, write_options={"wait_for_job": True})


Uploading Dataframe: 100.00% |██████████| Rows 178080/178080 | Elapsed Time: 00:08 | Remaining Time: 00:00


Launching job: time_series_hourly_feature_group_3_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1211556/jobs/named/time_series_hourly_feature_group_3_offline_fg_materialization/executions
2025-02-05 17:47:54,739 INFO: Waiting for execution to finish. Current state: INITIALIZING. Final status: UNDEFINED
2025-02-05 17:47:57,829 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-02-05 17:50:47,425 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-02-05 17:50:47,489 INFO: Waiting for log aggregation to finish.
2025-02-05 17:50:58,924 INFO: Execution finished successfully.


(Job('time_series_hourly_feature_group_3_offline_fg_materialization', 'SPARK'),
 None)