In [None]:
from dask.distributed import Client 
from dask_cloudprovider.aws import EC2Cluster
import pandas as pd
import numpy as np
import dask.dataframe as dd
import dask.array as da
import matplotlib.pyplot as plt
import seaborn as sns 

In [None]:
# for local connection
client=Client(n_workers=2, threads_per_worker=1, memory_limit='1G')
client

In [None]:
cluster=EC2Cluster()
cluster.scale(2)
cluster = EC2Cluster(instance_type="t2.micro",
                     filesystem_size=8)
client = Client(cluster)
client

In [None]:
def load_data():
    '''
    Load data into local environment as dask ddfs
    '''
    real=dd.read_csv('cresci_data/genuine_accounts/tweets.csv', 
                     parse_dates=['created_at', 'timestamp', 'crawled_at', 'updated'], 
                     dtype={'geo':str, 'place':str})
    real_id=dd.read_csv('cresci_data/genuine_accounts/users.csv', 
                        parse_dates=['created_at', 'timestamp', 'crawled_at', 'updated'])
    
    fake_follow=dd.read_csv('cresci_data/fake_followers/tweets.csv', 
                            parse_dates=['created_at', 'timestamp'], dtype={'geo':str, 'place':str})
    fake_follow_id=dd.read_csv('cresci_data/fake_followers/users.csv', 
                               parse_dates=['created_at','updated'])
    fake_follow_id['created_at']=fake_follow_id.map_partitions(
        lambda df: df.created_at.apply(
            lambda x: x.replace(tzinfo=None)))
    
    social_spambots=dd.read_csv('cresci_data/social_spambots_*/twe*', 
                                parse_dates=['created_at', 'timestamp', 'crawled_at', 'updated'], 
                                dtype={'geo':str, 'place':str})
    social_spambots_id=dd.read_csv('cresci_data/social_spambots_*/use*', 
                                   parse_dates=['created_at', 'timestamp', 'crawled_at', 'updated'])
    
    traditional_spambots=dd.read_csv('cresci_data/traditional_spambots_1/tweets.csv', 
                                     parse_dates=['created_at', 'timestamp', 'crawled_at', 'updated'], 
                                     dtype={'geo':str, 'place':str})
    traditional_spambots_id=dd.read_csv('cresci_data/traditional_spambots_*/u*', 
                                        parse_dates=['created_at', 'timestamp', 'crawled_at', 'updated'],
                                        dtype={'lang': str,
                                               'profile_background_color': str,
                                               'profile_background_image_url_https': str,
                                               'profile_banner_url': str,
                                               'profile_image_url_https': str,
                                               'profile_sidebar_border_color': str,
                                               'profile_sidebar_fill_color': str,
                                               'profile_link_color': str,
                                               'profile_text_color': str,
                                               'time_zone': str})
    
    return real, real_id, fake_follow, fake_follow_id, social_spambots, social_spambots_id, traditional_spambots, traditional_spambots_id

In [None]:
def na_filler(ddf):
    '''
    Replace Na's with 0 in ddf and change columns to appropriate data types 
    '''
    new_ddf=ddf.map_partitions(lambda df: df.fillna(0))
    return new_ddf

In [None]:
def bot_string(ddf, bot_var=1):
    '''
    Create bot label column and other columns that indicate whether is 
    'bot' in the user name, screen name and description.
    0=no 'bot'
    1=yes 'bot'
    '''
    ddf1=ddf.map_partitions(lambda df: df.assign(bot=bot_var))


    ddf2=ddf1.map_partitions(lambda df: 
                           df.assign(bot_name=
                                     df['name'].apply(lambda x:
                                                      np.where('bot' in str(x), 1, 0).item())))
    ddf3=ddf2.map_partitions(lambda df: 
                           df.assign(bot_screen_name=
                                     df['screen_name'].apply(lambda x:
                                                             np.where('bot' in str(x), 1, 0).item())))
    ddf4=ddf3.map_partitions(lambda df: 
                           df.assign(bot_description=
                                     df['description'].apply(lambda x:
                                                             np.where('bot' in str(x), 1, 0).item())))
    return ddf4

In [None]:
def string_len(ddf):
    '''
    Calulate the length of name, screen name and description of each user
    '''
    ddf1=ddf.map_partitions(lambda df: 
                           df.assign(len_name=df['name'].apply(lambda x: len(str(x)))))
    ddf2=ddf1.map_partitions(lambda df: 
                           df.assign(len_screen_name=df['screen_name'].apply(lambda x: len(str(x)))))
    ddf3=ddf2.map_partitions(lambda df: 
                           df.assign(len_description=df['description'].apply(lambda x: len(str(x)))))
    return ddf3        

In [None]:
def account_age(ddf, timeA, timeB):
    '''
    Calculate account age by seconds (due to bots having very short life span).
    '''
    ddf1=ddf.map_partitions(lambda df: df.assign(age_seconds=df.apply(lambda x: 
                                                                     ((x[timeB]-x[timeA])/np.timedelta64(1, 's')), axis=1)))
    return ddf1

In [None]:
def tweet_rate(ddf, ddf_id, column_name):
    '''
    Calcuate the number of tweets each user sent (num tweets/seconds)
    '''
    custom_time=dd.Aggregation('custom_rate',
                          lambda t: (t.max(), t.min()),
                          lambda chunk, chunk1: ((chunk.max()-chunk1.min())/np.timedelta64(1, 's')))
    
    tweet_rate=ddf.groupby('user_id').agg(
        {'id':'count', column_name:custom_time}).replace(0, 1).eval(
        f'tweet_rate=id/{column_name}').drop(['id', column_name], axis=1)
    
    ddf1=ddf_id.merge(tweet_rate, left_on='id', right_on='user_id', how='left').fillna(0)
    
    return ddf1    

In [None]:
real, real_id, fake_follow, fake_follow_id, social_spambots, social_spambots_id, traditional_spambots, traditional_spambots_id=load_data()

In [None]:
real=na_filler(real)
real_id=na_filler(real_id)
fake_follow=na_filler(fake_follow)
fake_follow_id=na_filler(fake_follow_id)
social_spambots=na_filler(social_spambots)
social_spambots_id=na_filler(social_spambots_id)
traditional_spambots=na_filler(traditional_spambots)
traditional_spambots_id=na_filler(traditional_spambots_id)

In [None]:
real_id=bot_string(real_id, bot_var=0)
fake_follow_id=bot_string(fake_follow_id)
social_spambots_id=bot_string(social_spambots_id)
traditional_spambots_id=bot_string(traditional_spambots_id)

In [None]:
real_id=string_len(real_id)
fake_follow_id=string_len(fake_follow_id)
social_spambots_id=string_len(social_spambots_id)
traditionalspambots_id=string_len(traditional_spambots_id)

In [None]:
real_id=account_age(real_id, 'timestamp', 'updated')
fake_follow_id=account_age(fake_follow_id, 'created_at', 'updated')
social_spambots_id=account_age(social_spambots_id, 'timestamp', 'updated')
traditional_spambots_id=account_age(traditional_spambots_id, 'timestamp', 'updated')

In [None]:
real_id=tweet_rate(real, real_id, 'updated')
fake_follow_id=tweet_rate(fake_follow, fake_follow_id, 'timestamp')
social_spambots_id=tweet_rate(social_spambots, social_spambots_id, 'updated')
traditional_spambots_id=tweet_rate(traditional_spambots, traditional_spambots_id, 'updated')

In [None]:
fake_follow_id.head()

In [None]:
traditional_spambots.retweet_count.value_counts().compute()

# EDA

In [None]:
sns.scatterplot(x='followers_count', y='friends_count', data=real_id.compute()).set_title('Real users')

In [None]:
sns.scatterplot(x='followers_count', y='friends_count', data=fake_follow_id.compute()).set_title('Fake follower bots')

In [None]:
sns.scatterplot(x='followers_count', y='friends_count', data=social_spambots_id.compute()).set_title('Social spambots')

In [None]:
sns.scatterplot(x='followers_count', y='friends_count', 
                data=traditional_spambots_id.compute()).set_title('Traditional spambots')