In [3]:
import cupy as cp
x_gpu = cp.array([90,75,80])
x_gpu.dtype

dtype('int64')

In [5]:
l1_gpu = cp.linalg.norm(x_gpu)

In [7]:
cp.cuda.get_device_id()

0

In [10]:
cp.asnumpy(x_gpu)

array([90, 75, 80])

In [11]:
x_gpu.get()

array([90, 75, 80])

In [None]:
import numpy as np
xC = np.array([5,79,80])
yK = np.array([2,8,57])
xC+yK

In [15]:
xG = cp.asarray(xC)
xG + yK

TypeError: Unsupported type <class 'numpy.ndarray'>

Three types of CUDA kernels: elementwise kernels, reduction kernels and raw kernels. 
The details are not covered in this notebook. 

We will be writing UDFs on the following data structures:

- Series
- DataFrame
- Rolling Windows Series
- Groupby DataFrames
- CuPy NDArrays
- Numba DeviceNDArrays

Exploring CUDA and GPU architecture in-depth is out of scope for this guide. At a high level:

Compute is spread across multiple "blocks", which have access to both global memory and their own block local memory
Within each block, many "threads" operate independently and simultaneously access their block-specific shared memory with low latency

In [16]:
import numpy as np

import cudf
from cudf.datasets import randomdata 

df = randomdata(nrows=10, dtypes={'a':float, 
                                  'b':bool, 
                                  'c':str}, seed=12)
df.head()

Unnamed: 0,a,b,c
0,-0.691674,True,Dan
1,0.480099,False,Bob
2,-0.47337,True,Xavier
3,0.067479,True,Alice
4,-0.97085,False,Sarah


In [19]:
from numba import cuda

In [17]:
def udf(x):
    if x > 0:
        return x + 5
    else:
        return x - 5

In [21]:
#apply map works on entire dataframe
df[['a']].applymap(udf)

Unnamed: 0,a
0,-5.691674
1,5.480099
2,-5.47337
3,5.067479
4,-5.97085
5,5.837494
6,5.80143
7,-5.933157
8,5.913899
9,-5.725581


In [26]:
try:
    df.applymap(udf)

except Exception as e:
    print('failed as there is Boolean column',e.args)

failed as there is Boolean column ('user defined function compilation failed.',)


In [27]:
@cuda.jit
def multiply(in_col, out_col, multiplier):
    i = cuda.grid(1)
    if i < in_col.size: # boundary guard
        out_col[i] = in_col[i] * multiplier

In [28]:
size = len(df['a'])

df['e'] = 0.0

multiply.forall(size)(df['a'], 
                      df['e'], 
                      10.0)



In [30]:
df.head(2)

Unnamed: 0,a,b,c,e
0,-0.691674,True,Dan,-6.916743
1,0.480099,False,Bob,4.800994


We could apply a UDF on a DataFrame like we did above with forall. We'd need to write a kernel that expects multiple inputs, and pass multiple Series as arguments when we execute our kernel. 

Because this is fairly common and can be difficult to manage, cuDF provides two APIs to streamline this: apply_rows and apply_chunks. Below, we walk through an example of using apply_rows. apply_chunks works in a similar way, but also offers more control over low-level kernel behavior.

In [31]:
def conditional_add(x, y, out):
    for i, (a, e) in enumerate(zip(x, y)):
        if a > 0:
            out[i] = a + e
        else:
            out[i] = a

In [32]:
df = df.apply_rows(conditional_add, 
                   incols={'a':'x', 'e':'y'},
                   outcols={'out': np.float64},
                   kwargs={}
                  )
df.head()

Unnamed: 0,a,b,c,e,out
0,-0.691674,True,Dan,-6.916743,-0.691674
1,0.480099,False,Bob,4.800994,5.281093
2,-0.47337,True,Xavier,-4.7337,-0.47337
3,0.067479,True,Alice,0.674788,0.742267
4,-0.97085,False,Sarah,-9.708501,-0.97085


In [33]:
ser = cudf.Series([16, 25, 36, 49, 64, 81], dtype='float64')
ser

0    16.0
1    25.0
2    36.0
3    49.0
4    64.0
5    81.0
dtype: float64

In [34]:
rolling = ser.rolling(window=3, min_periods=3, center=False)
rolling

Rolling [window=3,min_periods=3,center=False]

In [35]:
import math

def example_func(window):
    b = 0
    for a in window:
        b = max(b, math.sqrt(a))
    if b == 8:
        return 100    
    return b

In [36]:
rolling.apply(example_func)

0     <NA>
1     <NA>
2      6.0
3      7.0
4    100.0
5      9.0
dtype: float64

In [38]:
grouped = df.groupby(['b'])

In [39]:
grouped

<cudf.core.groupby.groupby.DataFrameGroupBy at 0x7fca3157f970>

In [40]:
def rolling_avg(e, rolling_avg_e):
    win_size = 3
    for i in range(cuda.threadIdx.x, len(e), cuda.blockDim.x):
        if i < win_size - 1:
            # If there is not enough data to fill the window,
            # take the average to be NaN
            rolling_avg_e[i] = np.nan
        else:
            total = 0
            for j in range(i - win_size + 1, i + 1):
                total += e[j]
            rolling_avg_e[i] = total / win_size

In [41]:
results = grouped.apply_grouped(rolling_avg,
                               incols=['e'],
                               outcols=dict(rolling_avg_e=np.float64))
results

Unnamed: 0,a,b,c,e,out,rolling_avg_e
1,0.480099,False,Bob,4.800994,5.281093,
4,-0.97085,False,Sarah,-9.708501,-0.97085,
6,0.80143,False,Sarah,8.014297,8.815727,1.035597
7,-0.933157,False,Quinn,-9.331571,-0.933157,-3.675258
0,-0.691674,True,Dan,-6.916743,-0.691674,
2,-0.47337,True,Xavier,-4.7337,-0.47337,
3,0.067479,True,Alice,0.674788,0.742267,-3.658552
5,0.837494,True,Wendy,8.37494,9.212434,1.438676
8,0.913899,True,Ursula,9.138987,10.052885,6.062905
9,-0.725581,True,George,-7.255814,-0.725581,3.419371


If your UDFs will read or write any column containing nulls, you should read this section carefully.

Writing UDFs that can handle null values is complicated by the fact that a separate bitmask is used to identify when a value is valid and when it's null. By default, DataFrame methods for applying UDFs like apply_rows will handle nulls pessimistically (all rows with a null value will be removed from the output if they are used in the kernel). 