In [1]:
import vaex as vx
from kedro.io import PickleLocalDataSet
from twitter_bot_detection.helpers import log_running_time
from twitter_bot_detection.io.vaex_hdf5 import VaexHDF5DataSet

In [None]:
tweets = vx.open('data/02_intermediate/tweets.hdf5')

In [3]:
def get_dominating_hour_tweets(df):
    temp = df.groupby(['user_id', "hour"], 'count')
    temp = temp.groupby(['user_id'], agg={
        "max_hour": vx.agg.max("count")
    })
    temp = temp.to_pandas_df()
    temp.set_index("user_id", inplace=True)
    return temp

In [4]:
def get_dominating_dayofweek_tweets(df):
    temp = df.groupby(['user_id', "dayofweek"], 'count')
    temp = temp.groupby(['user_id'], agg={
        "max_dow": vx.agg.max("count")
    })
    temp = temp.to_pandas_df()
    temp.set_index("user_id", inplace=True)
    return temp

In [5]:
def get_dominating_day_of_week(df):
    temp = df.groupby(['user_id', "dayofweek"], 'count')
    temp = temp.to_pandas_df()
    temp = temp.loc[temp.groupby(['user_id'])['count'].idxmax()].drop(columns=["count"])
    return temp.set_index("user_id")

In [6]:
def get_dominating_hour(df):
    temp = df.groupby(['user_id', "hour"], 'count')
    temp = temp.to_pandas_df()
    temp = temp.loc[temp.groupby(['user_id'])['count'].idxmax()].drop(columns=["count"])

    return temp.set_index("user_id")
    

In [4]:
tweets.dtypes

id                         <class 'str'>
user_id                    <class 'str'>
created_at                datetime64[ns]
source                     <class 'str'>
is_reply                            bool
is_quote                            bool
hashtags_count                      int8
mentions_count                      int8
urls_count                          int8
symbols_count                       int8
sensitive                           bool
truncated                           bool
lang                       <class 'str'>
is_retweet                          bool
text                       <class 'str'>
retweeted_author           <class 'str'>
media_count                         int8
reply_to                   <class 'str'>
quote_of                   <class 'str'>
retweet_count                      int32
favorite_count                     int32
year                               int64
month                              int64
day                                int64
hour            

In [9]:
@log_running_time
def extract_tweets_features(tweets: VaexHDF5DataSet) -> PickleLocalDataSet:
    features = tweets.groupby(by=tweets["user_id"], agg={
        "tweets": vx.agg.count("id"),
        "unique_sources": vx.agg.nunique("source"),
        "replies": [vx.agg.mean("is_reply")],
        "quotes": [vx.agg.mean("is_quote")],
        "hashtags": [vx.agg.mean("hashtags_count"), vx.agg.max("hashtags_count"), vx.agg.std("hashtags_count")],
        "mentions": [vx.agg.mean("mentions_count"), vx.agg.max("mentions_count"), vx.agg.std("mentions_count")],
        "urls": [vx.agg.mean("urls_count"), vx.agg.max("urls_count"), vx.agg.std("urls_count")],
        "symbols": [vx.agg.mean("symbols_count"), vx.agg.max("symbols_count"), vx.agg.std("symbols_count")],
        "sensitive": [vx.agg.mean("sensitive")],
        "truncated": [vx.agg.mean("truncated")],
        "langs": [vx.agg.nunique("lang")],
        "is_retweet": [vx.agg.mean("is_retweet")],
        #retweeted_author
        "media": [vx.agg.mean("media_count"), vx.agg.max("media_count"), vx.agg.std("media_count")],
                

#         "retweeted_by": [vx.agg.mean("retweet_count"), vx.agg.max("retweet_count"), vx.agg.std("retweet_count")],
#         "favorited_by": [vx.agg.mean("favorite_count"), vx.agg.max("favorite_count"), vx.agg.std("favorite_count")],
# #         "reply_to_tweet_ratio": vx.agg.sum("is_reply") / vx.agg.count("is_reply"),
#         "reply_to_unique": vx.agg.nunique("reply_to"),
#         "quotes_count": vx.agg.sum("is_quote"),
    })
    features = features.to_pandas_df()
    features.set_index("user_id", inplace=True)

#     features["dominating_hour"] = get_dominating_hour(tweets)
#     features["dominating_day_of_week"] = get_dominating_day_of_week(tweets)
    
#     features["dominating_hour_share"] = get_dominating_hour_tweets(tweets) / features["tweets"] 
#     features["dominating_dayofweek_share"] = get_dominating_dayofweek_tweets(tweets) / features["tweets"] 
    return features.drop(columns=["tweets"])

In [10]:
features = extract_tweets_features(tweets)

2020-03-10 22:39:43,766 - twitter_bot_detection.helpers - INFO - Running 'extract_tweets_features' took 20.80 seconds


In [15]:
tweets.groupby(vx.BinnerTime.per_week(tweets['created_at']))

2020-03-10 22:41:58,347 - vaex.execution - ERROR - error in task, flush task queue
Traceback (most recent call last):
  File "/home/eugene/anaconda3/envs/twitter-bot-detection/lib/python3.7/site-packages/vaex/execution.py", line 172, in execute
    task.check()
  File "/home/eugene/anaconda3/envs/twitter-bot-detection/lib/python3.7/site-packages/vaex/tasks.py", line 519, in check
    raise RuntimeError('Aggregation tasks started but nothing to do, maybe adding operations failed?')
RuntimeError: Aggregation tasks started but nothing to do, maybe adding operations failed?


RuntimeError: Aggregation tasks started but nothing to do, maybe adding operations failed?

In [15]:
x = get_dominating_dayofweek_tweets(tweets)
x

Unnamed: 0_level_0,max_dow
user_id,Unnamed: 1_level_1
254341885,575
41304837,594
2880027272,632
136745811,505
18222378,657
...,...
22498583,232
139631247,386
281160191,365
83490960,16


In [40]:
x.loc[["4031355440"]]

Unnamed: 0_level_0,hour
user_id,Unnamed: 1_level_1
4031355440,2


In [33]:
features["dominating_hour"] = x
features

Unnamed: 0_level_0,tweets,unique_sources,replies_mean,quotes_mean,hashtags_mean,hashtags_max,hashtags_std,mentions_mean,mentions_max,mentions_std,...,symbols_max,symbols_std,sensitive_mean,truncated_mean,langs_nunique,is_retweet_mean,media_mean,media_max,media_std,dominating_hour
user_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
358957543,2233,1,0.000000,0.061352,1.307210,17,2.074006,1.407524,13,1.055375,...,10,0.326737,0.538737,0.000000,31,1.000000,0.159875,1,0.366490,4
3126352533,1865,3,0.001072,0.054155,1.243432,14,1.940880,1.435389,8,1.051608,...,10,0.455835,0.548525,0.000000,27,0.997855,0.162466,1,0.368878,19
138200739,1611,6,0.016139,0.077592,1.278709,18,2.135372,1.338920,12,1.043721,...,8,0.346887,0.507759,0.001241,26,0.949721,0.147114,1,0.354219,13
292716510,2315,1,0.000000,0.061339,1.291577,14,1.966697,1.460043,10,1.087739,...,10,0.422828,0.542981,0.000000,30,1.000000,0.151188,1,0.358232,4
4209284126,1420,3,0.001408,0.066901,1.283803,14,2.078838,1.389437,11,1.053379,...,8,0.417084,0.551408,0.000000,26,0.980986,0.159859,1,0.366475,4
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4031355440,382,3,0.646597,0.214660,0.188482,5,0.588790,1.324607,6,1.015261,...,0,0.000000,0.269634,0.130890,5,0.154450,0.028796,1,0.167232,2
2908651951,674,2,0.000000,0.068249,1.120178,12,1.711050,1.397626,9,1.040312,...,8,0.341430,0.560831,0.000000,23,0.980712,0.182493,1,0.386250,19
428161313,1162,6,0.008606,0.001721,0.005164,1,0.071672,1.141136,7,0.604668,...,0,0.000000,0.989673,0.006885,16,0.003442,0.002582,1,0.050745,17
174074593,3078,1,0.120858,0.181936,0.049383,17,0.400006,0.449318,5,0.592237,...,0,0.000000,0.293697,0.108512,18,0.312216,0.116309,1,0.320595,17


In [22]:
x = get_dominating_hour(tweets)
x

Unnamed: 0_level_0,hour
user_id,Unnamed: 1_level_1
1000029438233448448,9
1000197919,3
100024370,15
1000351128389865477,11
1000410554064830470,0
...,...
999632047122591744,12
999647972626452480,14
999730854,16
999776965,4


In [11]:
x = x.to_pandas_df()
x.loc[x.reset_index().groupby(['user_id'])['count'].idxmax()]

Unnamed: 0,user_id,dayofweek,count
225955,1000029438233448448,1,87
185903,1000197919,4,43
76950,100024370,3,751
52643,1000351128389865477,0,586
138304,1000410554064830470,3,233
...,...,...,...
13045,999632047122591744,3,43
186156,999647972626452480,2,242
59660,999730854,4,26
44681,999776965,1,207


In [17]:
%%time
# get_dominating_hour_tweets(tweets)
x[x["user_id"] == "1000029438233448448"]

CPU times: user 13.9 ms, sys: 61 µs, total: 14 ms
Wall time: 13.4 ms


Unnamed: 0,user_id,dayofweek,count
225952,1000029438233448448,4,72
225953,1000029438233448448,3,68
225954,1000029438233448448,2,83
225955,1000029438233448448,1,87
225956,1000029438233448448,0,48
225957,1000029438233448448,6,26
225958,1000029438233448448,5,84


In [19]:
x.groupby(['user_id'], agg={
    "max_hour": vx.agg.max("count")
})

#,user_id,max_hour
0,19095849,330
1,49114382,279
2,370842296,249
3,945405725999632384,127
4,2892652353,49
...,...,...
36795,34692933,288
36796,108618776,134
36797,1151975221,162
36798,908724269567365120,240


In [25]:
x[x["user_id"] == "2926129144"]

#,user_id,hour,count
0,2926129144,9,9
1,2926129144,8,1
2,2926129144,7,1
3,2926129144,6,2
4,2926129144,5,2
...,...,...,...
19,2926129144,13,286
20,2926129144,12,270
21,2926129144,11,98
22,2926129144,10,28


In [None]:
tweets_features = extract_tweets_features(tweets)
# tweets_features["tweets_per_lang"] = tweets_features[]

In [None]:
tweets_features

In [29]:
tweets[tweets["user_id"] == "2530039118"]["is_reply"].value_counts()

False    5480
True      738
dtype: int64

In [2]:
names=[
    "ID", "userID", "createdAt", "source", "isReply", "isQuote", "hashtags", 
    "hashtagsCount", "mentions", "mentionsCount", "urls", "urlsCount", 
    "symbols", "symbolsCount", "sensitive", "truncated", "lang", "isRetweet",
    "text", "retweetedAuthor", "mediaCount", "pollsCount", "replyTo", "quoteOf",
    "quoteCount", "replyCount", "retweetCount", "favoriteCount"
]

In [7]:
%%time
tweets = pd.read_csv('data/01_raw/tweets.csv', names=names, dtype={
    "userID": str,
    },
    usecols=[1, 2],
    parse_dates=["createdAt"],
    quoting=1,
    chunksize=1000000
)

CPU times: user 2.4 ms, sys: 0 ns, total: 2.4 ms
Wall time: 1.25 ms


In [None]:
for chunk in tweets:
    print(chunk.head(1))

In [20]:
%%time
len(tweets["userID"].unique())

CPU times: user 387 µs, sys: 8 µs, total: 395 µs
Wall time: 372 µs


0

In [10]:
from dask import dataframe as dd
import csv

2020-03-07 20:35:05,626 - numexpr.utils - INFO - NumExpr defaulting to 8 threads.


In [11]:
from dask.distributed import Client, progress
client = Client()
client

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


0,1
Client  Scheduler: tcp://127.0.0.1:33691  Dashboard: http://127.0.0.1:41087/status,Cluster  Workers: 4  Cores: 8  Memory: 16.71 GB


In [95]:
%%time
dfd = dd.read_json(
    'data/01_raw/tweets.csv',
    names=names,
    dtype={
        "userID": str,
    },
#     usecols=[1, 2],
    quoting=csv.QUOTE_ALL,
    parse_dates=["createdAt"],
    lines=True
#     blocksize=128000000 # = 64 Mb chunks
).head(2000000)

ParserError: Error tokenizing data. C error: EOF inside string starting at row 189069

In [94]:
%%time
dfd

CPU times: user 7 µs, sys: 0 ns, total: 7 µs
Wall time: 13.1 µs


Unnamed: 0,userID,createdAt
0,2718436417,2018-04-24 16:23:41+00:00
1,2718436417,2018-04-07 21:28:08+00:00
2,2718436417,2018-03-10 17:17:31+00:00
3,2718436417,2018-02-13 17:09:54+00:00
4,2718436417,2017-12-04 04:19:02+00:00
...,...,...
373601,24603089,2020-02-13 12:08:21+00:00
373602,24603089,2020-02-13 12:07:24+00:00
373603,24603089,2020-02-13 12:07:08+00:00
373604,24603089,2020-02-13 12:05:59+00:00


In [86]:
%%time
import csv
x = set()
i = 0
y = None
try:
    with open('data/01_raw/test.csv', mode='rU', encoding='utf-16', newline='',) as csvfile:
        reader = csv.DictReader(csvfile, fieldnames=names, dialect='unix')
        for row in reader:
            y = row
            i += 1
            x.add(row["userID"])
except Exception as e:
    print(e)
    print(row)
len(x)

UTF-16 stream does not start with BOM
OrderedDict([('ID', '175638790180835328'), ('userID', '155659213'), ('createdAt', 'Fri Mar 02 17:48:45 +0000 2012'), ('source', 'WhoSay'), ('isReply', 'True'), ('isQuote', 'False'), ('hashtags', '[]'), ('hashtagsCount', '0'), ('mentions', '[]'), ('mentionsCount', '0'), ('urls', "['http://t.co/ZQdVV6Aq']"), ('urlsCount', '1'), ('symbols', '[]'), ('symbolsCount', '0'), ('sensitive', 'True'), ('truncated', 'False'), ('lang', 'en'), ('isRetweet', 'False'), ('text', "My music/rhythm game Cristiano Ronaldo Freestyle is now on Android phones! Who's going to download it right now? … http://t.co/ZQdVV6Aq"), ('retweetedAuthor', ''), ('mediaCount', '0'), ('pollsCount', '0'), ('replyTo', ''), ('quoteOf', ''), ('quoteCount', '0'), ('replyCount', '0'), ('retweetCount', '556'), ('favoriteCount', '373')])
CPU times: user 0 ns, sys: 1.16 ms, total: 1.16 ms
Wall time: 715 µs


  


0