In [None]:
# Copyright 2021 NVIDIA CORPORATION
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

In [1]:
import glob
import pandas as pd
import numpy as np
import cudf
import cupy
import gc

from datetime import datetime

In [2]:
files = glob.glob('/raid/recsys2021_pre_validXGB_TE/*.parquet')

In [3]:
import os, time
os.environ["CUDA_VISIBLE_DEVICES"]="0,1,2,3,4,5,6,7"

In [4]:
import dask as dask, dask_cudf
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import subprocess

In [5]:
cluster = LocalCUDACluster(local_directory='/raid/dask219423/', device_memory_limit=0.4)
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:41607  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 8  Memory: 429.50 GB


In [6]:
df = dask_cudf.read_parquet(sorted(files))

In [7]:
quantiles = [240,  588, 1331, 3996]

df['group'] = 0
for i, quant in enumerate(quantiles):
    df['group'] = (df['group']+(df['a_follower_count']>quant).astype('int8')).astype('int8')

In [8]:
train = df
del df

In [9]:
train, = dask.persist(train)

In [10]:
print(train.shape[0].compute())

14461760


In [11]:
label_names = ['reply', 'retweet', 'retweet_comment', 'like']
DONT_USE = ['timestamp','a_account_creation','b_account_creation','engage_time', 'folds',
            'fold','b_user_id','a_user_id', 'a_account_creation', 
            'b_account_creation', 'elapsed_time', 'links','domains','hashtags','id', 'date', 'is_train', 
            'tw_original_http0', 'tw_original_user0', 'tw_original_user1', 'tw_original_user2',
            'tw_rt_count_char', 'tw_rt_count_words', 'tw_rt_user0', 'tw_tweet', 'tw_word0',
            'tw_word1', 'tw_word2', 'tw_word3', 'tw_word4', 'tw_count_hash', 'dt_second']
DONT_USE += label_names
features = [c for c in train.columns if c not in DONT_USE]

RMV = [c for c in DONT_USE if c in train.columns and c not in label_names]
RMV = list(set(RMV))

In [12]:
%%time

train = train.drop(RMV,axis=1)
wait(train)

CPU times: user 20.6 ms, sys: 8.09 ms, total: 28.7 ms
Wall time: 27.2 ms


DoneAndNotDoneFutures(done={<Future: finished, type: cudf.DataFrame, key: ('assign-616dc237f79f71bc47c244388d8e185e', 0)>, <Future: finished, type: cudf.DataFrame, key: ('assign-616dc237f79f71bc47c244388d8e185e', 2)>, <Future: finished, type: cudf.DataFrame, key: ('assign-616dc237f79f71bc47c244388d8e185e', 5)>, <Future: finished, type: cudf.DataFrame, key: ('assign-616dc237f79f71bc47c244388d8e185e', 4)>, <Future: finished, type: cudf.DataFrame, key: ('assign-616dc237f79f71bc47c244388d8e185e', 3)>, <Future: finished, type: cudf.DataFrame, key: ('assign-616dc237f79f71bc47c244388d8e185e', 1)>, <Future: finished, type: cudf.DataFrame, key: ('assign-616dc237f79f71bc47c244388d8e185e', 6)>}, not_done=set())

In [13]:
Y_train = train[label_names]
Y_train, = dask.persist(Y_train)
Y_train.head()    
    
train = train.drop(['tweet_id']+label_names,axis=1)
train, = dask.persist(train)
train.head()


features = [c for c in train.columns if c not in DONT_USE]
print('Using %i features:'%(len(features)),train.shape[1])
np.asarray(features)

Using 115 features: 115


array(['BNN_like', 'BNN_reply', 'BNN_retweet', 'BNN_retweet_comment',
       'BONN_like', 'BONN_reply', 'BONN_retweet', 'BONN_retweet_comment',
       'CE_valid_b_user_id', 'CNN_like', 'CNN_reply', 'CNN_retweet',
       'CNN_retweet_comment', 'CXGB_like', 'CXGB_reply', 'CXGB_retweet',
       'CXGB_retweet_comment', 'GXGB_like', 'GXGB_reply', 'GXGB_retweet',
       'GXGB_retweet_comment', 'TE_a_user_id_like', 'TE_a_user_id_reply',
       'TE_a_user_id_retweet', 'TE_a_user_id_retweet_comment',
       'TE_b_is_verified_tweet_type_like',
       'TE_b_is_verified_tweet_type_reply',
       'TE_b_is_verified_tweet_type_retweet',
       'TE_b_is_verified_tweet_type_retweet_comment',
       'TE_b_user_id_a_user_id_like', 'TE_b_user_id_a_user_id_reply',
       'TE_b_user_id_a_user_id_retweet',
       'TE_b_user_id_a_user_id_retweet_comment', 'TE_b_user_id_like',
       'TE_b_user_id_reply', 'TE_b_user_id_retweet',
       'TE_b_user_id_retweet_comment',
       'TE_b_user_id_tweet_type_language_li

In [14]:
import xgboost as xgb
print('XGB Version',xgb.__version__)

xgb_parms = { 
    'max_depth':8, 
    'learning_rate':0.1, 
    'subsample':0.8,
    'colsample_bytree':0.3, 
    'eval_metric':'logloss',
    'objective':'binary:logistic',
    'tree_method':'gpu_hist',
    'predictor' : 'gpu_predictor'
}

XGB Version 1.1.0


In [15]:
for col in train.columns:
    if train[col].dtype=='bool':
        train[col] = train[col].astype('int8')
train, = dask.persist(train)
train.head()

Unnamed: 0,BNN_like,BNN_reply,BNN_retweet,BNN_retweet_comment,BONN_like,BONN_reply,BONN_retweet,BONN_retweet_comment,CE_valid_b_user_id,CNN_like,...,tw_last_quest,tw_len_gif,tw_len_media,tw_len_photo,tw_len_quest,tw_len_retweet,tw_len_rt,tw_len_token,tw_len_video,tweet_type
0,0.501815,0.004607,0.002321,0.000813,0.574376,0.005486,0.005481,0.000998,3,0.354139,...,0,0,0,0,0,0,0,61,0,0
1,0.605746,0.011113,0.025695,0.00566,0.646616,0.024667,0.034379,0.006897,6,0.563894,...,0,0,1,1,0,0,0,42,0,0
2,0.380935,0.002046,0.49192,0.006244,0.350995,0.002856,0.429382,0.00705,6,0.370738,...,0,0,0,0,0,0,0,41,0,1
3,0.549288,0.128277,0.008822,0.00151,0.359111,0.0478,0.003064,0.000813,6,0.442478,...,0,0,0,0,1,0,0,44,0,0
4,0.246989,0.001345,0.068371,0.00074,0.065246,0.000971,0.034187,0.001279,5,0.402536,...,0,0,1,1,0,0,0,31,0,1


In [16]:
models_index = [0,1,2,3]

In [17]:
def get_colnames(train, name):
    cols = list(train.columns)
    cols = [col for col in cols if col!='group' and cols!='quantile']
    cols = [col for col in cols if 'NN_' not in col and 'XGB' not in col]
    cols = [col for col in cols if 'a_follows_b' not in col]
    cols = [col for col in cols if 'TE_valid' not in col]
    cols = [col for col in cols if '_switch_' not in col]
    cols = cols + ['CNN_' + target for target in label_names]
    cols = cols + ['CXGB_' + target for target in label_names]
    cols = cols + ['BONN_' + target for target in label_names]
    cols = cols + ['XGB_' + target for target in label_names]
    cols = cols + ['BNN_' + target for target in label_names]
    cols = cols + ['GXGB_' + target for target in label_names]
    cols = [col for col in cols if col not in [
        'TE_valid_tweet_id_like', 'TE_valid_tweet_id_reply',
        'TE_valid_tweet_id_retweet', 'TE_valid_tweet_id_retweet_comment',
        'CE_a_user_id', 'CE_b_user_id', 'CE_switch_a_user_id',
        'CE_switch_b_user_id', 'CE_switch_valid_a_user_id',
        'CE_switch_valid_b_user_id', 'CE_valid_a_user_id',
        'CE_valid_b_user_id', 'CE_valid_tweet_id', 'b_timestamp_-1', 'b_timestamp_1',
    ]]
    cols = cols + [
        'TE_valid_a_user_id_like', 'TE_valid_a_user_id_reply',
        'TE_valid_a_user_id_retweet', 'TE_valid_a_user_id_retweet_comment',
        'TE_valid_b_user_id_like', 'TE_valid_b_user_id_reply',
        'TE_valid_b_user_id_retweet', 'TE_valid_b_user_id_retweet_comment',
        'TE_switch_a_user_id_like', 'TE_switch_a_user_id_reply',
        'TE_switch_a_user_id_retweet', 'TE_switch_a_user_id_retweet_comment', 
        'TE_switch_b_user_id_like', 'TE_switch_b_user_id_reply', 
        'TE_switch_b_user_id_retweet', 'TE_switch_b_user_id_retweet_comment',
        'TE_valid_tweet_id_like', 'TE_valid_tweet_id_reply',
        'TE_valid_tweet_id_retweet', 'TE_valid_tweet_id_retweet_comment',
        'CE_valid_b_user_id',
    ]
    return(cols)

In [18]:
cols = get_colnames(train, 'retweet')

In [19]:
cols

['TE_a_user_id_like',
 'TE_a_user_id_reply',
 'TE_a_user_id_retweet',
 'TE_a_user_id_retweet_comment',
 'TE_b_is_verified_tweet_type_like',
 'TE_b_is_verified_tweet_type_reply',
 'TE_b_is_verified_tweet_type_retweet',
 'TE_b_is_verified_tweet_type_retweet_comment',
 'TE_b_user_id_a_user_id_like',
 'TE_b_user_id_a_user_id_reply',
 'TE_b_user_id_a_user_id_retweet',
 'TE_b_user_id_a_user_id_retweet_comment',
 'TE_b_user_id_like',
 'TE_b_user_id_reply',
 'TE_b_user_id_retweet',
 'TE_b_user_id_retweet_comment',
 'TE_b_user_id_tweet_type_language_like',
 'TE_b_user_id_tweet_type_language_reply',
 'TE_b_user_id_tweet_type_language_retweet',
 'TE_b_user_id_tweet_type_language_retweet_comment',
 'TE_tw_original_user0_tweet_type_language_like',
 'TE_tw_original_user0_tweet_type_language_reply',
 'TE_tw_original_user0_tweet_type_language_retweet',
 'TE_tw_original_user0_tweet_type_language_retweet_comment',
 'TE_tw_original_user1_tweet_type_language_like',
 'TE_tw_original_user1_tweet_type_langua

In [20]:
!rm -r models_stacked
!mkdir models_stacked

/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)


In [21]:
import pickle

In [22]:
%%time

import time

ROUNDS = [273, 436, 184, 601]
VERBOSE_EVAL = 50
ESR = 50
preds_out = []
best_trees_out = []

for i in models_index:
    name = label_names[i]
    print('#'*25);print('###',name);print('#'*25)
    cols = get_colnames(train, name)
    print(cols)
    start = time.time(); print('Creating DMatrix...')
    dtrain = xgb.dask.DaskDMatrix(client,data=train[cols].values,label=Y_train.iloc[:, i])
    start = time.time(); print('Training...')
    for bag in range(3):
        xgb_parms['seed'] = bag
        model = xgb.dask.train(client, xgb_parms, 
                               dtrain=dtrain,
                               evals=[(dtrain,'valid')],
                               num_boost_round=ROUNDS[i],
                               verbose_eval=VERBOSE_EVAL
                              )
        print('Took %.1f seconds'%(time.time()-start))
        logloss_valid = model["history"]['valid']['logloss'][::VERBOSE_EVAL]
        for ik in range(len(logloss_valid)):
            print(f"{str(VERBOSE_EVAL*ik).zfill(4)} valid-logloss:{logloss_valid[ik]:.5f}")
        pickle.dump(model, open('./models_stacked/model_' + str(name) + '_' + str(bag)+ '.pickle', 'wb'))

#########################
### reply
#########################
['TE_a_user_id_like', 'TE_a_user_id_reply', 'TE_a_user_id_retweet', 'TE_a_user_id_retweet_comment', 'TE_b_is_verified_tweet_type_like', 'TE_b_is_verified_tweet_type_reply', 'TE_b_is_verified_tweet_type_retweet', 'TE_b_is_verified_tweet_type_retweet_comment', 'TE_b_user_id_a_user_id_like', 'TE_b_user_id_a_user_id_reply', 'TE_b_user_id_a_user_id_retweet', 'TE_b_user_id_a_user_id_retweet_comment', 'TE_b_user_id_like', 'TE_b_user_id_reply', 'TE_b_user_id_retweet', 'TE_b_user_id_retweet_comment', 'TE_b_user_id_tweet_type_language_like', 'TE_b_user_id_tweet_type_language_reply', 'TE_b_user_id_tweet_type_language_retweet', 'TE_b_user_id_tweet_type_language_retweet_comment', 'TE_tw_original_user0_tweet_type_language_like', 'TE_tw_original_user0_tweet_type_language_reply', 'TE_tw_original_user0_tweet_type_language_retweet', 'TE_tw_original_user0_tweet_type_language_retweet_comment', 'TE_tw_original_user1_tweet_type_language_like', 'T

Training...
Took 26.6 seconds
0000 valid-logloss:0.60018
0050 valid-logloss:0.02926
0100 valid-logloss:0.02711
0150 valid-logloss:0.02652
Took 54.3 seconds
0000 valid-logloss:0.60018
0050 valid-logloss:0.02931
0100 valid-logloss:0.02710
0150 valid-logloss:0.02653
Took 81.1 seconds
0000 valid-logloss:0.60018
0050 valid-logloss:0.02935
0100 valid-logloss:0.02709
0150 valid-logloss:0.02650
#########################
### like
#########################
['TE_a_user_id_like', 'TE_a_user_id_reply', 'TE_a_user_id_retweet', 'TE_a_user_id_retweet_comment', 'TE_b_is_verified_tweet_type_like', 'TE_b_is_verified_tweet_type_reply', 'TE_b_is_verified_tweet_type_retweet', 'TE_b_is_verified_tweet_type_retweet_comment', 'TE_b_user_id_a_user_id_like', 'TE_b_user_id_a_user_id_reply', 'TE_b_user_id_a_user_id_retweet', 'TE_b_user_id_a_user_id_retweet_comment', 'TE_b_user_id_like', 'TE_b_user_id_reply', 'TE_b_user_id_retweet', 'TE_b_user_id_retweet_comment', 'TE_b_user_id_tweet_type_language_like', 'TE_b_user_

In [23]:
!ls ./models_stacked/*

/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
./models_stacked/model_like_0.pickle
./models_stacked/model_like_1.pickle
./models_stacked/model_like_2.pickle
./models_stacked/model_reply_0.pickle
./models_stacked/model_reply_1.pickle
./models_stacked/model_reply_2.pickle
./models_stacked/model_retweet_0.pickle
./models_stacked/model_retweet_1.pickle
./models_stacked/model_retweet_2.pickle
./models_stacked/model_retweet_comment_0.pickle
./models_stacked/model_retweet_comment_1.pickle
./models_stacked/model_retweet_comment_2.pickle


In [24]:
!rm models_stacked.zip

/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
rm: cannot remove 'models_stacked.zip': No such file or directory


In [25]:
!zip -r models_stacked.zip models_stacked

/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
  adding: models_stacked/ (stored 0%)
  adding: models_stacked/model_retweet_2.pickle (deflated 49%)
  adding: models_stacked/model_retweet_comment_1.pickle (deflated 50%)
  adding: models_stacked/model_like_0.pickle (deflated 49%)
  adding: models_stacked/model_retweet_0.pickle (deflated 49%)
  adding: models_stacked/model_retweet_1.pickle (deflated 49%)
  adding: models_stacked/model_like_1.pickle (deflated 49%)
  adding: models_stacked/model_retweet_comment_2.pickle (deflated 50%)
  adding: models_stacked/model_reply_1.pickle (deflated 50%)
  adding: models_stacked/model_retweet_comment_0.pickle (deflated 50%)
  adding: models_stacked/model_reply_2.pickle (deflated 50%)
  adding: models_stacked/model_like_2.pickle (deflated 49%)
  adding: models_stacked/model_reply_0.pickle (deflated 50%)


In [26]:
train_features_stage2 = {}
for label in label_names:
    cols = get_colnames(train, label)
    train_features_stage2[label] = cols

In [27]:
train_features_stage2

{'reply': ['TE_a_user_id_like',
  'TE_a_user_id_reply',
  'TE_a_user_id_retweet',
  'TE_a_user_id_retweet_comment',
  'TE_b_is_verified_tweet_type_like',
  'TE_b_is_verified_tweet_type_reply',
  'TE_b_is_verified_tweet_type_retweet',
  'TE_b_is_verified_tweet_type_retweet_comment',
  'TE_b_user_id_a_user_id_like',
  'TE_b_user_id_a_user_id_reply',
  'TE_b_user_id_a_user_id_retweet',
  'TE_b_user_id_a_user_id_retweet_comment',
  'TE_b_user_id_like',
  'TE_b_user_id_reply',
  'TE_b_user_id_retweet',
  'TE_b_user_id_retweet_comment',
  'TE_b_user_id_tweet_type_language_like',
  'TE_b_user_id_tweet_type_language_reply',
  'TE_b_user_id_tweet_type_language_retweet',
  'TE_b_user_id_tweet_type_language_retweet_comment',
  'TE_tw_original_user0_tweet_type_language_like',
  'TE_tw_original_user0_tweet_type_language_reply',
  'TE_tw_original_user0_tweet_type_language_retweet',
  'TE_tw_original_user0_tweet_type_language_retweet_comment',
  'TE_tw_original_user1_tweet_type_language_like',
  'TE_

In [28]:
client.close()