# Data Preprocessing at Scale with NVIDIA Merlin NVTabular and Vertex AI

This notebook demonstrates how to preprocess data using [NVIDIA Merlin NVTabular](https://developer.nvidia.com/nvidia-merlin/nvtabular) and [Vertex AI](https://cloud.google.com/vertex-ai). The notebook covers the following:  
1. NVTabular Overview.  
2. Preprocessing Criteo Dataset.  
3. Preprocessing Pipeline on Vertex AI

## 1. Merlin NVTabular Overview

Merlin NVTabular is a feature engineering and preprocessing library designed to effectively manipulate 
large datasets and significantly reduce data preparation time. The [core features](https://github.com/NVIDIA-Merlin/NVTabular/blob/main/docs/source/core_features.md) of NVTabular include:

* Processes large datasets not bound by CPU or GPU memory.
* Accelerates data preprocessing computation on NVIDIA GPUs using the [RAPIDS cuDF](https://github.com/rapidsai/cudf/tree/main/python/dask_cudf) library.
* Supports multi-GPU and multi-node scaling with [DASK-CUDA](https://github.com/rapidsai/dask-cuda) and [dask.distributed](https://distributed.dask.org/en/latest/) parallelism.
* Supports tabular data formats, including comma-separated values (CSV) files, Apache Parquet, Apache Orc, and Apache Avro.
* Provides data loaders that are optimized for TensorFlow, PyTorch, and Merlin HugeCTR.
* Includes multi-hot categoricals and vector continuous passing support to ease feature engineering.


To preprocess the data, we need to define a transformation [`Workflow`](https://nvidia-merlin.github.io/NVTabular/main/api/workflow/workflow.html).  
Each transformation step in the transformation pipeline executes multiple calculations, called `ops`. 
NVTabular provides a [set of ops](c), which include:

 - Filtering outliers or missing values, or creating new features indicating that a value is missing;
 - Imputing and filling in missing data;
 - Discretization or bucketing of continuous features;
 - Creating features by splitting or combining existing features, for example, breaking down a date column into day-of-week, month-of-year, day-of-month features;
 - Normalizing numerical features to have zero mean and unit variance or applying transformations, for example with log transform;
 - Encoding discrete features using one-hot vectors or converting them to continuous integer indices.  

NVTabular processes a dataset, given a pre-defined workflow, in two steps:

1. The `fit` step, where NVTabular compute the statistics required for transforming the data. Such a step requires at most `N` passes through the data, where `N` is the number of chained operations in the workflow.
2. The `apply` step, where NVTabular uses the fitted workflow to process the data. 

NVTabular is designed to minimize the number of passes through the data. This is achieved with a lazy execution strategy. Data operations are not executed until an explicit apply phase. This allows NVTabular to optimize the workflow that requires iteration over the entire dataset.



## 2. Preprocessing Criteo dataset

The Criteo dataset contains over four billion samples spanning 24 CSV files. Each record contains 40 columns: 13 columns are numerical, 26 columns are categorical, and 1 binary target column.  
See [00-dataset-management.ipynb](https://github.com/GoogleCloudPlatform/nvidia-merlin-on-vertex-ai/blob/main/00-dataset-management.ipynb) for more details.


### NVTabular preprocessing Workflow for Criteo dataset

In this example, the preprocessing `nvt.Workflow` consists for the following operations:
 - [Categorify](https://nvidia-merlin.github.io/NVTabular/main/api/ops/categorify.html): applied to categorical columns (column names that start with C). 
 - [FillMissing](https://nvidia-merlin.github.io/NVTabular/main/api/ops/fillmissing.html): applied to continuous columns (column names that start with I).
 - [Clip](https://nvidia-merlin.github.io/NVTabular/main/api/ops/clip.html):  applied to continuous columns after FillMissing.
 - [Normalize](https://nvidia-merlin.github.io/NVTabular/main/api/ops/normalize.html): applied to continuous columns after Clip.
 
<img src="images/dag_preprocessing.png" alt="Pipeline" style="width:30%;"/>
 
 The `nvt.Workflow` is created in the `create_criteo_nvt_workflow` method, which can be found in [src/preprocessing/task.py](https://github.com/GoogleCloudPlatform/nvidia-merlin-on-vertex-ai/blob/main/src/preprocessing/task.py) module.  
 This `nvt.Workflow` will be used as a guide to calculate the necessary statistics, and execute the data transformation.  
 

### Implementing the preprocessing pipelines using KFP

[src/pipelines/preprocessing_pipelines.py](https://github.com/GoogleCloudPlatform/nvidia-merlin-on-vertex-ai/blob/main/src/pipelines/preprocessing_pipelines.py) defines the KFP pipelines to preprocess the Criteo data. 
The `preprocessing_csv` processes the CSV data files in Cloud Storage.

A pipeline component is a self-contained set of code that performs one step in your ML workflow. The pipeline uses the following components defined in [src/pipelines/components.py](https://github.com/GoogleCloudPlatform/nvidia-merlin-on-vertex-ai/blob/main/src/pipelines/components.py):

1. `convert_csv_to_parquet_op`: this component converts raw CSV files to Parquet files, and store them to Cloud Storage. 
2. `analyze_dataset_op`: this component creates a Criteo preprocessing `nvt.Workflow`, fit it to the training data split, and store it to Cloud Storage.
3. `transform_dataset_op`: this component loads the fitted `nvt.Workflow` from Cloud Storage, uses it to transform and input datas split, and store the transformed data as Parquet files to Cloud Storage.

Each component is annotated with Inputs and Outputs to keep track of lineage metadata.  
The docker image used to execute the components is defined in [Dockerfile.nvtabular](https://github.com/GoogleCloudPlatform/nvidia-merlin-on-vertex-ai/blob/main/src/Dockerfile.nvtabular).  

Some steps in the pipeline are configured to submit a custom Vertex AI Training job with the required CPU, memory and GPU configurations.  
You can customize the pipeline by setting the variables in the [config.py](https://github.com/GoogleCloudPlatform/nvidia-merlin-on-vertex-ai/blob/main/src/pipelines/config.py) module.


## Setup

In this section of the notebook you configure your environment settings, including a GCP project, a GCP compute region, a Vertex AI service account and a Vertex AI staging bucket. You also set the locations of training and validation splits in GCS.

Make sure to update the below cells with the values reflecting your environment.

In [13]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [14]:
import os
import json
from datetime import datetime
from google.cloud import aiplatform as vertex_ai
from kfp.v2 import compiler

In [15]:
# Project definitions
PROJECT_ID = 'jk-mlops-dev' # Change to your project ID.
REGION = 'us-central1' # Change to your region.

# Bucket definitions
BUCKET =  'jk-staging-us-central1' # Change to your bucket. All the files will be stored here.
VERSION = 'v03'
MODEL_DISPLAY_NAME = f'criteo-merlin-recommender-{VERSION}'
WORKSPACE = f'gs://{BUCKET}/{MODEL_DISPLAY_NAME}'

# Docker definitions
IMAGE_NAME = 'nvt_preprocessing_test'
IMAGE_URI = f'gcr.io/{PROJECT_ID}/{IMAGE_NAME}'
DOCKERNAME = 'nvtabular'

# Pipeline definitions
PREPROCESS_CSV_PIPELINE_NAME = 'nvt-csv-pipeline'
PREPROCESS_CSV_PIPELINE_ROOT = os.path.join(WORKSPACE, PREPROCESS_CSV_PIPELINE_NAME)

# Instance configuration
# Change if you need a different instance configuration
GPU_LIMIT = '2'
GPU_TYPE = 'NVIDIA_TESLA_A100'
CPU_LIMIT = '24'
MEMORY_LIMIT = '170'

### Set pipeline configurations

In [16]:
os.environ['PROJECT_ID'] = PROJECT_ID
os.environ['REGION'] = REGION
os.environ['BUCKET'] = BUCKET
os.environ['WORKSPACE'] = WORKSPACE

os.environ['NVT_IMAGE_URI'] = IMAGE_URI
os.environ['PREPROCESS_CSV_PIPELINE_NAME'] = PREPROCESS_CSV_PIPELINE_NAME
os.environ['PREPROCESS_CSV_PIPELINE_ROOT'] = PREPROCESS_CSV_PIPELINE_ROOT
os.environ['DOCKERNAME'] = DOCKERNAME

os.environ['INSTANCE_TYPE'] = INSTANCE_TYPE
os.environ['GPU_LIMIT'] = GPU_LIMIT
os.environ['GPU_TYPE'] = GPU_TYPE
os.environ['CPU_LIMIT'] = CPU_LIMIT
os.environ['MEMORY_LIMIT'] = MEMORY_LIMIT

### Initialize Vertex SDK client

In [17]:
# Initialize Vertex AI API
vertex_ai.init(
    project=PROJECT_ID,
    location=REGION,
    staging_bucket=os.path.join(WORKSPACE, 'stg') 
)

### Build Container Docker Image

The following command will build the Docker container image to the NVTabular preprocessing steps of the pipeline and push it to the [Google Container Registry](https://cloud.google.com/container-registry). 

Note that building the Docker container image take up to 20 minutes.

In [18]:
FILE_LOCATION = './src'
! gcloud builds submit --config src/cloudbuild.yaml --substitutions _DOCKERNAME=$DOCKERNAME,_IMAGE_URI=$IMAGE_URI,_FILE_LOCATION=$FILE_LOCATION --timeout=2h --machine-type=e2-highcpu-8

Creating temporary tarball archive of 51 file(s) totalling 5.0 MiB before compression.
Some files were not included in the source upload.

Check the gcloud log [/home/jupyter/.config/gcloud/logs/2022.02.27/18.04.48.317153.log] to see which files and the contents of the
default gcloudignore file used (see `$ gcloud topic gcloudignore` to learn
more).

Uploading tarball of [.] to [gs://jk-mlops-dev_cloudbuild/source/1645985088.676192-d56de4ccb6974facbb959ddd748e4df6.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/jk-mlops-dev/locations/global/builds/e1b2c215-0846-48bf-a74f-d4dd54d12de5].
Logs are available at [https://console.cloud.google.com/cloud-build/builds/e1b2c215-0846-48bf-a74f-d4dd54d12de5?project=895222332033].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "e1b2c215-0846-48bf-a74f-d4dd54d12de5"

FETCHSOURCE
Fetching storage object: gs://jk-mlops-dev_cloudbuild/source/1645985088.676192-d56de4ccb6974facbb959ddd748e4df6.

## 3-1. CSV Preprocessing Pipeline Execution

The CSV Criteo data preprocessing pipeline performs the following steps.  

 1. Read CSV files from Cloud Storage.
 2. Convert the CSV files to parquet format and write it Cloud Storage.
 3. Fit a pre-defined NVTabular workflow to the training data split to calculate transformation statistics.
 4. Transform the training and validation data splits using the fitted workflow.
 5. Output transformed parquet files to Cloud Storage.

<img src="./images/preprocessing_pipeline_csv.png" alt="Pipeline" style="height: 50%; width:50%;"/>

### Converting CSV files to Parquet with NVTabular

The Criteo dataset is provided in TSV format, but the recommended data format to run the NVTabular preprocessing task and get the best possible performance is [Parquet](http://parquet.apache.org/documentation/latest/); a compressed, column-oriented file structure format. While NVTabular also supports reading from CSV files, reading  
Parquet files can be 2x faster than reading CSV files.  

To convert the Criteo CSV data to Parquet, the following steps are performed:

1. Create a `nvt.Dataset` object the CSV data using the `create_csv_dataset` method in [src/preprocessing/task.py](https://github.com/GoogleCloudPlatform/nvidia-merlin-on-vertex-ai/blob/main/src/preprocessing/task.py).
2. Convert the CSV data to Parquet, and write it to Cloud Storahe using the `convert_csv_to_parquet` method in [src/preprocessing/task.py](https://github.com/GoogleCloudPlatform/nvidia-merlin-on-vertex-ai/blob/main/src/preprocessing/task.py).

The pipeline uses the `convert_csv_to_parquet_op` component, which is implemented in [src/pipelines/components.py](https://github.com/GoogleCloudPlatform/nvidia-merlin-on-vertex-ai/blob/main/src/pipelines/components.py) which submits a Vertex AI training job to convert the files.

### Create pipeline parameters

NVTabular provides an option to shuffle the dataset before storing to disk.  
The uniformly shuffled dataset enables the data loader to read in contiguous chunks of data that are already randomized across the entire dataset.
NVTabular provides the option to control the number of chunks that are combined into a batch, allowing the end user flexibility when trading off between performance and true randomization.  
This mechanism is critical when dealing with datasets that exceed CPU memory and per-epoch shuffling is desired during training.  
Full shuffling of such a dataset can exceed training time for the epoch by several orders of magnitude.

In [19]:
# List of path(s) to criteo file(s) or folder(s) in GCS.
# Training files

CRITEO_BASE_PATH = 'gs://jk-criteo-bucket/criteo_raw_tsv'

TRAIN_PATHS = [
    f'{CRITEO_BASE_PATH}/day_{i}' for i in range(23)
]

# Validation files
VALID_PATHS = [f'{CRITEO_BASE_PATH}/day_23'] 

sep = '\t' # Separator for the CSV file.
num_output_files_train = len(TRAIN_PATHS) # Number of output files after converting CSV to Parquet
num_output_files_valid = len(VALID_PATHS) # Number of output files after converting CSV to Parquet 

In [20]:
csv_parameter_values = {
    'train_paths': json.dumps(TRAIN_PATHS),
    'valid_paths': json.dumps(VALID_PATHS),
    'sep': sep,
    'num_output_files_train': num_output_files_train,
    'num_output_files_valid': num_output_files_valid,
    'shuffle': json.dumps(None) # select PER_PARTITION, PER_WORKER, FULL, or None.
}

### Compile KFP pipeline

In [21]:
from src.pipelines.preprocessing_pipelines import preprocessing_csv

csv_compiled_pipeline_path = f'{PREPROCESS_CSV_PIPELINE_NAME}.json'
compiler.Compiler().compile(
       pipeline_func=preprocessing_csv,
       package_path=csv_compiled_pipeline_path
)

### Submit job to Vertex AI Pipelines

In [22]:
job_name = f'{datetime.now().strftime("%Y%m%d%H%M%S")}_{PREPROCESS_CSV_PIPELINE_NAME}'

pipeline_job = vertex_ai.PipelineJob(
    display_name=job_name,
    template_path=csv_compiled_pipeline_path,
    enable_caching=False,
    parameter_values=csv_parameter_values,
)

pipeline_job.submit()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/895222332033/locations/us-central1/pipelineJobs/nvt-csv-pipeline-20220227180940
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/895222332033/locations/us-central1/pipelineJobs/nvt-csv-pipeline-20220227180940')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/nvt-csv-pipeline-20220227180940?project=895222332033


## Next Steps
After completing this notebook you can proceed to the [02-model-training-hugectr.ipynb](https://github.com/GoogleCloudPlatform/nvidia-merlin-on-vertex-ai/blob/main/02-model-training-hugectr.ipynb) notebook that demonstrates how to train DeepFM model using NVIDIA HugeCTR and Vertex AI.