# User defined functions with cuDF

Sometimes, the builtin methods of cudf.DataFrame don't do exactly what we want. We need to write a custom function to apply over the data frame.

cuDF’s DataFrame class has two methods that let users run custom Python functions on GPUs: `apply_rows` and `apply_chunks`. In this tutorial, we’ll walk through how to use `apply_rows` and `apply_chunks` to create your own UDFs and show how you can implement a GPU-accelerated windowing function.



## `apply_rows`

`apply_rows` processes each of the DataFrame rows independently in parallel. Under the hood, the `apply_rows` method will optimally divide the long columns into chunks, and assign chunks into different GPU blocks for parallel computation. 

In order to use `apply_rows`, we need to write a kernel function. A kernel function is a function that should should loop over the columns and set the output for each row. **The loop execution order is arbitrary, so each iteration of the loop MUST be independent of each other.** The loop in the function resembles serial code, but executes concurrently in multiple threads on the GPU. When func is invoked, the array args corresponding to the input/output are strided so as to improve GPU parallelism. The kernel function is compiled to the GPU using `numba.cuda`, so the kernel function must only use Python features/functions that are supported by numba.

A kernel function takes the form

```python
def kernel(in1, in2, in3, ..., out1, out2, ..., kwarg1, kwarg2, ...):
    for i, (x, y, z, ...) in enumerate(zip(in1, in2, in3, ...)):
        out1[i] = ...
        out2[i] = ...
```

`in1, in2, in3, ...` are the input columns. `out1, out2, ...` are the output columns. The kernel function should not return a result. Rather, output columns are passed as arguments and the result is written to them. Additional keyword arguments can be passed (`kwarg1, kwarg2, ...`). Inside the kernel function, [standard numba.cuda attributes](https://numba.pydata.org/numba-doc/dev/cuda/kernels.html#thread-positioning) like `numba.cuda.threadIdx` can be used to access things like the thread or block indices.

`apply_rows` is called like

```python
df = df.apply_rows(kernel
                   incols=['in1', 'in2', 'in3', ...],
                   outcols={'out1': np.float64, 'out2': np.int8, ...},
                   kwargs={'kwarg1': val1, 'kwarg2': val2, ...})
```

`incols` is a list of the arguments to `kernel` that are input columns. These should have the same names as the columns in the dataframe.

`outcols` is a dictionary mapping the output column names to their dtype.

`kwargs` is a dictionary mapping the keyword argument parameters to their values. `kwargs` can be an empty dictionary if there are no keyword arguments.

After calling `apply_rows` as above, `df` will have extra columns `out1`, `out2`, ... with the output results. 

## Example: Haversine distance

In the example below, we create a dataframe representing pairs of latitude and longitude points. We use `apply_rows` to calculate the [Haversine distance](https://en.wikipedia.org/wiki/Haversine_formula) between two points in the input array and also print out the GPU block and grid allocation information.

$$\begin{align}
  d &= 2r \arcsin\left(\sqrt{\sin^2\left(\frac{\varphi_2 - \varphi_1}{2}\right) + \cos(\varphi_1) \cos(\varphi_2)\sin^2\left(\frac{\lambda_2 - \lambda_1}{2}\right)}\right)
\end{align}$$

where $\varphi_1,\varphi_2$ are the latitudes and $\lambda_1,\lambda_2$ are the longitudes.

In [43]:
from math import cos, sin, asin, sqrt, pi, atan2

import time
import cudf
import numpy as np
from numba import cuda

In [44]:
np.random.seed(12)
data_length = 1000

df = cudf.DataFrame()
df['lat1'] = np.random.normal(10, 1, data_length)
df['lon1'] = np.random.normal(10, 1, data_length)
df['lat2'] = np.random.normal(10, 1, data_length)
df['lon2'] = np.random.normal(10, 1, data_length)

In [45]:
def haversine_distance_kernel(lat1, lon1, lat2, lon2, out):
    """Haversine distance formula taken from Michael Dunn's StackOverflow post:
    https://stackoverflow.com/questions/4913349/haversine-formula-in-python-bearing-and-distance-between-two-gps-points
    """
    for i, (x_1, y_1, x_2, y_2) in enumerate(zip(lat1, lon1, lat2, lon2)):
        print('thread_id:', cuda.threadIdx.x, 'bid:', cuda.blockIdx.x,
              'array size:', lat1.size, 'block threads:', cuda.blockDim.x)

        x_1 = pi/180 * x_1
        y_1 = pi/180 * y_1
        x_2 = pi/180 * x_2
        y_2 = pi/180 * y_2
        
        dlon = y_2 - y_1
        dlat = x_2 - x_1
        a = sin(dlat/2)**2 + cos(x_1) * cos(x_2) * sin(dlon/2)**2
        
        c = 2 * asin(sqrt(a)) 
        r = 6371 # Radius of earth in kilometers
        
        out[i] = c * r

In [46]:
df = df.apply_rows(haversine_distance_kernel,
                   incols=['lat1', 'lon1', 'lat2', 'lon2'],
                   outcols=dict(out=np.float64),
                   kwargs=dict())

In [22]:
print(df.head())

                 lat1                lon1                lat2                lon2                 out
0  10.472985831489979  11.889149236424382  10.173868999704066   9.557544882003544   257.2241270903516
1   9.318574120560555  11.733838880199368  10.237408418960612  10.941703071688115  134.06359643450295
2  10.242439496690478   9.969705766548733   8.158691857367202   8.084420608064908   310.6504166817936
3   8.299264365961669   10.28430047739249  10.376267158128183   9.831747409666285   236.2291524258287
4    10.7531428339492  11.003470626251172   9.416266513336655  12.702623243748901  238.11529748873042


Note: print statements in kernels will only appear in terminal output; Jupyter Notebooks won't display them

Sample print statement output:

```
tid: 32 bid: 0 array size: 2 block threads: 64
tid: 33 bid: 0 array size: 2 block threads: 64
...
tid: 24 bid: 6 array size: 1 block threads: 64
tid: 25 bid: 6 array size: 1 block threads: 64
...
tid: 1 bid: 5 array size: 1 block threads: 64
tid: 2 bid: 5 array size: 1 block threads: 64
...
```

In the example above, the sample output in the last cell shows that the for-loop in the kernel function is unrolled by the compiler automatically. It uses 15 CUDA blocks, each using 64 threads to do the computation. In this case, most threads in a block handle one element from the input array, but some threads have to deal with two elements, because there are 1000 rows and 960 threads (15 blocks * 64 threads per block). Additionally, due to the parallelism, the DataFrame rows are not necessarily processed in order.

## **Exercise 1**

Modify the above example to pass in the radius of the earth `r` as a keyword argument to the kernel.

<details><summary><b>Solution</b></summary>
   <pre>
def haversine_distance_kernel(lat1, lon1, lat2, lon2, out, r):
    """Haversine distance formula taken from Michael Dunn's StackOverflow post:
    https://stackoverflow.com/questions/4913349/haversine-formula-in-python-bearing-and-distance-between-two-gps-points
    """
    for i, (x_1, y_1, x_2, y_2) in enumerate(zip(lat1, lon1, lat2, lon2)):
        print('thread_id:', cuda.threadIdx.x, 'bid:', cuda.blockIdx.x,
              'array size:', lat1.size, 'block threads:', cuda.blockDim.x)

        x_1 = pi/180 * x_1
        y_1 = pi/180 * y_1
        x_2 = pi/180 * x_2
        y_2 = pi/180 * y_2
        
        dlon = y_2 - y_1
        dlat = x_2 - x_1
        a = sin(dlat/2)**2 + cos(x_1) * cos(x_2) * sin(dlon/2)**2
        
        c = 2 * asin(sqrt(a)) 
        
        out[i] = c * r
   </pre>
    <pre>
df = df.apply_rows(haversine_distance_kernel,
                   incols=['lat1', 'lon1', 'lat2', 'lon2'],
                   outcols=dict(out=np.float64),
                   kwargs=dict(r=6371))
print(df.head()
</pre>
</details>

## **Exercise 2**

Write a kernel to compute the [bearing formula](https://www.movable-type.co.uk/scripts/latlong.html):

$$\operatorname{atan2}(\sin(\lambda_2-\lambda_1)\cos(\varphi_2), \cos(\varphi_1)\sin(\varphi_2)-\sin(\varphi_1)\cos(\varphi_2)\cos(\lambda_2-\lambda_1))$$
       
where again $\varphi_1,\varphi_2$ are the latitudes and $\lambda_1,\lambda_2$ are the longitudes.

<details><summary><b>Solution</b></summary>
   <pre>
from math import atan2

def bearing_kernel(lat1, lon1, lat2, lon2, out):
    for i, (x_1, y_1, x_2, y_2) in enumerate(zip(lat1, lon1, lat2, lon2)):
        print('thread_id:', cuda.threadIdx.x, 'bid:', cuda.blockIdx.x,
              'array size:', lat1.size, 'block threads:', cuda.blockDim.x)

        x_1 = pi/180 * x_1
        y_1 = pi/180 * y_1
        x_2 = pi/180 * x_2
        y_2 = pi/180 * y_2
        
        dlon = y_2 - y_1
        a = atan2(sin(dlon)*cos(x_2), cos(x_1)*sin(x_1) - sin(x_1)*cos(x_2)*cos(dlon))
        # Convert radians [-π, π] to degrees [0°, 360°]
        out[i] = (180/pi*a + 180) % 360

df = df.apply_rows(bearing_kernel,
                   incols=['lat1', 'lon1', 'lat2', 'lon2'],
                   outcols=dict(out=np.float64),
                   kwargs=dict())

print(df.head())

</pre>
</details>

## `apply_chunks`

`apply_chunks` is a more general version of `apply_rows`, that gives us control over how the data is chunked on the GPU. We can specify how to divide the long array into chunks, map each of the array chunks to different GPU blocks to process (chunks argument) and assign the number of threads per block (tpb argument). The for-loop is also no longer automatically unrolled in the kernel function, instead serving as the for-loop for that GPU thread.

`apply_chunks` kernels work the same as as `apply_rows` kernels. When we call `apply_chunks`, we must supply the chunk size `chunks` as an integer or `cudf.Series` of integer offsets, and the number of threads per block `tpb`. Note that `tpb` can be omitted, but in that case, it defaults to `1` thread per block, which is very inefficient. Therefore it is recommended to always set this argument.

Each kernel corresponds to a thread in one block, with full access to all the elements in that chunk of the array. In this example with chunks=16, we try to uniformly cut the 100 elements into chunks of size 16, resulting in six blocks with a full 16 rows and the seventh block with four rows (for a total of 100 rows). Each block has eight threads to process this length 16 subarray (or length four for the last block), since we set tpb=8.

In [33]:
# This is the exact same kernel as above. 
def haversine_distance_kernel(lat1, lon1, lat2, lon2, out):
    """Haversine distance formula taken from Michael Dunn's StackOverflow post:
    https://stackoverflow.com/questions/4913349/haversine-formula-in-python-bearing-and-distance-between-two-gps-points
    """
    for i, (x_1, y_1, x_2, y_2) in enumerate(zip(lat1, lon1, lat2, lon2)):
        print('thread_id:', cuda.threadIdx.x, 'bid:', cuda.blockIdx.x,
              'array size:', lat1.size, 'block threads:', cuda.blockDim.x)

        x_1 = pi/180 * x_1
        y_1 = pi/180 * y_1
        x_2 = pi/180 * x_2
        y_2 = pi/180 * y_2
        
        dlon = y_2 - y_1
        dlat = x_2 - x_1
        a = sin(dlat/2)**2 + cos(x_1) * cos(x_2) * sin(dlon/2)**2
        
        c = 2 * asin(sqrt(a)) 
        r = 6371 # Radius of earth in kilometers
        
        out[i] = c * r

In [31]:
outdf = df.apply_chunks(haversine_distance_kernel,
                        incols=['lat1', 'lon1', 'lat2', 'lon2'],
                        outcols=dict(out=np.float64),
                        kwargs=dict(),
                        chunks=16,
                        tpb=8)

In [32]:
print(outdf.head())

                 lat1                lon1                lat2                lon2                 out
0  10.472985831489979  11.889149236424382  10.173868999704066   9.557544882003544  257.22412709035154
1   9.318574120560555  11.733838880199368  10.237408418960612  10.941703071688115  134.06359643450293
2  10.242439496690478   9.969705766548733   8.158691857367202   8.084420608064908  310.65041668179356
3   8.299264365961669   10.28430047739249  10.376267158128183   9.831747409666285   236.2291524258287
4    10.7531428339492  11.003470626251172   9.416266513336655  12.702623243748901  238.11529748873042


Note: print statements in kernels will only appear in terminal output; Jupyter Notebooks won't display them

Sample print statement output:

```
tid: 0 bid: 2 array size: 16 block threads: 8
tid: 1 bid: 2 array size: 16 block threads: 8
...
tid: 6 bid: 6 array size: 4 block threads: 8
tid: 7 bid: 6 array size: 4 block threads: 8
...
tid: 6 bid: 1 array size: 16 block threads: 8
tid: 7 bid: 1 array size: 16 block threads: 8
...
```

## **Exercise**

Modify your `apply_rows` exercise functions above to work with `apply_chunks` (if you did not complete the exercises, click **solution** below each exercise to get the solution)

## Advanced `apply_chunks`: rolling window

In the financial services industry, data scientists often need to compute features from time series data. One of the most popular ways to process time series data is to compute a moving average, as if you were sliding a window across your array. In the following example, we’ll show how to utilize `apply_chunks` to speed up computing moving averages for a long array:

In the below code, we divide the array into subarrays of size `trunk_size` and send those subarrays to GPU blocks to compute the moving average with `moving_average_kernel`. However, there is no history for the elements at the beginning of the subarray. To account for this, we shift the chunk division by an offset of `average_window`. Then, we call `fill_missing_average_kernel` to compute the moving average of only those missing records.

Note that when we used `fill_missing_average_kernel`, we didn’t define the `outcols` argument for `apply_chunks`, as it would create a new GPU memory buffer and overwrite the old out array. Instead, we reuse the existing out array as input. For an array of 100 million records, cuDF takes about 0.6 seconds to do the computation using an NVIDIA Tesla V100 GPU.

In [38]:
data_length = int(1e8)
average_window = 4
threads_per_block = 128
trunk_size = 10240

In [39]:
df = cudf.DataFrame()
df['in1'] = np.arange(data_length, dtype=np.float64)

In [40]:
def moving_average_kernel(in1, out, average_length):
    # Set the first average_length-1 rows in each chunk to np.nan
    # since there's not enough history.
    for i in range(cuda.threadIdx.x, average_length-1, cuda.blockDim.x):
        out[i] = np.nan
    
    # For all other rows, compute the average of the preceding
    # average_length rows (inclusive)
    for i in range(cuda.threadIdx.x + average_length - 1, in1.size, cuda.blockDim.x):
        summ = 0.0
        
        for j in range(i - average_length + 1, i + 1):
            summ += in1[j]
        
        out[i] = summ / np.float64(average_length)

def fill_missing_average_kernel(in1, out, average_length):
    # Safeguard to make sure we're not accessing outside the subarray boundary.
    # Prevents the average for being calculated where there isn't enough history.
    if in1.size - average_length + cuda.threadIdx.x - average_length + 1 < 0:
        return
    
    # Calculate the moving average for the average_length-1 rows with np.nan values
    # at the end of each shifted chunk that need an actual value.
    for i in range(in1.size - average_length + cuda.threadIdx.x,
                   in1.size, cuda.blockDim.x):
        
        summ = 0.0
        
        for j in range(i - average_length + 1,
                       i + 1):
            summ += in1[j]
        
        out[i] = summ / np.float64(average_length)

In [41]:
start = time.time()
df = df.apply_chunks(moving_average_kernel,
                     incols=['in1'],
                     outcols=dict(out=np.float64),
                     kwargs=dict(average_length=average_window),
                     chunks=list(range(0, data_length,
                                       trunk_size))+ [data_length],
                     tpb=threads_per_block)

df = df.apply_chunks(fill_missing_average_kernel,
                     incols=['in1', 'out'],
                     outcols=dict(),
                     kwargs=dict(average_length=average_window),
                     chunks=[0]+list(range(average_window, data_length,
                                           trunk_size))+ [data_length],
                     tpb=threads_per_block)
end = time.time()
print('cuDF time', end-start)

NameError: name 'time' is not defined

In [37]:
print(df.head(10))

    in1  out
 0  0.0     
 1  1.0     
 2  2.0     
 3  3.0  1.5
 4  4.0  2.5
 5  5.0  3.5
 6  6.0  4.5
 7  7.0  5.5
 8  8.0  6.5
 9  9.0  7.5
