# Snowflake and Feast Feature Store Demo

Feast abstracts data engineering work in building datasets for training machine learning models, and also in reproducing these datasets for serving machine learning models in production.

Feast decouples models from data infrastructure by providing a single data access layer that abstracts feature storage from feature retrieval. Feast also provides a consistent means of referencing feature data for retrieval, and therefore ensures that models remain portable when moving from training to serving.

### Feast documentation
https://docs.feast.dev/ 

### Core concepts
The top-level namespace within Feast is a project. 
- Users define one or more feature views within a project. 
- Each feature view contains one or more features. These features typically relate to one or more entities. 
- A feature view must always have a data source, which in turn is used during the generation of training datasets and when materializing feature values into the online store.

<img src="images\concepts.png" width=500/>

### Feast objects
| Feast object | Description |
|--------------|-------------|
| Data Source | The data source refers to raw underlying data (e.g. a table in BigQuery). Feast uses a time-series data model to represent data. This data model is used to interpret feature data in data sources in order to build training datasets or when materializing features into an online store.
| Dataset | Feast datasets allow for conveniently saving dataframes that include both features and entities to be subsequently used for data analysis and model training. Datasets are created from results of historical retrieval. <br><br> Dataset vs Feature View: Feature views contain the schema of data and a reference to where data can be found (through its data source). Datasets are the actual data manifestation of querying those data sources. <br><br> Dataset vs Data Source: Datasets are the output of historical retrieval, whereas data sources are the inputs. One or more data sources can be used in the creation of a dataset. 
| Entity | An entity is a collection of semantically related features. Users define entities to map to the domain of their use case.  E.g. account number, customer id, driver id etc. <br><br> Entities are typically defined as part of feature views. Entity name is used to reference the entity from a feature view definition and join key is used to identify the physical primary key on which feature values should be stored and retrieved.
| Feature | A feature is an individual measurable property. It is typically a property observed on a specific entity, but does not have to be associated with an entity. For example, a feature of a customer entity could be the number of transactions they have made on an average month, while a feature that is not observed on a specific entity could be the total number of posts made by all users in the last month. <br><br> Features are defined as part of feature views. Since Feast does not transform data, a feature is essentially a schema that only contains a name and a type.
| Feature View | A feature view is an object that represents a logical group of time-series feature data as it is found in a data source. Feature views consist of zero or more entities, one or more features, and a data source. Feature views allow Feast to model your existing feature data in a consistent way in both an offline (training) and online (serving) environment.
| Stream feature view | A stream feature view is an extension of a normal feature view. The primary difference is that stream feature views have both stream and batch data sources, whereas a normal feature view only has a batch data source.
| Feature Services | A feature service is an object that represents a logical group of features from one or more feature views. Feature Services allows features from within a feature view to be used as needed by an ML model. Users can expect to create one feature service per model version, allowing for tracking of the features used by models.
| Dataset | Feast datasets allow for conveniently saving dataframes that include both features and entities to be subsequently used for data analysis and model training. Datasets are created from results of historical retrieval.
| Registry | The Feast registry is where all applied Feast objects (e.g. Feature views, entities, etc) are stored. The registry exposes methods to apply, list, retrieve and delete these objects. The registry is abstraction, with multiple possible implementations. <br><br>The registry can be either filed-based or SQL based. |

### Feast architecture
<img src="images/feast_architecture2.jpeg" width=800/>
<br><br>
<img src="images/feast_architecture.png" width=500/>

## Installing Feast and initialising a feature store

        pip install 'feast[snowflake]'
        feast init <project-name> -t snowflake

## Define entities, data sources, feature views

**Entity**: Primary key such as an account number, customer id etc. <br>
**Data source**: Underlying data source from where features are retrieved. <br>
**Feature view**: Grouping of features aligned to an online or offline store.


In [1]:
from datetime import datetime, timedelta
import yaml
from feast import FeatureStore, Entity, FeatureService, FeatureView, Field, SnowflakeSource
from feast.types import Float32, Int64
from feast.infra.offline_stores.snowflake_source import SavedDatasetSnowflakeStorage
from feast.infra.offline_stores.file_source import SavedDatasetFileStorage
from feast import FileSource
from feast.data_format import ParquetFormat
import pandas as pd

In [20]:
!feast teardown

In [21]:
# Load the feature store from the current path
fs = FeatureStore(repo_path=".")

# Create entity
driver = Entity(
    name="driver",
    join_keys=["driver_id"],
)

driver_stats_source = FileSource(
    file_format=ParquetFormat(),
    path="data/driver_stats.parquet",    
    timestamp_field="event_timestamp",
    created_timestamp_column="created",
)

driver_stats_fv = FeatureView(
    name="driver_hourly_stats",
    entities=[driver],
    ttl=timedelta(weeks=52),
    schema=[
        Field(name="conv_rate", dtype=Float32),
        Field(name="acc_rate", dtype=Float32),
        Field(name="avg_daily_trips", dtype=Int64),
    ],
    batch_source=driver_stats_source,
    online=True
)

driver_stats_fs = FeatureService(name="driver_activity", features=[driver_stats_fv])

# Deploy the feature store
print("Deploying feature store...")
fs.apply([driver, driver_stats_fv, driver_stats_fs])

Deploying feature store...




## Create a streaming source

In [None]:
#todo

## Retrieving data from the feature store

We can retrieve features across numerous feature views:

In [2]:
# Load the feature store from the current path
fs = FeatureStore(repo_path=".")

# Define feature views and features to retrieve
features = ["driver_hourly_stats:conv_rate", 
            "driver_hourly_stats:acc_rate",
            "driver_hourly_stats:avg_daily_trips"]

In [10]:
# Create an entity dataframe. This is the dataframe that will be enriched with historical features
entity_df = pd.DataFrame(
    {
        "event_timestamp": [
            pd.Timestamp(dt, unit="ms", tz="UTC").round("ms")
            for dt in pd.date_range(
                start=datetime.now() - timedelta(days=3),
                end=datetime.now(),
                periods=3,
            )
        ],
        "driver_id": [1001, 1002, 1003],
    }
)

print(entity_df.head())

                   event_timestamp  driver_id
0 2022-07-29 13:56:53.219000+00:00       1001
1 2022-07-31 01:56:53.219000+00:00       1002
2 2022-08-01 13:56:53.219000+00:00       1003


In [24]:
# Retrieve historical features by joining the entity dataframe to the Snowflake table source
training_df = fs.get_historical_features(
    features=features, entity_df=entity_df
).to_df()

print(training_df.head())


                   event_timestamp  driver_id  conv_rate  acc_rate  \
0 2022-07-29 11:58:25.895000+00:00       1001   0.138443  0.845902   
1 2022-07-30 23:58:25.895000+00:00       1002   0.037272  0.990824   
2 2022-08-01 11:58:25.895000+00:00       1003   0.483464  0.183020   

   avg_daily_trips  
0              275  
1              655  
2              705  


## Feast UI

In [9]:
!feast ui -h localhost -p 9090

[32mINFO[0m:     Started server process [[36m36369[0m]
08/01/2022 11:51:17 AM uvicorn.error INFO: Started server process [36369]
[32mINFO[0m:     Waiting for application startup.
08/01/2022 11:51:17 AM uvicorn.error INFO: Waiting for application startup.
[32mINFO[0m:     Application startup complete.
08/01/2022 11:51:17 AM uvicorn.error INFO: Application startup complete.
[32mINFO[0m:     Uvicorn running on [1mhttp://localhost:9090[0m (Press CTRL+C to quit)
08/01/2022 11:51:17 AM uvicorn.error INFO: Uvicorn running on http://localhost:9090 (Press CTRL+C to quit)
[32mINFO[0m:     ::1:53807 - "[1mGET / HTTP/1.1[0m" [33m304 Not Modified[0m
[32mINFO[0m:     ::1:53807 - "[1mGET /static/js/main.3d0f2d20.js HTTP/1.1[0m" [32m200 OK[0m
[32mINFO[0m:     ::1:53808 - "[1mGET /static/css/main.4b6d8029.css HTTP/1.1[0m" [32m200 OK[0m
[32mINFO[0m:     ::1:53807 - "[1mGET /static/media/feast-icon-grey.e5aa0a305d8efd775c4b.svg HTTP/1.1[0m" [33m304 Not Modified[0m
[32

## Save data into Snowflake
For example, say we want to save a copy of a training dataset that we have used to build a model.

<font color='red'> WARNING: Creating a saved dataset appears to break the Feast UI for some reason. </font>

In [9]:
# Load the feature store from the current path
fs = FeatureStore(repo_path=".")

# Define feature views and features to retrieve
features = ["driver_hourly_stats:conv_rate", 
            "driver_hourly_stats:acc_rate",
            "driver_hourly_stats:avg_daily_trips"]

In [11]:
df = fs.get_historical_features(features=features, entity_df=entity_df)

dataset = fs.create_saved_dataset(
    from_=df,
    name='driver_training_dataset',
    storage=SavedDatasetFileStorage('data/my_training_dataset'),
    tags={'author': 'geoff'}
)



In [12]:
df.to_df().head()

Unnamed: 0,event_timestamp,driver_id,conv_rate,acc_rate,avg_daily_trips
0,2022-07-29 13:56:53.219000+00:00,1001,0.838634,0.54903,218
1,2022-07-31 01:56:53.219000+00:00,1002,0.681086,0.069958,956
2,2022-08-01 13:56:53.219000+00:00,1003,0.483464,0.18302,705


## Data validation
https://docs.feast.dev/tutorials/validating-historical-features 

In [13]:
from great_expectations.dataset import Dataset
from great_expectations.core.expectation_suite import ExpectationSuite
from feast.dqm.profilers.ge_profiler import ge_profiler

In [14]:
# Create an entity dataframe. This is the dataframe that will be enriched with historical features
entity_df = pd.DataFrame(
    {
        "event_timestamp": [
            pd.Timestamp(dt, unit="ms", tz="UTC").round("ms")
            for dt in pd.date_range(
                start=datetime.now() - timedelta(days=3),
                end=datetime.now(),
                periods=3,
            )
        ],
        "driver_id": [1001, 1002, 1003],
    }
)

print(entity_df.head())

                   event_timestamp  driver_id
0 2022-07-29 13:57:49.692000+00:00       1001
1 2022-07-31 01:57:49.692000+00:00       1002
2 2022-08-01 13:57:49.692000+00:00       1003


In [22]:
@ge_profiler
def manual_profiler(ds):
    ds.expect_column_values_to_be_between(
        "conv_rate",
        min_value=0,
        max_value=0.85,
        mostly=0.99  # allow some outliers
    )

    ds.expect_column_values_to_be_between(
        "acc_rate",
        min_value=0,
        max_value=1.0,
        mostly=0.99  # allow some outliers
    )

    ds.expect_column_values_to_be_between(
        "avg_daily_trips",
        min_value=0.0,
        max_value=955.0,
        mostly=0.99  # allow some outliers
    )

    return ds.get_expectation_suite()

In [23]:
ds = fs.get_saved_dataset('driver_training_dataset')
ds.to_df().head()



Unnamed: 0,event_timestamp,acc_rate,avg_daily_trips,driver_id,conv_rate
0,2022-07-29 13:56:53.219000+00:00,0.54903,218,1001,0.838634
1,2022-07-31 01:56:53.219000+00:00,0.069958,956,1002,0.681086
2,2022-08-01 13:56:53.219000+00:00,0.18302,705,1003,0.483464


In [24]:
ds.get_profile(profiler=manual_profiler)

<GEProfile with expectations: [
  {
    "meta": {},
    "kwargs": {
      "column": "conv_rate",
      "min_value": 0,
      "max_value": 0.85,
      "mostly": 0.99
    },
    "expectation_type": "expect_column_values_to_be_between"
  },
  {
    "meta": {},
    "kwargs": {
      "column": "acc_rate",
      "min_value": 0,
      "max_value": 1.0,
      "mostly": 0.99
    },
    "expectation_type": "expect_column_values_to_be_between"
  }
]>

In [25]:
validation_reference = ds.as_reference(profiler=manual_profiler, name='profiler_test')

In [26]:
job = fs.get_historical_features(features=features, entity_df=entity_df)
job.to_df(
    validation_reference=fs
        .get_saved_dataset("driver_training_dataset")
        .as_reference(profiler=manual_profiler, name="profiler_test")
)



Unnamed: 0,event_timestamp,driver_id,conv_rate,acc_rate,avg_daily_trips
0,2022-07-29 13:57:49.692000+00:00,1001,0.838634,0.54903,218
1,2022-07-31 01:57:49.692000+00:00,1002,0.681086,0.069958,956
2,2022-08-01 13:57:49.692000+00:00,1003,0.483464,0.18302,705


In [27]:
_ = job.to_df(validation_reference=validation_reference)



## Load data into the online feature store
Materlialising the latest data for entities into the online feature store.

In [24]:
fs.materialize_incremental(end_date=datetime.now())

Materializing [1m[32m3[0m feature views to [1m[32m2022-07-28 10:36:20+01:00[0m into the [1m[32mdynamodb[0m online store.

[1m[32mdriver_hourly_stats[0m from [1m[32m2021-07-29 09:36:20+01:00[0m to [1m[32m2022-07-28 10:36:20+01:00[0m:


100%|█████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 31.71it/s]


[1m[32mdriver_avg_daily_trips[0m from [1m[32m2021-07-29 09:36:21+01:00[0m to [1m[32m2022-07-28 11:36:20+01:00[0m:


100%|████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 130.27it/s]


[1m[32mdriver_orders[0m from [1m[32m2021-07-29 09:36:22+01:00[0m to [1m[32m2022-07-28 11:36:20+01:00[0m:


0it [00:00, ?it/s]


## Get data from online feature store

We use get_online_features() to retrieve data from the online feature store.

In [25]:
online_features = fs.get_online_features(
    features=features, entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}],
).to_df()

online_features

Unnamed: 0,driver_id,acc_rate,conv_rate,avg_daily_trips,trip_completed
0,1001,0.466293,0.913352,975,
1,1002,0.374785,0.025624,552,


## Feast CLI

In [27]:
!feast --help

Usage: feast [OPTIONS] COMMAND [ARGS]...

  Feast CLI

  For more information, see our public docs at https://docs.feast.dev/

  For any questions, you can reach us at https://slack.feast.dev/

Options:
  -c, --chdir TEXT  Switch to a different feature repository directory before
                    executing the given subcommand.
                    CRITICAL (case-insensitive).
  --help            Show this message and exit.

Commands:
  alpha                    Access alpha features
  apply                    Create or update a feature store deployment
  data-sources             Access data sources
  endpoint                 Display feature server endpoints
  entities                 Access entities
  feature-services         Access feature services
  feature-views            Access feature views
  init                     Create a new Feast repository
  materialize              Run a (non-incremental) materialization job to...
  materialize-incremental  Run an incremental materializ

In [112]:
!feast feature-views list

NAME                    ENTITIES    TYPE
driver_hourly_stats     {'driver'}  FeatureView
driver_avg_daily_trips  {'driver'}  FeatureView
driver_orders           {'driver'}  FeatureView
^C
Exception ignored in: <module 'threading' from '/Users/geoffrey.nightingale@contino.io/opt/anaconda3/lib/python3.9/threading.py'>
Traceback (most recent call last):
  File "/Users/geoffrey.nightingale@contino.io/opt/anaconda3/lib/python3.9/threading.py", line 1440, in _shutdown
    atexit_call()
  File "/Users/geoffrey.nightingale@contino.io/opt/anaconda3/lib/python3.9/concurrent/futures/thread.py", line 31, in _python_exit
    t.join()
  File "/Users/geoffrey.nightingale@contino.io/opt/anaconda3/lib/python3.9/threading.py", line 1053, in join
    self._wait_for_tstate_lock()
  File "/Users/geoffrey.nightingale@contino.io/opt/anaconda3/lib/python3.9/threading.py", line 1073, in _wait_for_tstate_lock
    if lock.acquire(block, timeout):
KeyboardInterrupt: 


In [113]:
!feast entities list

NAME    DESCRIPTION    TYPE
driver                 ValueType.UNKNOWN
^C
Exception ignored in: <module 'threading' from '/Users/geoffrey.nightingale@contino.io/opt/anaconda3/lib/python3.9/threading.py'>
Traceback (most recent call last):
  File "/Users/geoffrey.nightingale@contino.io/opt/anaconda3/lib/python3.9/threading.py", line 1440, in _shutdown
    atexit_call()
  File "/Users/geoffrey.nightingale@contino.io/opt/anaconda3/lib/python3.9/concurrent/futures/thread.py", line 31, in _python_exit
    t.join()
  File "/Users/geoffrey.nightingale@contino.io/opt/anaconda3/lib/python3.9/threading.py", line 1053, in join
    self._wait_for_tstate_lock()
  File "/Users/geoffrey.nightingale@contino.io/opt/anaconda3/lib/python3.9/threading.py", line 1073, in _wait_for_tstate_lock
    if lock.acquire(block, timeout):
KeyboardInterrupt: 


In [32]:
!feast registry-dump

{
  "dataSources": [
    {
      "name": "DRIVER_ORDERS",
      "snowflakeOptions": {
        "database": "FEAST_TEST",
        "schema": "PUBLIC",
        "table": "DRIVER_ORDERS",
        "warehouse": "COMPUTE_WH"
      },
      "timestampField": "EVENT_TIMESTAMP",
      "type": "BATCH_SNOWFLAKE"
    },
    {
      "name": "driver_orders",
      "snowflakeOptions": {
        "database": "FEAST_TEST",
        "schema": "PUBLIC",
        "table": "driver_orders",
        "warehouse": "COMPUTE_WH"
      },
      "timestampField": "event_timestamp",
      "type": "BATCH_SNOWFLAKE"
    },
    {
      "createdTimestampColumn": "created",
      "name": "feast_test_feast_driver_hourly_stats",
      "snowflakeOptions": {
        "database": "FEAST_TEST",
        "schema": "PUBLIC",
        "table": "feast_test_feast_driver_hourly_stats",
        "warehouse": "COMPUTE_WH"
      },
      "timestampField": "event_timestamp",
      "type": "BATCH_SNOWFLAKE"
    }
  ],
  "entities": [
    {
      