Copyright (c) 2020, NVIDIA CORPORATION.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
    http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

#### ~~Tweet_id is encoded using approximate hashing which caused hashing collisions. Exact encoding of tweet_id is blocked by a bug in cudf. All other columns are using exact encoding.~~ 
Fixed

In [1]:
import sys
sys.path.append('../../..')

import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt
import pandas as pd
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import subprocess

import dask.multiprocessing
dask.config.set(schedular='process')

from utils.cuda_cluster import *
from utils.dataset import read_data, factorize_small_cardinality


import core.config as conf


In [2]:
client = Client()
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 40083 instead
  http_address["port"], self.http_server.port


0,1
Client  Scheduler: tcp://127.0.0.1:33247  Dashboard: http://127.0.0.1:40083/status,Cluster  Workers: 5  Cores: 20  Memory: 33.47 GB


In [3]:
data_path = conf.raw_data_path + 'part-00175'
df = read_data(data_path, n_partitions=conf.n_partitions)
features = [
    'text_tokens',    ###############
    'hashtags',       #Tweet Features
    'tweet_id',       #
    'media',          #
    'links',          #
    'domains',        #
    'tweet_type',     #
    'language',       #
    'timestamp',      ###############
    'creator_user_id',              ###########################
    'creator_follower_count',       #Engaged With User Features
    'creator_following_count',      #
    'creator_is_verified',          #
    'creator_account_creation',     ###########################
    'engager_user_id',              #######################
    'engager_follower_count',       #Engaging User Features
    'engager_following_count',      #
    'engager_is_verified',          #
    'engager_account_creation',     #######################
    'engager_follows_creator',    #################### Engagement Features
    'reply',          #Target Reply
    'retweet',        #Target Retweet    
    'retweet_comment',#Target Retweet with comment
    'like',           #Target Like
                      ####################
]

df.columns = features

df = df.drop('text_tokens', axis=1)
df, = dask.persist(df)
_ = wait(df)

df.head()

number of rows: 3033347


Unnamed: 0_level_0,hashtags,tweet_id,media,links,domains,tweet_type,language,timestamp,creator_user_id,creator_follower_count,...,engager_user_id,engager_follower_count,engager_following_count,engager_is_verified,engager_account_creation,engager_follows_creator,reply,retweet,retweet_comment,like
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1,,C8F345CF8BC7A86E34572072ECFBBEC4,Photo\tPhoto,,,TopLevel,B8B04128918BBF54E2E178BFF1ABA833,1612993854,BC99C936FE4C2A1FDB0CD09295CAA53C,4753,...,411C3FA9B6AB5CA95192D875CDC22823,76,189,False,1435892882,True,0,0,0,1613048460
2,E48B050ADC7D0DE54FA50374F4C9A063\tCF4C782EBE1C...,C1E31636C343B780BA776E4B73147028,,,,Retweet,9FCF19233EAD65EA6E32C2E6DC03A444,1612886900,F4ADB8DEADF1980D0B5C2B796C108F78,110643,...,E764026AB0E38A5C2FF19921D73B6C18,260,379,False,1393057295,False,0,0,0,0
3,,B436C84E80C2430BA9DE41FDF04C73BF,,,,TopLevel,B0FA488F2911701DD8EC5B1EA5E322D8,1614019237,59EBFE44ABE4C5E31528340304F529C3,4480,...,455134BAAD3EAC4093393EC233FBAEF9,1259,868,False,1587102370,True,1614019431,0,0,0
4,,033FFA42C8AD502057AE96C8B4B812BE,,,,TopLevel,1F73BB863A39DB62B4A55B7E558DB1E8,1612779567,DF6A02AB1731A91FA46A2259F398F57B,461,...,92D70497B86CAFBA5C51E331084462AD,437,597,False,1419347918,True,0,0,0,1612780118
5,,84F2E902BA3CF3B34B8D056F6F78D488,,,,Retweet,E7F038DE3EAD397AEC9193686C911677,1613822114,A656845C3239DB662CFD45D64F2B94F5,1308,...,DC1C8A9412B9E266A4C3D4CAF6DB06CB,247,404,False,1507470713,True,0,0,0,0


In [4]:
df.dtypes

hashtags                    object
tweet_id                    object
media                       object
links                       object
domains                     object
tweet_type                  object
language                    object
timestamp                    int32
creator_user_id             object
creator_follower_count       int32
creator_following_count      int32
creator_is_verified           bool
creator_account_creation     int32
engager_user_id             object
engager_follower_count       int32
engager_following_count      int32
engager_is_verified           bool
engager_account_creation     int32
engager_follows_creator       bool
reply                        int32
retweet                      int32
retweet_comment              int32
like                         int32
dtype: object

In [5]:

%%time
df['id']   = 1
df['id']   = df['id'].cumsum()
df['id'] = df['id'].astype('int32')

df['reply']   = df['reply'].fillna(0)
df['retweet'] = df['retweet'].fillna(0)
df['retweet_comment'] = df['retweet_comment'].fillna(0)
df['like']    = df['like'].fillna(0)

df['reply']   = df['reply'].astype('int32')
df['retweet'] = df['retweet'].astype('int32')
df['retweet_comment'] = df['retweet_comment'].astype('int32')
df['like']    = df['like'].astype('int32')
df, = dask.persist(df)
_ = wait(df)

CPU times: user 664 ms, sys: 36.7 ms, total: 701 ms
Wall time: 2.06 s


In [6]:
%%time

df['timestamp']         = df['timestamp'].astype( np.int32 )
df['creator_follower_count']  = df['creator_follower_count'].astype( np.int32 )
df['creator_following_count'] = df['creator_following_count'].astype( np.int32 )
df['creator_account_creation']= df['creator_account_creation'].astype( np.int32 )
df['engager_follower_count']  = df['engager_follower_count'].astype( np.int32 )
df['engager_following_count'] = df['engager_following_count'].astype( np.int32 )
df['engager_account_creation']= df['engager_account_creation'].astype( np.int32 )

df, = dask.persist(df)
_ = wait(df)
df.head()


CPU times: user 236 ms, sys: 5.07 ms, total: 241 ms
Wall time: 817 ms


Unnamed: 0_level_0,hashtags,tweet_id,media,links,domains,tweet_type,language,timestamp,creator_user_id,creator_follower_count,...,engager_follower_count,engager_following_count,engager_is_verified,engager_account_creation,engager_follows_creator,reply,retweet,retweet_comment,like,id
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1,,C8F345CF8BC7A86E34572072ECFBBEC4,Photo\tPhoto,,,TopLevel,B8B04128918BBF54E2E178BFF1ABA833,1612993854,BC99C936FE4C2A1FDB0CD09295CAA53C,4753,...,76,189,False,1435892882,True,0,0,0,1613048460,1
2,E48B050ADC7D0DE54FA50374F4C9A063\tCF4C782EBE1C...,C1E31636C343B780BA776E4B73147028,,,,Retweet,9FCF19233EAD65EA6E32C2E6DC03A444,1612886900,F4ADB8DEADF1980D0B5C2B796C108F78,110643,...,260,379,False,1393057295,False,0,0,0,0,2
3,,B436C84E80C2430BA9DE41FDF04C73BF,,,,TopLevel,B0FA488F2911701DD8EC5B1EA5E322D8,1614019237,59EBFE44ABE4C5E31528340304F529C3,4480,...,1259,868,False,1587102370,True,1614019431,0,0,0,3
4,,033FFA42C8AD502057AE96C8B4B812BE,,,,TopLevel,1F73BB863A39DB62B4A55B7E558DB1E8,1612779567,DF6A02AB1731A91FA46A2259F398F57B,461,...,437,597,False,1419347918,True,0,0,0,1612780118,4
5,,84F2E902BA3CF3B34B8D056F6F78D488,,,,Retweet,E7F038DE3EAD397AEC9193686C911677,1613822114,A656845C3239DB662CFD45D64F2B94F5,1308,...,247,404,False,1507470713,True,0,0,0,0,5


In [7]:
df.dtypes

hashtags                    object
tweet_id                    object
media                       object
links                       object
domains                     object
tweet_type                  object
language                    object
timestamp                    int32
creator_user_id             object
creator_follower_count       int32
creator_following_count      int32
creator_is_verified           bool
creator_account_creation     int32
engager_user_id             object
engager_follower_count       int32
engager_following_count      int32
engager_is_verified           bool
engager_account_creation     int32
engager_follows_creator       bool
reply                        int32
retweet                      int32
retweet_comment              int32
like                         int32
id                           int32
dtype: object

In [8]:
train_size = len(df)#.shape[0]

In [9]:
df.head()['language']

id
1    B8B04128918BBF54E2E178BFF1ABA833
2    9FCF19233EAD65EA6E32C2E6DC03A444
3    B0FA488F2911701DD8EC5B1EA5E322D8
4    1F73BB863A39DB62B4A55B7E558DB1E8
5    E7F038DE3EAD397AEC9193686C911677
Name: language, dtype: object

In [10]:
df.info()

<class 'dask_cudf.core.DataFrame'>
Columns: 24 entries, hashtags to id
dtypes: object(9), bool(3), int32(12)

In [11]:
print(df.npartitions,len(df))

16 3033347


In [13]:
df['media'] = df['media'].fillna('')
def split_join(ds, sep):
    return ds.str.replace('\t', '_')
df['media'] = df['media'].map_partitions( lambda x:  split_join(x,'\t'), meta=('O'))

df.head()

Unnamed: 0_level_0,hashtags,tweet_id,media,links,domains,tweet_type,language,timestamp,creator_user_id,creator_follower_count,...,engager_follower_count,engager_following_count,engager_is_verified,engager_account_creation,engager_follows_creator,reply,retweet,retweet_comment,like,id
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1,,C8F345CF8BC7A86E34572072ECFBBEC4,Photo_Photo,,,TopLevel,B8B04128918BBF54E2E178BFF1ABA833,1612993854,BC99C936FE4C2A1FDB0CD09295CAA53C,4753,...,76,189,False,1435892882,True,0,0,0,1613048460,1
2,E48B050ADC7D0DE54FA50374F4C9A063\tCF4C782EBE1C...,C1E31636C343B780BA776E4B73147028,,,,Retweet,9FCF19233EAD65EA6E32C2E6DC03A444,1612886900,F4ADB8DEADF1980D0B5C2B796C108F78,110643,...,260,379,False,1393057295,False,0,0,0,0,2
3,,B436C84E80C2430BA9DE41FDF04C73BF,,,,TopLevel,B0FA488F2911701DD8EC5B1EA5E322D8,1614019237,59EBFE44ABE4C5E31528340304F529C3,4480,...,1259,868,False,1587102370,True,1614019431,0,0,0,3
4,,033FFA42C8AD502057AE96C8B4B812BE,,,,TopLevel,1F73BB863A39DB62B4A55B7E558DB1E8,1612779567,DF6A02AB1731A91FA46A2259F398F57B,461,...,437,597,False,1419347918,True,0,0,0,1612780118,4
5,,84F2E902BA3CF3B34B8D056F6F78D488,,,,Retweet,E7F038DE3EAD397AEC9193686C911677,1613822114,A656845C3239DB662CFD45D64F2B94F5,1308,...,247,404,False,1507470713,True,0,0,0,0,5


In [15]:
%%time
for col in ['language','tweet_type','media']:
    df,_ = factorize_small_cardinality(df,col)

CPU times: user 888 ms, sys: 35.1 ms, total: 924 ms
Wall time: 3.45 s


In [30]:
df = df.drop('language', axis=1)
df = df.drop('tweet_type', axis=1)
df = df.drop('media', axis=1)
df = df.rename(columns = {'language_encode':'language'})
df = df.rename(columns = {'tweet_type_encode':'tweet_type'})
df = df.rename(columns = {'media_encode':'media'})

In [31]:
df.head()

Unnamed: 0,hashtags,links,domains,timestamp,creator_follower_count,creator_following_count,creator_is_verified,creator_account_creation,engager_follower_count,engager_following_count,...,reply,retweet,retweet_comment,like,language,tweet_type,media,tweet_id,creator_user_id,engager_user_id
0,,,,1613273040,93498,1536,True,1278252855,669,740,...,0,0,0,1613277907,19,0,0,1552126,347802,189330
1,,,,1613825197,3191,478,False,1240809025,144,203,...,0,0,0,1613825437,61,2,0,551482,387049,88035
2,,,,1612769501,159,161,False,1598884793,295,395,...,0,0,0,0,57,2,4,1372003,404851,56471
3,,,,1612736658,2594,913,False,1329253936,746,661,...,1612737594,0,0,1612737594,46,2,0,742026,423015,187313
4,,,,1614142340,1866,4982,False,1258936374,422,507,...,0,0,0,1614162137,61,2,4,1586000,421298,158258


In [32]:
%%time
tweet = df[['tweet_id']]
tweet = tweet.drop_duplicates(split_out=16)
tweet['tweet_encode'] = 1
tweet['tweet_encode'] = tweet['tweet_encode'].cumsum()
tweet, = dask.persist(tweet)
_ = wait(tweet)
tweet.head()

CPU times: user 238 ms, sys: 12.1 ms, total: 251 ms
Wall time: 639 ms


Unnamed: 0,tweet_id,tweet_encode
42225,5,1
136360,9,2
140315,11,3
85412,54,4
111714,92,5


In [33]:
%%time
df = df.merge(tweet,on='tweet_id',how='left')
df = df.drop('tweet_id',axis=1)
df.columns = [i if i!='tweet_encode' else 'tweet_id' for i in df.columns]
df, = dask.persist(df)
wait(df)
del tweet
df.head()

CPU times: user 369 ms, sys: 17 ms, total: 386 ms
Wall time: 1.87 s


Unnamed: 0,hashtags,links,domains,timestamp,creator_follower_count,creator_following_count,creator_is_verified,creator_account_creation,engager_follower_count,engager_following_count,...,reply,retweet,retweet_comment,like,language,tweet_type,media,creator_user_id,engager_user_id,tweet_id
0,AAD42DFA66FF0130153B64969FEB3EF8,,,1613440158,7305441,91,True,1243718342,44,127,...,0,0,0,0,48,2,5,2399561,69204,2473
1,881AF0B899BA9B790DA2A4B3E643A9AF,,,1612655251,14749,985,False,1306644632,117,883,...,0,0,0,1612667012,61,1,0,1676003,80459,144178
2,,,,1613764985,79726,321,False,1318537465,435,515,...,0,0,0,0,41,1,0,1509450,29236,99914
3,,,,1612552142,356,640,False,1362098882,282,318,...,0,0,0,0,10,1,4,2156228,65255,149369
4,,,,1612939269,1316508,535,True,1341922272,5768,4505,...,0,1612960208,0,0,47,2,4,2097365,67615,70884


In [34]:
%%time
user_a = df[['creator_user_id']].drop_duplicates(split_out=16)
user_a, = dask.persist(user_a)
_ = wait(user_a)
user_b = df[['engager_user_id']].drop_duplicates(split_out=16)
user_b, = dask.persist(user_b)
wait(user_b)
print(len(user_a),len(user_b),len(df))

user_a.columns = ['user_id']
user_b.columns = ['user_id']
user_b['dummy'] = 1
user_a = user_a.merge(user_b,on='user_id',how='outer')
user_a = user_a.drop('dummy',axis=1)
user_a, = dask.persist(user_a)
wait(user_a)
print(len(user_a),len(user_b),len(df))
del user_b

user_a['user_encode'] = 1
user_a['user_encode'] = user_a['user_encode'].cumsum()
user_a, = dask.persist(user_a)
_ = wait(user_a)

1558866 2175376 3033347
3511090 2175376 3033347
CPU times: user 588 ms, sys: 22.2 ms, total: 610 ms
Wall time: 1.19 s


In [35]:
%%time
df = df.merge(user_a,left_on='creator_user_id',right_on='user_id',how='left')
df = df.drop(['creator_user_id','user_id'],axis=1)
df.columns = [i if i!='user_encode' else 'creator_user_id' for i in df.columns]
df, = dask.persist(df)
_ = wait(df)

CPU times: user 325 ms, sys: 19.8 ms, total: 345 ms
Wall time: 1.88 s


In [36]:
%%time
df = df.merge(user_a,left_on='engager_user_id',right_on='user_id',how='left')
df = df.drop(['engager_user_id','user_id'],axis=1)
df.columns = [i if i!='user_encode' else 'engager_user_id' for i in df.columns]
df, = dask.persist(df)
wait(df)
del user_a
df.head()

CPU times: user 320 ms, sys: 0 ns, total: 320 ms
Wall time: 1.8 s


Unnamed: 0,hashtags,links,domains,timestamp,creator_follower_count,creator_following_count,creator_is_verified,creator_account_creation,engager_follower_count,engager_following_count,...,reply,retweet,retweet_comment,like,language,tweet_type,media,tweet_id,creator_user_id,engager_user_id
0,,,,1613889799,4699,273,False,1560832031,187,529,...,0,0,0,1613892434,61,2,10,1627164,135209,107473
1,,,,1613873373,9720780,9283,True,1380227131,275,259,...,0,0,0,0,48,2,0,1306403,204811,55210
2,,,,1613232366,2052,485,False,1495190063,471,381,...,0,1613239338,0,0,10,2,4,1438881,185342,50889
3,1C807AD6102295FB10E404BA57DDD31D,F3CB8E1E77A8FE91EB294D7A8EB7CC94,E8AD4213C8D8247BC233D37CFFBBEE96,1612812188,327944,1903,True,1212794544,523,2267,...,0,0,0,0,19,2,0,1115580,192746,48968
4,524054DA1796A0FC7A09EDD25634E7E7\tEFF74E1B549B...,6E72F518EAFE47705C9D239D45F2B112,244B6C5815741C40EB100AB72FF93F7C,1612826291,174160,0,True,1517540495,299,267,...,0,0,0,1612831482,61,2,4,1252560,170438,64951


In [37]:
%%time
df = df.repartition(npartitions=conf.n_partitions)
df, = dask.persist(df)
_ = wait(df)

CPU times: user 0 ns, sys: 5.06 ms, total: 5.06 ms
Wall time: 4.52 ms


In [38]:
%%time
df.to_parquet(f'{conf.data_root}/dask_input/step1_output',write_index=False)

CPU times: user 39.3 ms, sys: 15.3 ms, total: 54.6 ms
Wall time: 440 ms


In [39]:
df.head()

Unnamed: 0,hashtags,links,domains,timestamp,creator_follower_count,creator_following_count,creator_is_verified,creator_account_creation,engager_follower_count,engager_following_count,...,reply,retweet,retweet_comment,like,language,tweet_type,media,tweet_id,creator_user_id,engager_user_id
0,,,,1613889799,4699,273,False,1560832031,187,529,...,0,0,0,1613892434,61,2,10,1627164,135209,107473
1,,,,1613873373,9720780,9283,True,1380227131,275,259,...,0,0,0,0,48,2,0,1306403,204811,55210
2,,,,1613232366,2052,485,False,1495190063,471,381,...,0,1613239338,0,0,10,2,4,1438881,185342,50889
3,1C807AD6102295FB10E404BA57DDD31D,F3CB8E1E77A8FE91EB294D7A8EB7CC94,E8AD4213C8D8247BC233D37CFFBBEE96,1612812188,327944,1903,True,1212794544,523,2267,...,0,0,0,0,19,2,0,1115580,192746,48968
4,524054DA1796A0FC7A09EDD25634E7E7\tEFF74E1B549B...,6E72F518EAFE47705C9D239D45F2B112,244B6C5815741C40EB100AB72FF93F7C,1612826291,174160,0,True,1517540495,299,267,...,0,0,0,1612831482,61,2,4,1252560,170438,64951
