In [5]:
from datetime import datetime
import pandas as pd

from feast import FeatureStore

# Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for 
# more details on how to retrieve for all entities in the offline store instead
entity_df = pd.DataFrame.from_dict(
    {
        # entity's join key -> entity values
        "driver_id": [1001, 1002, 1003],
        # "event_timestamp" (reserved key) -> timestamps
        "event_timestamp": [
            datetime(2023, 4, 12, 10, 59, 42),
            datetime(2023, 4, 12, 8, 12, 10),
            datetime(2023, 4, 12, 16, 40, 26),
        ],
        # (optional) label name -> label values. Feast does not process these
        "label_driver_reported_satisfaction": [1, 5, 3],
        "val_to_add": [1, 2, 3],
        "val_to_add_2": [10, 20, 30],
    }
)


store = FeatureStore(repo_path="./feast_project/feature_repo")
entity_df["event_timestamp"] = pd.to_datetime("now", utc=True)
feature_service = store.get_feature_service("driver_activity_v1")


training_df = store.get_historical_features(
    entity_df=entity_df,features=feature_service,
    # features=[
    #     "driver_hourly_stats:conv_rate",
    #     "driver_hourly_stats:acc_rate",
    #     "driver_hourly_stats:avg_daily_trips",
    # ],
).to_df()

print("----- Feature schema -----\n")
print(training_df.info())

print()
print("----- Example features -----\n")
print(training_df.head())

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
  for column, series in pdf.iteritems():
24/02/23 05:35:43 WARN HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = 18ae1d4c-cb5b-455b-89fa-3315b90909c3
24/02/23 05:35:45 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties

----- Feature schema -----

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 8 columns):
 #   Column                              Non-Null Count  Dtype         
---  ------                              --------------  -----         
 0   driver_id                           3 non-null      int64         
 1   event_timestamp                     3 non-null      datetime64[ns]
 2   label_driver_reported_satisfaction  3 non-null      int64         
 3   val_to_add                          3 non-null      int64         
 4   val_to_add_2                        3 non-null      int64         
 5   conv_rate                           3 non-null      float32       
 6   conv_rate_plus_val1                 3 non-null      float64       
 7   conv_rate_plus_val2                 3 non-null      float64       
dtypes: datetime64[ns](1), float32(1), float64(2), int64(4)
memory usage: 308.0 bytes
None

----- Example features -----

   driver_id            event_

See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  return np.find_common_type(types, [])
  series = series.astype(t, copy=False)
See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  return np.find_common_type(types, [])


In [6]:
training_df

Unnamed: 0,driver_id,event_timestamp,label_driver_reported_satisfaction,val_to_add,val_to_add_2,conv_rate,conv_rate_plus_val1,conv_rate_plus_val2
0,1003,2024-02-23 05:35:37.204273,3,3,30,0.745193,3.745193,30.745193
1,1001,2024-02-23 05:35:37.204273,1,1,10,0.81271,1.81271,10.81271
2,1002,2024-02-23 05:35:37.204273,5,2,20,0.131389,2.131389,20.131389


In [7]:
%%bash
#ingest data into Online store 
cd feast_project/feature_repo
CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S")
feast materialize-incremental $CURRENT_TIME



Materializing [1m[32m2[0m feature views to [1m[32m2024-02-23 05:37:02+00:00[0m into the [1m[32msqlite[0m online store.

[1m[32mdriver_hourly_stats_fresh[0m from [1m[32m2022-10-11 05:37:09+00:00[0m to [1m[32m2024-02-23 05:37:02+00:00[0m:


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Pulling latest features from spark offline store


24/02/23 05:37:13 WARN HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = 936fbc71-f1da-47dd-a138-f4d1a5019565
24/02/23 05:37:21 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  return np.find_common_type(types, [])
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
100%|█████████████████████████████████████████████████████████████████| 3/3 [00:00<00:00, 29.85it/s]


[1m[32mdriver_hourly_stats[0m from [1m[32m2022-10-11 05:37:29+00:00[0m to [1m[32m2024-02-23 05:37:02+00:00[0m:




Pulling latest features from spark offline store


See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  return np.find_common_type(types, [])
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
100%|█████████████████████████████████████████████████████████████████| 3/3 [00:00<00:00, 36.02it/s]


In [9]:
# Fetching Feature vectors for inference
from pprint import pprint
from feast import FeatureStore

store = FeatureStore(repo_path="./feast_project/feature_repo")

feature_vector = store.get_online_features(
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
    ],
    entity_rows=[
        # {join_key: entity_value}
        {"driver_id": 1001, "val_to_add": 1000, "val_to_add_2": 2000,},
        {"driver_id": 1002, "val_to_add": 3000,"val_to_add_2": 4000,},
    ],
).to_dict()

pprint(feature_vector)


{'acc_rate': [0.011025987565517426, 0.3090273141860962],
 'avg_daily_trips': [711, 44],
 'conv_rate': [0.8127095699310303, 0.13138850033283234],
 'driver_id': [1001, 1002]}


In [13]:
#USING FEATURE SERVICE TO FETCH ONLINE FEATURES

from pprint import pprint
from feast import FeatureStore

feature_store = FeatureStore(repo_path="./feast_project/feature_repo")

feature_service = feature_store.get_feature_service("driver_activity_v1")
feature_vector = feature_store.get_online_features(
    features=feature_service,
    entity_rows=[
        # {join_key: entity_value}
        {"driver_id": 1001, "val_to_add": 1000, "val_to_add_2": 2000,},
        {"driver_id": 1002, "val_to_add": 3000,"val_to_add_2": 4000,},
    ],
).to_dict()
pprint(feature_vector)

{'conv_rate': [0.8127095699310303, 0.13138850033283234],
 'conv_rate_plus_val1': [1000.812709569931, 3000.131388500333],
 'conv_rate_plus_val2': [2000.812709569931, 4000.131388500333],
 'driver_id': [1001, 1002]}


See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  return np.find_common_type(types, [])
----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 47618)
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/local/lib/python3.9/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/local/lib/python3.9/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/local/lib/python3.9/socketserver.py", line 747, in __init__
    self.handle()
  File "/home/cdsw/.local/lib/python3.9/site-packages/pyspark/accumulators.py", line 262, in handle
    poll(accum_updates)
  File "/home/cdsw/.local/lib/python3.9/site-packages/pyspark/accumulator

In [None]:
%%bash
#Step 4: Step 4: Browse your features with the Web UI (experimental)
cd good_mallard
feast ui