<a href="https://www.nvidia.com/dli"> <img src="images/DLI_Header.png" alt="Header" style="width: 400px;"/> </a>

# Op Optimization

Now that our pipeline is working well, let's learn to scale it with multiple GPUs.

<b>Learning Objectives</b>:
* Learn how to use a LocalCUDACluster to utilize multiple GPUs.
* Learn how to [Rename](https://github.com/NVIDIA/NVTabular/blob/main/nvtabular/ops/rename.py) a column.
* Learn how to export NVTabular to Dask.

### 1. NVTabular with Multi-GPU support by LocalCUDACluster

So far, we have used the simplest way to apply an NVTabular workflow which utilizes only a single GPU. NVTabular can easily be scaled to multiple GPUs by initializing a [LocalCUDACluster](https://dask-cuda.readthedocs.io/en/latest/ucx.html?highlight=localcudacluster#localcudacluster) first. We will begin by importing all of the libraries we need for this lab.

In [1]:
import os
import glob 

import cudf
import rmm
import nvtabular as nvt
import numpy as np
from dask_cuda import LocalCUDACluster
from dask.distributed import Client

from nvtabular.utils import device_mem_size, get_rmm_size

If it is not running already, we highly recommend setting up a terminal on the side to monitor our GPUs with the following command:

`watch -n0.1 nvidia-smi --query-gpu=index,memory.used,memory.total,utilization.gpu --format=csv`

With this command, we can see that have 4 GPUs. So far, we've only been using one GPU (`0`), which is using the most memory. The others are barely using anything. Let's put them to work!

To do this, we will be using a [LocalCUDACluster](https://dask-cuda.readthedocs.io/en/latest/ucx.html?highlight=localcudacluster#localcudacluster). Here are some of the key parameters:

* `protocol`: Protocol to use for communication.
  * `tcp` is the default communication.
  * `ucx` enables to use NVIDIA's [NVLink](https://www.nvidia.com/en-us/data-center/nvlink/) technology, where GPU's can directly communicate with each other and achieves higher speed-ups. It requires `ucx`, `enable_tcp_over_ucx=True` and `enable_nvlink=True`.
* `CUDA_VISIBLE_DEVICES`: Defines visible GPUs devices to the LocalCUDACluster
 * e.g. `0,1,3` for GPU 0, 1 and 3.
* `local_directory`: Defines the directory to buffer data.
* `device_memory_limit` : Reduce the memory limit for workers in your cluster. 
  * This setting may need to be much lower than the actual memory capacity of your device.
  
Rather than manually entering our memory limit, NVTabular has a number of [utils](https://github.com/NVIDIA/NVTabular/blob/main/nvtabular/utils.py) to help us figure out our capacity programmatically. Let's use 90% of our total capacity. If we use all 100%, there will be no room for anything else, which can lead to memory errors.

**TODO**: We've created four variables to use as parameters to `LocalCUDACluster`:
* device_memory_limit
* temporary_data_directory
* protocol
* visible_devices

Match one of each to the FIXMEs below.

**Note**: If you need to rerun this cell, please restart the kernel.

In [2]:
# Use a fraction of capacity to prevent memory errors
device_memory_limit = device_mem_size(kind="total") * .9 
temporary_data_directory = '/tmp/'
protocol = "tcp"             # "tcp" or "ucx"
visible_devices = "0,1,2,3"  # Select devices to place workers

# Deploy a Single-Machine Multi-GPU Cluster
cluster = LocalCUDACluster(
    protocol = protocol,
    CUDA_VISIBLE_DEVICES = visible_devices,
    local_directory = temporary_data_directory,
    device_memory_limit = device_memory_limit
)

# Create the distributed client
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:36212  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 186.82 GiB


In [None]:
# Use a fraction of capacity to prevent memory errors
device_memory_limit = device_mem_size(kind="total") * .9 
temporary_data_directory = '/tmp/'
protocol = "tcp"             # "tcp" or "ucx"
visible_devices = "0,1,2,3"  # Select devices to place workers

# Deploy a Single-Machine Multi-GPU Cluster
cluster = LocalCUDACluster(
    protocol = protocol,
    CUDA_VISIBLE_DEVICES = visible_devices,
    local_directory = temporary_data_directory,
    device_memory_limit = device_memory_limit
)

# Create the distributed client
client = Client(cluster)
client

#### Initializing Memory Pools

Since allocating memory is often a performance bottleneck, it is usually a good idea [to initialize](https://docs.rapids.ai/api/rmm/stable/basics.html) a memory pool on each of our workers. When using a distributed cluster, we must use the `client.run` utility to make sure a function is executed on all available workers.

In [3]:
# Initialize RMM pool on ALL workers
def _rmm_pool():
    rmm.reinitialize()
client.run(_rmm_pool)

{'tcp://127.0.0.1:33075': None,
 'tcp://127.0.0.1:34696': None,
 'tcp://127.0.0.1:40218': None,
 'tcp://127.0.0.1:44853': None}

**Done! Congrats, you are using NVTabular with multi-GPU support, now.**

That's it. After the LocalCUDACluster is initialized, we can use NVTabular as usual, but it will be executed on multiple GPUs.

### Rename

Let's put these GPUs to work with an NVTabular workflow. In our dataset, we have a column called `HourlyWindDirection`. It's in degrees with 0 degree and 360 degrees as true north.

In [4]:
df = cudf.read_parquet("data/transformed_out/*.parquet")
columns = df.columns.to_list()
df.head()

Unnamed: 0,HourlyWindSpeed_difference_lag_1,HourlyWindDirection,HourlyRelativeHumidity,HourlyDewPointTemperature,HourlyDryBulbTemperature,HourlyWetBulbTemperature,HourlyWindSpeed_difference_lag_1_filled,HourlyWindDirection_filled,HourlyRelativeHumidity_filled,HourlyDewPointTemperature_filled,HourlyDryBulbTemperature_filled,HourlyWetBulbTemperature_filled,STATION,DATE
0,-5.0,320.0,50.0,12.222222,0.497725,0.159465,False,False,False,False,False,True,72058700184,2012-04-22T11:35:00
1,0.0,0.0,70.0,21.111111,0.974457,0.159465,False,False,False,False,False,True,72058700184,2011-06-05T23:15:00
2,6.0,310.0,44.0,11.111111,0.616908,0.159465,False,False,False,False,False,True,72058700184,2012-04-22T12:00:00
3,-5.0,310.0,44.0,12.222222,0.736091,0.159465,False,False,False,False,False,True,72058700184,2012-04-22T12:15:00
4,8.0,240.0,66.0,21.111111,1.034049,0.159465,False,False,False,False,False,True,72058700184,2011-06-05T23:35:00


With degrees like this, 350 degrees and 10 degrees seem far away, when physically, they're close. Let's make a pipeline to convert our `HourlyWindDirection` into Latitude and Longitude components.

Since our output cannot have two columns with the same name, we will use the [Rename](https://github.com/NVIDIA/NVTabular/blob/main/nvtabular/ops/rename.py) Op to make this possible. We will add a `postfix` to define which output is the latitude component and which output is the longitude component.

**TODO**: We've provided an example of `Rename` below for the latitude component. Replace the FIXMEs below to add a `_long` tag to the longitude component.

In [33]:
def lat_speed(col):
    return np.cos(np.radians(col))


def long_speed(col):
    return np.sin(np.radians(col))


wind_lat = (
    ["HourlyWindDirection"]
    >> nvt.ops.LambdaOp(lat_speed)
    >> nvt.ops.Rename(postfix="_lat")
)
wind_long = (
    ["HourlyWindDirection"]
    >> nvt.ops.LambdaOp(long_speed)
    >> nvt.ops.Rename(postfix="_long")
)

# Combine all columns
all_cols = columns + wind_lat + wind_long

# Define workflow
files = glob.glob("data/transformed_out/*.parquet")
workflow = nvt.Workflow(all_cols)
dataset1 = nvt.Dataset(files, engine="parquet")

files = glob.glob("data/parquet_out/*.parquet")
workflow = nvt.Workflow(all_cols)
dataset2 = nvt.Dataset(files, engine="parquet")

# files = glob.glob("data/renamed_out/*.parquet")
# workflow = nvt.Workflow(all_cols)
# dataset3 = nvt.Dataset(files, engine="parquet")

In [None]:
def lat_speed(col):
    return np.cos(np.radians(col))


def long_speed(col):
    return np.sin(np.radians(col))


wind_lat = (
    ["HourlyWindDirection"]
    >> nvt.ops.LambdaOp(lat_speed)
    >> nvt.ops.Rename(postfix="_lat")
)
wind_long = (
    ["HourlyWindDirection"]
    >> nvt.ops.LambdaOp(long_speed)
    >> nvt.ops.Rename(postfix="_long")
)

# Combine all columns
all_cols = columns + wind_lat + wind_long

# Define workflow
files = glob.glob("data/transformed_out/*.parquet")
workflow = nvt.Workflow(all_cols)
dataset = nvt.Dataset(files, engine="parquet")

Let's verify that the pipeline is setup correctly. Confirm the following:
* There is a `HourlyWindDirection_lat` column
* There is a `HourlyWindDirection_long` column
* The above two columns have different values
* **All GPUs are used when running the workflow**

In [34]:
!rm -rf data/renamed_out

In [35]:
workflow.transform(dataset1).to_parquet(
    output_path="data/renamed_out/", out_files_per_proc=4
)
df_out1 = cudf.read_parquet("data/renamed_out/*.parquet")

workflow.transform(dataset2).to_parquet(
    output_path="data/renamed_out/", out_files_per_proc=4
)
df_out2 = cudf.read_parquet("data/renamed_out/*.parquet")

# workflow.transform(dataset3).to_parquet(
#     output_path="data/renamed_out/", out_files_per_proc=4
# )
# df_out3 = cudf.read_parquet("data/renamed_out/*.parquet")

df_out = df_out1 + df_out2
df_out.head()

ValueError: The following columns were not found in the dataset {'HourlyRelativeHumidity_filled', 'HourlyWindDirection_filled', 'HourlyWetBulbTemperature_filled', 'HourlyDryBulbTemperature_filled', 'HourlyDewPointTemperature_filled', 'HourlyWindSpeed_difference_lag_1_filled', 'HourlyWindSpeed_difference_lag_1'}
The following columns were found Index(['STATION', 'DATE', 'HourlyDewPointTemperature',
       'HourlyDryBulbTemperature', 'HourlyPrecipitation',
       'HourlyRelativeHumidity', 'HourlyWetBulbTemperature',
       'HourlyWindDirection', 'HourlyWindSpeed'],
      dtype='object')

Congratulations, all done!

In order to shut down our LocalCUDACluster, we can use the `close` method.

In [36]:
client.close()

### Dask Integration and Other Tools

NVTabular is tightly integrated with [Dask-CuDF](https://github.com/rapidsai/dask-cudf), and we can convert a NVTabular Dataset to a Dask DataFrame. While we're at it, let's take a closer look at [NVTabular datasets](https://nvidia.github.io/NVTabular/main/api/dataset.html#).

`??nvt.Dataset`

```
 Parameters
    -----------
    path_or_source : str, list of str, or <dask.dataframe|cudf|pd>.DataFrame
        Dataset path (or list of paths), or a DataFrame. If string,
        should specify a specific file or directory path. If this is a
        directory path, the directory structure must be flat (nested
        directories are not yet supported).
    engine : str or DatasetEngine
        DatasetEngine object or string identifier of engine. Current
        string options include: ("parquet", "csv", "avro"). This argument
        is ignored if path_or_source is a DataFrame type.
    part_size : str or int
        Desired size (in bytes) of each Dask partition.
        If None, part_mem_fraction will be used to calculate the
        partition size.  Note that the underlying engine may allow
        other custom kwargs to override this argument. This argument
        is ignored if path_or_source is a DataFrame type.
    part_mem_fraction : float (default 0.125)
        Fractional size of desired Dask partitions (relative
        to GPU memory capacity). Ignored if part_size is passed
        directly. Note that the underlying engine may allow other
        custom kwargs to override this argument. This argument
        is ignored if path_or_source is a DataFrame type.
    storage_options: None or dict
        Further parameters to pass to the bytes backend. This argument
        is ignored if path_or_source is a DataFrame type.

```

First, `path_or_source` defines our dataset source. For example, we can directly initialize a `nvt.Dataset` from a dask.dataframe.DataFrame or cudf.DataFrame, as below: 

In [9]:
# creates 1 partition
df = cudf.DataFrame({'col1': [0,1,2,3,4], 'col2': ['a', 'b', 'c', 'd', 'e']})
dataset = nvt.Dataset(df)

If we read data from disk, `path_or_source` is a string or list of strings for dataset path. The parameter `engine` defines the filetype and nvt.Dataset supports `parquet`, `csv` and `avro` file formats.

**The parameters `part_size` or `part_mem_fraction` are important when we read data from disk. They define the size of the chunks we read in memory.** 

Only one of the parameters is used. If `part_size` is defined, then `part_mem_fraction` is ignored. By default, both parquet and csv-based data will be converted to a Dask-DataFrame collection with a maximum partition size of roughly 12.5 percent of the total memory on a single device.  The partition size can be changed to a different fraction of total memory on a single device with the `part_mem_fraction` argument.

* `part_size` defines the size of each Dask partition in bytes. A good rule of thumb is `128MB` or `256MB`.
* `part_mem_fraction` defines fractional size of desired Dask partitions relative to GPU memory.

Example:
* if `part_size='100MB'`, then each Dask partition is of 100MB (or smaller)
* if `part_mem_fraction=0.1`, then 10% of the single GPU memory (or smaller) is used for the Dask partitions

**TODO**: Use [glob](https://docs.python.org/3/library/glob.html) to find all of the `.parquet` files in either `data/parquet_out`, `data/renamed_out`, `data/transformed_out`.

In [39]:
files = glob.glob("data/transformed_out/*.parquet", recursive=True)
print(files)
# cudf.read_parquet(files).to_parquet("data/final_parquet")
dataset = nvt.Dataset(files, engine='parquet', part_size='100MB')

['data/transformed_out/2.919c533fafbf446cb635196a8e3165d8.parquet', 'data/transformed_out/3.a92bcfaf689040c6969d62a294d582a1.parquet', 'data/transformed_out/1.edfc22382da74215ab591337ffe3f6a8.parquet', 'data/transformed_out/0.7f1090bc95984029ade469df888e79a7.parquet']


We can then convert this dataset to a Dask DataFrame using `to_ddf`.

In [40]:
ddf = dataset.to_ddf()
ddf

Unnamed: 0_level_0,HourlyWindSpeed_difference_lag_1,HourlyWindDirection,HourlyRelativeHumidity,HourlyDewPointTemperature,HourlyDryBulbTemperature,HourlyWetBulbTemperature,HourlyWindSpeed_difference_lag_1_filled,HourlyWindDirection_filled,HourlyRelativeHumidity_filled,HourlyDewPointTemperature_filled,HourlyDryBulbTemperature_filled,HourlyWetBulbTemperature_filled,STATION,DATE
npartitions=4,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
,float64,float64,float64,float64,float32,float32,bool,bool,bool,bool,bool,bool,int64,object
,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...


We can check the memory usage per columns with [memory_usage](https://docs.rapids.ai/api/cudf/legacy/api.html#cudf.core.dataframe.DataFrame.memory_usage) and `memory_usage_per_partition`.

In [41]:
ddf.memory_usage()

<dask_cudf.Series | 17 tasks | 1 npartitions>

In [42]:
ddf.memory_usage_per_partition()

Dask Series Structure:
npartitions=4
    int64
      ...
      ...
      ...
      ...
dtype: int64
Dask Name: total_mem_usage, 8 tasks

However, since Dask is lazy, and nothing is computed yet, we're only given information about the graph nodes. To force a result, we can use `compute`. The memory usage is in bytes.

In [43]:
ddf.memory_usage().compute()

DATE                                       11171484
HourlyWindSpeed_difference_lag_1            3885728
HourlyDryBulbTemperature                    1942864
HourlyWindSpeed_difference_lag_1_filled      485716
HourlyRelativeHumidity                      3885728
HourlyWindDirection                         3885728
HourlyDryBulbTemperature_filled              485716
Index                                             0
HourlyDewPointTemperature_filled             485716
HourlyWetBulbTemperature_filled              485716
HourlyWindDirection_filled                   485716
HourlyDewPointTemperature                   3885728
HourlyRelativeHumidity_filled                485716
STATION                                     3885728
HourlyWetBulbTemperature                    1942864
dtype: int64

Congratulations on getting through this set of courses! If you would like to learn more about NVTabular and tools for Retail Big Data, these notebooks were created with help from the [NVIDIA Merlin](https://developer.nvidia.com/nvidia-merlin) team. Merlin is a framework used to handle industry scale Recommender Systems.

There are four main components:

<center><img src='https://developer.nvidia.com/sites/default/files/akamai/merlin/recommender-systems-dev-web-850.svg' width='80%'></center>

* [NVTabular](https://nvidia.github.io/NVTabular/main/Introduction.html): Feature engineering and preprocessing library designed to quickly and easily manipulate terabytes of tabular data
* NVTabular dataloader: Highly optimized dataloaders to accelerate TensorFlow and PyTorch pipelines
* [HugeCTR](https://github.com/NVIDIA/HugeCTR): Highly efficient Python and C++ GPU framework and reference design dedicated for recommendation workload training
* [Triton](https://developer.nvidia.com/nvidia-triton-inference-server): Production inference on GPUs for feature transforms and neural network execution.

See you in the next lab!

If you would like to see the GPU numbers drop to zero one more time, feel free to fun the code below:

In [None]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

<a href="https://www.nvidia.com/dli"> <img src="images/DLI_Header.png" alt="Header" style="width: 400px;"/> </a>