# 👩🏽‍💻🛒 Vertex AI Feature Store 

O Feature Store da Vertex AI oferece um repositório centralizado para organizar, armazenar e exibir recursos de ML.

Um `featurestore` central permite que as organizações compartilhem, descubram e utilizem novamente atributos de ML com eficiência, o que pode aumentar a velocidade de desenvolvimento e implantação de novos aplicativos de ML.

Neste laboratório, vamos entender os principios fundamentais de um Feature Store e entender de forma prática como podemos utilizar dessa estrutura para gerenciar features de modelos de Machine Learning.

Disponível em: https://cloud.google.com/vertex-ai/docs/featurestore

In [None]:
# Importando pacotes
import numpy as np
import pandas as pd
import random 
import pickle

from sklearn.datasets import load_iris
import sklearn
from google.cloud import aiplatform
from datetime import datetime, timedelta

In [None]:
# Definindo variáveis globais
PROJECT = "garrido-ml-demos"
LOCATION = "us-central1"
FEATURESTORE_NAME = "iris_fs"
ENTITY_TYPE = "id"

## 🌺 Carregando e entendendo o conjunto de dados: Iris Dataset
O Iris Dataset contém quatro características (comprimento e largura das sépalas e pétalas) de 50 amostras de três espécies de Iris (Iris setosa, Iris virginica e Iris versicolor). Essas medidas foram utilizadas para criar um modelo linear discriminante para classificar as espécies. O conjunto de dados é freqüentemente utilizado na mineração de dados, exemplos de classificação e agrupamento e para testar algoritmos.

Neste laboratório, trabalharemos com a biblioteca `scikit-learn` para realizar a ingestão do dataset e construiremos algumas amostras, introduzindo uma certa quantidade de ruído, para simular pequenas diferenças históricas nos nossos dados (de forma a representar um pouco melhor um cenário real de armazenamento de features).

In [None]:
# Carregando o Iris Datasets 
iris = load_iris()

# Construindo Dataframe
df = pd.DataFrame(data= np.c_[iris['data'], iris['target']],
                 columns= iris['feature_names'] + ['target'])

df['species'] = pd.Categorical.from_codes(iris.target, iris.target_names)
columns = ['id', 'sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species', 'targets']

# Adicionando id das flores e timestamp
df.reset_index(inplace=True)
df.columns = columns

df['id'] = df['id'].apply(lambda x: f'flower_{x}')
df['timestamp'] = pd.Timestamp(datetime.today())

In [None]:
# Gerando amostras de features
samples = 50

# Instanciando uma cópia do dataframe
stage = df.copy()

for day in range(samples):
    # Gerando cópias para introdução de ruído nos dados
    sample = df.copy()
    
    # Definindo marcas de timestamp retrógradas para cada amostra
    sample['timestamp'] = sample['timestamp'].apply(lambda x: x - timedelta(days=1))
    
    # Adicionando ruído às features
    sample[columns[1:-2]] += random.uniform(0, .5)
    
    # Concatenando as amostras conforme são geradas
    df = pd.concat([stage, sample]) 

In [None]:
# Visualizando resultado do incremento de amostras para um id específico
df.loc[df['id'] == 'flower_0'].head(15)

Unnamed: 0,id,sepal_length,sepal_width,petal_length,petal_width,species,targets,timestamp
0,flower_0,5.1,3.5,1.4,0.2,0.0,setosa,2022-06-25 17:09:41.443744
0,flower_0,5.3115,3.7115,1.6115,0.4115,0.0,setosa,2022-06-24 17:09:41.443744
0,flower_0,5.783277,4.183277,2.083277,0.883277,0.0,setosa,2022-06-23 17:09:41.443744
0,flower_0,6.227935,4.627935,2.527935,1.327935,0.0,setosa,2022-06-22 17:09:41.443744
0,flower_0,6.248496,4.648496,2.548496,1.348496,0.0,setosa,2022-06-21 17:09:41.443744
0,flower_0,6.360848,4.760848,2.660848,1.460848,0.0,setosa,2022-06-20 17:09:41.443744
0,flower_0,6.481017,4.881017,2.781017,1.581017,0.0,setosa,2022-06-19 17:09:41.443744
0,flower_0,6.578152,4.978152,2.878152,1.678152,0.0,setosa,2022-06-18 17:09:41.443744
0,flower_0,6.762937,5.162937,3.062937,1.862937,0.0,setosa,2022-06-17 17:09:41.443744
0,flower_0,7.024087,5.424087,3.324087,2.124087,0.0,setosa,2022-06-16 17:09:41.443744


## 🧱 Inicializando um Feature Store

Uma vez definido o nosso conjunto de dados histórico, é possível inicializar um novo Feature Store de forma bem simples, utilizando o SDK da Vertex AI.

O Feature Store provisionará uma infraestrutura distribuída (baseada em quantidades de nós de computação), escalável e altamente disponível para que tenhamos as nossas features disponíveis para recuperação de `baixa latência` e também de forma `histórica`, sendo estas opções definidas por dois canais, chamados `online` e `offline` Feature Store, respectivamente.

In [None]:
# Iniciando Vertex AI
aiplatform.init(project=PROJECT, location=LOCATION)

In [None]:
# Criando o Feature Store
fs = aiplatform.Featurestore.create(
        featurestore_id=FEATURESTORE_NAME,
        online_store_fixed_node_count=1,
        sync=True
    )

Creating Featurestore
Create Featurestore backing LRO: projects/348385944272/locations/us-central1/featurestores/iris_fs/operations/1038639984002727936
Featurestore created. Resource name: projects/348385944272/locations/us-central1/featurestores/iris_fs
To use this Featurestore in another session:
featurestore = aiplatform.Featurestore('projects/348385944272/locations/us-central1/featurestores/iris_fs')


## 👷🏽‍♀️ Realizando a ingestão de features para o BigQuery

A partir do Iris dataset em memória (i.e., o nosso `Pandas Dataframe`), podemos gerar um arquivo `Parquet` de forma bem simples para persistir os dados de forma binária no disco. A partir disso, será possível realizar um processo de ingestão em batch para o `BigQuery`, um processo *sem custo* que disponibilizará os nossos dados em uma estrutura serverless, de armazenamento colunar, com baixo custo e orientada à recuperação e processamento massivo de dados. 

O Vertex AI Feature Store possui mecanismos de ingestão de features em batch a partir de arquivos binários no Google Cloud Storage, tabelas no BigQuery e Pandas Dataframes. 

In [None]:
%%bash
# Vamos começar criando uma camada lógica de dados no BigQuery, o nosso dataset
{
    bq --location=us-central1 mk -d iris
} || { # catch
    echo "Dataset já existente"
}

BigQuery error in mk operation: Dataset 'garrido-ml-demos:iris' already exists.
Dataset já existente


In [None]:
%%bigquery 
# Em seguida, vamos garantir que a nossa tabela será recriada do zero
DROP TABLE IF EXISTS iris.iris

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 969.56query/s] 


In [None]:
%%bigquery
# Vamos definir a nossa tabela, utilizando o statement DDL do BigQuery
CREATE OR REPLACE TABLE iris.iris(
    id STRING,
    petal_length FLOAT64,
    sepal_length FLOAT64,
    petal_width FLOAT64,
    sepal_width FLOAT64,
    species FLOAT64,
    targets STRING,
    timestamp TIMESTAMP
)

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 843.25query/s] 


In [None]:
# Extraindo os dados em Parquet para realizar ingestão
df.to_parquet('features.parquet.gzip',
              compression='gzip',
             index=False)

In [None]:
%%bash
bq load \
    --source_format=PARQUET \
    iris.iris \
    features.parquet.gzip

Upload complete.
Waiting on bqjob_r3ba7f803b2835ac9_000001819bd7e764_1 ... (1s) Current status: DONE   


In [None]:
%%bigquery 
SELECT * FROM iris.iris

Query complete after 0.00s: 100%|██████████| 2/2 [00:00<00:00, 1013.85query/s]                        
Downloading: 100%|██████████| 7650/7650 [00:01<00:00, 6773.32rows/s]


Unnamed: 0,id,petal_length,sepal_length,petal_width,sepal_width,species,targets,timestamp
0,flower_23,1.700000,5.100000,0.500000,3.300000,0.0,setosa,2022-06-25 17:09:41.443744+00:00
1,flower_23,5.662628,9.062628,4.462628,7.262628,0.0,setosa,2022-06-08 17:09:41.443744+00:00
2,flower_6,2.083277,5.283277,0.983277,4.083277,0.0,setosa,2022-06-23 17:09:41.443744+00:00
3,flower_17,2.083277,5.783277,0.983277,4.183277,0.0,setosa,2022-06-23 17:09:41.443744+00:00
4,flower_18,2.383277,6.383277,0.983277,4.483277,0.0,setosa,2022-06-23 17:09:41.443744+00:00
...,...,...,...,...,...,...,...,...
7645,flower_89,13.144353,14.644353,10.444353,11.644353,1.0,versicolor,2022-05-22 17:09:41.443744+00:00
7646,flower_94,13.344353,14.744353,10.444353,11.844353,1.0,versicolor,2022-05-22 17:09:41.443744+00:00
7647,flower_96,13.344353,14.844353,10.444353,12.044353,1.0,versicolor,2022-05-22 17:09:41.443744+00:00
7648,flower_97,13.444353,15.344353,10.444353,12.044353,1.0,versicolor,2022-05-22 17:09:41.443744+00:00


## 👷🏿 Definindo as features do Feature Store

Uma vez definido o repositório de staging de features no BigQuery, podemos executar a criação das `features` pela própria UI do Feature Store. 

As features representam informações que serão armazenada dentro da estrutura lógica de uma `entidade` (um identificador, tal como um id, um SKU, um nome, etc.), ou seja, para cada entidade, teremos n features, seguidas de um timestamp, que representa um carimbo do quão atual é aquele valor dentro do Feature Store:

```Entidade``` --> [`feature_1`, `feature_2`, `...`, `feature_n`] [`timestamp`]

Podemos nos basear em um exemplo bem simples, presente na documentação da Vertex Feature Store e que representa a associação de características de um filme (features) a uma entidade (o id do filme):

In [None]:
# Criando uma nova entidade (id)
aiplatform.EntityType.create(
        entity_type_id=ENTITY_TYPE, featurestore_name=FEATURESTORE_NAME
    )

Creating EntityType
Create EntityType backing LRO: projects/348385944272/locations/us-central1/featurestores/iris_fs/entityTypes/id/operations/3017972020232060928
EntityType created. Resource name: projects/348385944272/locations/us-central1/featurestores/iris_fs/entityTypes/id
To use this EntityType in another session:
entity_type = aiplatform.EntityType('projects/348385944272/locations/us-central1/featurestores/iris_fs/entityTypes/id')


<google.cloud.aiplatform.featurestore.entity_type.EntityType object at 0x7fd4e206bad0> 
resource name: projects/348385944272/locations/us-central1/featurestores/iris_fs/entityTypes/id

In [None]:
# Instanciando o Entity Type de IDs (criado pela UI do Feature Store)
ids = aiplatform.featurestore.EntityType(
    entity_type_name=ENTITY_TYPE, featurestore_id=FEATURESTORE_NAME
)

In [None]:
columns[1:-2]

['sepal_length', 'sepal_width', 'petal_length', 'petal_width']

In [None]:
# Definindo e criando as features no Feature Store (de forma síncrona)
FEATURES = columns[1:-2]

FEATURE_CONFIGS = {
        feature : {
            "value_type": "DOUBLE",
            "description": f"Representa a {feature} da flor"
        } for feature in FEATURES
}

ids.batch_create_features(feature_configs=FEATURE_CONFIGS, sync=True)

Batch creating features EntityType entityType: projects/348385944272/locations/us-central1/featurestores/iris_fs/entityTypes/id
Batch create Features EntityType entityType backing LRO: projects/348385944272/locations/us-central1/featurestores/iris_fs/operations/8514615365437751296
EntityType entityType Batch created features. Resource name: projects/348385944272/locations/us-central1/featurestores/iris_fs/entityTypes/id


<google.cloud.aiplatform.featurestore.entity_type.EntityType object at 0x7fd4e207f310> 
resource name: projects/348385944272/locations/us-central1/featurestores/iris_fs/entityTypes/id

## 👷🏾‍♀️ Ingestão e Recuperação de Features 

Agora vamos pular rapidinho pra console e definir um novo job de ingestão de features a partir do `BigQuery` para posteriormente poder recuperá-las do canal de online serving.

In [None]:
# Criando um job de ingestão de features do source (BigQuery) para o Feature Store de forma síncrona
ids.ingest_from_bq(
    feature_ids=FEATURES,
    feature_time="timestamp",
    bq_source_uri=f"bq://{PROJECT}.iris.iris",
    entity_id_field="id",
    sync=True
)

Importing EntityType feature values: projects/348385944272/locations/us-central1/featurestores/iris_fs/entityTypes/id
Import EntityType feature values backing LRO: projects/348385944272/locations/us-central1/featurestores/iris_fs/entityTypes/id/operations/426150434680340480
EntityType feature values imported. Resource name: projects/348385944272/locations/us-central1/featurestores/iris_fs/entityTypes/id


<google.cloud.aiplatform.featurestore.entity_type.EntityType object at 0x7fd4e207f310> 
resource name: projects/348385944272/locations/us-central1/featurestores/iris_fs/entityTypes/id

In [None]:
# Após finalizado o job de ingestão, vamos recuperar features para um determinado id
entity_id = "flower_113"
features = ids.read(entity_ids=[entity_id], feature_ids=["*"])

In [None]:
# Visualizando as features recuperadas
features

Unnamed: 0,entity_id,petal_length,petal_width,sepal_width,sepal_length
0,flower_113,5.0,2.0,2.5,5.7


In [None]:
# Ajustando features para previsão
np.array([features.to_dict("split")["data"][0][1:]])

## 👩🏾‍🔬 Treinando um classificador 

Para que possamos posteriormente consumir as nossas features para um propósito de inferências, vamos utilizar o Iris Dataset que está armazenado em memória para treinar um classificador do tipo `DecisionTreeClassifier`.

In [None]:
from sklearn.model_selection import train_test_split

X = iris.data
y = iris.target

X_train, X_test, y_train, y_test = train_test_split(X, y, 
                                                    test_size=0.2, 
                                                    random_state=42)

In [None]:
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.tree import DecisionTreeClassifier

clf = DecisionTreeClassifier()
clf.fit(X_train,y_train)

In [None]:
# Persistindo o modelo em disco
filename = "model.pkl"
pickle.dump(clf, open(filename, 'wb'))

In [None]:
# Verificando score do modelo para os dados de teste
clf.score(X_test, y_test) 

## 👩🏾‍💻 Realizando predições para entidades recuperadas do Feature Store

Agora que já temos um modelo treinado, podemos consumir as nossasfeatures do Feature store para realizar predições.

Com isso, é possível recuperar a informação de maneira centralizada e rápida para inferência a partir de alguma dada entidade.

É válido que ressaltar que o método de leitura `read` de uma `entidade` no Feature Store sempre recupera o **timestamp mais atual para o serving online.**

A depender da disponibilidade do Feature Store, é possível perceber uma maior latência nas primeiras requisições, mas que rapidamente se ajustam, de modo que a estrutura possa receber uma quantidade massiva de requisições subsequentes, sem perda de performance.

In [None]:
# Indexação do target
indexes = pd.Series(df.targets.unique()).to_dict()
indexes

In [None]:
# Instanciando uma entidade
entity = "flower_34"

# Recuperando as features
features = ids.read(entity_ids=[entity], feature_ids=["*"]).to_dict("split")["data"][0][1:]

# Resultado da predição
print(f'A flor {entity} é do tipo "{indexes[clf.predict(np.array([features]))[0]]}".')

In [None]:
# Executando 10 inferências síncronas em série
from random import randrange

for _ in range(30):
    entity = f"flower_{randrange(150)}"
    # Recuperando as features
    features = ids.read(entity_ids=[entity], feature_ids=["*"]).to_dict("split")["data"][0][1:]

    # Resultado da predição
    print(f'A flor {entity} é do tipo "{indexes[clf.predict([features])[0]]}".')

## 🕒 Point-in-time Recovery

Através do Feature Store, também é possível recuperar features em batch para diferentes timestamps. É através de um canal denominado `Offline Feature Store` que podemos consumir de slices de features em diferentes intervalos de timestamp e exportá-los para arquivos binários no Cloud Storage, tabelas no BigQuery ou Pandas Dataframes. 

Esse processo reforça a ideia da construção de um repositório centralizado de features, com características híbridas, proporcionando baixa latência de recuperação e consistência de recuperação histórica de valores utilizando uma estrutura unificada. 

No exemplo a seguir, vamos recuperar features em batch em um timestamp retroativo:

In [None]:
# Instanciando o Feature Store
fs = aiplatform.featurestore.Featurestore(featurestore_name=FEATURESTORE_NAME)

In [None]:
# Definindo features para recuperação dentro de entity types
SERVING_FEATURE_IDS = {
    "id": ["*"]
}

In [None]:
# Definindo um timestamp de recuperação point in time
point_in_time = pd.Timestamp(datetime.today() - timedelta(days=20))

In [None]:
# Definindo esquema de recuperação Point-in-Time para uma determinado entidade
entity = "flower_139"

pit_config = pd.DataFrame(
    data=[
        {
        "id": entity,
        "timestamp": point_in_time
        }
    ],
)

In [None]:
# Recuperando features Point-in-Time do offline Feature Store
pit_features = fs.batch_serve_to_df(
    serving_feature_ids=SERVING_FEATURE_IDS,
    read_instances_df=pit_config,
)

# Visualizando features Point-in-Time recuperadas
pit_features

In [None]:
# Recuperando as features mais recentes para comparação
ids.read(entity_ids=[entity], feature_ids=["*"])

In [None]:
# Definindo esquema de recuperação histórico para todos os entities
historical_config = pd.DataFrame(
    data=[
        {
        "id": [f"flower_{n}" for n in range(150)],
        "timestamp": pd.date_range(start="2022-06-10",end="2022-06-15").tolist()
        }
    ],
).explode(["id"]).explode(["timestamp"])

In [None]:
# Recuperando features históricas do offline Feature Store
historical_features = fs.batch_serve_to_df(
    serving_feature_ids=SERVING_FEATURE_IDS,
    read_instances_df=historical_config,
)

# Visualizando features históricas recuperadas 
historical_features

## 👨🏽‍🎓 Conclusão

Neste laboratório, pudemos entender um pouco melhor sobre os conceitos básicos de um Feature Store, bem como:
- Componentes da Arquitetura (canal `online` e `offline`)
- Modelo de dados do Feature Store (`featurestores`, `entidades`, `features`, `timestamps`)
- Como `ingerir/recuperar` features do Vertex AI Feature Store, para inferência e point-in-time check

Para excluir o feature store, siga para as próximas células.
Time to clean! 

In [None]:
fs.delete(sync=True, force=True)