In [None]:
#%pip install --upgrade networkx[default]
#%pip install pyinform

In [None]:
# reqruied libraries

import glob
import re
import os
import time
import datetime

import pandas as pd
import numpy as np
import networkx as nx
import pyinform

# local functions import

import transfer_entropy_functions

In [None]:
# s3 specific libraries
import s3fs
import boto3
s3 = s3fs.S3FileSystem(anon=False)

In [None]:
# Running with S3 (inside AWS) ?
RUNNING_IN_S3 = True

In [None]:
# Directories
OUTPUT_DIR = './OUTPUT'

#PC_INPUT_DIR = 'C:/STUFF/RESEARCH/Brandwatch/TEST'
PC_INPUT_DIR = 'C:/STUFF/RESEARCH/Brandwatch/DATA/MainQuery/All'
S3_INPUT_DIR = 's3://mips-main/initial_data_collection/raw_data/brandwatch'
INPUT_DIR = S3_INPUT_DIR if RUNNING_IN_S3 else PC_INPUT_DIR

S3_NEWS_DOMAINS_CSV_FILE = 's3://mips-main/initial_data_collection/news_outlets_v2.csv'
PC_NEWS_DOMAINS_CSV_FILE = 'C:/STUFF/RESEARCH/Brandwatch/DATA/news_outlets_v2.csv'
NEWS_DOMAINS_CSV_FILE = S3_NEWS_DOMAINS_CSV_FILE if RUNNING_IN_S3 else PC_NEWS_DOMAINS_CSV_FILE

S3_NEWS_DOMAIN_TUFM_FILE = 's3://mips-main/initial_data_collection/processed_data/news_guard/news_table-v1-UT60-FM5.csv'
PC_NEWS_DOMAIN_TUFM_FILE = 'C:/STUFF/RESEARCH/Brandwatch/DATA/news_table-v1-UT60-FM5.csv'
NEWS_DOMAIN_TUFM_FILE = S3_NEWS_DOMAIN_TUFM_FILE if RUNNING_IN_S3 else PC_NEWS_DOMAIN_TUFM_FILE

In [None]:
# Parameters
SAMPLE_FREQUENCY = 'D'
skip_strings = {'add', 'al', 'au', 'ca', 'com', 'es', 'in', 'is', 'it', 'ms', 'my', 'net', 'news', 'org', 'rs', 'st', 'tv', 'uk', 'us', 'co'}
MIN_COMM_SIZE = 500
MIN_PLAT_SIZE = 50
KNOWN_PLATFORMS = {'twitter.com', 'tumblr.com', 'youtube.com', 'reddit.com', '4chan.org', 'facebook.com', 'gab.com'}

In [None]:
def read_csv_data(data_directory, filter_platform_domain_set, is_using_s3):
    """
    Reads data from all the csv files in the given directory
    :param data_directory: Path to the directory that contains the csv files
    :type data_directory: str
    :return: pandas Dataframe that contains all the data from all csv files
    :rtype: pd.Dataframe
    """
    data_files = []
    if is_using_s3:
        data_files = s3.glob(os.path.join(data_directory, "*.csv*"))
    else:
        data_files = glob.glob(os.path.join(data_directory, "*.csv*"))
    prefix_path = ''
    if is_using_s3:
        prefix_path = 's3://'
    print(data_files)
    df_list = []
    for idx, file in enumerate(data_files):
        print(f"Reading {idx + 1} of {len(data_files)} files.\nFile name: {file}")
        df = pd.read_csv(prefix_path + data_files[idx], skiprows=6, parse_dates=['Date'],
                         dtype={'Twitter Author ID': str, 'Author':str,
                                'Full Text':str, 'Title':str,
                                'Thread Id':str, 'Thread Author':str,
                                'Domain':str, 'Expanded URLs':str,
                                'Avatar':str, 'Parent Blog Name':str, 'Root Blog Name':str})
        # df = df[['Date', 'Hashtags', 'Twitter Author ID', 'Author', 'Url', 'Thread Id', 'Thread Author', 'Domain']]
        df = df.rename(columns={'Date':'datetime', 'Author': 'source_user_id',
                                'Full Text':'content', 'Title':'title',
                                'Thread Id': 'parent_source_msg_id', 'Thread Author': 'parent_source_user_id',
                                'Domain':'platform', 'Expanded URLs':'article_url'})
        df_list.append(df)
        
    start_time = time.time() 
    result_df = pd.concat(df_list)
    end_time = time.time() 
    print(f"{(end_time - start_time)/60} mins for concat dataframes")
    
    start_time = time.time() 
    result_df.drop_duplicates(subset='Url', keep="first", inplace=True)
    end_time = time.time() 
    print(f"{(end_time - start_time)/60} mins for drop duplicates")
    
    start_time = time.time()
    result_df['platform'].value_counts().rename('users_count').rename_axis('platform').to_csv(OUTPUT_DIR + "/platform_counts_info.csv")
    result_df = result_df[result_df['platform'].isin(filter_platform_domain_set)]
    end_time = time.time() 
    print(f"{(end_time - start_time)/60} mins for filtering platforms")
    
    result_df.reset_index(drop=True, inplace=True)
    print(result_df.shape)
    return result_df

In [None]:
%%time
# read data and filter for only the KNOWN_PLATFORMS
all_osn_msgs_df = read_csv_data(INPUT_DIR, KNOWN_PLATFORMS, RUNNING_IN_S3)
all_osn_msgs_df

In [None]:
all_osn_msgs_df

In [None]:
%%time
# add user_id column and parent_user_id column

def generate_users_dict(osn_msgs_df):
    global next_user_id
    users_data = {}
    next_user_id = 0
    def extract_user(row):
        global next_user_id
        if (row['platform'], row['source_user_id']) not in users_data:
            users_data[(row['platform'], row['source_user_id'])] = next_user_id
            next_user_id += 1
        if (row['platform'], row['parent_source_user_id']) not in users_data:
            users_data[(row['platform'], row['parent_source_user_id'])] = next_user_id
            next_user_id += 1
    all_osn_msgs_df.apply(lambda row: extract_user(row), axis=1)
    print(len(users_data))
    return users_data

users_data = generate_users_dict(all_osn_msgs_df)

users_df = pd.Series(users_data).rename_axis(['platform','source_user_id']).rename('user_id').reset_index()
print(users_df.shape)
print(users_df)
users_df.to_csv(OUTPUT_DIR + "/users.csv",index=False)

# add user_id column and parent_user_id column
all_osn_msgs_df[['user_id','parent_user_id']] = all_osn_msgs_df.apply(lambda row: pd.Series([
            users_data[(row['platform'],row['source_user_id'])],
            users_data[(row['platform'],row['parent_source_user_id'])]
        ]), axis = 1)

In [None]:
%%time
# Add news_domains column

def find_patterns_for_domains(in_news_domains_csv_file, in_skip_strings):
    news_domains = pd.read_csv(in_news_domains_csv_file)['news outlets'].rename('news_outlets')
    news_domains_set = set(news_domains.to_list())
    pattern_to_news_domain_name = {re.compile(nd):nd for nd in news_domains_set}
    for nd in news_domains_set:
        valid_split_strs = set(nd.split('.'))
        for e in in_skip_strings:
            valid_split_strs.discard(e)
        for sp in valid_split_strs:
            if len(sp) > 2:
                pattern_to_news_domain_name[re.compile(sp)] = nd
    return pattern_to_news_domain_name

def search_domain_in_string(in_expanded_url, in_news_domains_names):
    # print(in_expanded_url)
    max_len_match = None
    max_len_found = 0
    for ndn in in_news_domains_names:
        match_obj = ndn.search(in_expanded_url)
        if match_obj:
            # print(match_obj, in_news_domains_names[ndn], match_obj.end() - match_obj.start())
            if max_len_found < match_obj.end() - match_obj.start():
                max_len_match = ndn
                max_len_found = match_obj.end() - match_obj.start()
    return in_news_domains_names[max_len_match] if max_len_match is not None else None


def calculate_news_domain_series(in_string_series, in_skip_strings):
    news_domains_names = find_patterns_for_domains(NEWS_DOMAINS_CSV_FILE, in_skip_strings)
    return in_string_series.apply(lambda x: search_domain_in_string(x, news_domains_names) if type(x) is str else None)

all_osn_msgs_df['news_domain'] = calculate_news_domain_series(all_osn_msgs_df['article_url'], skip_strings)
all_osn_msgs_df = all_osn_msgs_df.loc[all_osn_msgs_df['news_domain'].notnull(), :]

In [None]:
all_osn_msgs_df

In [None]:
# read TUMF class values of news_domains
tufmdf = pd.read_csv(NEWS_DOMAIN_TUFM_FILE)
tufmdf.rename(columns={'Domain':'news_domain'}, inplace=True)
print("raw: " ,tufmdf['tufm_class'].unique())
def get_clearn_tufm_class(row):
    if row['tufm_class'] == '0':
        return 'UF'
    else:
        return row['tufm_class']
    
tufmdf['clean_tufm_class'] = tufmdf.apply(lambda row: get_clearn_tufm_class(row), axis=1)
tufmdf = tufmdf[['news_domain','clean_tufm_class']].rename(columns={'clean_tufm_class':'class'})
print("cleaned:", tufmdf['class'].unique())
print(tufmdf)
news_domain_to_tufm_class = tufmdf.set_index('news_domain')['class'].to_dict()
del tufmdf
news_domain_to_tufm_class

In [None]:
# add TUFMClass to osn messages table
all_osn_msgs_df['class'] = all_osn_msgs_df['news_domain'].apply(lambda x: news_domain_to_tufm_class[x])
print(all_osn_msgs_df['class'].unique())
all_osn_msgs_df

In [None]:
#%%time
# generate retweet network
retweet_network_edges = all_osn_msgs_df.groupby(['parent_user_id','user_id'], as_index=False).size().sort_values('size', ascending=False).rename(columns={'size':'num_retweets'})
print(retweet_network_edges)
retweet_network_edges.to_csv(OUTPUT_DIR + "/retweet_network.csv", index=False)

In [None]:
#%%time
# generate community partition on retweet network
G = nx.from_pandas_edgelist(retweet_network_edges,'parent_user_id','user_id',['num_retweets'])
print(G)
lc = nx.algorithms.community.louvain_communities(G, resolution=1, seed=235246345345)
print(len(lc))
user_comm_sizes = pd.Series([len(c) for c in lc]).value_counts().rename_axis('community_size').rename('num_of_communities').reset_index().sort_values('community_size', ascending=False).reset_index(drop=True).rename_axis('size_rank')
user_comm_sizes.to_csv(OUTPUT_DIR + "/all_user_comm_sizes.csv")
print(user_comm_sizes[user_comm_sizes['community_size'] > 500])
filtered_comms = [c for c in lc if len(c) > MIN_COMM_SIZE]
print(f"Number of communities with min of {MIN_COMM_SIZE} users : {len(filtered_comms)}")

In [None]:
#%%time
# generate actor tables 

# generate platform actors table
platform_to_user_count = all_osn_msgs_df['platform'].value_counts().to_dict()
platform_actors_df = pd.Series(platform_to_user_count).rename('users_count').rename_axis('platform').reset_index()
platform_actors_df = platform_actors_df[(platform_actors_df['users_count'] > MIN_PLAT_SIZE) & (platform_actors_df['platform'].isin(KNOWN_PLATFORMS))].reset_index(drop=True).rename_axis('actor_id')
next_actor_id = platform_actors_df.index.max() + 1
print(next_actor_id)
print("plat_actors:\n", platform_actors_df, "\n")
platform_actors_df.to_csv(OUTPUT_DIR + "/plat_actors.csv")

# generate individual actors table
ind_actors_df = all_osn_msgs_df['parent_user_id'].value_counts().iloc[:100].rename('received_share_count').rename_axis('user_id').reset_index().rename_axis('actor_id')
ind_actors_df.index = ind_actors_df.index + next_actor_id
ind_actors_df.reset_index(inplace=True)
next_actor_id = ind_actors_df['actor_id'].max() + 1
print(next_actor_id)
print("indv_actors:\n", ind_actors_df, "\n")
ind_actors_df.to_csv(OUTPUT_DIR + "/indv_actors.csv", index=False)

# generate community actors table
comm_info_df = pd.DataFrame([[len(c), c] for c in filtered_comms], columns=['users_count', 'users_set']).sort_values('users_count', ascending=False).reset_index(drop=True).rename_axis('comm_id')
comm_info_df['actor_id'] = comm_info_df.index + next_actor_id
comm_info_df = comm_info_df.reset_index().set_index('actor_id')
comm_info_df.to_csv(OUTPUT_DIR + "/comm_info.csv")
print("Community info :\n", comm_info_df)
comm_actors_data = []
for actid,row in comm_info_df.iterrows():
    for uid in row['users_set']:
        comm_actors_data.append([uid, row['comm_id'], actid])
        
comm_actors_df = pd.DataFrame(comm_actors_data, columns=['user_id','comm_id','actor_id'])
print("comm_actors:\n", comm_actors_df, "\n")
comm_actors_df.to_csv(OUTPUT_DIR + "/comm_actors.csv", index=False)


# generate actors table
all_actors = [[plt_aid, platform_actors_df.loc[plt_aid]['platform'], 'plat', platform_actors_df.loc[plt_aid]['users_count']] for plt_aid in platform_actors_df.index.to_list()]
all_actors = all_actors + [[indv,
                            users_df[ users_df['user_id'] == ind_actors_df[ind_actors_df['actor_id']==indv].iloc[0]['user_id'] ].iloc[0]['source_user_id'],
                            'indv', 1] for indv in ind_actors_df['actor_id']]
print('here')
all_actors = all_actors + list({(comm_aid, comm_info_df.loc[comm_aid]['comm_id'], 'comm', comm_info_df.loc[comm_aid]['users_count']) for comm_aid in comm_info_df.index})
print('here')
all_actors_df = pd.DataFrame(all_actors, columns=['actor_id','actor_label','actor_type','num_users']).sort_values('actor_id')
all_actors_df.to_csv(OUTPUT_DIR + "/actors.csv", index=False)
all_actors_df

In [None]:
all_osn_msgs_df.to_csv(OUTPUT_DIR + "/all_osn_msgs.csv", index=False)
all_osn_msgs_df

In [None]:
# upload datafiles to s3 bucket
bucket_name = 'mips-main'
folder_name = 'initial_data_collection/processed_data/actors_and_messages/v1'
b = boto3.Session().resource('s3').Bucket(bucket_name)
for file_name in [os.path.basename(f) for f in glob.glob("./OUTPUT/*.csv")]:
    print(f"Uploading file : {OUTPUT_DIR}/{file_name} to {bucket_name}/{folder_name}/{file_name}")
    b.upload_file(f"{OUTPUT_DIR}/{file_name}", f"{folder_name}/{file_name}")

In [None]:
import importlib
importlib.reload(transfer_entropy_functions)

def compute_actor_te_network(actors_of_interest, t3_all_osn_msgs, t6_actors, t7_indv_actors, t8_comm_actors, t9_plat_actors, freq):
    # generate te network for the selected actors
    print(t6_actors)
    results_df = transfer_entropy_functions.generate_te_edge_list(actors_of_interest, t3_all_osn_msgs, t6_actors, t7_indv_actors, t8_comm_actors, t9_plat_actors, freq)
    results_df.to_csv(OUTPUT_DIR + '/actor_te_edges.csv', index=False)
    return results_df

In [None]:
platform_actors_df

In [None]:
comm_actors_df

In [None]:
ind_actors_df

In [None]:
all_osn_msgs_df.iloc[541].to_dict()

In [None]:
all_actors_df = all_actors_df.set_index('actor_id')
ind_actors_df = ind_actors_df.set_index('actor_id')
comm_actors_df = comm_actors_df.set_index('actor_id')

In [None]:
importlib.reload(transfer_entropy_functions)
r = compute_actor_te_network(all_actors_df.index.to_list(), all_osn_msgs_df, all_actors_df, ind_actors_df, comm_actors_df, platform_actors_df,
                             SAMPLE_FREQUENCY)
r

In [None]:
print(f"num of TE <= 0 edges : {r[r['total_te'] <= 0].shape[0]}")
print(f"num of TE > 0 edges : {r[r['total_te'] > 0].shape[0]}")
r[r['total_te'] > 0]

In [None]:
b.upload_file(f"{OUTPUT_DIR}/actor_te_edges.csv", f"{folder_name}/actor_te_edges.csv")

In [None]:
all_actors_df.actor_type.unique()