In [1]:
import cudf as gd
import pandas as pd
import numpy as np
import math
import xgboost as xgb
from functools import partial
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import os
import time
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
import warnings
warnings.filterwarnings("ignore")

In [2]:
GPU_MEMORY = 32 # GB. please change it accordingly

In [3]:
TEST_ROWS = 453653104 # number of rows in test data
# no skip if your gpu has 32 GB memory
# otherwise, skip rows porportionally
SKIP_ROWS = int((1 - GPU_MEMORY/32.0)*TEST_ROWS) 

# Functions

In [4]:
def multi_weighted_logloss(y_true, y_preds, classes, class_weights):
    """
    refactor from
    @author olivier https://www.kaggle.com/ogrellier
    multi logloss for PLAsTiCC challenge
    """
    y_p = y_preds.reshape(y_true.shape[0], len(classes), order='F')
    # Trasform y_true in dummies
    y_ohe = pd.get_dummies(y_true)
    # Normalize rows and limit y_preds to 1e-15, 1-1e-15
    y_p = np.clip(a=y_p, a_min=1e-15, a_max=1 - 1e-15)
    # Transform to log
    y_p_log = np.log(y_p)
    # Get the log for ones, .values is used to drop the index of DataFrames
    # Exclude class 99 for now, since there is no class99 in the training set
    # we gave a special process for that class
    y_log_ones = np.sum(y_ohe.values * y_p_log, axis=0)
    # Get the number of positives for each class
    nb_pos = y_ohe.sum(axis=0).values.astype(float)
    # Weight average and divide by the number of positives
    class_arr = np.array([class_weights[k] for k in sorted(class_weights.keys())])
    y_w = y_log_ones * class_arr / nb_pos

    loss = - np.sum(y_w) / np.sum(class_arr)
    return loss

def xgb_multi_weighted_logloss(y_predicted, y_true, classes, class_weights):
    loss = multi_weighted_logloss(y_true.get_label(), y_predicted, 
                                  classes, class_weights)
    return 'wloss', loss

In [5]:
def ravel_column_names(cols):
    d0 = cols.get_level_values(0)
    d1 = cols.get_level_values(1)
    return ["%s_%s"%(i,j) for i,j in zip(d0,d1)]
    
def etl_cpu(df,df_meta):
    df['flux_ratio_sq'] = np.power(df['flux'] / df['flux_err'], 2.0)
    df['flux_by_flux_ratio_sq'] = df['flux'] * df['flux_ratio_sq']
    aggs = {
        'passband': ['mean'], 
        'flux': ['min', 'max', 'mean'],
        'flux_err': ['min', 'max', 'mean'],
        'detected': ['mean'],
        'mjd':['max','min'],
        'flux_ratio_sq':['sum'],
        'flux_by_flux_ratio_sq':['sum'],
    }
    agg_df = df.groupby('object_id').agg(aggs)
    agg_df.columns = ravel_column_names(agg_df.columns)
    
    agg_df['flux_diff'] = agg_df['flux_max'] - agg_df['flux_min']
    agg_df['flux_dif2'] = (agg_df['flux_max'] - agg_df['flux_min']) / agg_df['flux_mean']
    agg_df['flux_w_mean'] = agg_df['flux_by_flux_ratio_sq_sum'] / agg_df['flux_ratio_sq_sum']
    agg_df['flux_dif3'] = (agg_df['flux_max'] - agg_df['flux_min']) / agg_df['flux_w_mean']
    
    agg_df['mjd_diff'] = agg_df['mjd_max'] - agg_df['mjd_min']
    agg_df = agg_df.drop(['mjd_max','mjd_min'],axis=1)
    
    agg_df = agg_df.reset_index()
    df_meta = df_meta.drop(['ra','decl','gal_l','gal_b'],axis=1)
    df_meta = df_meta.merge(agg_df,on='object_id',how='left')
    return df_meta

In [6]:
# To save GPU memory, we drop the column as soon as it is done with groupby
# 
# this hits performance a little but avoids GPU OOM.
def groupby_aggs(df,aggs,col):
    res = None
    for i,j in aggs.items():
        for k in j:
            #print(i,k)
            tmp = df.groupby(col).agg({i:[k]})
            if res is None:
                res = tmp
            else:
                res = res.merge(tmp,on=[col],how='left')
        df.drop_column(i)
    return res

def etl_gpu(df,df_meta):
    aggs = {
        'passband': ['mean'], 
        'detected': ['mean'],
        'mjd':['max','min'],
    }
    agg_df = groupby_aggs(df,aggs,'object_id')
    # at this step, columns ['passband','detected','mjd'] are deleted 
    
    df['flux_ratio_sq'] = df['flux'] / df['flux_err']
    df['flux_ratio_sq'] = df['flux_ratio_sq'].applymap(lambda x: math.pow(x,2))
    df['flux_by_flux_ratio_sq'] = df['flux'] * df['flux_ratio_sq']
    
    aggs2 = {
        'flux_ratio_sq':['sum'],
        'flux_by_flux_ratio_sq':['sum'],
        'flux': ['min', 'max', 'mean'],
        'flux_err': ['min', 'max', 'mean'],
    }
    agg_df2 = groupby_aggs(df,aggs2,'object_id')
    agg_df = agg_df.merge(agg_df2,on=['object_id'],how='left')
    del agg_df2

    agg_df['flux_diff'] = agg_df['max_flux'] - agg_df['min_flux']
    agg_df['flux_dif2'] = (agg_df['max_flux'] - agg_df['min_flux']) / agg_df['mean_flux']
    agg_df['flux_w_mean'] = agg_df['sum_flux_by_flux_ratio_sq'] / agg_df['sum_flux_ratio_sq']
    agg_df['flux_dif3'] = (agg_df['max_flux'] - agg_df['min_flux']) / agg_df['flux_w_mean']
    
    agg_df['mjd_diff'] = agg_df['max_mjd'] - agg_df['min_mjd']
    agg_df.drop_column('max_mjd')
    agg_df.drop_column('min_mjd')
    
    for col in ['ra','decl','gal_l','gal_b']:
        df_meta.drop_column(col)
    
    df_meta = df_meta.merge(agg_df,on=['object_id'],how='left')
    return df_meta

# Load data for ETL part 1

In [7]:
# please download data from https://www.kaggle.com/c/PLAsTiCC-2018/data
PATH = '../lsst/input'

In [8]:
%%time
# read data on gpu
ts_cols = ['object_id', 'mjd', 'passband', 'flux', 'flux_err', 'detected']
ts_dtypes = ['int32', 'float32', 'int32', 'float32','float32','int32']

train_gd = gd.read_csv('%s/training_set.csv'%PATH,
            names=ts_cols,dtype=ts_dtypes,skiprows=1)
test_gd = gd.read_csv('%s/test_set.csv'%PATH,
            names=ts_cols,dtype=ts_dtypes,skiprows=1+SKIP_ROWS) # skip the header

CPU times: user 13.4 s, sys: 8.29 s, total: 21.7 s
Wall time: 21.7 s


# ETL with 120x ~ 140x  speedup

In [9]:
%%time
# to save memory, we need to move dataframe to cpu and only keep the columns we need
test_gd = test_gd[['object_id','flux']]
train_gd = train_gd[['object_id','flux']]

CPU times: user 8 ms, sys: 8 ms, total: 16 ms
Wall time: 10 ms


In [10]:
%%time
test = test_gd.to_pandas()
train = train_gd.to_pandas()

CPU times: user 9.65 s, sys: 7.06 s, total: 16.7 s
Wall time: 2.3 s


In [11]:
from cudf_workaround import cudf_groupby_aggs
"""
def cudf_groupby_aggs(df,group_id_col,aggs):
    
    Parameters
    ----------
    df : cudf dataframe
        dataframe to be grouped
    group_id_col : string
        name of the column which is used as the key of the group
    aggs : dictionary
        key is the name of column for which aggregation is calculated
        values is the name of function for aggregation
    Returns
    -------
    dg : cudf dataframe
        result of groupby aggregation
"""

In [12]:
%%time
# GPU
aggs = {'flux':['skew']}
test_gd = cudf_groupby_aggs(test_gd,group_id_col='object_id',aggs=aggs)
train_gd = cudf_groupby_aggs(train_gd,group_id_col='object_id',aggs=aggs)

CPU times: user 10.3 s, sys: 1.04 s, total: 11.3 s
Wall time: 4.83 s


In [13]:
%%time
# CPU
test = test.groupby('object_id').agg(aggs)
train = train.groupby('object_id').agg(aggs)

CPU times: user 10min 22s, sys: 24.8 s, total: 10min 47s
Wall time: 10min 32s


In [42]:
speedup = (10*60 + 32)/4.83
print("we achieve %.3f speedup!"%speedup)

we achieve 130.849 speedup!


In [15]:
%%time
test_gd = test_gd.sort_values(by='object_id')
train_gd = train_gd.sort_values(by='object_id')

CPU times: user 8 ms, sys: 32 ms, total: 40 ms
Wall time: 39.8 ms


In [16]:
%%time
test.columns = ['skew_flux']
test = test.reset_index()
test = test.sort_values(by='object_id')
train.columns = ['skew_flux']
train = train.reset_index()
train = train.sort_values(by='object_id')

CPU times: user 7.02 s, sys: 24 ms, total: 7.04 s
Wall time: 176 ms


In [17]:
def rmse(a,b):
    return np.mean((a-b)**2)**0.5
print('test')
for col in test.columns:
    if col in test_gd.columns:
        print("%s, rmse %.6f"%(col,rmse(test[col].values,test_gd[col].to_pandas().values)))
print('train')
for col in train.columns:
    if col in train_gd.columns:
        print("%s, rmse %.6f"%(col,rmse(train[col].values,train_gd[col].to_pandas().values)))

test
object_id, rmse 0.000000
skew_flux, rmse 0.000000
train
object_id, rmse 0.000000
skew_flux, rmse 0.000001


In [18]:
test_flux_skew_gd = test_gd
test_flux_skew = test
train_flux_skew_gd = train_gd
train_flux_skew = train
print(len(test_gd),len(test))

3492890 3492890


# Load data with 11x speedup

In [19]:
%%time
# read data on cpu
test = pd.read_csv('%s/test_set.csv'%PATH)
test_meta = pd.read_csv('%s/test_set_metadata.csv'%PATH)

train = pd.read_csv('%s/training_set.csv'%PATH)
train_meta = pd.read_csv('%s/training_set_metadata.csv'%PATH)

CPU times: user 4min 2s, sys: 50.7 s, total: 4min 52s
Wall time: 3min 45s


In [20]:
%%time
# read data on gpu
ts_cols = ['object_id', 'mjd', 'passband', 'flux', 'flux_err', 'detected']
ts_dtypes = ['int32', 'float32', 'int32', 'float32','float32','int32']

test_gd = gd.read_csv('%s/test_set.csv'%PATH,
            names=ts_cols,dtype=ts_dtypes,skiprows=1+SKIP_ROWS) # skip the header
train_gd = gd.read_csv('%s/training_set.csv'%PATH,
            names=ts_cols,dtype=ts_dtypes,skiprows=1)

cols = ['object_id', 'ra', 'decl', 'gal_l', 'gal_b', 'ddf',
       'hostgal_specz', 'hostgal_photoz', 'hostgal_photoz_err', 
       'distmod','mwebv', 'target']
dtypes = ['int32']+['float32']*4+['int32']+['float32']*5+['int32']

train_meta_gd = gd.read_csv('%s/training_set_metadata.csv'%PATH,
            names=cols,dtype=dtypes,skiprows=1)
del cols[-1],dtypes[-1]
test_meta_gd = gd.read_csv('%s/test_set_metadata.csv'%PATH,
            names=cols,dtype=dtypes,skiprows=1)

CPU times: user 19.7 s, sys: 5.46 s, total: 25.2 s
Wall time: 18.7 s


In [43]:
speedup = (3*60 + 45)/18.7
print("we achieve %.3f speedup!"%speedup)

we achieve 12.032 speedup!


# ETL with 9x ~ 12x speedup 

In [22]:
%%time
# GPU
train_final_gd = etl_gpu(train_gd,train_meta_gd)
train_final_gd = train_final_gd.merge(train_flux_skew_gd,on=['object_id'],how='left')
test_final_gd = etl_gpu(test_gd,test_meta_gd)
del test_gd,test_meta_gd
test_final_gd = test_final_gd.merge(test_flux_skew_gd,on=['object_id'],how='left')

1:int32
2:int32
3:int32
1:int32
1:int32
2:int32
3:float32
1:int32
1:int32
1:float32
2:int32
3:float32
1:float32
2:int32
3:float32
1:float32
1:float32
2:int32
3:float32
1:float32
1:float32
1:float32
2:int32
3:float32
1:float32
1:float32
1:float32
1:float32
2:int32
3:float32
1:float32
1:float32
1:float32
1:float32
1:float32
2:int32
3:float32
1:float32
1:float32
1:float32
1:float32
1:float32
1:float32
2:int32
3:float32
1:float32
1:float32
1:float32
1:float32
1:float32
1:float32
1:float32
2:int32
3:float32
1:int32
1:int32
1:float32
1:float32
2:int32
3:float32
3:float32
3:float32
3:float32
3:float32
3:float32
3:float32
3:float32
1:int32
1:float32
1:float32
1:float32
1:float32
1:float32
1:int32
2:int32
3:int32
3:int32
3:float32
3:float32
3:float32
3:float32
3:float32
3:float32
3:float32
3:float32
3:float32
3:float32
3:float32
3:float32
3:float32
1:int32
1:float32
1:float32
1:float32
1:float32
1:float32
1:int32
1:int32
1:int32
1:float32
1:float32
1:float32
1:float32
1:float32
1:float32
1:floa

In [23]:
%%time
#CPU
train_final = etl_cpu(train,train_meta)
train_final = train_final.merge(train_flux_skew,on=['object_id'],how='left')
test_final = etl_cpu(test,test_meta)
test_final = test_final.merge(test_flux_skew,on=['object_id'],how='left')

CPU times: user 3min 56s, sys: 1min 42s, total: 5min 39s
Wall time: 1min 57s


In [44]:
speedup = (1*60 + 57)/11.5
print("we achieve %.3f speedup!"%speedup)

we achieve 10.174 speedup!


# train and validation with 5x speedup

In [37]:
# CPU
X = train_final.drop(['object_id','target'],axis=1).values
y = train_final['target']
Xt = test_final.drop(['object_id'],axis=1).values
assert X.shape[1] == Xt.shape[1]
classes = sorted(y.unique())    
# Taken from Giba's topic : https://www.kaggle.com/titericz
# https://www.kaggle.com/c/PLAsTiCC-2018/discussion/67194
# with Kyle Boone's post https://www.kaggle.com/kyleboone
class_weights = {c: 1 for c in classes}
class_weights.update({c:2 for c in [64, 15]})

lbl = LabelEncoder()
y = lbl.fit_transform(y)
print(lbl.classes_)

X_train, X_test, y_train, y_test = train_test_split(X,y,test_size=0.1,stratify=y)

[ 6 15 16 42 52 53 62 64 65 67 88 90 92 95]


In [38]:
cpu_params = {
            'objective': 'multi:softprob', 
            'tree_method': 'hist', 
            'nthread': 16, 
            'num_class':14,
            'max_depth': 7, 
            'silent':1,
            'subsample':0.7,
            'colsample_bytree': 0.7,}

In [39]:
func_loss = partial(xgb_multi_weighted_logloss, 
                        classes=classes, 
                        class_weights=class_weights)

In [40]:
%%time
dtrain = xgb.DMatrix(data=X_train, label=y_train)
dvalid = xgb.DMatrix(data=X_test, label=y_test)
dtest = xgb.DMatrix(data=Xt)
watchlist = [(dvalid, 'eval'), (dtrain, 'train')]
clf = xgb.train(cpu_params, dtrain=dtrain,
                num_boost_round=60,evals=watchlist,
                feval=func_loss,early_stopping_rounds=10,
                verbose_eval=1000)
yp = clf.predict(dvalid)
loss = multi_weighted_logloss(y_test, yp, classes, class_weights)
ysub = clf.predict(dtest)
print('validation loss %.4f'%loss)

[02:47:23] Tree method is selected to be 'hist', which uses a single updater grow_fast_histmaker.
[0]	eval-merror:0.326115	train-merror:0.272122	eval-wloss:2.03443	train-wloss:1.88621
Multiple eval metrics have been passed: 'train-wloss' will be used for early stopping.

Will train until train-wloss hasn't improved in 10 rounds.
[59]	eval-merror:0.276433	train-merror:0.001982	eval-wloss:1.33349	train-wloss:0.091674
validation loss 1.3335
CPU times: user 6min 38s, sys: 1.43 s, total: 6min 40s
Wall time: 26.5 s


In [29]:
# GPU
y = train_final_gd['target'].to_array()
y = lbl.fit_transform(y)
for col in ['object_id','target']:
    train_final_gd.drop_column(col)
for col in train_final_gd.columns:
    train_final_gd[col] = train_final_gd[col].fillna(0).astype('float32')


for col in ['object_id']:
    test_final_gd.drop_column(col)
for col in test_final_gd.columns:
    test_final_gd[col] = test_final_gd[col].fillna(0).astype('float32')

In [30]:
cols = [i for i in test_final_gd.columns]
X = train_final_gd[cols].as_matrix()
Xt = test_final_gd.as_matrix()

In [31]:
X_train, X_test, y_train, y_test = train_test_split(X,y,test_size=0.1,stratify=y)

In [32]:
# GPU
gpu_params = cpu_params.copy()
gpu_params.update({'objective': 'multi:softprob',
                   'tree_method': 'gpu_hist', 
                  })

In [33]:
%%time
dtrain = xgb.DMatrix(data=X_train, label=y_train)
dvalid = xgb.DMatrix(data=X_test, label=y_test)
dtest = xgb.DMatrix(data=Xt)
watchlist = [(dvalid, 'eval'), (dtrain, 'train')]
clf = xgb.train(gpu_params, dtrain=dtrain,
                num_boost_round=60,evals=watchlist,
                feval=func_loss,early_stopping_rounds=10,
                verbose_eval=1000)
yp = clf.predict(dvalid)
loss = multi_weighted_logloss(y_test, yp, classes, class_weights)
ysub = clf.predict(dtest)
print('validation loss %.4f'%loss)

[0]	eval-merror:0.338854	train-merror:0.296899	eval-wloss:1.98145	train-wloss:1.8993
Multiple eval metrics have been passed: 'train-wloss' will be used for early stopping.

Will train until train-wloss hasn't improved in 10 rounds.
[59]	eval-merror:0.26242	train-merror:0.002832	eval-wloss:1.13939	train-wloss:0.102957
validation loss 1.1394
CPU times: user 1min 16s, sys: 1.12 s, total: 1min 17s
Wall time: 5.68 s


In [41]:
speedup = 26.5/5.68
print("we achieve %.3f speedup!"%speedup)

we achieve 4.665 speedup!


In [45]:
total_speedup = (10*60 + 32 + 3*60 + 45 + 1*60 + 57 + 26.5)/(4.8+18.7+11.5+5.68)
print("we achieve %.3f speedup in total!"%total_speedup)

we achieve 24.594 speedup in total!


In [46]:
4.8+18.7+11.5+5.68

40.68