### Automatre the feature pipeline

In this notebook we work on automating the feature pipeline.

In [1]:
import sys 
sys.path.append(r"C:\Users\User\capstone_project")

In [2]:
import src.config as config

In [3]:
from datetime import datetime, timedelta

import pandas as pd

current_date = pd.to_datetime(datetime.utcnow()).floor('H')
print(f'{current_date=}')

# we fetch raw data for the last 28 days, to add redundancy to our data pipeline
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=28)

current_date=Timestamp('2024-01-07 17:00:00')


In [4]:
from src.data import load_raw_data

def fetch_batch_raw_data(from_date: datetime, to_date: datetime) -> pd.DataFrame:
    """
    Simulate production data by sampling historical data from 52 weeks ago (i.e. 1 year)
    """
    from_date_ = from_date - timedelta(days=7*52)
    to_date_ = to_date - timedelta(days=7*52)
    print(f'{from_date=}, {to_date_=}')

    # download 2 files from website
    rides = load_raw_data(year=from_date_.year, months=from_date_.month)
    rides = rides[rides.pickup_datetime >= from_date_]
    rides_2 = load_raw_data(year=to_date_.year, months=to_date_.month)
    rides_2 = rides_2[rides_2.pickup_datetime < to_date_]

    rides = pd.concat([rides, rides_2])

    # shift the data to pretend this is recent data
    rides['pickup_datetime'] += timedelta(days=7*52)

    rides.sort_values(by=['pickup_location_id', 'pickup_datetime'], inplace=True)

    return rides

In [5]:
rides = fetch_batch_raw_data(from_date=fetch_data_from, to_date=fetch_data_to)

from_date=Timestamp('2023-12-10 17:00:00'), to_date_=Timestamp('2023-01-08 17:00:00')
File 2022-12 was already in local storage
File 2023-01 was already in local storage


In [6]:
from src.data import transform_raw_data_into_ts_data
ts_data = transform_raw_data_into_ts_data(rides)

100%|██████████| 265/265 [00:00<00:00, 455.14it/s]


In [7]:
# string to datetime
ts_data['pickup_hour'] = pd.to_datetime(ts_data['pickup_hour'], utc=True)

# add column with Unix epoch milliseconds
ts_data['pickup_ts'] = ts_data['pickup_hour'].astype('int64') // 10**6


In [8]:
%pip install hopsworks==3.4.*

Note: you may need to restart the kernel to use updated packages.


In [9]:
%pip install hsfs[python] 

Note: you may need to restart the kernel to use updated packages.


Connecting to feature store

In [12]:
"""
import hopsworks

# connect to the project
import hsfs
conn = hsfs.connection()
feature_store = conn.get_feature_store()

project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

# connect to the feature store
feature_store = project.get_feature_store()

# connect to the feature group
feature_group = feature_store.get_or_create_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
    description="Time-series data at hourly frequency",
    primary_key = ['pickup_location_id', 'pickup_hour'],
    event_time='pickup_hour',
)
"""

'\nimport hopsworks\n\n# connect to the project\nimport hsfs\nconn = hsfs.connection()\nfeature_store = conn.get_feature_store()\n\nproject = hopsworks.login(\n    project=config.HOPSWORKS_PROJECT_NAME,\n    api_key_value=config.HOPSWORKS_API_KEY\n)\n\n# connect to the feature store\nfeature_store = project.get_feature_store()\n\n# connect to the feature group\nfeature_group = feature_store.get_or_create_feature_group(\n    name=config.FEATURE_GROUP_NAME,\n    version=config.FEATURE_GROUP_VERSION,\n    description="Time-series data at hourly frequency",\n    primary_key = [\'pickup_location_id\', \'pickup_hour\'],\n    event_time=\'pickup_hour\',\n)\n'

Inserting data into feature group

In [13]:
"""
feature_group.insert(ts_data, write_options={"wait_for_job": True})
"""

'\nfeature_group.insert(ts_data, write_options={"wait_for_job": True})\n'