<a href="https://www.eventsforce.net/turingevents/frontend/reg/thome.csp?pageID=89551&eventID=249&traceRedir=2"> <img src="images/turing.png" alt="Header" style="height: 200px;" align="left"/> </a> <a href="https://www.baskerville.ac.uk/"> <img src="images/baskerville.png" alt="Header" style="height: 200px;" /> </a> <a href="https://www.nvidia.com/dli"> <img src="images/DLI_Header.png" alt="Header" style="width: 400px;" align="right"/> </a>

# Challenge Instructions
## Baskerville — Accelerate your research with GPUs 2023 

This challenge is based on part of the NVIDIA DLI course for Accelerating Data Engineering Pipelines. Please do not share this material publicly.

### Objectives

- Read data into GPU memory using the `cuDF` library
- Cleanse the data
- Use the `Dask` library to read thousands of CSV files at once
- Visualise data using the `Plotly` library

### The Dataset

For this challenge, we will be using the US <a href="https://www.noaa.gov/">NOAA's</a> Hourly Precipitation Data. It goes all the way back to 1940! We will be using an unmodified version of their data pulled straight from their database <a href="https://www.ncei.noaa.gov/data/coop-hourly-precipitation/v2/archive/">here</a>, however I have provided a `fetch_data.sh` bash script to pull the data from the website into the `data` folder in your working directory.

> **CHECK**
>
> Please run `fetch_data.sh` in a terminal, and check that your datasets are copied into your challenge folder by running the code below. This is a shell command that counts the number of CSV files contained in the `data` folder and should return the result `2010` (as of 2 Feb 2023).

In [None]:
!ls -l data | wc -l

We highly recommend setting up a terminal on the side to monitor our GPUs with the following command:

`watch -n0.1 nvidia-smi --query-gpu=index,memory.used,memory.total,utilization.gpu --format=csv`

### Configure your Baskerville environment

There are a couple of environment variables to configure in order for the `Dask` and `Plotly` packages to work on Baskerville (may not be necessary on other HPC systems). Run the following to unset `$UCX_NET_DEVICES` and add the conda environment to `$PATH`:


In [None]:
import os
del os.environ["UCX_NET_DEVICES"]
user = os.getenv('USER')
os.environ["PATH"] += os.pathsep + "/bask/projects/v/vjgo8416-training/users/{}/env_data_engineering/bin".format(user)

### cuDF

We can load the data into <a href="https://docs.rapids.ai/api/cudf/stable/">cuDF</a> in order to run some analysis on it. The API for `cuDF` is very similar to the popular `Pandas` library. Let's the compare the two on a single CSV file.

In [None]:
# Import libraries
import numpy as np
import pandas as pd
import cudf

In [None]:
%%time
# Read data into Pandas dataframe (CPU memory)
df_cpu = pd.read_csv("data/AQC00914594.csv")

In [None]:
%%time
# Read data into cuDF dataframe (GPU memory)
df_gpu = cudf.read_csv("data/AQC00914594.csv")

> **NOTE**
>
> Why, in this case, is reading data into GPU memory slower than reading into CPU memory?

 A description of the raw dataset can be found in `data/readme.txt` or from their <a href="https://www.ncei.noaa.gov/data/coop-hourly-precipitation/v2/doc/readme.csv.txt">website</a>. The key points are:

* Each row represents a day's precipitation at a Hourly Precipitation Data (HPD) Station.
* Precipitation is measured in hundredths of an inch
* Each hour is represented as a column with HR00 being the first hour of the day and HR23 being the last.
* A missing value is represented as -9999
* The `DlySum` column is the daily sum across the hours in their respective rows.

#### `describe` method

We can use the <a href="https://docs.rapids.ai/api/cudf/stable/api.html?highlight=describe#cudf.core.dataframe.DataFrame.describe">`describe`</a> method in order to quickly catch unusual data.

In [None]:
df_cpu.describe()

> **CHECK**
>
> * Where does the `-9999` show up in the summary?
> * Besides the min, what other statistics does it impact?
> * Any other unusual observations about the data?


> **ANSWERS**
> * The -9999 appears in the minimum. This should be a red flag to us because how is negative rainfall possible?
> * This strongly affects the mean as the -9999 has a much larger magnitude than the other precipitation data
> * It also strongly affects the standard deviation as it is based on the range between the minimum and maximum values
> * For some columns like latitude and longitude, the standard deviation is `0`. That means all cells in a column have the same value. If we were only working with this .csv, this would be a waste of space, but this value changes depending on the .csv, so we'll keep it as is.
> * Non-numerical columns will still have statistics computed for them, although the results don't have much meaning.

Thankfully, there is a straightforward way to fix this. As we are reading in the data, we can let `read_csv` know that we want to treat `-9999` as a missing value with the `na_values` parameter.

In [None]:
df_cpu = pd.read_csv("data/AQC00914594.csv", na_values='-9999')

In [None]:
df_cpu.describe()

#### `usecols` parameter

When working with large datasets, it's useful to specify which columns we want with the `usecols` parameter. The earlier we can filter our data, the less wasted computation we'll have.

Another parameter we might consider is `dtype`. If we don't specify this, cuDF will sample a number of rows and infer the data type. This can cause issues with anomalous values. For instance, if we have a float while most of the values in a column are integers, cuDF will throw an error about a type mismatch when it finally finds and reads in the float. cuDF supports most <a href="https://numpy.org/doc/stable/user/basics.types.html">NumPy Data Types</a> along with a few others.m

In [None]:
df_gpu = cudf.read_csv(
    "data/AQC00914594.csv",
    usecols=["STATION", "DlySum", "DATE"],
    dtype={
        "STATION": "object",
        "DlySum": np.uint32,
        "DATE": "datetime64[ns]",
    }
)
df_gpu.head()

### Dask

<a href="https://dask.org/">Dask</a> is a library to parallelize Python libraries. Even better? It's device-agnostic, meaning that it scales on GPUs just as easily as it does with CPU computing. 

Dask has a replacement library called `dask_cudf` for <a href="https://docs.rapids.ai/api/cudf/stable/">cuDF</a> with much of the same functions. It even uses the same <a href="https://docs.rapids.ai/api/cudf/stable/api.html?highlight=read_csv#cudf.io.csv.read_csv">`read_csv`</a> function. Let's give it a try reading in all 2010 of our CSV files.

Before we can use `dask_cudf`, we need to use [dask_cuda](https://docs.rapids.ai/api/dask-cuda/nightly/index.html) to distibute our GPU workloads across multiple GPUs.

In [None]:
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
from dask_cuda.initialize import initialize
import rmm

visible_devices = [0,1] # maximum 2 GPUs per node
temp_data_directory = '/tmp/' # define directory to buffer data
device_memory_limit = 2**32 * .9 # use a fraction of 4GiB capacity to prevent memory errors

cluster = LocalCUDACluster(
    CUDA_VISIBLE_DEVICES=visible_devices,
    local_directory=temp_data_directory,
    rmm_pool_size=device_memory_limit,
    # allows direct GPU-to-GPU data transfer
    protocol='ucx', 
    enable_tcp_over_ucx=True,
    enable_nvlink=True,
    enable_infiniband=True,
)
client = Client(cluster)

#### Initializing Memory Pools

Since allocating memory is often a performance bottleneck, it is usually a good idea [to initialize](https://docs.rapids.ai/api/rmm/stable/basics.html) a memory pool on each of our workers. When using a distributed cluster, we must use the `client.run` utility to make sure a function is executed on all available workers.

In [None]:
# Initialize RMM pool on ALL workers
def _rmm_pool():
    rmm.reinitialize()
client.run(_rmm_pool)# Initialize RMM pool on ALL workers
def _rmm_pool():
    rmm.reinitialize()
client.run(_rmm_pool)

In [None]:
%%time
import numpy as np
import dask_cudf

ddf = dask_cudf.read_csv(
    "data/*.csv", # read all CSV files using the * wildcard
    usecols=["STATION", "DlySum", "DATE"],
    dtype={
        "STATION": "object",
        "DlySum": np.uint32,
        "DATE": "datetime64[ns]",
    },
    na_values=["-9999"],
)

ddf = ddf.dropna() # method to remove missing values

For us, the above cell completed in less than **1000** milliseconds. All CSV files in **1000** milliseconds? Is that true!?
Not really. What Dask is doing is building an [execution graph](https://tutorial.dask.org/01x_lazy.html).

We can think of this as building the components to an assembly line. While we've optimized the process of calculating a result, we don't have the result yet. Let's get the mean rainfall data with the `mean` method.

In [None]:
ddf["DlySum"].mean()

Hmm, no results. In order to force a result, we can use the [`compute`](https://docs.rapids.ai/api/cudf/stable/dask-cudf.html) method. The result is stored as a cuDF object as opposed to a Dask computation node.

In [None]:
%%time
my_mean = ddf["DlySum"].mean().compute()
print('The mean is {:.2f}'.format(my_mean))

Another way to force computation is [`persist`](https://docs.dask.org/en/stable/api.html#dask.persist). This keeps the results of the computation within the Dask cuDF environment. This is often used after loading data from disk memory in order to eliminate the relatively long process of reading files over and over again.

In [None]:
ddf = ddf.persist()

In [None]:
%%time
my_mean = ddf["DlySum"].mean().compute()
print('The mean is {:.2f}'.format(my_mean))

Great! Let's use this Dask dataframe to make a plot in the next section.

### Plotly

Now that we have a way to quickly read and manipulate our data, let's make an interactive graph. [Plotly](https://plotly.com/) is a popular library for graphing and data dashboards.

In order for Plotly to make a graph, data needs to be on the host, not the GPU.

Let's try plotting our precipitation data over time. Since we have data going back to 1940, let's filter for the year 2021.

In [None]:
# Filter by year
mask = (ddf['DATE'] >= pd.Timestamp(2021, 1, 1)) & (ddf['DATE'] < pd.Timestamp(2022,1,1))
ddf_year = ddf[mask]

# Compute result and send to host
df_year = ddf_year.compute().to_pandas()

df_year.head()

To plot rainfall over time we're going to use the [`Scatter Graph Object`](https://plotly.com/python-api-reference/generated/plotly.graph_objects.Scatter.html), which encompasses line charts, as well as scatter charts, text charts, and bubble charts.

In [None]:
import plotly.graph_objects as go

fig = go.Figure([go.Scatter(
    x = df_year["DATE"], 
    y = df_year["DlySum"] # hundredths of an inch

)])

fig.show()

There's also the `markers` mode and the [`update_layout`](https://plotly.com/python/reference/layout/) method which gives us even more options to style our figure. For example:

In [None]:
fig = go.Figure([go.Scatter(
    x = df_year["DATE"], 
    y = df_year["DlySum"]/100, # inches
    mode = 'markers'

)])

ft_colour = "rgb(252, 3, 3)" # define font colour

fig.update_layout(
        title = 'USA Precipitation 2021',
        font_color = ft_colour
    )

fig.show()

#### Shut down the kernel

Explicitly shut down the kernel by running the cell below  - this prevents problems with using Dask in another notebook

In [None]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

<a href="https://www.nvidia.com/dli"> <img src="images/DLI_Header.png" alt="Header" style="width: 400px;"/> </a>