# Feature store


Дословно - «магазин» фичей - удобный интерфейс для взаимодействия data-engineer и data-science процессов

Более подробно - Feature Store представляет собой хранилище данных для обучения и инференса ML-моделей и совокупность процессов по:
- хранению и версионированию признаков
- заведению новых признаков и их подробного описания
- обогащению экспертизы по feature-engineering для построения ml-моделей
- (опционально) регулярный пересчет признаков

Feature store помогает бизнесу эффективнее делиться экспертизой в создании признаков для обучения моделей между командами и бизнес-доменами, а также помогает data - отделам эффективнее мониторить расчет признаков, сдвиги распределений данных и другие свойства датасетов, влияющие на финальную точность алгоритмов машинного обучения.

![title](./images/feature_store_overview.png)

### Feature Store в компаниях

Большинство компаний формируют архитектуру Feature Store детально под свои нужды и единого бенчмарка Feature Store не существует, но ниже приведу несколько примеров архитектур Feature store в разных крупных компаниях

#### Spotify

![title](./images/spotify_feature_store.png)

#### Twitter

![title](./images/twitter_feature_store.png)

#### Linkedin

![title](./images/linkedin_feature_store.png)

Как можно заметить, внутри компаний инфраструктуры Feature Store довольно сильно отличаются

В первую очередь это связано с устройством и наличием специфичных in-house инструментов внутри крупных компаний, а также с уникальностью домена данных, Feature Store для обработки звуковых данных может сильно отличаться от Feature Store для хранения данных о банковских транзакциях.

Стоит отметить и общие моменты, например для realtime - данных зачастую используется уже зарекомендовавший себя брокер обмена сообщениями - Kafka, но более важная деталь, характеризующая Feature Store - **глобально Feature Store - это про подход работы с данными**, который с технической точки зрения может включать в себя не только Python SDK для обращения к признакам, но и инструменты для сбора данных, хранения, расчета и методологии версионирования.

Также, важно понимать, что за действительно рабочим Feature Store стоит также и работа над бизнес - процессами, связанными с хранением и созданием признаков, выделением ресурсов на аккуратное их заведение в регистр признаков и донесение информации о них другим ds - командам в компании.

Без налаженных процессов построенный Feauture Store быстро устареет и перестанет использоваться, может превратиться в дополнительную обузу на онбординге для новых разработчиков и Data Scientist'ов

### [Feast](https://github.com/feast-dev/feast)

На примере архитектуры Feature Store в компании X (быв. Twitter) можно заметить использование open-source решение для создания Feature Store - [Feast](https://feast.dev/)

Feast представляет собой интерфейс Feature store в виде Python SDK и коннекторов к различным источникам данных, как стриминговых (Kafka, Kinesis), так и к батчевым (S3, Parquet, Snowflake, BigQuery)

![title](./images/feast_overview.webp)

#### Компоненты Feast

##### Registry – Главная директория Feature store, содержащая все метаданные признакового поля

##### Store – Делится на offline/online, online не содержит истории, только самый актуальный срез признакового описания объектов

##### Serve – Интерфейс (Python SDK) для взаимодействия с данными (их выгрузки и регистрации)

![title](./images/feast_architecture.webp)

- Create Batch Features: ELT/ETL системы для преобразования данных
- Create Stream Features: потоковые сервисы поставки данных (Kafka/Kinesis)
- Feast Apply: обновление инфраструктуры feature store согласно указанным конфигурационным файлам feast
- Feast Materialize: выгрузка данных в онлайн хранилище
- Model Training: пайплайн для получения данных перед обучением модели
- Get Historical Features: экспорт истории по признакам для подачи данных в модели
- Deploy Model: запуск модели (не покрывается feast'ом)
- Prediction: получение предсказаний модели
- Get Online Features: получение потоковых данных для обучнеия модели

Feast является наиболее популярным open-source решением для внедрения Feature Store, однако при его внедрении стоит помнить о нескольких ограничениях:

#### Feast НЕ является

- ELT/ETL система, Feast не поддерживает пайплайны для обработки данных, их лучше выносить в отдельные приложения
- Оркестратор, для этого лучше выбрать Apache Airflow / Apache Nifi
- База данных, инструмент предназначен для обработки данных из БД, а не хранения их внутри себя

### Feast пример использования

#### Инициализация проекта

In [6]:
!rm -r test_project; mkdir test_project

In [5]:
!pip install feast

In [7]:
!feast init test_project
!cd test_project/feature_repo


Creating a new Feast repository in [1m[32m/Users/alexeytarasov/temp/feature_store/test_project[0m.



Посмотрим, что содержится в папке

In [14]:
!ls ./test_project/feature_repo/*

./test_project/feature_repo/__init__.py
./test_project/feature_repo/example_repo.py
./test_project/feature_repo/feature_store.yaml
./test_project/feature_repo/test_workflow.py

./test_project/feature_repo/__pycache__:
__init__.cpython-39.pyc      test_workflow.cpython-39.pyc
example_repo.cpython-39.pyc

./test_project/feature_repo/data:
driver_stats.parquet


- example_repo.py содержит определение признаков
- feature_store.yaml содержит настройки для инициализации нашего feature store
- test_workflow.py пример для запуска команд по выгрузке признаков из feature store


#### Содержание конфигурационных файлов для создания feature store с помощью Feast

`feature_store.yaml`

```
project: my_project
# By default, the registry is a file (but can be turned into a more scalable SQL-backed registry)
registry: data/registry.db
# The provider primarily specifies default offline / online stores & storing the registry in a given cloud
provider: local
online_store:
  type: sqlite
  path: data/online_store.db
entity_key_serialization_version: 2
```

`example_repo.py`

In [None]:
# This is an example feature definition file

from datetime import timedelta

import pandas as pd

from feast import (
    Entity,
    FeatureService,
    FeatureView,
    Field,
    FileSource,
    PushSource,
    RequestSource,
)
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Float64, Int64

# Define an entity for the driver. You can think of entity as a primary key used to
# fetch features.
driver = Entity(name="driver", join_keys=["driver_id"])

# Read data from parquet files. Parquet is convenient for local development mode. For
# production, you can use your favorite DWH, such as BigQuery. See Feast documentation
# for more info.
driver_stats_source = FileSource(
    name="driver_hourly_stats_source",
    path="%PARQUET_PATH%",
    timestamp_field="event_timestamp",
    created_timestamp_column="created",
)

# Our parquet files contain sample data that includes a driver_id column, timestamps and
# three feature column. Here we define a Feature View that will allow us to serve this
# data to our model online.
driver_stats_fv = FeatureView(
    # The unique name of this feature view. Two feature views in a single
    # project cannot have the same name
    name="driver_hourly_stats",
    entities=[driver],
    ttl=timedelta(days=1),
    # The list of features defined below act as a schema to both define features
    # for both materialization of features into a store, and are used as references
    # during retrieval for building a training dataset or serving features
    schema=[
        Field(name="conv_rate", dtype=Float32),
        Field(name="acc_rate", dtype=Float32),
        Field(name="avg_daily_trips", dtype=Int64),
    ],
    online=True,
    source=driver_stats_source,
    # Tags are user defined key/value pairs that are attached to each
    # feature view
    tags={"team": "driver_performance"},
)

# Defines a way to push data (to be available offline, online or both) into Feast.
driver_stats_push_source = PushSource(
    name="driver_stats_push_source",
    batch_source=driver_stats_source,
)

# Define a request data source which encodes features / information only
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestSource(
    name="vals_to_add",
    schema=[
        Field(name="val_to_add", dtype=Int64),
        Field(name="val_to_add_2", dtype=Int64),
    ],
)


# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
    sources=[driver_stats_fv, input_request],
    schema=[
        Field(name="conv_rate_plus_val1", dtype=Float64),
        Field(name="conv_rate_plus_val2", dtype=Float64),
    ],
)
def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
    df = pd.DataFrame()
    df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
    df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
    return df


# This groups features into a model version
driver_activity_v1 = FeatureService(
    name="driver_activity_v1",
    features=[
        driver_stats_fv[["conv_rate"]],  # Sub-selects a feature from a feature view
        transformed_conv_rate,  # Selects all features from the feature view
    ],
)
driver_activity_v2 = FeatureService(
    name="driver_activity_v2", features=[driver_stats_fv, transformed_conv_rate]
)

#### Запуск тестового workflow

Для запуска тестового процесса выгрузки признаков можно запустить:

`python test_workflow.py`

Ниже указан код `test_workflow.py`

In [None]:
import subprocess
from datetime import datetime

import pandas as pd

from feast import FeatureStore
from feast.data_source import PushMode


def run_demo():
    store = FeatureStore(repo_path=".")
    print("\n--- Run feast apply ---")
    subprocess.run(["feast", "apply"])

    print("\n--- Historical features for training ---")
    fetch_historical_features_entity_df(store, for_batch_scoring=False)

    print("\n--- Historical features for batch scoring ---")
    fetch_historical_features_entity_df(store, for_batch_scoring=True)

    print("\n--- Load features into online store ---")
    store.materialize_incremental(end_date=datetime.now())

    print("\n--- Online features ---")
    fetch_online_features(store)

    print("\n--- Online features retrieved (instead) through a feature service---")
    fetch_online_features(store, source="feature_service")

    print(
        "\n--- Online features retrieved (using feature service v3, which uses a feature view with a push source---"
    )
    fetch_online_features(store, source="push")

    print("\n--- Simulate a stream event ingestion of the hourly stats df ---")
    event_df = pd.DataFrame.from_dict(
        {
            "driver_id": [1001],
            "event_timestamp": [
                datetime.now(),
            ],
            "created": [
                datetime.now(),
            ],
            "conv_rate": [1.0],
            "acc_rate": [1.0],
            "avg_daily_trips": [1000],
        }
    )
    print(event_df)
    store.push("driver_stats_push_source", event_df, to=PushMode.ONLINE_AND_OFFLINE)

    print("\n--- Online features again with updated values from a stream push---")
    fetch_online_features(store, source="push")

    print("\n--- Run feast teardown ---")
    subprocess.run(["feast", "teardown"])


def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring: bool):
    # Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve
    # for all entities in the offline store instead
    entity_df = pd.DataFrame.from_dict(
        {
            # entity's join key -> entity values
            "driver_id": [1001, 1002, 1003],
            # "event_timestamp" (reserved key) -> timestamps
            "event_timestamp": [
                datetime(2021, 4, 12, 10, 59, 42),
                datetime(2021, 4, 12, 8, 12, 10),
                datetime(2021, 4, 12, 16, 40, 26),
            ],
            # (optional) label name -> label values. Feast does not process these
            "label_driver_reported_satisfaction": [1, 5, 3],
            # values we're using for an on-demand transformation
            "val_to_add": [1, 2, 3],
            "val_to_add_2": [10, 20, 30],
        }
    )
    # For batch scoring, we want the latest timestamps
    if for_batch_scoring:
        entity_df["event_timestamp"] = pd.to_datetime("now", utc=True)

    training_df = store.get_historical_features(
        entity_df=entity_df,
        features=[
            "driver_hourly_stats:conv_rate",
            "driver_hourly_stats:acc_rate",
            "driver_hourly_stats:avg_daily_trips",
            "transformed_conv_rate:conv_rate_plus_val1",
            "transformed_conv_rate:conv_rate_plus_val2",
        ],
    ).to_df()
    print(training_df.head())


def fetch_online_features(store, source: str = ""):
    entity_rows = [
        # {join_key: entity_value}
        {
            "driver_id": 1001,
            "val_to_add": 1000,
            "val_to_add_2": 2000,
        },
        {
            "driver_id": 1002,
            "val_to_add": 1001,
            "val_to_add_2": 2002,
        },
    ]
    if source == "feature_service":
        features_to_fetch = store.get_feature_service("driver_activity_v1")
    elif source == "push":
        features_to_fetch = store.get_feature_service("driver_activity_v3")
    else:
        features_to_fetch = [
            "driver_hourly_stats:acc_rate",
            "transformed_conv_rate:conv_rate_plus_val1",
            "transformed_conv_rate:conv_rate_plus_val2",
        ]
    returned_features = store.get_online_features(
        features=features_to_fetch,
        entity_rows=entity_rows,
    ).to_dict()
    for key, value in sorted(returned_features.items()):
        print(key, " : ", value)


if __name__ == "__main__":
    run_demo()

### Масштабирование Feast

#### Registry
Для использования Feast в production - среде доступны следующие варианты ведения registry:
- S3
- GCP
- Snowflake
- Postgres
- MySQL

Для SQL - based форматов ведения registry будет необходипо прописать подключение в `feature_store.yaml`, например - вот так:

```
project: <your project name>
provider: <provider name>
online_store: redis
offline_store: file
registry:
    registry_type: sql
    path: postgresql://postgres:mysecretpassword@127.0.0.1:55001/feast
    cache_ttl_seconds: 60
```

#### Offline store

Доступные варианты для offline-store компоненты:

- File
- Snowflake
- BigQuery
- Redshift
- Spark*
- Postgres*
- Trino*


#### Online store

То же самое для online-store

- Sqlite
- Redis
- DynamoDB
- Snowflake
- Postgres*
- Hbase*
- Cassandra*

\* - поддерживается feast - commiunity, не гарантируется стабильность работы