# Feature store
Feature store is a concept that is used to store and manage features in a centralized way. The frameworks out there provide you with guidance
and wiring methods to connect different parts of data processing pipeline. However the underlying decisions and implementations are left to the user(have to be implemented by the user).

# Feat is a framework to create and maintain a feature store
- you need to write your transformations yourself.

Key benefits:
- treat features as model ready artifacts that don't have to be cleaned but only used or further transformed in the model
- avoid feature duplication
- Unify training and serving data transformations to avoid train-serving skew
- Enable feature sharing across teams or across researchers
- Enable feature monitoring, tracking (for instance we can use great expectations to monitor the data quality)
- Enable exploration of features

# Kudos to US team:
US team already has some notion of feature store. Where they share and store clean features.


# Main idea continue - train-serving skew: (not so much applicable to our context yet, but maybe for intraday?)
We have two types of data:
- real-time data used to make predictioins in an instant fassion
- batch/historical data used for training.

The idea is that the training-serving skew happens if the pipeline of serving the model and training are separate from each other. It eventually leads to failures or quite failures due to for instance the fact that there are new features in the training data and training transformations that are not present in the serving pipeline.

In [None]:
!pip install feast

In [2]:
!feast init my_project


Creating a new Feast repository in [1m[32m/home/bla/projects/aau/mlops/notebooks/feature_store/my_project[0m.



In [3]:
cd my_project/feature_repo

/home/bla/projects/aau/mlops/notebooks/feature_store/my_project/feature_repo


  self.shell.db['dhist'] = compress_dhist(dhist)[-100:]


In [4]:
import pandas as pd
pd.read_parquet("data/driver_stats.parquet")

Unnamed: 0,event_timestamp,driver_id,conv_rate,acc_rate,avg_daily_trips,created
0,2024-08-21 13:00:00+00:00,1005,0.097577,0.495715,586,2024-09-05 13:04:00.258
1,2024-08-21 14:00:00+00:00,1005,0.576356,0.254795,340,2024-09-05 13:04:00.258
2,2024-08-21 15:00:00+00:00,1005,0.617877,0.113315,554,2024-09-05 13:04:00.258
3,2024-08-21 16:00:00+00:00,1005,0.846933,0.332277,920,2024-09-05 13:04:00.258
4,2024-08-21 17:00:00+00:00,1005,0.721526,0.165804,832,2024-09-05 13:04:00.258
...,...,...,...,...,...,...
1802,2024-09-05 11:00:00+00:00,1001,0.650213,0.260819,693,2024-09-05 13:04:00.258
1803,2024-09-05 12:00:00+00:00,1001,0.084032,0.386191,21,2024-09-05 13:04:00.258
1804,2021-04-12 07:00:00+00:00,1001,0.314292,0.412562,743,2024-09-05 13:04:00.258
1805,2024-08-29 01:00:00+00:00,1003,0.724839,0.608073,12,2024-09-05 13:04:00.258


In [5]:
!feast apply

Created entity [1m[32mdriver[0m
Created feature view [1m[32mdriver_hourly_stats_fresh[0m
Created feature view [1m[32mdriver_hourly_stats[0m
Created on demand feature view [1m[32mtransformed_conv_rate[0m
Created on demand feature view [1m[32mtransformed_conv_rate_fresh[0m
Created feature service [1m[32mdriver_activity_v2[0m
Created feature service [1m[32mdriver_activity_v1[0m
Created feature service [1m[32mdriver_activity_v3[0m

Created sqlite table [1m[32mmy_project_driver_hourly_stats_fresh[0m
Created sqlite table [1m[32mmy_project_driver_hourly_stats[0m



In [7]:
from datetime import datetime
import pandas as pd

from feast import FeatureStore

# Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for
# more details on how to retrieve for all entities in the offline store instead
entity_df = pd.DataFrame.from_dict(
    {
        # entity's join key -> entity values
        "driver_id": [1001, 1002, 1003],
        # "event_timestamp" (reserved key) -> timestamps
        "event_timestamp": [
            datetime(2021, 4, 12, 10, 59, 42),
            datetime(2021, 4, 12, 8, 12, 10),
            datetime(2021, 4, 12, 16, 40, 26),
        ],
        # (optional) label name -> label values. Feast does not process these
        "label_driver_reported_satisfaction": [1, 5, 3],
        # values we're using for an on-demand transformation
        "val_to_add": [1, 2, 3],
        "val_to_add_2": [10, 20, 30],
    }
)

store = FeatureStore(repo_path=".")

training_df = store.get_historical_features(
    entity_df=entity_df,
    # This features would typically come as a request from a model
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
        "transformed_conv_rate:conv_rate_plus_val1",
        "transformed_conv_rate:conv_rate_plus_val2",
    ],
).to_df()

print("----- Feature schema -----\n")
print(training_df.info())

print()
print("----- Example features -----\n")
print(training_df.head())



----- Feature schema -----

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 10 columns):
 #   Column                              Non-Null Count  Dtype              
---  ------                              --------------  -----              
 0   driver_id                           3 non-null      int64              
 1   event_timestamp                     3 non-null      datetime64[ns, UTC]
 2   label_driver_reported_satisfaction  3 non-null      int64              
 3   val_to_add                          3 non-null      int64              
 4   val_to_add_2                        3 non-null      int64              
 5   conv_rate                           3 non-null      float32            
 6   acc_rate                            3 non-null      float32            
 7   avg_daily_trips                     3 non-null      int32              
 8   conv_rate_plus_val1                 3 non-null      float64            
 9   conv_rate_plus_val2

In [8]:
entity_df["event_timestamp"] = pd.to_datetime("now", utc=True)
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
        "transformed_conv_rate:conv_rate_plus_val1",
        "transformed_conv_rate:conv_rate_plus_val2",
    ],
).to_df()

print("\n----- Example features -----\n")
print(training_df.head())




----- Example features -----

   driver_id                  event_timestamp  \
0       1002 2024-07-05 11:00:00.651808+00:00   
1       1001 2024-07-05 11:00:00.651808+00:00   
2       1003 2024-07-05 11:00:00.651808+00:00   

   label_driver_reported_satisfaction  val_to_add  val_to_add_2  conv_rate  \
0                                   5           2            20   0.278025   
1                                   1           1            10   0.783102   
2                                   3           3            30   0.981281   

   acc_rate  avg_daily_trips  conv_rate_plus_val1  conv_rate_plus_val2  
0  0.472908              382             2.278025            20.278025  
1  0.840802              195             1.783102            10.783102  
2  0.589697              766             3.981281            30.981281  


In [10]:
%%sh

CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S")
feast materialize-incremental $CURRENT_TIME

  and should_run_async(code)


Materializing [1m[32m2[0m feature views to [1m[32m2024-07-05 11:01:48+00:00[0m into the [1m[32msqlite[0m online store.

[1m[32mdriver_hourly_stats[0m from [1m[32m2024-07-04 11:01:54+00:00[0m to [1m[32m2024-07-05 11:01:48+00:00[0m:
[1m[32mdriver_hourly_stats_fresh[0m from [1m[32m2024-07-04 11:01:54+00:00[0m to [1m[32m2024-07-05 11:01:48+00:00[0m:


100%|████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 310.58it/s]
  0%|                                                                         | 0/5 [00:00<?, ?it/s]100%|████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 421.78it/s]


In [11]:
from pprint import pprint
from feast import FeatureStore

store = FeatureStore(repo_path=".")

feature_vector = store.get_online_features(
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
    ],
    entity_rows=[
        # {join_key: entity_value}
        {"driver_id": 1004},
        {"driver_id": 1005},
    ],
).to_dict()

pprint(feature_vector)



{'acc_rate': [0.5811399817466736, 0.3614414632320404],
 'avg_daily_trips': [509, 498],
 'conv_rate': [0.22280587255954742, 0.5063762068748474],
 'driver_id': [1004, 1005]}
