In [1]:
import os
import sys
sys.path.append("../src/")
sys.path.append("../")

import config
import data

In [2]:
from datetime import datetime, timedelta
import pandas as pd

CurrentDate = pd.to_datetime(datetime.utcnow()).floor("H")

#Fetching Data Since 28 Days ago to Today
#By doing this we are adding a lot of Redundancy to the Pipeline
#Lot of Redundancy -> Pipeline doesn't break if it misses a job
FetchDataStart = CurrentDate
FetchDataEnd = CurrentDate - timedelta(days = 28) 

In [3]:
def FetchBatchRawData(fromdate:datetime, todate:datetime) -> pd.DataFrame:
    
    #Simulate Production of new Data Points by Sampling Historical Data (from 52 weeks ago)
    #We do not have access to Real Time Taxi Data, so here's some Synthetic Data (roughly created)
    
    from_ = fromdate - timedelta(days = 7*52)
    to_ = todate - timedelta(days = 7*52)
    
    #Download 2 Files from Website
    rides = data.LoadRawData(year = from_.year, months = from_.month)
    rides = rides[rides["pickup_datetime"] >= from_]
    rides2 = data.LoadRawData(year = to_.year, months = to_.month)
    rides2 = rides2[rides2["pickup_datetime"] <= to_]
    
    rides = pd.concat([rides, rides2])
    
    #Shift Data to pretend it's recent
    rides["pickup_datetime"] += timedelta(days = 7*52)
    
    rides.sort_values(by=["pickup_location_id", "pickup_datetime"], inplace = True)
    
    return rides

In [4]:
rides = FetchBatchRawData(fromdate = FetchDataStart, todate = FetchDataEnd)

File 2022-10 was already in local storage
File 2022-09 was already in local storage


In [5]:
TS_Data = data.TransformRawDataIntoTSData(rides)

100%|███████████████████████████████████████████████████████████████████████████| 260/260 [00:01<00:00, 134.16it/s]


In [8]:
TS_Data.head()

Unnamed: 0,pickup_hour,numrides,pickup_location_id
0,2023-08-31 00:00:00,5,4
1,2023-08-31 01:00:00,1,4
2,2023-08-31 02:00:00,2,4
3,2023-08-31 03:00:00,0,4
4,2023-08-31 04:00:00,0,4


In [9]:
import hopsworks

#Connect to the Project
Project = hopsworks.login(project = config.HopsworksProjectName, api_key_value = config.HOPSWORKSAPIKEY)

#Connect to Feature Store
FeatureStore = Project.get_feature_store()

#Connect to the Feature Group
FeatureGroup = FeatureStore.get_or_create_feature_group(name = config.FeatureGroupName,
                                                        version = config.FeatureGroupVersion,
                                                        description = "TimeSeries Data at Hourly Frequency",
                                                        primary_key = ["pickup_location_id", "pickup_hour"],
                                                        event_time = "pickup_hour"
                                                       )


Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/104605
Connected. Call `.close()` to terminate connection gracefully.


In [11]:
FeatureGroup.insert(TS_Data, write_options = {"wait_for_job":False})

Uploading Dataframe: 0.00% |          | Rows 0/380640 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: ts_hourly_featuregroup_updated_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/104605/jobs/named/ts_hourly_featuregroup_updated_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x7f6446e3a0e0>, None)

In [10]:
TS_Data

Unnamed: 0,pickup_hour,numrides,pickup_location_id
0,2023-08-31 00:00:00,5,4
1,2023-08-31 01:00:00,1,4
2,2023-08-31 02:00:00,2,4
3,2023-08-31 03:00:00,0,4
4,2023-08-31 04:00:00,0,4
...,...,...,...
380635,2023-10-30 19:00:00,0,251
380636,2023-10-30 20:00:00,0,251
380637,2023-10-30 21:00:00,0,251
380638,2023-10-30 22:00:00,0,251
