# Demo RAPIDS cuDF Groupby User Defined Functions
In this notebook, we create additional groupby user defined functions for RAPIDS cuDF. We create fast GPU aggregations for `mean`, `standard deviation`, `count`, and `nunique` written in Numba's CUDA. Before calling these functions, fill all NaN with -1.

The aggregation functions for `mean` and `std` assume that values of -1 are null and ignore them. The aggregation function for `nunique` only counts up to 2048 unique values and note that it is an approximate algorithm.

There are great tutorials for creating UDF [here][1] and [here][2]

[1]: https://rapidsai.github.io/projects/cudf/en/0.11.0/guide-to-udfs.html
[2]: https://github.com/daxiongshu/notebooks-extended/blob/kdd_plasticc/advanced_notebooks/tutorials/rapids_customized_kernels.ipynb

In [1]:
import cudf, cupy, math, warnings, time 
from numba import cuda, float32, int8
import numpy as np, pandas as pd
warnings.filterwarnings("ignore")

In [2]:
def mean2(x,y_out):
    # ALLOCATE SHARED MEMORY
    sum = cuda.shared.array((2),dtype=float32)
    sum[0] = 0; sum[1] = 0
    cuda.syncthreads()
    # COMPUTE SUM AND SKIP NULL
    for i in range(cuda.threadIdx.x,len(x),cuda.blockDim.x):
        if (x[i]!=-1): cuda.atomic.add(sum,0,x[i])
        else: cuda.atomic.add(sum,1,1)
    cuda.syncthreads()
    # OUTPUT MEAN WITHOUT NULL
    for i in range(cuda.threadIdx.x,len(x),cuda.blockDim.x):
        if (len(x)-sum[1])<=0: y_out[i] = -1
        else: y_out[i] = sum[0]/(len(x)-sum[1])
        
def std2(x,y_out):
    # ALLOCATE SHARED MEMORY
    sum = cuda.shared.array((3),dtype=float32)
    for i in range(3): sum[i] = 0
    cuda.syncthreads()
    # COMPUTE MEAN AND SKIP NULL
    for i in range(cuda.threadIdx.x,len(x),cuda.blockDim.x):
        if (x[i]!=-1): cuda.atomic.add(sum,0,x[i])
        else: cuda.atomic.add(sum,2,1)
    cuda.syncthreads()
    if cuda.threadIdx.x==0: sum[0] = sum[0]/(len(x)-sum[2])
    cuda.syncthreads()
    # COMPUTE SUM OF SQUARES AND SKIP NULL
    for i in range(cuda.threadIdx.x,len(x),cuda.blockDim.x):
        if (x[i]!=-1): cuda.atomic.add(sum,1,(x[i]-sum[0])**2) 
    cuda.syncthreads()
    # OUTPUT STANDARD DEVIATION WITHOUT NULL
    for i in range(cuda.threadIdx.x,len(x),cuda.blockDim.x):
        if (len(x)-sum[2])<=1: y_out[i] = -1
        else: y_out[i] = math.sqrt( sum[1]/(len(x)-sum[2]-1) )
            
def count2(x,y_out):
    for i in range(cuda.threadIdx.x,len(x),cuda.blockDim.x):  
        y_out[i] = len(x)
        
def nunique2(x,y_out):
    # ALLOCATE SHARED MEMORY
    record = cuda.shared.array((2048),dtype=int8)
    for i in range(2048): record[i] = 0
    cuda.syncthreads()
    # RECORD UNIQUES
    for i in range(cuda.threadIdx.x,len(x),cuda.blockDim.x):
        record[ int(x[i]*1e6)%2048 ] = 1
    cuda.syncthreads()
    # OUTPUT NUNIQUE
    sum = 0
    for j in range(2048): sum = sum + record[j]
    for i in range(cuda.threadIdx.x,len(x),cuda.blockDim.x):
        y_out[i] = sum

In [3]:
# GROUP AGGREGATIONS
def add_feature(df1,df2,uid,col,agg,verbose=True):
    
    if agg=='count': func = count2
    elif agg=='mean': func = mean2
    elif agg=='std' : func = std2
    elif agg=='nunique': func = nunique2
    else: return
    
    df1['idx'] = np.arange(len(df1))
    df2['idx'] = np.arange(len(df2))+len(df1)
    temp_df = cudf.concat([df1[[uid,col,'idx']], df2[[uid,col,'idx']]])
    tmp = temp_df.groupby(uid,method='cudf').apply_grouped(
        func,
        incols={col:'x'},
        outcols=dict(y_out=np.float32),
        tpb=32
    ).rename({'y_out':'new'})  
    tmp = tmp.sort_values('idx')
    df1[uid+'_'+col+'_'+agg] = tmp.iloc[:len(df1)].new
    df2[uid+'_'+col+'_'+agg] = tmp.iloc[len(df1):].new
    if verbose: print(uid+'_'+col+'_'+agg,', ',end='')
    df1.drop_column('idx')
    df2.drop_column('idx')
    
def add_features(df1,df2,uids,cols,aggs,verbose=True):
    for col in cols:
        for uid in uids:
            for agg in aggs:
                add_feature(df1,df2,uid,col,agg,verbose)

# Example Usage
To use the function `add_features()`, provide the following five arguments in the order listed. After calling the function, new columns will be appending to your train and test dataframes. All combinations of the three lists will be created.
  
* train dataframe
* test dataframe
* list of columns to groupby
* list of columns to aggregate
* list of functions to aggregate by
  
For example if you want to mimic adding the Pandas column `df.groupby(uid)[col].transform('mean')` where `df = pd.concat([df1,df2])`, then you would use `add_features(df1,df2,[uid],[col],['mean'])`

In [4]:
# MAKE SAMPLE DATA
train = cudf.DataFrame()
train['a'] = np.random.randint(0,5,10)
train['b'] = np.random.randint(90,100,10)
train['c'] = np.random.normal(10,1,10)
train.loc[3,'a'] = None
train.loc[6,'b'] = None
train.loc[9,'c'] = None
train

Unnamed: 0,a,b,c
0,0.0,93.0,11.70506526
1,2.0,95.0,10.57281485
2,4.0,98.0,10.11137112
3,,94.0,10.57444393
4,0.0,96.0,9.642664485
5,3.0,94.0,9.648297971
6,4.0,,9.403795818
7,2.0,92.0,9.340346217
8,1.0,98.0,8.576498537
9,2.0,99.0,


In [5]:
# MAKE SAMPLE DATA
test = cudf.DataFrame()
test['a'] = np.random.randint(0,5,10)
test['b'] = np.random.randint(90,100,10)
test['c'] = np.random.normal(10,1,10)
test.loc[5,'a'] = None
test.loc[3,'b'] = None
test.loc[6,'c'] = None
test

Unnamed: 0,a,b,c
0,4.0,99.0,9.269787977
1,2.0,97.0,8.122499893
2,2.0,94.0,9.058107999
3,1.0,,11.21664514
4,3.0,94.0,11.10481323
5,,93.0,9.695475504
6,0.0,94.0,
7,0.0,99.0,9.604890334
8,4.0,99.0,9.839916743
9,4.0,90.0,9.749268333


In [6]:
# YOU MUST FILL NAN WITH -1 BEFORE CALLING
train = train.fillna(-1)
test = test.fillna(-1)

In [7]:
# FEATURES GET APPENDED TO DATAFRAMES
add_features(train,test,['a'],['b'],['count','nunique'])
train

a_b_count , a_b_nunique , 

Unnamed: 0,a,b,c,a_b_count,a_b_nunique
0,0,93,11.705065,4.0,4.0
1,2,95,10.572815,5.0,5.0
2,4,98,10.111371,5.0,4.0
3,-1,94,10.574444,2.0,2.0
4,0,96,9.642664,4.0,4.0
5,3,94,9.648298,2.0,1.0
6,4,-1,9.403796,5.0,4.0
7,2,92,9.340346,5.0,5.0
8,1,98,8.576499,2.0,2.0
9,2,99,-1.0,5.0,5.0


In [8]:
# FEATURES GET APPENDED TO DATAFRAMES
add_features(train,test,['a'],['b','c'],['mean','std'])
train

a_b_mean , a_b_std , a_c_mean , a_c_std , 

Unnamed: 0,a,b,c,a_b_count,a_b_nunique,a_b_mean,a_b_std,a_c_mean,a_c_std
0,0,93,11.705065,4.0,4.0,95.5,2.645751,10.31754,1.20178
1,2,95,10.572815,5.0,5.0,95.400002,2.701851,9.273442,1.010591
2,4,98,10.111371,5.0,4.0,96.5,4.358899,9.674828,0.339441
3,-1,94,10.574444,2.0,2.0,93.5,0.707107,10.13496,0.621525
4,0,96,9.642664,4.0,4.0,95.5,2.645751,10.31754,1.20178
5,3,94,9.648298,2.0,1.0,94.0,0.0,10.376556,1.029912
6,4,-1,9.403796,5.0,4.0,96.5,4.358899,9.674828,0.339441
7,2,92,9.340346,5.0,5.0,95.400002,2.701851,9.273442,1.010591
8,1,98,8.576499,2.0,2.0,98.0,-1.0,9.896572,1.866866
9,2,99,-1.0,5.0,5.0,95.400002,2.701851,9.273442,1.010591


# Comparison to Pandas

In [11]:
pdf = cudf.concat([train[['a','b','c']],test[['a','b','c']]]).to_pandas()
pdf['a_b_count'] = pdf.groupby('a')['b'].transform('count')
pdf['a_b_nunique'] = pdf.groupby('a')['b'].transform('nunique')
pdf.loc[pdf.b==-1,'b'] = np.nan; pdf.loc[pdf.c==-1,'c'] = np.nan
pdf['a_b_mean'] = pdf.groupby('a')['b'].transform('mean')
pdf['a_b_std'] = pdf.groupby('a')['b'].transform('std')
pdf['a_c_mean'] = pdf.groupby('a')['c'].transform('mean')
pdf['a_c_std'] = pdf.groupby('a')['c'].transform('std')
pdf.iloc[:10]

Unnamed: 0,a,b,c,a_b_count,a_b_nunique,a_b_mean,a_b_std,a_c_mean,a_c_std
0,0,93.0,11.705065,4,4,95.5,2.645751,10.31754,1.201781
1,2,95.0,10.572815,5,5,95.4,2.701851,9.273442,1.010591
2,4,98.0,10.111371,5,4,96.5,4.358899,9.674828,0.339441
3,-1,94.0,10.574444,2,2,93.5,0.707107,10.13496,0.621525
4,0,96.0,9.642664,4,4,95.5,2.645751,10.31754,1.201781
5,3,94.0,9.648298,2,1,94.0,0.0,10.376556,1.029912
6,4,,9.403796,5,4,96.5,4.358899,9.674828,0.339441
7,2,92.0,9.340346,5,5,95.4,2.701851,9.273442,1.010591
8,1,98.0,8.576499,2,2,98.0,,9.896572,1.866866
9,2,99.0,,5,5,95.4,2.701851,9.273442,1.010591
