In [1]:
import numpy as np
import pandas as pd
import cudf as gd
from numba import cuda,jit,float32
import math
import time
from scipy import stats
from cudf_workaround import TPB,compute_mean_kernel,compute_std_kernel,compute_skew_kernel,compute_kurtosis_kernel
from cudf_workaround import cudf_groupby_aggs

# sanity check of correctness

In [2]:
mean = np.zeros(1) # result
std = np.zeros(1)
skew = np.zeros(1)
kurtosis = np.zeros(1)

In [3]:
# to make a point, we examine a large dataframe with many small groups
N = 100000000
array = np.random.rand(N)
df = pd.DataFrame({'a':array})
df['group_id'] = np.random.randint(0,N//100,N) # group size is about 100

In [4]:
%%time
# TPB: threads per block
compute_mean_kernel[1,TPB](array,mean) # 1 thread block per kernel
compute_std_kernel[1,TPB](array,std)
compute_skew_kernel[1,TPB](array,skew)
compute_kurtosis_kernel[1,TPB](array,kurtosis)

CPU times: user 4.15 s, sys: 1.63 s, total: 5.78 s
Wall time: 5.83 s


In [5]:
%%time
print(mean,df['a'].mean())
print(std,df['a'].std())
print(skew,df['a'].skew())
print(kurtosis,df['a'].kurtosis())

[0.49999949] 0.49999326897828683
[0.28843188] 0.28866113502333707
[-7.64954748e-05] -1.1624572086195845e-05
[-1.19952583] -1.1998401667470764
CPU times: user 1min 5s, sys: 15.5 s, total: 1min 21s
Wall time: 3.18 s


gpu code is not faster because we only use 1 thread block of 32 threads in the kernel. However, the same setup could lead to significant speedup when used with groupby-aggregation, especially when there are a large number of small groups.

# Groupby aggregation example

In [6]:
%%time
gdf = gd.DataFrame.from_pandas(df)

CPU times: user 728 ms, sys: 480 ms, total: 1.21 s
Wall time: 1.21 s


In [7]:
%%time
# cudf with GPU
# cudf-0.4 supports ['mean','max','min','sum','count'] for groupby aggregation
aggs = {'a':['mean','max','min','sum','count']}
gdf_res = gdf.groupby('group_id').agg(aggs)

CPU times: user 2.9 s, sys: 1.26 s, total: 4.16 s
Wall time: 4.16 s


In [8]:
%%time
# cudf with GPU
# for functions not supported by cudf yet, we can implement workarounds with cudf's premitives.
# ['std','var','skew','kurtosis'] are implemented in cudf_workarounds.py
# a helper function cudf_groupby_aggs can be used for all functions.
"""
def cudf_groupby_aggs(df,group_id_col,aggs):
    
    Parameters
    ----------
    df : cudf dataframe
        dataframe to be grouped
    group_id_col : string
        name of the column which is used as the key of the group
    aggs : dictionary
        key is the name of column for which aggregation is calculated
        values is the name of function for aggregation
    Returns
    -------
    dg : cudf dataframe
        result of groupby aggregation
"""
aggs = {'a':['mean','max','min','sum','count','std','var','skew','kurtosis']}
gdf_res = cudf_groupby_aggs(gdf,group_id_col='group_id',aggs=aggs)

1:float64
2:int64
3:float64
1:float64
1:float64
2:int64
3:float64
1:float64
1:float64
1:float64
2:int64
3:float64
1:float64
1:float64
1:float64
1:float64
2:int64
3:int64
1:float64
1:float64
1:float64
1:float64
1:int64
2:int64
3:float32
1:float64
1:float64
1:float64
1:float64
1:int64
1:float32
2:int64
3:float32
1:float64
1:float64
1:float64
1:float64
1:int64
1:float32
1:float32
2:int64
3:float32
1:float64
1:float64
1:float64
1:float64
1:int64
1:float32
1:float32
1:float32
2:int64
3:float32
CPU times: user 7.61 s, sys: 2.6 s, total: 10.2 s
Wall time: 10.2 s


In [9]:
%%time
# pandas with cpu
# kurtosis is not supported in pandas' agg by default.
aggs = {'a':['mean','max','min','sum','count','std','var','skew']}
df_res = df.groupby('group_id').agg(aggs)

CPU times: user 3min 41s, sys: 8.38 s, total: 3min 49s
Wall time: 3min 20s


In [10]:
%%time
l1 = df_res.columns.get_level_values(0).values
l2 = df_res.columns.get_level_values(1).values
df_res.columns = ["%s_%s"%(j,i) for i,j in zip(l1,l2)]
df_res = df_res.reset_index()
df_res = df_res.sort_values('group_id')

CPU times: user 3.01 s, sys: 96 ms, total: 3.11 s
Wall time: 77.7 ms


In [11]:
df_res.head()

Unnamed: 0,group_id,mean_a,max_a,min_a,sum_a,count_a,std_a,var_a,skew_a
0,0,0.504853,0.996404,0.002265,52.504705,104,0.301911,0.09115,-0.06236
1,1,0.489854,0.988171,0.009025,51.924545,106,0.268501,0.072093,0.143553
2,2,0.459487,0.981849,0.00061,44.570211,97,0.263458,0.06941,0.09725
3,3,0.476552,0.983754,0.011389,50.991013,107,0.27792,0.077239,0.142719
4,4,0.472198,0.99498,0.006389,42.497792,90,0.300784,0.090471,0.158511


In [12]:
%%time
gdf_res = gdf_res.sort_values(by='group_id')
gdf_res = gdf_res.to_pandas()

CPU times: user 5.41 s, sys: 276 ms, total: 5.69 s
Wall time: 297 ms


In [13]:
gdf_res.head()

Unnamed: 0,group_id,mean_a,max_a,min_a,sum_a,count_a,std_a,var_a,skew_a,kurtosis_a
5120,0,0.504853,0.996404,0.002265,52.504705,104,0.301911,0.09115,-0.06236,-1.360296
5121,1,0.489854,0.988171,0.009025,51.924545,106,0.268501,0.072093,0.143553,-1.061151
5122,2,0.459487,0.981849,0.00061,44.570211,97,0.263458,0.06941,0.097251,-0.918109
5123,3,0.476552,0.983754,0.011389,50.991013,107,0.27792,0.077239,0.142719,-1.082183
5124,4,0.472198,0.99498,0.006389,42.497792,90,0.300784,0.090471,0.158511,-1.226775


In [14]:
def rmse(a,b):
    return np.mean((a-b)**2)**0.5

for col in df_res.columns:
    if col in gdf_res.columns:
        print("%s, %.6f"%(col,rmse(df_res[col].values,gdf_res[col].values)))

group_id, 0.000000
mean_a, 0.000000
max_a, 0.000000
min_a, 0.000000
sum_a, 0.000000
count_a, 0.000000
std_a, 0.000000
var_a, 0.000000
skew_a, 0.000000
