In [1]:
import os
import sys
from datetime import datetime, timedelta, timezone
import pytz
import pandas as pd

import hopsworks

In [2]:
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), os.pardir)))

In [3]:
import src.config as config

In [4]:
current_date = pd.to_datetime(datetime.utcnow()).floor('H')
print(f'Current date: {current_date}')

fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=28)


Current date: 2024-05-10 01:00:00


In [5]:
print(f'Fetching data from {fetch_data_from} to {fetch_data_to}')

Fetching data from 2024-04-12 01:00:00 to 2024-05-10 01:00:00


In [6]:
from src.data import load_raw_data

In [7]:
import pandas as pd
from datetime import datetime, timedelta
import calendar
from dateutil.relativedelta import relativedelta  # For precise month additions

def fetch_batch_raw_data(from_date: datetime, to_date: datetime) -> pd.DataFrame:
    '''
    Fetch data from the specified historical date range and simulate it as if it is from March 1, 2024, onward.
    '''

    # Adjust the date range backward by one year to fetch historical data
    from_date_ = from_date - relativedelta(years=1)
    to_date_ = to_date - relativedelta(years=1)
    print(f'Adjusted from_date: {from_date_}, to_date: {to_date_}')

    rides = pd.DataFrame()
    current_date = from_date_

    while current_date <= to_date_:
        year, month = current_date.year, current_date.month
        try:
            month_data = load_raw_data(year=year, months=[month])
            month_data = month_data[(month_data.pickup_datetime >= from_date_) & (month_data.pickup_datetime <= to_date_)]
            rides = pd.concat([rides, month_data])
        except FileNotFoundError:
            print(f"No data available for {year}-{month}, skipping...")
        current_date += timedelta(days=calendar.monthrange(year, month)[1])

    # Define the base date for simulation (March 1, 2024)
    base_date = pd.Timestamp('2024-03-01')

    # Calculate the difference in days from the earliest date in 'rides' to 'base_date'
    earliest_date_in_data = rides['pickup_datetime'].min()
    days_difference = (base_date - earliest_date_in_data).days

    # Shift all 'pickup_datetime' in 'rides' by this difference
    rides['pickup_datetime'] += timedelta(days=days_difference)

    # Sort data by location ID and datetime
    rides.sort_values(by=['pickup_location_id', 'pickup_datetime'], inplace=True)

    return rides

In [8]:
rides = fetch_batch_raw_data(from_date=fetch_data_from, to_date=fetch_data_to)

Adjusted from_date: 2023-04-12 01:00:00, to_date: 2023-05-10 01:00:00
2023-04 file is already in local storage


In [9]:
from src.data import transform_raw_data_into_timeseries_data

In [10]:
ts_data = transform_raw_data_into_timeseries_data(rides)





100%|██████████| 265/265 [00:01<00:00, 213.94it/s]


In [11]:
ts_data['pickup_hour'] = pd.to_datetime(ts_data['pickup_hour'], utc=True)
ts_data['pickup_ts'] = ts_data['pickup_hour'].astype(int) // 10**6

In [12]:
# Connect to Hopsworks
project = hopsworks.login(project=config.HOPSWORKS_PROJECT_NAME, api_key_value=config.HOPSWORKS_API_KEY)

# Get the feature store handle for the project
feature_store = project.get_feature_store()

# Create the feature group
feature_group = feature_store.get_or_create_feature_group(name=config.FEATURE_GROUP_NAME, 
                                                          version=config.FEATURE_GROUP_VERSION,
                                                          description='Timeseries data with hourly frequency', 
                                                          primary_key=['pickup_location_id', 'pickup_ts'], 
                                                          event_time='pickup_hour')

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/699541
Connected. Call `.close()` to terminate connection gracefully.


In [13]:
feature_group.insert(ts_data, write_options={'wait_for_job': False})

Uploading Dataframe: 0.00% |          | Rows 0/120575 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: timeseries_hourly_feature_group_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/699541/jobs/named/timeseries_hourly_feature_group_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x13c31f070>, None)