In [None]:
# Copyright 2021 NVIDIA CORPORATION
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

In [1]:
import glob
import pandas as pd
import numpy as np
import cudf
import cupy
import gc

from datetime import datetime

In [2]:
files = glob.glob('/raid/recsys2021_pre/*')

In [3]:
files

['/raid/recsys2021_pre/part-00211.parquet',
 '/raid/recsys2021_pre/part-00244.parquet',
 '/raid/recsys2021_pre/part-00114.parquet',
 '/raid/recsys2021_pre/part-00069.parquet',
 '/raid/recsys2021_pre/part-00077.parquet',
 '/raid/recsys2021_pre/part-00144.parquet',
 '/raid/recsys2021_pre/part-00079.parquet',
 '/raid/recsys2021_pre/part-00130.parquet',
 '/raid/recsys2021_pre/part-00191.parquet',
 '/raid/recsys2021_pre/part-00010.parquet',
 '/raid/recsys2021_pre/part-00005.parquet',
 '/raid/recsys2021_pre/part-00039.parquet',
 '/raid/recsys2021_pre/part-00052.parquet',
 '/raid/recsys2021_pre/part-00014.parquet',
 '/raid/recsys2021_pre/part-00140.parquet',
 '/raid/recsys2021_pre/part-00179.parquet',
 '/raid/recsys2021_pre/part-00099.parquet',
 '/raid/recsys2021_pre/part-00225.parquet',
 '/raid/recsys2021_pre/part-00209.parquet',
 '/raid/recsys2021_pre/part-00049.parquet',
 '/raid/recsys2021_pre/part-00034.parquet',
 '/raid/recsys2021_pre/part-00171.parquet',
 '/raid/recsys2021_pre/part-0004

In [4]:
import os, time
os.environ["CUDA_VISIBLE_DEVICES"]="0,1,2,3,4,5,6,7"

In [5]:
import dask as dask, dask_cudf
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import subprocess

In [6]:
cluster = LocalCUDACluster(local_directory='/raid/dask2/', device_memory_limit=0.5)
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:46095  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 8  Memory: 429.50 GB


In [7]:
ddf = dask_cudf.read_parquet(files)

In [8]:
meta = ('time', 'datetime64[s]')

def convert_timestamp(x):
    x = cudf.to_datetime(x, unit='s')
    return(x)

ddf['date'] = ddf['timestamp'].map_partitions(convert_timestamp, meta=meta)

In [9]:
VALID_DOW = ''
if VALID_DOW!='':
    print('True')
    train = ddf[ddf['date']<cudf.to_datetime(VALID_DOW)].reset_index(drop=True)
else:
    print('False')
    train = ddf

False


In [10]:
train.head()

Unnamed: 0,b_user_id,tweet_type,b_is_verified,tw_original_user0,tw_original_user1,language,timestamp,reply,retweet,retweet_comment,like,date
0,6403276251848102467,1,0,4186399962,4186399962,1,1612712747,0,0,0,1,2021-02-07 15:45:47
1,2973183603024793551,0,0,953329835,4186399962,0,1614002023,0,0,0,0,2021-02-22 13:53:43
2,-6755659593911051842,2,0,4186399962,4186399962,0,1612452111,0,0,0,1,2021-02-04 15:21:51
3,1000370995655921600,0,0,4186399962,4186399962,0,1612980390,0,0,0,1,2021-02-10 18:06:30
4,-2379778680414312637,1,0,4186399962,4186399962,0,1612834174,0,0,0,0,2021-02-09 01:29:34


In [11]:
train.columns

Index(['b_user_id', 'tweet_type', 'b_is_verified', 'tw_original_user0',
       'tw_original_user1', 'language', 'timestamp', 'reply', 'retweet',
       'retweet_comment', 'like', 'date'],
      dtype='object')

In [12]:
# !rm -r /raid/TE_submission/
# !mkdir /raid/TE_submission/

In [13]:
means = {}
means['reply'] = 0.02846728456689906
means['like'] = 0.3968895210408169
means['retweet'] = 0.08769760903336701
means['retweet_comment'] = 0.006918407917391091
psmooth = 20

In [14]:
TE_cols = [
    ['a_user_id'],
    ['domains', 'language', 'b_follows_a', 'tweet_type', 'media', 'a_is_verified'],
    ['media', 'tweet_type', 'language'],
    ['media', 'tweet_type', 'language', 'a_is_verified', 'b_is_verified', 'b_follows_a'],
    ['b_user_id'],
    ['b_is_verified', 'tweet_type'],
    ['tweet_type'],
    ['tw_original_user0', 'tweet_type', 'language'],
    ['tw_original_user1', 'tweet_type', 'language']
]

In [15]:
%%time

for col in TE_cols:
    print(col)
    df_tmp = train[
        col + ['reply', 'retweet', 'retweet_comment', 'like']
         ].groupby(col).agg({
        'reply': ['sum', 'count'],
        'retweet': ['sum'], 
        'retweet_comment': ['sum'], 
        'like': ['sum']}
    )
    df_tmp = df_tmp.reset_index()
    df_tmp.columns = col + [
        'reply_sum',
        'reply_count',
        'retweet_sum',
        'retweet_comment_sum',
        'like_sum']
    if False:
        print('Filter>4')
        df_tmp = df_tmp[df_tmp['reply_count']>4]
        df_tmp, = dask.persist(df_tmp)
        n_len = df_tmp.shape[0].compute()
        if n_len>3000000:
            print('Filter std')
            for key in means.keys():
                df_tmp['TE_' + key] = (df_tmp[key + '_sum']+means[key]*psmooth)/(df_tmp['reply_count']+psmooth)
                df_tmp['diff_mean' + key] = (df_tmp['TE_' + key]-means[key]).abs()
            df_tmp = df_tmp[((df_tmp['diff_meanreply']>0.03)|(df_tmp['diff_meanlike']>0.075)|
                             (df_tmp['diff_meanretweet']>0.055)|(df_tmp['diff_meanretweet_comment']>0.011))]
    df_tmp = df_tmp[col + [
        'reply_sum',
        'reply_count',
        'retweet_sum',
        'retweet_comment_sum',
        'like_sum'
    ]].to_parquet('/raid/TE_submission/' + '_'.join(col) + '.parquet')

['b_user_id']


MemoryError: std::bad_alloc: CUDA error at: /opt/conda/envs/rapids/include/rmm/mr/device/cuda_memory_resource.hpp:69: cudaErrorMemoryAllocation out of memory

In [None]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

In [2]:
%%time

import glob
import pandas as pd
import numpy as np
import gc

from datetime import datetime

files = glob.glob('/raid/recsys2021_pre/*')

import dask.dataframe as dd
import pandas as pd
from dask.distributed import Client, wait

client = Client(n_workers=8, 
                threads_per_worker=1,
                memory_limit='300GB')

ddf = dd.read_parquet(files, columns=['b_user_id', 'a_user_id', 'tweet_type',
                                      'language', 'timestamp'] + ['reply', 'retweet', 'retweet_comment', 'like'])

meta = ('time', 'datetime64[s]')

def convert_timestamp(x):
    x = pd.to_datetime(x, unit='s')
    return(x)

ddf['date'] = ddf['timestamp'].map_partitions(convert_timestamp)

VALID_DOW = ''
if VALID_DOW!='':
    print('True')
    train = ddf[ddf['date']<pd.to_datetime(VALID_DOW)].reset_index(drop=True)
else:
    print('False')
    train = ddf
    
means = {}
means['reply'] = 0.02846728456689906
means['like'] = 0.3968895210408169
means['retweet'] = 0.08769760903336701
means['retweet_comment'] = 0.006918407917391091
psmooth = 20

TE_cols = [
    #['b_user_id', 'tweet_type', 'language'],
    ['b_user_id', 'a_user_id']
]

for col in TE_cols:
    print(col)
    df_tmp = train[
        col + ['reply', 'retweet', 'retweet_comment', 'like']
         ].groupby(col).agg({
        'reply': ['sum', 'count'],
        'retweet': ['sum'], 
        'retweet_comment': ['sum'], 
        'like': ['sum']}
    )
    df_tmp = df_tmp.reset_index()
    df_tmp.columns = col + [
        'reply_sum',
        'reply_count',
        'retweet_sum',
        'retweet_comment_sum',
        'like_sum']
    if False:
        print('Filter>4')
        df_tmp = df_tmp[df_tmp['reply_count']>4]
        df_tmp, = dask.persist(df_tmp)
        n_len = df_tmp.shape[0].compute()
        if n_len>3000000:
            print('Filter std')
            for key in means.keys():
                df_tmp['TE_' + key] = (df_tmp[key + '_sum']+means[key]*psmooth)/(df_tmp['reply_count']+psmooth)
                df_tmp['diff_mean' + key] = (df_tmp['TE_' + key]-means[key]).abs()
            df_tmp = df_tmp[((df_tmp['diff_meanreply']>0.03)|(df_tmp['diff_meanlike']>0.075)|
                             (df_tmp['diff_meanretweet']>0.055)|(df_tmp['diff_meanretweet_comment']>0.011))]
    df_tmp = df_tmp[col + [
        'reply_sum',
        'reply_count',
        'retweet_sum',
        'retweet_comment_sum',
        'like_sum'
    ]].to_parquet('/raid/TE_submission/' + '_'.join(col) + '.parquet')

False
['b_user_id', 'a_user_id']
CPU times: user 2min 12s, sys: 45.8 s, total: 2min 58s
Wall time: 40min 10s
