In [None]:
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

<table align="left">

  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/ai-platform-samples/blob/master/ai-platform-unified/notebooks/notebook_template.ipynb"">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/ai-platform-samples/blob/master/ai-platform-unified/notebooks/notebook_template.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
</table>

## Overview

This notebook shows a common Data Engineering trasformation in order to create:

* Calculate customer spending behaviour features
* Calculate terminal spending behaviour features
* Calculate terminal risk factors features 
* Calculate terminal risk index features

and obtain the wide features table to ingest in the feature store. 

Notice, we made an assumption to simplify the demo. We consider target as a feature. This would simplify the ML pipeline avoiding a join operation. 

### Objective

In the following notebook, you will learn how to create a feature store and use the SDK to implement the feature managment operations

## Before you begin

### Load config settings

In [None]:
GCP_PROJECTS = !gcloud config get-value project
PROJECT_ID = GCP_PROJECTS[0]
BUCKET_NAME = PROJECT_ID
config = !gsutil cat gs://{BUCKET_NAME}/config/notebook_env.py
print(config.n)
exec(config.n)

### Import libraries

In [None]:
import os
import sys
import random
import time
import datetime as dt

# Data Engineering
import numpy as np
import pandas as pd
from google.cloud import bigquery

#Vertex AI and Vertex AI Feature Store 
from google.cloud import aiplatform as vertex_ai
from google.cloud.aiplatform import Featurestore, EntityType, Feature

### Define constants

In [None]:
# Organize project repo
DATA_DIR = os.path.join('..', 'data') #set data folder
RAW_DATA_DIR = os.path.join(DATA_DIR, 'raw') #set 

# Data Engineering

# Define the range of transactions for training
START_DATE_TRAIN = "2022-01-01" #consider few days for training (demo)
END_DATE_TRAIN = "2022-01-31"

# Define BiqQuery dataset, tables and time windows to calculate static behavioral features
BQ_DATASET = "tx"
RAW_TABLE_TRANSACTIONS = 'tx'
RAW_TABLE_LABELS = 'txlabels'
DAY_WINDOWS = [1, 7, 14]
SECONDS_WINDOWS = [DW*24*60*60 for DW in DAY_WINDOWS]
DELAY_DAYS = 7
DELAY_SECONDS = DELAY_DAYS*24*60*60
DELAY_DAY_WINDOWS = [SW+DELAY_SECONDS for SW in SECONDS_WINDOWS]
FEATURES_TABLE = f"wide_features_table_{END_DATE_TRAIN}"

# Define Vertex AI Feature store settings
API_ENDPOINT = "us-central1-aiplatform.googleapis.com"  # @param {type:"string"}
EVENTS_TABLE_NAME = f'events_{END_DATE_TRAIN}'
EVENTS_COLUMNS = ['TX_DATETIME', 'TRANSACTION_ID', 'CUSTOMER_ID', 'TERMINAL_ID', 'TX_AMOUNT', 'TX_FRAUD']

CUSTOMERS_TABLE_NAME = f'customers_{END_DATE_TRAIN}'
CUSTOMERS_COLUMNS = ['TX_DATETIME', 'CUSTOMER_ID', 'CUSTOMER_ID_NB_TX_1DAY_WINDOW', 
                     'CUSTOMER_ID_AVG_AMOUNT_1DAY_WINDOW', 'CUSTOMER_ID_NB_TX_7DAY_WINDOW',
                     'CUSTOMER_ID_AVG_AMOUNT_7DAY_WINDOW', 'CUSTOMER_ID_NB_TX_14DAY_WINDOW',
                     'CUSTOMER_ID_AVG_AMOUNT_14DAY_WINDOW']

TERMINALS_TABLE_NAME = f'terminals_{END_DATE_TRAIN}'
TERMINALS_COLUMNS = ['TX_DATETIME', 'TERMINAL_ID', 'TERMINAL_ID_NB_TX_1DAY_WINDOW',
                     'TERMINAL_ID_RISK_1DAY_WINDOW', 'TERMINAL_ID_NB_TX_7DAY_WINDOW',
                     'TERMINAL_ID_RISK_7DAY_WINDOW', 'TERMINAL_ID_NB_TX_14DAY_WINDOW',
                     'TERMINAL_ID_RISK_14DAY_WINDOW']

### Initialize clients

In [None]:
vertex_ai.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_NAME)

## Data Engineering

### Customer ID and Terminal ID transformations

These transformations are the perfect examples of features to store in a feature stores:

- they need to be precalculated
- they need be shared

Below an example of wide features table in case of fraud detection.




In [None]:
sql_query = \
f"""
CREATE OR REPLACE TABLE
  `{PROJECT_ID}.{BQ_DATASET}.{FEATURES_TABLE}` AS
WITH
  # query to join labels with features -------------------------------------------------------------------------------------------
  get_raw_table AS (
  SELECT
    raw_tx.TX_TS,
    raw_tx.TX_ID,
    raw_tx.CUSTOMER_ID,
    raw_tx.TERMINAL_ID,
    raw_tx.TX_AMOUNT,
    raw_lb.TX_FRAUD
  FROM (
    SELECT
      *
    FROM
      `{BQ_DATASET}.{RAW_TABLE_TRANSACTIONS}`
    WHERE
      DATE(TX_TS) <= '{END_DATE_TRAIN}' 
    ORDER BY
      TX_TS) AS raw_tx
  LEFT JOIN 
    `{BQ_DATASET}.{RAW_TABLE_LABELS}` as raw_lb
  ON raw_tx.TX_ID = raw_lb.TX_ID),

  # query to calculate CUSTOMER spending behaviour --------------------------------------------------------------------------------
  get_customer_spending_behaviour AS (
  SELECT
    TX_TS,
    TX_ID,
    CUSTOMER_ID,
    TERMINAL_ID,
    TX_AMOUNT,
    TX_FRAUD,
    
    # calculate the number of customer transactions over windows
    COUNT(TX_FRAUD) OVER (PARTITION BY CUSTOMER_ID ORDER BY UNIX_SECONDS(TX_TS) ASC RANGE BETWEEN {SECONDS_WINDOWS[0]} PRECEDING
      AND CURRENT ROW ) AS CUSTOMER_ID_NB_TX_1DAY_WINDOW,
    COUNT(TX_FRAUD) OVER (PARTITION BY CUSTOMER_ID ORDER BY UNIX_SECONDS(TX_TS) ASC RANGE BETWEEN {SECONDS_WINDOWS[1]} PRECEDING
      AND CURRENT ROW ) AS CUSTOMER_ID_NB_TX_7DAY_WINDOW,
    COUNT(TX_FRAUD) OVER (PARTITION BY CUSTOMER_ID ORDER BY UNIX_SECONDS(TX_TS) ASC RANGE BETWEEN {SECONDS_WINDOWS[2]} PRECEDING
      AND CURRENT ROW ) AS CUSTOMER_ID_NB_TX_14DAY_WINDOW,
      
    # calculate the customer average transactions amount over windows
    AVG(TX_AMOUNT) OVER (PARTITION BY CUSTOMER_ID ORDER BY UNIX_SECONDS(TX_TS) ASC RANGE BETWEEN {SECONDS_WINDOWS[0]} PRECEDING
      AND CURRENT ROW ) AS CUSTOMER_ID_AVG_AMOUNT_1DAY_WINDOW,
    AVG(TX_AMOUNT) OVER (PARTITION BY CUSTOMER_ID ORDER BY UNIX_SECONDS(TX_TS) ASC RANGE BETWEEN {SECONDS_WINDOWS[1]} PRECEDING
      AND CURRENT ROW ) AS CUSTOMER_ID_AVG_AMOUNT_7DAY_WINDOW,
    AVG(TX_AMOUNT) OVER (PARTITION BY CUSTOMER_ID ORDER BY UNIX_SECONDS(TX_TS) ASC RANGE BETWEEN {SECONDS_WINDOWS[2]} PRECEDING
      AND CURRENT ROW ) AS CUSTOMER_ID_AVG_AMOUNT_14DAY_WINDOW,
  FROM get_raw_table),

  # query to calculate TERMINAL spending behaviour --------------------------------------------------------------------------------
  get_variables_delay_window AS (
  SELECT
    TX_TS,
    TX_ID,
    CUSTOMER_ID,
    TERMINAL_ID,
    
    # calculate total amount and the total number of trasactions over the delay period (7 days - delay)
    SUM(TX_FRAUD) OVER (PARTITION BY TERMINAL_ID ORDER BY UNIX_SECONDS(TX_TS) ASC RANGE BETWEEN {DELAY_SECONDS} PRECEDING
      AND CURRENT ROW ) AS NB_FRAUD_DELAY,
    COUNT(TX_FRAUD) OVER (PARTITION BY TERMINAL_ID ORDER BY UNIX_SECONDS(TX_TS) ASC RANGE BETWEEN {DELAY_SECONDS} PRECEDING
      AND CURRENT ROW ) AS NB_TX_DELAY,
      
    # calculate total amount and the total number of trasactions over the delayed window (window + 7 days - delay)
    SUM(TX_FRAUD) OVER (PARTITION BY TERMINAL_ID ORDER BY UNIX_SECONDS(TX_TS) ASC RANGE BETWEEN {DELAY_DAY_WINDOWS[0]} PRECEDING
      AND CURRENT ROW ) AS NB_FRAUD_1_DELAY_WINDOW,
    SUM(TX_FRAUD) OVER (PARTITION BY TERMINAL_ID ORDER BY UNIX_SECONDS(TX_TS) ASC RANGE BETWEEN {DELAY_DAY_WINDOWS[1]} PRECEDING
      AND CURRENT ROW ) AS NB_FRAUD_7_DELAY_WINDOW,
    SUM(TX_FRAUD) OVER (PARTITION BY TERMINAL_ID ORDER BY UNIX_SECONDS(TX_TS) ASC RANGE BETWEEN {DELAY_DAY_WINDOWS[2]} PRECEDING
      AND CURRENT ROW ) AS NB_FRAUD_14_DELAY_WINDOW,
    COUNT(TX_FRAUD) OVER (PARTITION BY TERMINAL_ID ORDER BY UNIX_SECONDS(TX_TS) ASC RANGE BETWEEN {DELAY_DAY_WINDOWS[0]} PRECEDING
      AND CURRENT ROW ) AS NB_TX_1_DELAY_WINDOW,
    COUNT(TX_FRAUD) OVER (PARTITION BY TERMINAL_ID ORDER BY UNIX_SECONDS(TX_TS) ASC RANGE BETWEEN {DELAY_DAY_WINDOWS[1]} PRECEDING
      AND CURRENT ROW ) AS NB_TX_7_DELAY_WINDOW,
    COUNT(TX_FRAUD) OVER (PARTITION BY TERMINAL_ID ORDER BY UNIX_SECONDS(TX_TS) ASC RANGE BETWEEN {DELAY_DAY_WINDOWS[2]} PRECEDING
      AND CURRENT ROW ) AS NB_TX_14_DELAY_WINDOW,
  FROM get_raw_table),

  # query to calculate TERMINAL risk factors ---------------------------------------------------------------------------------------
  get_risk_factors AS (
  SELECT
    TX_TS,
    TX_ID,
    CUSTOMER_ID,
    TERMINAL_ID,
    # calculate numerator
    NB_FRAUD_1_DELAY_WINDOW - NB_FRAUD_DELAY AS TERMINAL_ID_NB_FRAUD_1DAY_WINDOW,
    NB_FRAUD_7_DELAY_WINDOW - NB_FRAUD_DELAY AS TERMINAL_ID_NB_FRAUD_7DAY_WINDOW,
    NB_FRAUD_14_DELAY_WINDOW - NB_FRAUD_DELAY AS TERMINAL_ID_NB_FRAUD_14DAY_WINDOW,
    # calculate denominator
    NB_TX_1_DELAY_WINDOW - NB_TX_DELAY AS TERMINAL_ID_NB_TX_1DAY_WINDOW,
    NB_TX_7_DELAY_WINDOW - NB_TX_DELAY AS TERMINAL_ID_NB_TX_7DAY_WINDOW,
    NB_TX_14_DELAY_WINDOW - NB_TX_DELAY AS TERMINAL_ID_NB_TX_14DAY_WINDOW,
      FROM
    get_variables_delay_window),

  # query to calculate the TERMINAL risk index -------------------------------------------------------------------------------------
  get_risk_index AS (
    SELECT
    TX_TS,
    TX_ID,
    CUSTOMER_ID,
    TERMINAL_ID,
    TERMINAL_ID_NB_TX_1DAY_WINDOW,
    TERMINAL_ID_NB_TX_7DAY_WINDOW,
    TERMINAL_ID_NB_TX_14DAY_WINDOW,
    # calculate the risk index
    (TERMINAL_ID_NB_FRAUD_1DAY_WINDOW/(TERMINAL_ID_NB_TX_1DAY_WINDOW+0.0001)) AS TERMINAL_ID_RISK_1DAY_WINDOW,
    (TERMINAL_ID_NB_FRAUD_7DAY_WINDOW/(TERMINAL_ID_NB_TX_7DAY_WINDOW+0.0001)) AS TERMINAL_ID_RISK_7DAY_WINDOW,
    (TERMINAL_ID_NB_FRAUD_14DAY_WINDOW/(TERMINAL_ID_NB_TX_14DAY_WINDOW+0.0001)) AS TERMINAL_ID_RISK_14DAY_WINDOW
    FROM get_risk_factors 
  )

SELECT
  a.TX_TS AS tx_ts,
  a.TX_ID AS tx_id,
  a.CUSTOMER_ID AS customer_id,
  a.TERMINAL_ID AS terminal_id,
  CAST(a.TX_AMOUNT AS FLOAT64) AS tx_amount,
  CAST(a.CUSTOMER_ID_NB_TX_1DAY_WINDOW AS INT64) AS customer_id_nb_tx_1day_window,
  CAST(a.CUSTOMER_ID_NB_TX_7DAY_WINDOW AS INT64) AS customer_id_nb_tx_7day_window,
  CAST(a.CUSTOMER_ID_NB_TX_14DAY_WINDOW AS INT64) AS customer_id_nb_tx_14day_window,
  CAST(a.CUSTOMER_ID_AVG_AMOUNT_1DAY_WINDOW AS FLOAT64) AS customer_id_avg_amount_1day_window,
  CAST(a.CUSTOMER_ID_AVG_AMOUNT_7DAY_WINDOW AS FLOAT64) AS customer_id_avg_amount_7day_window,
  CAST(a.CUSTOMER_ID_AVG_AMOUNT_14DAY_WINDOW AS FLOAT64) AS customer_id_avg_amount_14day_window,
  CAST(b.TERMINAL_ID_NB_TX_1DAY_WINDOW AS INT64) AS terminal_id_nb_tx_1day_window,
  CAST(b.TERMINAL_ID_NB_TX_7DAY_WINDOW AS INT64) AS terminal_id_nb_tx_7day_window,
  CAST(b.TERMINAL_ID_NB_TX_14DAY_WINDOW AS INT64) AS terminal_id_nb_tx_14day_window,
  CAST(b.TERMINAL_ID_RISK_1DAY_WINDOW AS FLOAT64) AS terminal_id_risk_1day_window,
  CAST(b.TERMINAL_ID_RISK_7DAY_WINDOW AS FLOAT64) AS terminal_id_risk_7day_window,
  CAST(b.TERMINAL_ID_RISK_14DAY_WINDOW AS FLOAT64) AS terminal_id_risk_14day_window,
  a.TX_FRAUD AS tx_fraud
FROM
  get_customer_spending_behaviour AS a
INNER JOIN
  get_risk_index AS b
ON
  a.TX_TS = b.TX_TS
  AND a.TX_ID = b.TX_ID
  AND a.CUSTOMER_ID = b.CUSTOMER_ID
  AND a.TERMINAL_ID = b.TERMINAL_ID
ORDER BY
  a.TX_TS

"""

In [None]:
try:
    bq_client = bigquery.Client(project=PROJECT_ID, location=REGION)
    job = bq_client.query(sql_query)
    _ = job.result()
except RuntimeError as error:
    print(error)

## Feature Store for feature management

### What is a feature store?

Vertex AI Feature Store provides a centralized repository for organizing, storing, and serving ML features. Using a central featurestore, enables an organization to efficiently share, discover, and re-use ML features at scale, which can increase the velocity of developing and deploying new ML applications.

### Why would you like to set up it?

Based on the scenario, so far you build and store features on BigQuery. Now, in order to predict fraud, you want to serve those features in realtime around milliseconds scale. In particular, when the ML gateway receives a prediction request for a specific entity (customer, terminal and transaction), the system needs to fetch the features related to that entity and pass them as inputs to the model for online prediction. And, as you can imagine, an analytical data warehouse such as BigQuery are not able to manage low-latency singleton read operations. 

Last year, Google Cloud announched Vertex AI, a managed machine learning (ML) platform that allows data science team to accelerate the deployment and maintenance of ML models. The plaftform is composed by several building blocks and one of the is represented by Vertex AI Feature store which would provides a managed service for low latency scalable feature serving. Also it provides a centralized feature repository with easy APIs to search & discover features and feature monitoring capabilities to track for drift and other quality issues. 

Vertex AI Feature Store uses a time series data model to store a series of values for features. This model enables Vertex AI Feature Store to maintain feature values as they change over time. Vertex AI Feature Store organizes resources hierarchically in the following order: `Featurestore -> EntityType -> Feature`. You must create these resources before you can ingest data into Vertex AI Feature Store. 

Let's do that by using the **new SDK**!

### How would you set it?

Vertex AI Feature Store uses a time series data model to store a series of values for features. This model enables Vertex AI Feature Store to maintain feature values as they change over time. Vertex AI Feature Store organizes resources hierarchically in the following order: `Featurestore -> EntityType -> Feature`. 

You must create these resources before you can ingest data into Vertex AI Feature Store.

<img src="../assets/ff_data_model.png"/>

### Create featurestore, `fraud_detection`

A featurestore is the top-level container for entity types, features, and feature values. Typically, an organization creates one shared featurestore for feature ingestion, serving, and sharing across all teams in the organization.

In [None]:
# Check if a feature store already exists (IT SHOULD NOT EXIST!)

print(f"Listing all featurestores in {PROJECT_ID}")
feature_store_list = Featurestore.list()
if len(list(feature_store_list)) == 0:
    print(f"The {PROJECT_ID} is empty!")
else:
    for fs in feature_store_list:
        print("Found featurestore: {}".format(fs.resource_name))

In [None]:
# Try to create a new featurestore resource
try:
    ff_feature_store = Featurestore.create(featurestore_id=FEATURESTORE_ID,
                                  online_store_fixed_node_count=5, 
                                  labels={"team": "dataoffice", 
                                          "app" : "fraud_finder"}, 
                                  sync=True)
except RuntimeError as error:
    print(error)
else:
    FEATURESTORE_RESOURCE_NAME = ff_feature_store.resource_name
    print(f"Feature store created: {FEATURESTORE_RESOURCE_NAME}")

### Create the three main entity types and their features

An entity type is a collection of semantically related features. You define your own entity types, based on the concepts that are relevant to your use case. In this case, the Fraud Finder service has the entity types event, customer and transaction. 

In [None]:
# Check if entities already exists (THEY SHOULD NOT EXIST!)

print(f"Listing all entity types and features in featurestore {FEATURESTORE_RESOURCE_NAME}")
entity_types_list = ff_feature_store.list_entity_types()
if len(list(entity_types_list)) == 0:
    print(f"The featurestore {FEATURESTORE_RESOURCE_NAME} is empty!")
else:
    for et in entity_types_list:
        et_name = et.resource_name
        et_features_list = et.list_features()
        num_features = len(list(et_features_list))
        print(f"Entity type {et_name} with {num_features} features")

#### Create entity type, ```event```

In [None]:
# Try to create a new event entity type resource
try:
    event_entity_type = ff_feature_store.create_entity_type(
    entity_type_id="event",
    description="Event Entity", 
    sync=True)
except RuntimeError as error:
    print(error)
else:
    EVENT_ENTITY_RESOURCE_NAME = event_entity_type.resource_name
    print('Entity type name is', EVENT_ENTITY_RESOURCE_NAME)

#### Create features of the ```event``` entity type

Before to ingest features, you need to define the configuration. For simplicity, I created the configuration in a declarative way. Of course, we can create an helper function to built it from Bigquery schema. Also notice that we want to pass some feature on-fly. In this case, it country, operating system and language looks perfect for that.

In [None]:
event_feature_configs = {
    "customer_id": {
    "value_type": "STRING",
    "description" : "The identifier for the customer",
    "labels": {"status": "passed"}
    },
    "terminal_id": {
    "value_type": "STRING",
    "description" : "The identifier for the terminal",
    "labels": {"status": "passed"}
    },
    "tx_amount": {
    "value_type": "DOUBLE",
    "description" : "The amount of the transaction",
    "labels": {"status": "passed"}
    },
    "tx_fraud": {
    "value_type": "INT64",
    "description" : "The binary variable to indicate fradulent transaction (1=fraud)",
    "labels": {"status": "passed"}
    }
}

In [None]:
# Try to create features 
try:
    event_feature_ids = event_entity_type.batch_create_features(
        feature_configs = event_feature_configs,
        sync=True)
except RuntimeError as error:
    print(error)
else:
    for feature in event_feature_ids.list_features():
      print('')
      print(f'The resource name of {feature.name} feature is', feature.resource_name)

#### Create entity type, ```customer```

In [None]:
try:
    customer_entity_type = ff_feature_store.create_entity_type(
    entity_type_id="customer",
    description="Customer Entity", 
    sync=True)
except RuntimeError as error:
    print(error)
else:
    CUSTOMER_ENTITY_RESOURCE_NAME = customer_entity_type.resource_name
    print('Entity type name is', CUSTOMER_ENTITY_RESOURCE_NAME)

#### Create features of the ```customer``` entity type

In [None]:
customer_feature_configs = {
    "customer_id_nb_tx_1day_window": {
    "value_type": "INT64",
    "description" : "Number of transactions by the customer in the last day",
    "labels": {"status": "passed"}
    },
    "customer_id_avg_amount_1day_window": {
    "value_type": "DOUBLE",
    "description" : "Average spending amount in the last day",
    "labels": {"status": "passed"}
    },
    "customer_id_nb_tx_7day_window": {
    "value_type": "INT64",
    "description" : "Number of transactions by the customer in the last 7 days",
    "labels": {"status": "passed"}
    },
    "customer_id_avg_amount_7day_window": {
    "value_type": "DOUBLE",
    "description" : "Average spending amount in the last 7 days",
    "labels": {"status": "passed"}
    },
    "customer_id_nb_tx_14day_window": {
    "value_type": "INT64",
    "description" : "Number of transactions by the customer in the last 14 days",
    "labels": {"status": "passed"}
    },
    "customer_id_avg_amount_14day_window": {
    "value_type": "DOUBLE",
    "description" : "Average spending amount in the last 14 days",
    "labels": {"status": "passed"}
    }
}

In [None]:
try:
    customer_feature_ids = customer_entity_type.batch_create_features(
        feature_configs = customer_feature_configs,
        sync=True)
except RuntimeError as error:
    print(error)
else:
    for feature in customer_feature_ids.list_features():
      print('')
      print(f'The resource name of {feature.name} feature is', feature.resource_name)

#### Create entity type, ```terminal```

In [None]:
try:
    terminal_entity_type = ff_feature_store.create_entity_type(
    entity_type_id="terminal",
    description="Terminal Entity", 
    sync=True)
except RuntimeError as error:
    print(error)
else:
    TERMINAL_ENTITY_RESOURCE_NAME = terminal_entity_type.resource_name
    print('Entity type name is', TERMINAL_ENTITY_RESOURCE_NAME)

#### Create features of the ```terminal``` entity type

In [None]:
terminal_feature_configs = {
    "terminal_id_nb_tx_1day_window": {
    "value_type": "INT64",
    "description" : "Number of transactions by the terminal in the last day",
    "labels": {"status": "passed"}
    },
    "terminal_id_risk_1day_window": {
    "value_type": "DOUBLE",
    "description" : "Risk score calculated average number of frauds on the terminal in the last day",
    "labels": {"status": "passed"}
    },
    "terminal_id_nb_tx_7day_window": {
    "value_type": "INT64",
    "description" : "Number of transactions by the terminal in the 7 days",
    "labels": {"status": "passed"}
    },
    "terminal_id_risk_7day_window": {
    "value_type": "DOUBLE",
    "description" : "Risk score calculated average number of frauds on the terminal in the last 7 days",
    "labels": {"status": "passed"}
    },
    "terminal_id_nb_tx_14day_window": {
    "value_type": "INT64",
    "description" : "Number of transactions by the terminal in the 14 days",
    "labels": {"status": "passed"}
    },
    "terminal_id_risk_14day_window": {
    "value_type": "DOUBLE",
    "description" : "Risk score calculated average number of frauds on the terminal in the last 14 day",
    "labels": {"status": "passed"}
    }
}

In [None]:
try:
    terminal_feature_ids = terminal_entity_type.batch_create_features(
        feature_configs = terminal_feature_configs,
        sync=True)
except RuntimeError as error:
    print(error)
else:
    for feature in terminal_feature_ids.list_features():
      print('')
      print(f'The resource name of {feature.name} feature is', feature.resource_name)

Below you have a view of how Feature store looks like when we create it on Vertex AI

<img src="../assets/ff_view.png"/>

### Set Feature Monitoring 

Feature [monitoring](https://cloud.google.com/vertex-ai/docs/featurestore/monitoring) is still in preview, so you need to use v1beta1 Python. The easiest way to set this for now is using [console UI](https://console.cloud.google.com/vertex-ai/features). For completeness, below is example to do this using v1beta1 SDK


In [None]:
from google.cloud.aiplatform_v1beta1 import \
    FeaturestoreServiceClient as v1beta1_FeaturestoreServiceClient
from google.cloud.aiplatform_v1beta1.types import \
    entity_type as v1beta1_entity_type_pb2
from google.cloud.aiplatform_v1beta1.types import \
    featurestore_monitoring as v1beta1_featurestore_monitoring_pb2
from google.cloud.aiplatform_v1beta1.types import \
    featurestore_service as v1beta1_featurestore_service_pb2
from google.protobuf.duration_pb2 import Duration

v1beta1_admin_client = v1beta1_FeaturestoreServiceClient(
    client_options={"api_endpoint": API_ENDPOINT}
)


In [None]:
for entity in ff_feature_store.list_entity_types():
    v1beta1_admin_client.update_entity_type(
      v1beta1_featurestore_service_pb2.UpdateEntityTypeRequest(
          entity_type=v1beta1_entity_type_pb2.EntityType(
              name=v1beta1_admin_client.entity_type_path(
                  PROJECT_ID, REGION, FEATURESTORE_ID, entity.name
              ),
              monitoring_config=v1beta1_featurestore_monitoring_pb2.FeaturestoreMonitoringConfig(
                  snapshot_analysis=v1beta1_featurestore_monitoring_pb2.FeaturestoreMonitoringConfig.SnapshotAnalysis(
                      monitoring_interval=Duration(seconds=86400),  # 1 day
                  ),
              ),
          ),
      )
  )

### Search features

As we mentioned, Vertex AI Feature store guarantees searching capabilities. You can define queries in order to retrive features

In [None]:
feature_query = "feature_id:customer_id_nb_tx_7day_window"
searched_features = Feature.search(query=feature_query)
searched_features

## Import feature values 

You need to import feature values before you can use them for online/offline serving.

About **Source Data format and Layout**:

- BigQuery table/Avro/CSV are supported
- Each imported entity *must* have an ID
- Each entity can *optionally* have a timestamp, specifying when the feature values are generated.

In our scenario, we assume to import feature values from a BigQuery tables called events, customers and terminals

### Prepare tables to import

In [None]:
sql_query_feature_store = []

In [None]:
# Define queries to select relevant colums for each entities

event_sql_query = f"""
CREATE OR REPLACE TABLE
  `{PROJECT_ID}.{BQ_DATASET}.{EVENTS_TABLE_NAME}` AS
SELECT tx_ts, tx_id, customer_id, 
terminal_id, tx_amount, tx_fraud
FROM `{PROJECT_ID}.{BQ_DATASET}.{FEATURES_TABLE}`
WHERE
  tx_ts BETWEEN "{START_DATE_TRAIN}" AND "{END_DATE_TRAIN}"
ORDER BY tx_ts

"""

sql_query_feature_store.append(event_sql_query)


customers_sql_query = f"""
CREATE OR REPLACE TABLE
  `{PROJECT_ID}.{BQ_DATASET}.{CUSTOMERS_TABLE_NAME}` AS
SELECT tx_ts, customer_id, customer_id_nb_tx_1day_window, 
customer_id_avg_amount_1day_window, customer_id_nb_tx_7day_window,
customer_id_avg_amount_7day_window, customer_id_nb_tx_14day_window,
customer_id_avg_amount_14day_window
FROM `{PROJECT_ID}.{BQ_DATASET}.{FEATURES_TABLE}`
WHERE
  tx_ts BETWEEN "{START_DATE_TRAIN}" AND "{END_DATE_TRAIN}"
ORDER BY tx_ts
"""

sql_query_feature_store.append(customers_sql_query)


terminals_sql_query = f"""
CREATE OR REPLACE TABLE
  `{PROJECT_ID}.{BQ_DATASET}.{TERMINALS_TABLE_NAME}` AS
SELECT tx_ts, terminal_id, terminal_id_nb_tx_1day_window,
terminal_id_risk_1day_window, terminal_id_nb_tx_7day_window,
terminal_id_risk_7day_window, terminal_id_nb_tx_14day_window,
terminal_id_risk_14day_window
FROM `{PROJECT_ID}.{BQ_DATASET}.{FEATURES_TABLE}`
WHERE
  tx_ts BETWEEN "{START_DATE_TRAIN}" AND "{END_DATE_TRAIN}"
ORDER BY tx_ts
"""

sql_query_feature_store.append(terminals_sql_query)

In [None]:
for sq in sql_query_feature_store:
    try:
        job = bq_client.query(sq)
        _ = job.result()
    except RuntimeError as error:
        print(error)

### Import events

In [None]:
FEATURE_TIME = "tx_ts"

In [None]:
EVENT_FEATURES_IDS = [feature.name for feature in event_feature_ids.list_features()]
EVENT_BQ_SOURCE_URI = f"bq://{PROJECT_ID}.{BQ_DATASET}.events_{END_DATE_TRAIN}"
EVENT_ENTITY_ID_FIELD = "tx_id"

In [None]:
try:
  event_entity_type.ingest_from_bq(
    feature_ids=EVENT_FEATURES_IDS,
    feature_time=FEATURE_TIME,
    bq_source_uri=EVENT_BQ_SOURCE_URI,
    entity_id_field=EVENT_ENTITY_ID_FIELD,
    disable_online_serving=False,
    worker_count=10,
    sync=False
    )
except RuntimeError as error:
    print(error)

### Import customers

In [None]:
CUSTOMERS_FEATURES_IDS = [feature.name for feature in customer_feature_ids.list_features()]
CUSTOMER_BQ_SOURCE_URI = f"bq://{PROJECT_ID}.{BQ_DATASET}.customers_{END_DATE_TRAIN}"
CUSTOMER_ENTITY_ID_FIELD = "customer_id"

In [None]:
try:
  customer_entity_type.ingest_from_bq(
    feature_ids=CUSTOMERS_FEATURES_IDS,
    feature_time=FEATURE_TIME,
    bq_source_uri=CUSTOMER_BQ_SOURCE_URI,
    entity_id_field=CUSTOMER_ENTITY_ID_FIELD,
    disable_online_serving=False,
    worker_count=10,
    sync=False
    )
except RuntimeError as error:
    print(error)

### Import terminals

In [None]:
TERMINALS_FEATURES_IDS = [feature.name for feature in terminal_feature_ids.list_features()]
TERMINALS_BQ_SOURCE_URI = f"bq://{PROJECT_ID}.{BQ_DATASET}.terminals_{END_DATE_TRAIN}"
TERMINALS_ENTITY_ID_FIELD = "terminal_id"

In [None]:
try:
  terminal_entity_type.ingest_from_bq(
    feature_ids=TERMINALS_FEATURES_IDS,
    feature_time=FEATURE_TIME,
    bq_source_uri=TERMINALS_BQ_SOURCE_URI,
    entity_id_field=TERMINALS_ENTITY_ID_FIELD,
    disable_online_serving=False,
    worker_count=10,
    sync=False
    )
except RuntimeError as error:
    print(error)

On the Cloud Console, open the Vertex->Features->Injestion Jobs page to monitor the progress of the three ingestion jobs you just started. They should take several minutes each to complete but because they don't operate on the same entities they can be run in parallel.

## Search for feature values 

In [None]:
customer_aggregated_features = customer_entity_type.read(
      entity_ids=["5830444124423549", "5469689693941771", "1361459972478769"],
      feature_ids=CUSTOMERS_FEATURES_IDS)

In [None]:
customer_aggregated_features

## (DO NOT RUN) Delete Feature store

In [None]:
# ff_feature_store.delete(sync=True, force=True)