# Multi-GPU Scaling in NVTabular with Dask


## NVTabular + Dask Integration

NVTabular>=0.2.0 enables the use of [Dask](https://dask.org/) for multi-GPU parallelism.  To this end, the following classes are now tightly integrated with the [RAPIDS](https://rapids.ai/) `dask_cudf` library:

- **nvtabular.Dataset**: Most NVTabular functionality requires the raw data to be converted to a `Dataset` object. This step *should* be very inexpensive, as it requires minimal IO (if any at all).  A `Dataset` can be initialized using file/directory paths ("csv" or "parquet"), a `pyarrow.Table`, a pandas/cudf `DataFrame`, or a pandas/cudf-based *Dask* `DataFrame`.  The purpose of this "wrapper" class is to provide other NVTabular components with reliable mechanisms to (1) translate the target data into a dask collection, and to (2) iterate over the target data in small-enough chunks to fit comfortably in GPU memory.
- **nvtabular.Workflow**: This is the central class used in NVTabular to compose a GPU-accelerated preprocessing pipeline.  The Workflow class now tracks the state of the underlying data by applying all operations to an internal `dask_cudf.DataFrame` object (`ddf`).
- **nvtabular.ops.StatOperator**: All "statistics-gathering" operations must be designed to operate directly on the `Workflow` object's internal `ddf`.  This requirement facilitates the ability of NVTabular to handle the calculation of global statistics in a scalable way.

**Big Picture**:  NVTabular is tightly integrated with `dask_cudf`.  By representing the underlying dataset as a (lazily-evaluated) collection of cudf DataFrame objects (i.e. a single `dask_cudf.DataFrame`), we can seamlessly scale our preprocessing workflow to multiple GPUs.

## Simple Multi-GPU Toy Example
In order to illustrate the `dask_cudf`-based functionality of NVTabular, we will walk through a simple preprocessing example using *toy* data.


### Step 1:  Import Libraries and Cleanup Working Directories

In [1]:
# Standard Libraries
import os
import glob
import shutil

# External Dependencies
import cupy as cp
import cudf
import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
from dask.utils import parse_bytes

# NVTabular
import nvtabular as nvt
import nvtabular.ops as ops
from nvtabular.io import Shuffle

Note that it is often a good idea to set-aside (fast) dedicated disk space for Dask "workers" to spill data and write logging information.  To make things simple, we will perform all IO within a single `BASE_DIR` for this example.  Make sure to reset this environment variable as desired.

In [2]:
# Choose a "fast" root directory for this example
BASE_DIR = os.environ.get("BASE_DIR", "./basedir")

# Define and clean our worker/output directories
dask_workdir = os.path.join(BASE_DIR, "workdir")
demo_output_path = os.path.join(BASE_DIR, "demo_output")
demo_dataset_path = os.path.join(BASE_DIR, "demo_dataset.parquet")

# Ensure BASE_DIR exists
if not os.path.isdir(BASE_DIR):
    os.mkdir(BASE_DIR)

# Make sure we have a clean worker space for Dask
if os.path.isdir(dask_workdir):
    shutil.rmtree(dask_workdir)
os.mkdir(dask_workdir)

# Make sure we have a clean output path
if os.path.isdir(demo_output_path):
    shutil.rmtree(demo_output_path)
os.mkdir(demo_output_path)

### Step 2: Deploy a Distributed-Dask Cluster

Before we walk through the rest of this multi-GPU preprocessing example, it is important to reiterate that `dask_cudf` is used extensively within NVTabular.  This essentially means that you do **not** need to do anything special to *use* Dask here.  With that said, the default behavior of NVTabular is to to utilize Dask's ["synchronous"](https://docs.dask.org/en/latest/scheduling.html) task scheduler, which precludes distributed processing.  In order to properly utilize a multi-GPU system, you need to deploy a `dask.distributed` *cluster*.

There are many different ways to create a distributed Dask cluster.  In this notebook, we will focus only on the `LocalCUDACluster` API (which is provided by the RAPIDS [`dask_cuda`](https://github.com/rapidsai/dask-cuda) library). I also recommend that you check out [this blog article](https://blog.dask.org/2020/07/23/current-state-of-distributed-dask-clusters) to see a high-level summary of the (many) other cluster-deployment utilities.

For this example, we will assume that you want to perform preprocessing on a single machine with multiple GPUs. In this case, we can use `dask_cuda.LocalCUDACluster` to deploy a distributed cluster with each worker process being pinned to a distinct GPU.  This class also provides our workers with mechanisms for device-host memory spilling, and (optionally) enables the use of NVLink and infiniband-based inter-process communication via UCX...

In [3]:
# Deploy a Single-Machine Multi-GPU Cluster
protocol = "tcp"                     # "tcp" or "ucx"
visible_devices = "0,1,2,3,4,5,6,7"  # Delect devices to place workers
device_memory_limit = "28GB"         # Spill device mem to host at this limit
memory_limit = "96GB"                # Spill host mem to disk near this limit
cluster = None                       # (Optional) Specify existing scheduler port
if cluster is None:
    cluster = LocalCUDACluster(
        protocol = protocol,
        CUDA_VISIBLE_DEVICES = visible_devices,
        local_directory = dask_workdir,
        device_memory_limit = parse_bytes(device_memory_limit),
        memory_limit = parse_bytes(memory_limit),
    )

# Create the distributed client
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:34305  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 8  Memory: 768.00 GB


Since allocating memory is often a performance bottleneck, it is usually a good idea to initialize a memory pool on each of our workers. When using a distributed cluster, we must use the `client.run` utility to make sure a function is exectuted on all available workers...

In [4]:
# Initialize RMM pool on ALL workers
def _rmm_pool():
    cudf.set_allocator(
        pool=True,
        initial_pool_size=None, # Use default size
        allocator="default",
    )

client.run(_rmm_pool)

{'tcp://127.0.0.1:32939': None,
 'tcp://127.0.0.1:34515': None,
 'tcp://127.0.0.1:34827': None,
 'tcp://127.0.0.1:35537': None,
 'tcp://127.0.0.1:36285': None,
 'tcp://127.0.0.1:36409': None,
 'tcp://127.0.0.1:38957': None,
 'tcp://127.0.0.1:39155': None}

**Note**: If you have problems with this, it *may* be a `numba-0.51` problem. Try: `conda install -c conda-forge numba=0.50`


### Step 3: Create a "Toy" Parquet Dataset
In order to illustrate the power of multi-GPU scaling, without requiring an excessive runtime, we can use the `cudf.datasets.timeseries` API to generate a 20GB toy dataset.  Note that there are many ways that we can use Dask/dask_cudf to generate a large dataset in parallel.  However, for this example will just use cudf on a single process...

In [5]:
# Write a "largish" dataset (~20GB)...
# Change `write_count` and/or `freq` for larger or smaller dataset
write_count = 25
freq = "1s"
if not os.path.exists(demo_dataset_path):
    pw = cudf.io.parquet.ParquetWriter(demo_dataset_path)
    for i in range(write_count):
        df = cudf.datasets.timeseries(start="2000-01-01", end="2000-12-31", freq=freq, seed=i).reset_index(drop=False)
        df["name"] = df["name"].astype("object")
        df["label"] = cp.random.choice(cp.array([0, 1], dtype="uint8"), len(df))
        pw.write_table(df)
    pw.close()

### Step 4: Create an NVTabular Dataset object

As discussed above, the `nvt.Workflow` class requires data to be represented as an `nvt.Dataset`. This convention allows NVTabular to abstract away the raw format of the data, and convert everything to a consistent `dask_cudf.DataFrame` representation. Since the `Dataset` API effectively wraps functions like `dask_cudf.read_csv`, the syntax is very simple and the computational cost is minimal.

**Important Dataset Considerations**:

- Can be initialized with the following objects:
    - 1+ file/directory paths. An `engine` argument is required to specify the file format (unless file names are appended with `csv` or `parquet`)
    - `cudf.DataFrame`. Internal `ddf` will have 1 partition.
    - `pandas.DataFrame`. Internal `ddf` will have 1 partition.
    - `pyarrow.Table`. Internal `ddf` will have 1 partition.
    - `dask_cudf.DataFrame`. Internal `ddf` will be a shallow copy of the input.
    - `dask.dataframe.DataFrame`. Internal `ddf` will be a direct pandas->cudf conversion of the input.
- For file-based data initialization, the size of the internall `ddf` partitions will be chosen according to the following arguments (in order of precedence):
    - `part_size`: Desired maximum size of each partition **in bytes**.  Note that you can pass a string here. like `"2GB"`.
    - `part_mem_fraction`: Desired maximum size of each partition as a **fraction of total GPU memory**.

In [6]:
%%time
# Create a Dataset
# (`engine` argument optional if file names appended with `csv` or `parquet`)
ds = nvt.Dataset(demo_dataset_path, engine="parquet", part_size="1.2GB")

CPU times: user 934 ms, sys: 423 ms, total: 1.36 s
Wall time: 1.3 s


Once your data is converted to a `Dataset` object, it can be converted to a `dask_cudf.DataFrame` using the `to_ddf` method.  The wonderful thing about this `DataFrame` object, is that you are free to operate on it using a familiar `cudf`/`pandas` API...

In [7]:
ds.to_ddf().head()

Unnamed: 0,timestamp,id,name,x,y,label
0,2000-01-01 00:00:00,1019,Michael,0.168205,-0.54723,1
1,2000-01-01 00:00:01,984,Patricia,-0.145077,-0.240521,0
2,2000-01-01 00:00:02,935,Victor,0.557024,-0.098855,1
3,2000-01-01 00:00:03,970,Alice,0.527366,-0.632569,0
4,2000-01-01 00:00:04,997,Dan,0.309193,0.704845,0


Note that the output of a Dataset (a `ddf`) can be used to initialize a new Dataset.  This means we can use `dask_cudf` to perform complex ETL on our data before we process it in a `Workflow`. For example, although NVTabular does not support global shuffling transformations (yet), these operations **can** be performed before (and/or after) a Workflow.  The catch here is that operations requiring the global movement of data between partitions can require more device memory than available.

In [8]:
# Example of global shuffling outside an NVT Workflow
ddf = ds.to_ddf().shuffle("id", ignore_index=True)
ds = nvt.Dataset(ddf)
ds.to_ddf()

Unnamed: 0_level_0,timestamp,id,name,x,y,label
npartitions=29,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,datetime64[us],int64,object,float64,float64,uint8
,...,...,...,...,...,...
...,...,...,...,...,...,...
,...,...,...,...,...,...
,...,...,...,...,...,...


Since global shuffling operations can lead to significant GPU-memory pressure, we will start with a simpler `Dataset` definition for this example...

In [9]:
del ds
del ddf

dataset = nvt.Dataset(demo_dataset_path, part_mem_fraction=0.1)

Note that the default value for part_mem_fraction (0.125) is usually safe, but we will use a slightly smaller partition size for this example to be conservative.

### Step 5: Define our NVTabular Workflow

Now that we have our Dask cluster up and running, we can use the NVTabular API as usual...

In [10]:
# Define categorical, continuous, and label columns
cat_names = ["name", "id"]
cont_names = ["x", "y", "timestamp"]
label_name = ["label"]

# Initalize our Workflow
workflow = nvt.Workflow(cat_names=cat_names, cont_names=cont_names, label_name=label_name, client=client)

# Add Continuous Operation(s)
workflow.add_preprocess(ops.Normalize(columns=["x", "y"]))

# Add Categorical Operation(s)
workflow.add_preprocess(
    ops.Categorify(
        out_path=demo_output_path,  # Path to write unique values used for encoding
    )
)

# Finalize the Workflow
workflow.finalize()

### Step 6: Apply our Workflow

In [11]:
%%time
workflow.apply(
    dataset,
    output_format="parquet",
    output_path=os.path.join(demo_output_path,"processed"),
    shuffle=Shuffle.PER_WORKER,
    out_files_per_proc=8,
)

CPU times: user 1.4 s, sys: 140 ms, total: 1.54 s
Wall time: 13.4 s


For this (modestly sized) toy dataset, we get a great performance boost when we move from 1 to 2 V100 GPUs.  The parallel efficiency definitely drops at 8 GPUs.  However, when processing larger datasets, NVTabular/dask_cudf can scale well beyond a single DGX-1 (or DGX-A100).  It is also important to recognize that multi-GPU and multi-node scaling is typically much more successful with UCX support (enabling both NVLink and Infiniband communication).

**Example Results**:

**1 x 32GB V100 GPU**
```
CPU times: user 1.31 s, sys: 368 ms, total: 1.68 s
Wall time: 50.6 s
```

**2 x 32GB V100 GPUs**
```
CPU times: user 1.18 s, sys: 226 ms, total: 1.41 s
Wall time: 26.3 s
```

**8 x 32GB V100 GPUs**
```
CPU times: user 1.34 s, sys: 309 ms, total: 1.64 s
Wall time: 15 s
```

Now that we are done executing our Workflow, we can check the output data to confirm that everything is looking good...

In [12]:
dask_cudf.read_parquet(os.path.join(demo_output_path,"processed")).head()

Unnamed: 0,x,y,timestamp,name,id,label
0,1.293507,0.13515,2000-07-12 03:10:16,14,226,0
1,-1.068263,-0.173809,2000-01-16 09:39:08,2,195,0
2,1.36477,-0.438126,2000-01-18 11:12:11,4,187,0
3,-1.491342,-0.804333,2000-01-15 02:43:34,17,163,0
4,-0.744622,-1.32145,2000-06-08 09:12:03,15,206,0


### Step 7: (Optional) Follow-up Processing/Writing with `dask_cudf`

Instead of using an NVTabular Workflow to persist your processed dataset to disk, it is also possible to extract the internal `ddf` from the workflow, and perform follow-up operations with the `dask_cudf` API.  For example, if you want to convert the entire dataset into a `groupby` aggregation, you could do somthing like this...

In [13]:
%%time
ddf = workflow.get_ddf()
ddf = ddf.groupby(["name"]).max() # Optional follow-up processing
ddf.to_parquet(os.path.join(demo_output_path, "dask_output"), write_index=False)

CPU times: user 192 ms, sys: 6.97 ms, total: 198 ms
Wall time: 1.77 s


As always, we can use either `nvt.Dataset` or `dask_cudf` directly to read back our data...

In [14]:
dask_cudf.read_parquet(os.path.join(demo_output_path, "dask_output")).compute()

Unnamed: 0,x,y,timestamp,id,label
0,1.732032,1.731952,2000-12-31 00:00:00,354,1
1,1.732032,1.731952,2000-12-30 23:59:58,364,1
2,1.732032,1.731953,2000-12-31 00:00:00,354,1
3,1.732032,1.731952,2000-12-31 00:00:00,359,1
4,1.732032,1.731952,2000-12-31 00:00:00,357,1
5,1.732032,1.731953,2000-12-31 00:00:00,349,1
6,1.732032,1.731953,2000-12-30 23:59:59,356,1
7,1.732032,1.731952,2000-12-30 23:59:58,362,1
8,1.732032,1.731953,2000-12-31 00:00:00,361,1
9,1.732032,1.731952,2000-12-31 00:00:00,352,1


## Notes on Shuffling

NVTabular currently supports two shuffling options when writing output to disk (with a an additional `FULL` optoin planned for a future release): 

- `nvt.io.Shuffle.PER_PARTITION`
- `nvt.io.Shuffle.PER_WORKER`

For both these cases, the partitions of the underlying dataset/ddf are randomly ordered before any processing is performed. If `PER_PARTITION` is specified, each worker/process will also shuffle the rows within each partition before splitting and appending the data to a number (`out_files_per_proc`) of output files. Output files are distinctly mapped to each worker process. If `PER_WORKER` is specified, each worker will follow the same procedure as `PER_PARTITION`, but will re-shuffle each file after all data is persisted.  This results in a full shuffle of the data processed by each worker.  To improve performace, this option currently uses host-memory `BytesIO` objects for the intermediate persist stage. The general `PER_WORKER` algorithm is illustrated here:

![image.png](../_images/per_worker_shuffle.png)
