In [14]:
port = 6379
import redis
client = redis.Redis(host = "cache", port = port)
client.ping()

True

In [1]:
# init repo
!feast init feature_repo
%cd feature_repo

The directory [1m[32mfeature_repo[0m contains an existing feature store repository that may cause a conflict

/usr/src/feature_repo


In [2]:
%%writefile feature_store.yaml
project: feature_repo
registry: data/registry.db
provider: local
online_store:
    type: redis
    connection_string: "cache:6379"
flags:
  alpha_features: true
  direct_ingest_to_online_store: true

Overwriting feature_store.yaml


In [3]:
# Checking the data
import pandas as pd
data = pd.read_parquet("data/driver_stats.parquet")
data = data.astype({'avg_daily_trips': 'float'})
# data

data["avg_daily_trips"] *=30
data.to_parquet('data/test.parquet')
# data

Unnamed: 0,event_timestamp,driver_id,conv_rate,acc_rate,avg_daily_trips,created
0,2022-03-28 03:00:00+00:00,1005,0.831203,0.089688,28500.0,2022-04-12 03:55:33.995
1,2022-03-28 04:00:00+00:00,1005,0.544201,0.800719,7890.0,2022-04-12 03:55:33.995
2,2022-03-28 05:00:00+00:00,1005,0.983304,0.344561,3900.0,2022-04-12 03:55:33.995
3,2022-03-28 06:00:00+00:00,1005,0.410440,0.158432,3780.0,2022-04-12 03:55:33.995
4,2022-03-28 07:00:00+00:00,1005,0.580665,0.476483,9030.0,2022-04-12 03:55:33.995
...,...,...,...,...,...,...
1802,2022-04-12 01:00:00+00:00,1001,0.287030,0.689241,11940.0,2022-04-12 03:55:33.995
1803,2022-04-12 02:00:00+00:00,1001,0.305528,0.555652,8610.0,2022-04-12 03:55:33.995
1804,2021-04-12 07:00:00+00:00,1001,0.391705,0.010529,12690.0,2022-04-12 03:55:33.995
1805,2022-04-04 15:00:00+00:00,1003,0.749271,0.945916,22650.0,2022-04-12 03:55:33.995


In [4]:
!feast alpha enable direct_ingest_to_online_store
!feast apply

[1m[94mNo changes to registry
Deploying infrastructure for [1m[32mdriver_hourly_stats[0m


In [5]:
from datetime import datetime, timedelta
import pandas as pd

from feast import FeatureStore

# The entity dataframe is the dataframe we want to enrich with feature values
entity_df = pd.DataFrame.from_dict(
    {
        "driver_id": [1001, 1002, 1003],
        "label_driver_reported_satisfaction": [1, 5, 3], 
        "event_timestamp": [
            datetime.now() - timedelta(minutes=11),
            datetime.now() - timedelta(minutes=36),
            datetime.now() - timedelta(minutes=73),
        ],
    }
)

store = FeatureStore(repo_path=".")

training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
    ],
).to_df()

print("----- Feature schema -----\n")
print(training_df.info())

print()
print("----- Example features -----\n")
print(training_df.head())

----- Feature schema -----

<class 'pandas.core.frame.DataFrame'>
Int64Index: 3 entries, 359 to 1081
Data columns (total 6 columns):
 #   Column                              Non-Null Count  Dtype              
---  ------                              --------------  -----              
 0   driver_id                           3 non-null      int64              
 1   label_driver_reported_satisfaction  3 non-null      int64              
 2   event_timestamp                     3 non-null      datetime64[ns, UTC]
 3   conv_rate                           3 non-null      float32            
 4   acc_rate                            3 non-null      float32            
 5   avg_daily_trips                     3 non-null      float64            
dtypes: datetime64[ns, UTC](1), float32(2), float64(1), int64(2)
memory usage: 144.0 bytes
None

----- Example features -----

      driver_id  label_driver_reported_satisfaction  \
359        1001                                   1   
720        100

In [12]:
provider = store._get_provider()
feature_view = store.get_feature_view(
    "driver_hourly_stats", 
)
entities = []
for entity_name in feature_view.entities:
    entities.append(
        store.get_entity(entity_name)
    )
training_df["created"] = datetime.now()
# training_df
# print(entities)
store.write_to_online_store("driver_hourly_stats", training_df)


In [13]:
from datetime import datetime, date
!feast materialize-incremental {datetime.now().isoformat()}

# !feast materialize {date.fromisoformat('2019-12-04')} {datetime.now().isoformat()}

Materializing [1m[32m1[0m feature views to [1m[32m2022-04-12 03:57:15+00:00[0m into the [1m[32mredis[0m online store.

[1m[32mdriver_hourly_stats[0m from [1m[32m2022-04-11 03:57:16+00:00[0m to [1m[32m2022-04-12 03:57:15+00:00[0m:
100%|███████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 1679.74it/s]


In [13]:
from pprint import pprint
from feast import FeatureStore

store = FeatureStore(repo_path=".")

feature_vector = store.get_online_features(
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
    ],
    entity_rows=[
        {"driver_id": 1001},
        {"driver_id": 1002},
    ],
).to_dict()


pprint(feature_vector)
pprint(type(feature_vector["avg_daily_trips"][0]))

{'acc_rate': [0.5556520223617554, 0.9592219591140747],
 'avg_daily_trips': [8610.0, 18150.0],
 'conv_rate': [0.3055281639099121, 0.10407926142215729],
 'driver_id': [1001, 1002]}
<class 'float'>
