In [101]:
!feast version

Feast SDK Version: "feast 0.31.1"


## Working of Feast as a feature store in Machine Learning

<img src='../feast architecture.png'>

Above Architecture is from Feast official Documentation: 
https://143p6r3s46y330wuikz3ok81-wpengine.netdna-ssl.com/wp-content/uploads/2022/02/feast-marchitecture-220201.svg
https://feast.dev/

Important Features:
Feast is one of the widely used tool for feature store in Machine Learning.
Feature stores make it easy to:

* Productionize new features without extensive engineering support
* Automate feature computation, backfills, and logging
* Share and reuse feature pipelines across teams
* Track feature versions, lineage, and metadata
* Achieve consistency between training and serving data
* Monitor the health of feature pipelines in production

### Topics Covered in this Notebook:

    1. Prepare data set and store in parquet format
    2. Do feast init
    3. Define Feature definitions in a python file inside feature repo directory (created using feast init)
    4. Do feast apply
    5. Generate Training data from the offline store
    6. Model Training
    7. Materialize the features to push them into the online store. (materailize only those for which prediction is needed.)
    8. Load features from online store (It serves features from a low latency online feature store)
    9. Run the prediction function (API if you created using Seldon Core or any other platform)
    10. Review the Output.

### 1. Prepare data set and store in parquet format

In [80]:
import pandas as pd

In [81]:
data = pd.read_csv('https://raw.githubusercontent.com/TripathiAshutosh/feast/main/Feast%20Live%20Demo/diabetes.csv')

In [82]:
data.head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome
0,6,148,72,35,0,33.6,0.627,50,1
1,1,85,66,29,0,26.6,0.351,31,0
2,8,183,64,0,0,23.3,0.672,32,1
3,1,89,66,23,94,28.1,0.167,21,0
4,0,137,40,35,168,43.1,2.288,33,1


In [83]:
predictors_df = data.loc[:,data.columns!='Outcome']
target_df = data['Outcome']

In [84]:
predictors_df.head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age
0,6,148,72,35,0,33.6,0.627,50
1,1,85,66,29,0,26.6,0.351,31
2,8,183,64,0,0,23.3,0.672,32
3,1,89,66,23,94,28.1,0.167,21
4,0,137,40,35,168,43.1,2.288,33


**Create timestamp to be added as event_timestamp column in the data set.**

In [85]:
timestamps = pd.date_range(end = pd.Timestamp.now(),
                           periods = len(data),freq = 'D').to_frame(name = 'event_timestamp', index = False)

In [86]:
timestamps

Unnamed: 0,event_timestamp
0,2021-07-07 18:23:45.753531
1,2021-07-08 18:23:45.753531
2,2021-07-09 18:23:45.753531
3,2021-07-10 18:23:45.753531
4,2021-07-11 18:23:45.753531
...,...
763,2023-08-09 18:23:45.753531
764,2023-08-10 18:23:45.753531
765,2023-08-11 18:23:45.753531
766,2023-08-12 18:23:45.753531


**add event_timestamp column to the predictors and target dataframes**

In [87]:
predictors_df = pd.concat(objs = [predictors_df, timestamps], axis = 1)
target_df = pd.concat(objs = [target_df, timestamps], axis =1)

In [88]:
predictors_df.head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,event_timestamp
0,6,148,72,35,0,33.6,0.627,50,2021-07-07 18:23:45.753531
1,1,85,66,29,0,26.6,0.351,31,2021-07-08 18:23:45.753531
2,8,183,64,0,0,23.3,0.672,32,2021-07-09 18:23:45.753531
3,1,89,66,23,94,28.1,0.167,21,2021-07-10 18:23:45.753531
4,0,137,40,35,168,43.1,2.288,33,2021-07-11 18:23:45.753531


In [89]:
target_df.head()

Unnamed: 0,Outcome,event_timestamp
0,1,2021-07-07 18:23:45.753531
1,0,2021-07-08 18:23:45.753531
2,1,2021-07-09 18:23:45.753531
3,0,2021-07-10 18:23:45.753531
4,1,2021-07-11 18:23:45.753531


**Create a patientID column to uniquely identify records with patientID.**

In [90]:
dataLen = len(data)
idsList = list(range(dataLen))

In [91]:
#idsList

In [92]:
patient_ids = pd.DataFrame(data = idsList, columns = ['patient_id'])

In [93]:
#patient_ids

In [94]:
predictors_df = pd.concat(objs = [predictors_df, patient_ids], axis = 1)
target_df = pd.concat(objs = [target_df, patient_ids], axis =1)

In [95]:
predictors_df.head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,event_timestamp,patient_id
0,6,148,72,35,0,33.6,0.627,50,2021-07-07 18:23:45.753531,0
1,1,85,66,29,0,26.6,0.351,31,2021-07-08 18:23:45.753531,1
2,8,183,64,0,0,23.3,0.672,32,2021-07-09 18:23:45.753531,2
3,1,89,66,23,94,28.1,0.167,21,2021-07-10 18:23:45.753531,3
4,0,137,40,35,168,43.1,2.288,33,2021-07-11 18:23:45.753531,4


In [96]:
predictors_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 768 entries, 0 to 767
Data columns (total 10 columns):
 #   Column                    Non-Null Count  Dtype         
---  ------                    --------------  -----         
 0   Pregnancies               768 non-null    int64         
 1   Glucose                   768 non-null    int64         
 2   BloodPressure             768 non-null    int64         
 3   SkinThickness             768 non-null    int64         
 4   Insulin                   768 non-null    int64         
 5   BMI                       768 non-null    float64       
 6   DiabetesPedigreeFunction  768 non-null    float64       
 7   Age                       768 non-null    int64         
 8   event_timestamp           768 non-null    datetime64[ns]
 9   patient_id                768 non-null    int64         
dtypes: datetime64[ns](1), float64(2), int64(7)
memory usage: 60.1 KB


In [106]:
pwd

'C:\\Users\\Ashutosh Tripathi\\Documents\\projects\\feast\\Feast Live Demo\\feature_repo'

In [108]:
predictors_df.to_parquet(path='./data/predictors_df.parquet')
target_df.to_parquet(path='./data/target_df.parquet')

In [99]:
predictors_df.tail()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,event_timestamp,patient_id
763,10,101,76,48,180,32.9,0.171,63,2023-08-09 18:23:45.753531,763
764,2,122,70,27,0,36.8,0.34,27,2023-08-10 18:23:45.753531,764
765,5,121,72,23,112,26.2,0.245,30,2023-08-11 18:23:45.753531,765
766,1,126,60,0,0,30.1,0.349,47,2023-08-12 18:23:45.753531,766
767,1,93,70,31,0,30.4,0.315,23,2023-08-13 18:23:45.753531,767


In [100]:
#!pip install feast

### 2. Do feast init

this is option as it creates the feast repo directory structure. you can create a directory using mkdir and inside that create a feature_Store.yaml file and a feature_definitions.py file. but its better to use feast init and then modify the respective files.

In [21]:
!feast init feature_repo

The directory [1m[32mfeature_repo[0m contains an existing feature store repository that may cause a conflict



once the above command ran successfully, you will see that a directory named feature_repo is created with following content:
* folder data which will contain default driver_stats data file in parquet format. you can use this as well for feast demo or just copy the diabetes data set predictors_df and target_df parquet files which we will be using in this demo.
* feature_store.yaml file which will have references to your offline and online feature stores
* you can update the online store and local store paths in feature_store.yaml file if needed.
* example.py is the deafult feature definition file, which uses driver_stats dataset. HOwever you can replace it with feature_def.py file which we are going to create in the immidiate next step

<img src="../feast init.png">

### 3. Define Feature definitions in a python file inside feature repo directory (created using feast init)
This step is known as register and deploy the features
below chunk code should be executed from inside the feature repo. So I would recommend to create a feature_def.py file and copy the below chunk of code. and then execute feast apply (next step) from inside the feature store repo.

In [28]:
# This is an example feature definition file

from datetime import timedelta
from feast import Entity, FeatureService, FeatureView, Field, FileSource, ValueType
from feast.types import Float64, Int64

# patient = Entity(name = "patient_id",
#                      value_type = ValueType.INT64,
#                  description = "ID of the patient")
patient = Entity(name="patient", join_keys=["patient_id"])
## Predictors Feature View
file_source = FileSource(path = r"data/predictors_df.parquet",
                         event_timestamp_column = "event_timestamp",)

df1_fv = FeatureView(
    name = "predictors_df_feature_view",
    ttl = timedelta(seconds = 86400*2),
    entities = [patient],
    schema = [
    Field(name = "Pregnancies", dtype = Int64),
    Field(name = "Glucose", dtype = Int64),
    Field(name = "BloodPressure", dtype = Int64),
    Field(name = "SkinThickness", dtype = Int64),
    Field(name = "Insulin", dtype = Int64),
    Field(name = "BMI", dtype = Float64),
    Field(name = "DiabetesPedigreeFunction", dtype = Float64),
    Field(name = "Age", dtype = Int64),       
    ],
    source = file_source,
    online = True,
    tags= {},
)

## Target FEature View

target_source = FileSource(path = r"data/target_df.parquet",
                         event_timestamp_column = "event_timestamp",)

target_fv = FeatureView(
    name = "target_df_feature_view",
    ttl = timedelta(seconds = 86400*2),
    entities = [patient],
    schema = [
    Field(name = "Outcome", dtype = Int64),       
    ],
    source = target_source,
    online = True,
    tags= {},
)

### 4. Do feast apply

do feast apply from inside the feature_repo directory

In [102]:
pwd

'C:\\Users\\Ashutosh Tripathi\\Documents\\projects\\feast\\Feast Live Demo\\feature_repo'

In [103]:
cd feature_repo

[WinError 2] The system cannot find the file specified: 'feature_repo'
C:\Users\Ashutosh Tripathi\Documents\projects\feast\Feast Live Demo\feature_repo


These data source files will be fetched from some data warehouse. however in this demo purpose I am placing them inside feature_repo/data/ directory. Idea is wherever you keep them just define the same path in your feature_def.py file.

Execute below command in the terminal to copy the created data files in the /data directory.

1. cp ../predictors_df.parquet feature_repo/data/
2. cp ../target_df.parquet feature_repo/data/
3. cp ../feature_def.py feature_repo/

In [109]:
!feast apply

No changes to registry
No changes to infrastructure


  schema = ParquetDataset(path).schema


### 5. Generate Training Data Set from offline feature store

In [110]:
from feast import FeatureStore
from feast.infra.offline_stores.file_source import SavedDatasetFileStorage

store = FeatureStore(repo_path='.')

entity_df = pd.read_parquet(path = 'data/target_df.parquet')

training_data = store.get_historical_features(
entity_df = entity_df,
    features = [
        "predictors_df_feature_view:Pregnancies",
        "predictors_df_feature_view:Glucose",
        "predictors_df_feature_view:BloodPressure",
        "predictors_df_feature_view:SkinThickness",
        "predictors_df_feature_view:Insulin",
        "predictors_df_feature_view:BMI",
        "predictors_df_feature_view:DiabetesPedigreeFunction",
        "predictors_df_feature_view:Age",
               ]
)

dataset = store.create_saved_dataset(
from_=training_data,
    name = "diabetes_dataset",
    storage = SavedDatasetFileStorage('data/diabetes_dataset1.parquet')
)



In [111]:
training_data.to_df().tail()

Unnamed: 0,Outcome,event_timestamp,patient_id,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age
763,0,2023-08-09 18:23:45.753531+00:00,763,10,101,76,48,180,32.9,0.171,63
764,0,2023-08-10 18:23:45.753531+00:00,764,2,122,70,27,0,36.8,0.34,27
765,0,2023-08-11 18:23:45.753531+00:00,765,5,121,72,23,112,26.2,0.245,30
766,1,2023-08-12 18:23:45.753531+00:00,766,1,126,60,0,0,30.1,0.349,47
767,0,2023-08-13 18:23:45.753531+00:00,767,1,93,70,31,0,30.4,0.315,23


### 6. Model Training

In [61]:
pwd

'C:\\Users\\Ashutosh Tripathi\\Documents\\projects\\feast\\Feast Live Demo\\feature_repo'

In [112]:
# Importing dependencies
from feast import FeatureStore
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from joblib import dump

# Getting our FeatureStore
store = FeatureStore(repo_path=".")

# Retrieving the saved dataset and converting it to a DataFrame
training_df = training_data.to_df() #store.get_saved_dataset(name="diabetes_dataset").to_df()

# Separating the features and labels
y = training_df['Outcome']
X = training_df.drop(
    labels=['Outcome', 'event_timestamp', "patient_id"], 
    axis=1)

# Splitting the dataset into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, 
                                                    y, 
                                                    stratify=y)

# Creating and training LogisticRegression
reg = LogisticRegression(max_iter = 200)
reg.fit(X=X_train[sorted(X_train)], y=y_train)

# Saving the model
dump(value=reg, filename="model.joblib")

['model.joblib']

### 8. Prepare online feature store
(Loading the features to online store)

There are two ways you can use to load features to your online store 
- materialize

materialize loads the latest features between two dates.

`feast materialize 2020–01–01T00:00:00 2022–01–01T00:00:00`

- materialize-incremental

materialize-incremental loads features up to the provided end date:

`feast materialize-incremental 2022–01–01T00:00:00`

In [113]:
# Importing dependencies
from feast import FeatureStore
from datetime import datetime, timedelta

# Getting our FeatureStore
store = FeatureStore(repo_path=".")

#store.materialize_incremental(end_date = datetime.now())

store.materialize(start_date=datetime.utcnow() - timedelta(days=530), end_date=datetime.utcnow() - timedelta(days=10))

Materializing [1m[32m2[0m feature views from [1m[32m2022-03-01 13:04:32+05:30[0m to [1m[32m2023-08-03 13:04:32+05:30[0m into the [1m[32msqlite[0m online store.

[1m[32mtarget_df_feature_view[0m:


100%|███████████████████████████████████████████████████████████| 520/520 [00:00<00:00, 8142.24it/s]


[1m[32mpredictors_df_feature_view[0m:


100%|███████████████████████████████████████████████████████████| 520/520 [00:00<00:00, 2050.87it/s]


#### sometime you get a strange looking output as "0it [00:00, ?it/s]".
for this please refer the 4min quick explanation about this line: https://www.youtube.com/watch?v=v5uhwaST4uo
        

### 9. Get online features for prediction

In [114]:
# Importing dependencies
from feast import FeatureStore
import pandas as pd
from joblib import load

# Getting our FeatureStore
store = FeatureStore(repo_path=".")

# Defining our features names
feast_features = [
        "predictors_df_feature_view:Pregnancies",
        "predictors_df_feature_view:Glucose",
        "predictors_df_feature_view:BloodPressure",
        "predictors_df_feature_view:SkinThickness",
        "predictors_df_feature_view:Insulin",
        "predictors_df_feature_view:BMI",
        "predictors_df_feature_view:DiabetesPedigreeFunction",
        "predictors_df_feature_view:Age",
    ]

# Getting the latest features
features = store.get_online_features(
    features=feast_features,    
    entity_rows=[{"patient_id": 767}, {"patient_id": 766}]
).to_dict()

# Converting the features to a DataFrame
features_df = pd.DataFrame.from_dict(data=features)



In [115]:
features_df.head()

Unnamed: 0,patient_id,SkinThickness,BMI,Insulin,BloodPressure,Pregnancies,DiabetesPedigreeFunction,Glucose,Age
0,767,31,30.4,0,70,1,0.315,93,23
1,766,0,30.1,0,60,1,0.349,126,47


### 10. Call the predict function and see the output

In [116]:
# Loading our model and doing inference
reg = load("model.joblib")
predictions = reg.predict(features_df[sorted(features_df.drop("patient_id", axis=1))])
print(predictions)
prediction_probabilities = reg.predict_proba(features_df[sorted(features_df.drop("patient_id", axis=1))])
print(prediction_probabilities)

[0 0]
[[0.94607327 0.05392673]
 [0.73135484 0.26864516]]


### Full Explanation of this notebook is available at: https://www.youtube.com/watch?v=iZ8R_EUf_pM


### References:

https://docs.feast.dev/getting-started/quickstart