In [None]:
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Fraudfinder - Feature engineering (streaming)

<table align="left">
  <td>
    <a href="https://console.cloud.google.com/ai-platform/notebooks/deploy-notebook?name=Model%20Monitoring&download_url=https%3A%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fvertex-ai-samples%2Fmaster%2Fnotebooks%2Fcommunity%2Fmodel_monitoring%2Fmodel_monitoring_feature_attribs.ipynb">
       <img src="https://www.gstatic.com/cloud/images/navigation/vertex-ai.svg" alt="Google Cloud Notebooks">Open in Cloud Notebook
    </a>
  </td> 
  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/master/notebooks/community/model_monitoring/model_monitoring_feature_attribs.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Open in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/master/notebooks/community/model_monitoring/model_monitoring_feature_attribs.ipynb">
        <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
</table>

## Overview

[Fraudfinder](https://github.com/googlecloudplatform/fraudfinder) is a series of labs on how to build a real-time fraud detection system on Google Cloud. Throughout the Fraudfinder labs, you will learn how to read historical bank transaction data stored in data warehouse, read from a live stream of new transactions, perform exploratory data analysis (EDA), do feature engineering, ingest features into a feature store, train a model using feature store, register your model in a model registry, evaluate your model, deploy your model to an endpoint, do real-time inference on your model with feature store, and monitor your model.

### Objective

In order to calculate very recent customer and terminal activity (i.e. within the last hour), computation has to be done on real-time streaming data, rather than via batch-based feature engineering.

This notebook shows a step-by-step guide to create real-time data pipelines in order to:

- calculate customer spending features (last 15-mins, 30-mins, and 60-mins)
- calculate terminal activity features (last 15-mins, 30-mins, and 60-mins)

by pulling the streaming data from a Pub/Sub topic and ingesting the features directly into Vertex AI Feature Store using Dataflow. 

In the following notebook, you will learn to:

- Create features, using window and aggreation functions in an Apache Beam pipeline
- Deploy the Apache Beam pipeline to Dataflow
- Ingest engineered features to Vertex AI Feature Store

Note: As deploying Apache Beam pipelines to Dataflow works better if we submit the job from a Python Script, we will be writting the code into a python script instead of running directly on the notebook. 

### Import libraries

Importing all the required libraries for the latter part of this notebook

In [26]:
!mkdir beam_pipeline/

mkdir: cannot create directory ‘beam_pipeline/’: File exists


In [27]:
%%writefile beam_pipeline/main.py

import json
import time

from typing import Tuple, Any, List

import apache_beam as beam

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.combiners import CountCombineFn, MeanCombineFn
    
from google.cloud import aiplatform
from google.cloud import aiplatform_v1beta1

Overwriting beam_pipeline/main.py


### Load configuration settings from the setup notebook

We need to set the constants used in this notebook and load the config settings from the `00_environment_setup.ipynb` notebook.

In [28]:
GCP_PROJECTS = !gcloud config get-value project
PROJECT_ID = GCP_PROJECTS[0]
BUCKET_NAME = f"{PROJECT_ID}-fraudfinder"
config = !gsutil cat gs://{BUCKET_NAME}/config/notebook_env.py
exec(config.n)

### Defining an auxiliary magic function

The magic function `writefile` from Jupyter Notebook can only write the cell as is and could not unpack Python variables. Hence, we need to create an auxiliary magic function that can unpack Python variables and write them to a file.

In [29]:
from IPython.core.magic import register_line_cell_magic

@register_line_cell_magic
def writetemplate(line, cell):
    with open(line, 'a') as f:
        f.write(cell.format(**globals()))

Write the variables to a Python script with the new magic function

In [30]:
project_variables = "\n".join(config[1:-1])
project_variables += f'\nPROJECT_ID = "{PROJECT}"'

In [31]:
%%writetemplate beam_pipeline/main.py

# Project variables
{project_variables}

### Define constant variables

Constanct variables are defined here to be used in the later part of this notebook

In [32]:
%%writefile -a beam_pipeline/main.py

# Pub/Sub variables
SUBSCRIPTION_NAME = "ff-txlabels-sub"
SUBSCRIPTION_PATH = f"projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION_NAME}"

# Dataflow variables
MINUTES_WINDOWS = [15, 30, 60]
SECONDS_WINDOWS = [x * 60 for x in sorted(MINUTES_WINDOWS)]  # convert to seconds
WINDOW_SIZE = SECONDS_WINDOWS[-1]  # the largest window
WINDOW_PERIOD = 1 * 60  # 1 minute

Appending to beam_pipeline/main.py


### Defining auxiliary functions and classes

Here we define auxiliary functions and classes that will be used in building our real-time feature engineering and ingestion pipeline.

In [33]:
%%writefile -a beam_pipeline/main.py
def to_unix_time(time_str: str, time_format='%Y-%m-%d %H:%M:%S') -> int:
    import time
    """Converts a time string into Unix time."""
    time_tuple = time.strptime(time_str, time_format)
    return int(time.mktime(time_tuple))
    
    
class AddAddtionalInfo(beam.DoFn):
    """Add composite key and difference from window end timestamp to element"""
    def process(self, element: Tuple, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam) -> Tuple:
        window_end_dt = window.end.to_utc_datetime().strftime("%Y%m%d%H%M%S")
        new_element = {
            'TX_ID': element['TX_ID'],
            'TX_TS': element['TX_TS'],                                                                                                                                                                                                                                      
            'CUSTOMER_ID': element['CUSTOMER_ID'],
            'TERMINAL_ID': element['TERMINAL_ID'],
            'TX_AMOUNT': element['TX_AMOUNT'],
            'TX_FRAUD': element['TX_FRAUD'],
            'CUSTOMER_ID_COMPOSITE_KEY': f"{element['CUSTOMER_ID']}_{window_end_dt}",
            'TERMINAL_ID_COMPOSITE_KEY': f"{element['TERMINAL_ID']}_{window_end_dt}",
            'TS_DIFF': window.end - timestamp
        }
        return (new_element,)

    
class WriteFeatures(beam.DoFn):
    def __init__(self, resource_name: str):
        self.resource_name = resource_name
    
    def populate_customer_payload(self, new_records, aggregated) -> List[Any]:
        """
        Pepare payloads for customer related features to be written
        at the vertex feature store. The values are required to be of FeatureValue type.
        """
        payloads = []
        for row in new_records:
            payload = aiplatform_v1beta1.WriteFeatureValuesPayload()
            payload.entity_id = row.CUSTOMER_ID
            payload.feature_values = {
                "customer_id_nb_tx_15min_window": aiplatform_v1beta1.FeatureValue(int64_value=aggregated.CUSTOMER_ID_NB_TX_15MIN_WINDOW),
                "customer_id_nb_tx_30min_window": aiplatform_v1beta1.FeatureValue(int64_value=aggregated.CUSTOMER_ID_NB_TX_30MIN_WINDOW),
                "customer_id_nb_tx_60min_window": aiplatform_v1beta1.FeatureValue(int64_value=aggregated.CUSTOMER_ID_NB_TX_60MIN_WINDOW),
                "customer_id_avg_amount_15min_window": aiplatform_v1beta1.FeatureValue(double_value=aggregated.CUSTOMER_ID_SUM_AMOUNT_15MIN_WINDOW / aggregated.CUSTOMER_ID_NB_TX_15MIN_WINDOW),
                "customer_id_avg_amount_30min_window": aiplatform_v1beta1.FeatureValue(double_value=aggregated.CUSTOMER_ID_SUM_AMOUNT_30MIN_WINDOW / aggregated.CUSTOMER_ID_NB_TX_30MIN_WINDOW),
                "customer_id_avg_amount_60min_window": aiplatform_v1beta1.FeatureValue(double_value=aggregated.CUSTOMER_ID_AVG_AMOUNT_60MIN_WINDOW),
            }
            payloads.append(payload)
        return payloads
    
    def populate_terminal_payload(self, new_records, aggregated) -> List[Any]:
        """
        Prepare payloads for terminal related features to be written
        at the vertex feature store. The values are required to be of FeatureValue type.
        """
        payloads = []
        for row in new_records:
            payload = aiplatform_v1beta1.WriteFeatureValuesPayload()
            payload.entity_id = row.TERMINAL_ID
            payload.feature_values = {
                "terminal_id_nb_tx_15min_window": aiplatform_v1beta1.FeatureValue(int64_value=aggregated.TERMINAL_ID_NB_TX_15MIN_WINDOW),
                "terminal_id_nb_tx_30min_window": aiplatform_v1beta1.FeatureValue(int64_value=aggregated.TERMINAL_ID_NB_TX_30MIN_WINDOW),
                "terminal_id_nb_tx_60min_window": aiplatform_v1beta1.FeatureValue(int64_value=aggregated.TERMINAL_ID_NB_TX_60MIN_WINDOW),
                "terminal_id_avg_amount_15min_window": aiplatform_v1beta1.FeatureValue(double_value=aggregated.TERMINAL_ID_SUM_AMOUNT_15MIN_WINDOW / aggregated.TERMINAL_ID_NB_TX_15MIN_WINDOW),
                "terminal_id_avg_amount_30min_window": aiplatform_v1beta1.FeatureValue(double_value=aggregated.TERMINAL_ID_SUM_AMOUNT_30MIN_WINDOW / aggregated.TERMINAL_ID_NB_TX_30MIN_WINDOW),
                "terminal_id_avg_amount_60min_window": aiplatform_v1beta1.FeatureValue(double_value=aggregated.TERMINAL_ID_AVG_AMOUNT_60MIN_WINDOW),
            }
            payloads.append(payload)
        return payloads
    
    def send_request_to_feature_store(self, resource_name: str, payloads: List[Any]):
        """
        Sends a write request to vertex ai feature store by preparing 
        a write feature value request using provided resource name and payloads, and 
        by makeing use of a feature store online serving service client
        """
        # Prepare request
        request = aiplatform_v1beta1.WriteFeatureValuesRequest(
            entity_type=resource_name,
            payloads=payloads,
        )

        # Create feature store online serving service client
        client_options = {
            "api_endpoint": "us-central1-aiplatform.googleapis.com"
        }
        v1beta1_client = aiplatform_v1beta1.FeaturestoreOnlineServingServiceClient(client_options=client_options)
        
        # Send the request
        response = v1beta1_client.write_feature_values(request=request)
        return response


    def process(self, element: Tuple) -> Tuple:
        """
        Select entity using resource_name variable and 
        write the respective features to Vertex AI feature store
        """
        new_records = element[1]['new_records']
        aggregated = element[1]['aggregated'][0]
        
        entity = self.resource_name.split("/")[-1]
        payloads = []
        if entity == "customer":
            payloads = self.populate_customer_payload(new_records, aggregated)
        elif entity == "terminal":
            payloads = self.populate_terminal_payload(new_records, aggregated)            

        response = self.send_request_to_feature_store(self.resource_name, payloads)
        yield (response,)


Appending to beam_pipeline/main.py


### Building the pipeline

Now we are ready to build the pipeline using the defined classes and functions above. Once the pipeline is ready, we will wrap everything into a main function and submit it to the Dataflow.

In [34]:
%%writefile -a beam_pipeline/main.py

def main():
    # Initialize google ai platform
    aiplatform.init(
        project=PROJECT_ID,
        location=REGION
    )

    # Get entitye types for customer and terminal
    fs = aiplatform.featurestore.Featurestore(
        featurestore_name=FEATURESTORE_ID
    )
    customer_entity_type = fs.get_entity_type("customer")
    terminal_entity_type = fs.get_entity_type("terminal")
    
    # Setup pipeline options for deploying to dataflow
    pipeline_options = PipelineOptions(streaming=True, 
                                       save_main_session=True,
                                       runner="DataflowRunner",
                                       project=PROJECT_ID,
                                       region=REGION,
                                       temp_location=f"gs://{BUCKET_NAME}/dataflow/tmp",
                                       requirements_file="requirements.txt",
                                       max_num_workers=2)
    
    # Build pipeline and transformation steps
    pipeline = beam.Pipeline(options=pipeline_options)
    
    source = (
        pipeline
        | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(subscription=SUBSCRIPTION_PATH)
        | 'Decode byte array to json dict' >> beam.Map(lambda row: json.loads(row.decode('utf-8')))
    )

    enriched_source = (
        source
        | 'Attach timestamps' >> beam.Map(lambda row: beam.window.TimestampedValue(row, to_unix_time(row['TX_TS'])))
        | 'Create sliding window' >> beam.WindowInto(beam.window.SlidingWindows(WINDOW_SIZE, WINDOW_PERIOD, offset=WINDOW_SIZE))
        | 'Add window info' >> beam.ParDo(AddAddtionalInfo())
        | 'Convert to namedtuple' >> beam.Map(lambda row: beam.Row(**row))
    )

    new_records = (
        enriched_source
        | 'Filter only new rows' >> beam.Filter(lambda row: row.TS_DIFF <= WINDOW_PERIOD)
    )

    # Engineer customer features
    new_records_customer_id = (
        new_records
        | 'Assign CUSTOMER_ID_COMPOSITE_KEY as key' >> beam.WithKeys(lambda row: row.CUSTOMER_ID_COMPOSITE_KEY)
    )

    aggregated_customer_id = (
        enriched_source
        | 'Group by customer id composite key column' >> beam.GroupBy(CUSTOMER_ID_COMPOSITE_KEY='CUSTOMER_ID_COMPOSITE_KEY')
            .aggregate_field(lambda row: 1 if row.TS_DIFF <= SECONDS_WINDOWS[0] else 0, sum, 'CUSTOMER_ID_NB_TX_15MIN_WINDOW')
            .aggregate_field(lambda row: 1 if row.TS_DIFF <= SECONDS_WINDOWS[1] else 0, sum, 'CUSTOMER_ID_NB_TX_30MIN_WINDOW')
            .aggregate_field('TX_ID', CountCombineFn(), 'CUSTOMER_ID_NB_TX_60MIN_WINDOW')
            .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= SECONDS_WINDOWS[0] else 0, sum,'CUSTOMER_ID_SUM_AMOUNT_15MIN_WINDOW')
            .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= SECONDS_WINDOWS[1] else 0, sum, 'CUSTOMER_ID_SUM_AMOUNT_30MIN_WINDOW')
            .aggregate_field('TX_AMOUNT', MeanCombineFn(), 'CUSTOMER_ID_AVG_AMOUNT_60MIN_WINDOW')
        | 'Assign key for aggregated results (customer id)' >> beam.WithKeys(lambda row: row.CUSTOMER_ID_COMPOSITE_KEY)
    )

    merged_customer_id = (
        ({
            'new_records': new_records_customer_id, 
            'aggregated': aggregated_customer_id
        })
        | 'Merge pcollections (customer id)' >> beam.CoGroupByKey()
        | 'Filter empty rows (customer id)' >> beam.Filter(lambda row: len(row[1]['new_records']) > 0)
        | 'Write to feature store (customer id)' >> beam.ParDo(WriteFeatures(customer_entity_type.resource_name))
    )

    # Engineer terminal features
    new_records_terminal_id = (
        new_records
        | 'Assign TERMINAL_ID_COMPOSITE_KEY as key' >> beam.WithKeys(lambda row: row.TERMINAL_ID_COMPOSITE_KEY)
    )

    aggregated_terminal_id = (
        enriched_source
        | 'Group by terminal id composite key column' >> beam.GroupBy(TERMINAL_ID_COMPOSITE_KEY='TERMINAL_ID_COMPOSITE_KEY')
            .aggregate_field(lambda row: 1 if row.TS_DIFF <= SECONDS_WINDOWS[0] else 0, sum, 'TERMINAL_ID_NB_TX_15MIN_WINDOW')
            .aggregate_field(lambda row: 1 if row.TS_DIFF <= SECONDS_WINDOWS[1] else 0, sum, 'TERMINAL_ID_NB_TX_30MIN_WINDOW')
            .aggregate_field('TX_ID', CountCombineFn(), 'TERMINAL_ID_NB_TX_60MIN_WINDOW')
            .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= SECONDS_WINDOWS[0] else 0, sum, 'TERMINAL_ID_SUM_AMOUNT_15MIN_WINDOW')
            .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= SECONDS_WINDOWS[1] else 0, sum, 'TERMINAL_ID_SUM_AMOUNT_30MIN_WINDOW')
            .aggregate_field('TX_AMOUNT', MeanCombineFn(), 'TERMINAL_ID_AVG_AMOUNT_60MIN_WINDOW')
        | 'Assign key for aggregated results (terminal id)' >> beam.WithKeys(lambda row: row.TERMINAL_ID_COMPOSITE_KEY)
    )

    merged_terminal_id = (
        ({
            'new_records': new_records_terminal_id, 
            'aggregated': aggregated_terminal_id
        })
        | 'Merge pcollections (terminal id)' >> beam.CoGroupByKey()
        | 'Filter empty rows (terminal id)' >> beam.Filter(lambda row: len(row[1]['new_records']) > 0)
        | 'Write to feature store (terminal id)' >> beam.ParDo(WriteFeatures(terminal_entity_type.resource_name))
    )
    
    # Run the pipeline (async)
    pipeline.run()

    
if __name__ == "__main__":
    main()

Appending to beam_pipeline/main.py


### Creating `requirement.txt` for Dataflow Workers

As we are using `google-cloud-aiplatform` package, we need to pass the `requirement.txt` to the Dataflow Workers so that the workers will install the package.

In [35]:
%%writefile beam_pipeline/requirements.txt

google-cloud-aiplatform
google-apitools==0.5.32

Overwriting beam_pipeline/requirements.txt


### Deploying the pipeline

Now we are ready to deploy this pipeline to Dataflow.

In [38]:
!python3 beam_pipeline/main.py