# 📕 12 - Data Pipeline for Fetching New Data

## Introduction

Given that the New York City Taxi Association updates their data only on a monthly basis, we face a barrier in obtaining real-time data. To ensure that our predictive models remain current and our analytics dashboard reflects 'real-time' trends, I've devised a strategy. This involves simulating real-time data fetching by repurposing older datasets, making them appear as recent entries.

Initially, I set a timeframe spanning the last 28 days from the current UTC time. This not only delineates a clear period of interest but also provides redundancy. Subsequently, I leverage the already existing `fetch_batch_raw_data` function. What it essentially does is retrieve data from a year (52 weeks) back, within our defined window, and then adjusts the timestamps by a year forward. This clever trick makes past data seem current. Once transformed into a time-series format, the data is seamlessly inserted into our Hopsworks feature group.

It's crucial to emphasize that this approach simulates what would happen if we had direct access to a data warehouse. As part of the project workflow, we intend to utilize GitHub Actions to automate the data fetch process every hour.


In [3]:
# import libraries
from datetime import datetime, timedelta
import pandas as pd
import hopsworks
import src.config as config
from src.data import load_raw_data
from src.data import transform_raw_data_into_ts_data

In [27]:
# Get the current UTC time, rounded down to the nearest hour
current_time = pd.to_datetime(datetime.utcnow()).floor("H")
    
# Calculate the start of the timeframe
from_date = current_time - timedelta(days=28)
    
# Set the end of the timeframe as the current time
to_date = current_time

In [28]:
def fetch_batch_raw_data(from_date: datetime, to_date: datetime) -> pd.DataFrame:
    """
    Simulate fetching recent batch data by repurposing older historical data.

    This function fetches raw data from exactly 52 weeks (1 year) ago from the given date range, 
    then shifts the data timestamps by 52 weeks to simulate as if the data is recent.

    Parameters:
    - from_date (datetime): The start date of the required data range.
    - to_date (datetime): The end date of the required data range.

    Returns:
    - pd.DataFrame: A DataFrame containing the simulated recent data.
    """
    
    # Calculate equivalent date range from a year ago
    from_date_ = from_date - timedelta(weeks=52)
    to_date_ = to_date - timedelta(weeks=52)

    print(f"Fetching raw data from {from_date_} to {to_date_}")

    # Load raw data for the old date range
    rides = load_raw_data(year=from_date_.year, months=from_date_.month)
    rides = rides[rides["pickup_datetime"] >= from_date_]
    
    rides_2 = load_raw_data(year=to_date_.year, months=to_date_.month)
    rides_2 = rides_2[rides_2["pickup_datetime"] < to_date_]

    # Combine both dataframes
    rides = pd.concat([rides, rides_2])

    # Shift the data by 52 weeks to make it look recent
    rides["pickup_datetime"] += timedelta(weeks=52)
    
    # Sort the dataframe by location and datetime
    rides.sort_values(by=["pickup_location_id", "pickup_datetime"], inplace=True)

    return rides


In [29]:
# fetch batch raw data within the given 28 days timeframe
rides = fetch_batch_raw_data(from_date, to_date)

Fetching raw data from 2022-08-31 08:00:00 to 2022-09-28 08:00:00
File 2022-08 was already in local storage
File 2022-09 was already in local storage


In [50]:
# convert raw data into time series data
ts_data = transform_raw_data_into_ts_data(rides)
ts_data.head()

See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
100%|██████████| 265/265 [00:01<00:00, 252.34it/s]


Unnamed: 0,pickup_hour,rides,pickup_location_id
0,2023-08-30 08:00:00,1,1
1,2023-08-30 09:00:00,2,1
2,2023-08-30 10:00:00,4,1
3,2023-08-30 11:00:00,1,1
4,2023-08-30 12:00:00,2,1


In [51]:
# connect to the project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

# connect to the feature store
feature_store = project.get_feature_store()

# connect to the feature group
feature_group = feature_store.get_or_create_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
    description="Time series data aggregated by hour",
    primary_key=["pickup_location_id", "pickup_hour"],
    event_time="pickup_hour"
)

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/111769
Connected. Call `.close()` to terminate connection gracefully.


In [52]:
# insert the simulated time series data into the feature group
feature_group.insert(ts_data, write_options={"wait_for_job": False})

Uploading Dataframe: 0.00% |          | Rows 0/178080 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: time_series_hourly_feature_group_1_offline_fg_backfill
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/111769/jobs/named/time_series_hourly_feature_group_1_offline_fg_backfill/executions


(<hsfs.core.job.Job at 0x14d037550>, None)