In [1]:
import pandas as pd
import os
import glob
import numpy as np
import warnings
warnings.simplefilter(action='ignore', category = FutureWarning)
from tqdm import tqdm

def get_path(country, week = 'march'):
    base = f'../../data/03-experiment/{country}/'
    path_tw = base + f'treatment/followers/00-raw/tweets/{week}/'
    rand = f'../../data/02-randomize/{country}/04-stratification/integrate/followers_randomized.parquet'
    baseline = base + 'baseline/00-raw/followers/tweets/'
    agg = base + f'treatment/followers/01-preprocess/'
    agg_base = base + 'baseline/01-preprocess/followers/'
    return path_tw, base, rand, baseline, agg, agg_base


def summ_followers2(df):
    metrics = [col for col in df.columns if 'total_' in col]
    cols = metrics + ['verifiability', 'true']

    df_agg = df[['handle', 'author_id'] + 
        cols].groupby(['handle', 'author_id']).sum().reset_index()

    df_count = df[['handle', 'author_id']].groupby(['author_id']).count()
    df_count.rename({'handle': 'n_posts'}, axis=1, inplace=True)

    df_agg = df_agg.merge(df_count, on=['author_id'], how='left')

    return df_agg

def divide_and_conquer(df_final):
    df_RT = df_final[(df_final['text'].str.contains('RT @', case=True, regex=False)) & 
                 (df_final['total_comments'] == 0)]
    df_RT = df_RT.reset_index(drop=True)

    df_no_rt = df_final[(~df_final['text'].str.contains('RT @', 
                    case=True, regex=False)) | 
                    (df_final['total_comments'] > 0)].reset_index(drop=True)
    
    df_rt_agg = summ_followers2(df_RT).reset_index(drop=True)
    df_no_rt_agg = summ_followers2(df_no_rt).reset_index(drop=True)

    cols = [col for col in df_rt_agg.columns if 'total_' in col] + ['verifiability', 'true', 'n_posts']
    df_rt_agg.rename(columns = 
        {col: col + '_rt' for col in df_rt_agg.columns if col in cols}, 
        inplace=True)

    df_no_rt_agg.rename(columns = 
        {col: col + '_no_rt' for col in df_no_rt_agg.columns if col in cols}, 
        inplace=True)

    df_rt_agg.rename(columns = {'handle': 'username', 
                       'author_id':'follower_id'}, inplace = True)
    df_no_rt_agg.rename(columns = {'handle': 'username', 
                       'author_id':'follower_id'}, inplace = True)
    
    return df_rt_agg, df_no_rt_agg


# Change Country here
country = 'KE'
if country == 'SA':
    N_ARCHS = 25
    N_ARCHS1 = 10
    N_ARCHS2 = 10
else:
    N_ARCHS = 58
    N_ARCHS1 = 21
    N_ARCHS2 = 21

In [2]:
base1 = pd.read_parquet(f'../../data/04-analysis/{country}/baseline_rt.parquet')
base1.rename(columns = {'shares_base': 'total_shares_base', 
                       'reactions_base':'total_reactions_base',
                       'comments_base':'total_comments_base'}, inplace = True)

## Agreggate 1st month: 

dates:
start =  '2023-03-12T00:00:00Z'
end = '2023-04-09T00:00:00Z'

So:

week1-2 (stage1) = '2023-03-13T00:00:00Z' to '2023-03-27T00:00:00Z'
week3-4 (stage2) = '2023-03-27T00:00:00Z' to '2023-04-09T00:00:00Z'

In [3]:
# Read the files
path_tw, base, rand, baseline, agg, agg_base = get_path(country, 'march')

df_final = pd.DataFrame()
for i in range(0, N_ARCHS):
    df = pd.read_parquet(f'{agg}predicted/march_{i}.parquet.gzip')
    df_final = pd.concat([df_final, df])

df_final = df_final.reset_index(drop=True)
df_final.info()

KeyboardInterrupt: 

### Only run for KE
path_tw, base, rand, baseline, agg, agg_base = get_path('KE', 'march')
df_omu = pd.read_parquet(f'{base}treatment/followers/00-raw/tweets/march/omurung2.parquet')
df_omu = df_omu[['id', 'public_metrics.retweet_count']]
df_omu.rename(columns = {'public_metrics.retweet_count': 'total_shares'}, inplace = True)

df_check = df_final[df_final['handle'] == 'omurung2']
df_check = df_check.drop(['total_reactions', 'total_shares'], axis=1)
df_check = df_check.merge(df_omu, on='id', how='left')
df_check['total_reactions'] = (df_check['public_metrics.like_count'] + 
                               df_check['public_metrics.quote_count'] + 
                               df_check['total_shares'])

df_final1 = df_final[df_final['handle'] != 'omurung2']
df_final = pd.concat([df_final1, df_check]).reset_index(drop=True)

In [20]:
df_final['stage'] = np.where((df_final['created_at'] > '2023-03-13') & (df_final['created_at'] < '2023-03-27'), 
                             1, 2)
metrics = [col for col in df_final.columns if 'total_' in col]
cols = metrics + ['verifiability', 'true']

In [21]:
stage1 = df_final[df_final['stage'] == 1]
stage1_agg = summ_followers2(stage1)
stage1_agg.rename(columns = {'handle':'username', 
                             'author_id':'follower_id'}, inplace = True)
df_rt_agg, df_no_rt_agg = divide_and_conquer(stage1)

In [24]:
base1 = base1.merge(df_rt_agg, on=['follower_id', 'username'], 
                  how='left')

base1 = base1.merge(df_no_rt_agg, on=['follower_id', 'username'], 
                  how='left')

base1 = base1.merge(stage1_agg, on=['follower_id', 'username'], 
                  how='left')

In [26]:
cols_base = ([col for col in base1.columns if '_rt' in col] + 
             [col for col in base1.columns if '_no_rt' in col] + 
             cols + ['n_posts'])

for x in cols_base:
    base1[x] = np.where(base1[x].isnull(), 0, base1[x])

In [29]:
base1.to_parquet(f'{agg}aggregated/stage1_rt.parquet', 
                index=False)

In [30]:
base1 = pd.read_parquet(f'../../data/04-analysis/{country}/baseline_rt.parquet')
base1.rename(columns = {'shares_base': 'total_shares_base', 
                       'reactions_base':'total_reactions_base',
                       'comments_base':'total_comments_base'}, inplace = True)

In [31]:
# Stage 2
stage2 = df_final[df_final['stage'] == 2]
stage2_agg = summ_followers2(stage2)
stage2_agg.rename(columns = {'handle':'username', 
                             'author_id':'follower_id'}, inplace = True)
df_rt_agg, df_no_rt_agg = divide_and_conquer(stage2)

In [32]:
base1 = base1.merge(df_rt_agg, on=['follower_id', 'username'], 
                  how='left')

base1 = base1.merge(df_no_rt_agg, on=['follower_id', 'username'], 
                  how='left')

base1 = base1.merge(stage2_agg, on=['follower_id', 'username'], 
                  how='left')

In [33]:
for x in cols_base:
    base1[x] = np.where(base1[x].isnull(), 0, base1[x])

In [35]:
base1.to_parquet(f'{agg}aggregated/stage2_rt.parquet', 
                index=False)

### Week 5-6

In [12]:
base1 = pd.read_parquet(f'../../data/04-analysis/{country}/baseline_rt.parquet')
base1.rename(columns = {'shares_base': 'total_shares_base', 
                       'reactions_base':'total_reactions_base',
                       'comments_base':'total_comments_base'}, inplace = True)

In [13]:
path_tw, base, rand, baseline, agg, agg_base = get_path(country, 'april')

df_final = pd.DataFrame()
for i in range(0, N_ARCHS2):
    df = pd.read_parquet(f'{agg}predicted/april1_good{i}.parquet.gzip')
    df_final = pd.concat([df_final, df])

df_final = df_final.reset_index(drop=True)
df_final.info()

metrics = [col for col in df_final.columns if 'total_' in col]
cols = metrics + ['verifiability', 'true']

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5986328 entries, 0 to 5986327
Data columns (total 16 columns):
 #   Column            Dtype  
---  ------            -----  
 0   author_id         object 
 1   id                object 
 2   conversation_id   object 
 3   created_at        object 
 4   text              object 
 5   lang              object 
 6   total_shares      int32  
 7   reply_count       int32  
 8   like_count        int32  
 9   quote_count       int32  
 10  impression_count  int32  
 11  lang2             object 
 12  total_reactions   int64  
 13  total_comments    int64  
 14  verifiability     float64
 15  true              float64
dtypes: float64(2), int32(5), int64(2), object(7)
memory usage: 616.6+ MB


In [14]:
df_final['stage'] = 3
df_final['handle'] = df_final['author_id']

stage3 = df_final[df_final['stage'] == 3]
stage3_agg = summ_followers2(stage3)
stage3_agg.rename(columns = {'handle':'username', 
                             'author_id':'follower_id'}, inplace = True)
df_rt_agg, df_no_rt_agg = divide_and_conquer(stage3)

stage3_agg = stage3_agg.drop(['username'], axis=1)
df_rt_agg = df_rt_agg.drop(['username'], axis=1)
df_no_rt_agg = df_no_rt_agg.drop(['username'], axis=1)

base1 = base1.merge(df_rt_agg, on=['follower_id'], 
                  how='left')

base1 = base1.merge(df_no_rt_agg, on=['follower_id'], 
                  how='left')

base1 = base1.merge(stage3_agg, on=['follower_id'], 
                  how='left')

cols_base = ([col for col in base1.columns if '_rt' in col] + 
             [col for col in base1.columns if '_no_rt' in col] + 
             cols + ['n_posts'])

for x in cols_base:
    base1[x] = np.where(base1[x].isnull(), 0, base1[x])
    
base1.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 102534 entries, 0 to 102533
Data columns (total 48 columns):
 #   Column                      Non-Null Count   Dtype  
---  ------                      --------------   -----  
 0   username                    102534 non-null  object 
 1   follower_id                 102534 non-null  object 
 2   ads_treatment               102534 non-null  float64
 3   strat_block1                102534 non-null  object 
 4   strat_block2                102534 non-null  object 
 5   c_t_strong_total            102534 non-null  int32  
 6   c_t_weak_total              102534 non-null  int32  
 7   c_t_neither_total           102534 non-null  int32  
 8   t_strong                    102534 non-null  float64
 9   t_weak                      102534 non-null  float64
 10  t_neither                   102534 non-null  float64
 11  total_shares_base           102534 non-null  float64
 12  total_reactions_base        102534 non-null  float64
 13  total_comments

In [15]:
base1.to_parquet(f'{agg}aggregated/stage3_rt.parquet', 
                index=False)

In [17]:
stage3_agg

Unnamed: 0,follower_id,total_shares,total_reactions,total_comments,verifiability,true,n_posts
0,1000016911374127104,1036,1059,2,0.0,0.0,15
1,100003867,85097,85100,1,2.0,0.0,12
2,1000053696665980928,51,143,3,3.0,3.0,9
3,1000058675342458880,91,112,7,2.0,0.0,21
4,1000065262878711808,434,434,0,0.0,0.0,2
...,...,...,...,...,...,...,...
62047,999936965226135552,197,487,37,8.0,4.0,97
62048,999946294771712001,2,135,65,4.0,0.0,128
62049,999948386043351040,14,40,1,1.0,1.0,2
62050,999966479574544385,1,4,0,0.0,0.0,2


### Week 7-8

In [None]:
base1 = pd.read_parquet(f'../../data/04-analysis/{country}/baseline_rt.parquet')
base1.rename(columns = {'shares_base': 'total_shares_base', 
                       'reactions_base':'total_reactions_base',
                       'comments_base':'total_comments_base'}, inplace = True)

In [22]:
# Read the files
path_tw, base, rand, baseline, agg, agg_base = get_path(country, 'april2')

df_final = pd.DataFrame()
for i in range(0, N_ARCHS1):
    df = pd.read_parquet(f'{agg}predicted/april2_{i}.parquet.gzip')
    df_final = pd.concat([df_final, df])

df_final = df_final.reset_index(drop=True)
df_final.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3396471 entries, 0 to 3396470
Data columns (total 15 columns):
 #   Column            Dtype  
---  ------            -----  
 0   author_id         object 
 1   id                object 
 2   conversation_id   object 
 3   created_at        object 
 4   text              object 
 5   total_shares      int32  
 6   reply_count       int32  
 7   like_count        int32  
 8   quote_count       int32  
 9   impression_count  int32  
 10  lang              object 
 11  total_reactions   int64  
 12  total_comments    int64  
 13  verifiability     float64
 14  true              float64
dtypes: float64(2), int32(5), int64(2), object(6)
memory usage: 323.9+ MB


In [27]:
df_final['stage'] = 4
df_final['handle'] = df_final['author_id']
metrics = [col for col in df_final.columns if 'total_' in col]
cols = metrics + ['verifiability', 'true']

stage4 = df_final[df_final['stage'] == 4]
stage4_agg = summ_followers2(stage4)
stage4_agg.rename(columns = {'handle':'username', 
                             'author_id':'follower_id'}, inplace = True)
df_rt_agg, df_no_rt_agg = divide_and_conquer(stage4)

stage4_agg = stage4_agg.drop(['username'], axis=1)
df_rt_agg = df_rt_agg.drop(['username'], axis=1)
df_no_rt_agg = df_no_rt_agg.drop(['username'], axis=1)

base1 = base1.merge(df_rt_agg, on=['follower_id'], 
                  how='left')

base1 = base1.merge(df_no_rt_agg, on=['follower_id'], 
                  how='left')

base1 = base1.merge(stage4_agg, on=['follower_id'], 
                  how='left')

cols_base = ([col for col in base1.columns if '_rt' in col] + 
             [col for col in base1.columns if '_no_rt' in col] + 
             cols + ['n_posts'])

for x in cols_base:
    base1[x] = np.where(base1[x].isnull(), 0, base1[x])
    
base1.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 44201 entries, 0 to 44200
Data columns (total 48 columns):
 #   Column                      Non-Null Count  Dtype  
---  ------                      --------------  -----  
 0   username                    44201 non-null  object 
 1   follower_id                 44201 non-null  object 
 2   ads_treatment               44201 non-null  float64
 3   strat_block1                44201 non-null  object 
 4   strat_block2                44201 non-null  object 
 5   c_t_strong_total            44201 non-null  int32  
 6   c_t_weak_total              44201 non-null  int32  
 7   c_t_neither_total           44201 non-null  int32  
 8   t_strong                    44201 non-null  float64
 9   t_weak                      44201 non-null  float64
 10  t_neither                   44201 non-null  float64
 11  total_shares_base           44201 non-null  float64
 12  total_reactions_base        44201 non-null  float64
 13  total_comments_base         442

In [28]:
base1.to_parquet(f'{agg}aggregated/stage4_rt.parquet', 
                index=False)

### Post Treat 1

In [3]:
base1 = pd.read_parquet(f'../../data/04-analysis/{country}/baseline_rt.parquet')
base1.rename(columns = {'shares_base': 'total_shares_base', 
                       'reactions_base':'total_reactions_base',
                       'comments_base':'total_comments_base'}, inplace = True)

In [4]:
path_tw, base, rand, baseline, agg, agg_base = get_path(country, 'april2')

df_final = pd.DataFrame()
for i in range(0, N_ARCHS2):
    df = pd.read_parquet(f'{agg}predicted/posttreat_{i}.parquet.gzip')
    df_final = pd.concat([df_final, df])

df_final = df_final.reset_index(drop=True)
df_final.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3289281 entries, 0 to 3289280
Data columns (total 16 columns):
 #   Column            Dtype  
---  ------            -----  
 0   author_id         object 
 1   id                object 
 2   conversation_id   object 
 3   created_at        object 
 4   text              object 
 5   lang              object 
 6   total_shares      int32  
 7   reply_count       int32  
 8   like_count        int32  
 9   quote_count       int32  
 10  impression_count  int32  
 11  lang2             object 
 12  total_reactions   int64  
 13  total_comments    int64  
 14  verifiability     float64
 15  true              float64
dtypes: float64(2), int32(5), int64(2), object(7)
memory usage: 338.8+ MB


In [5]:
df_final['stage'] = 5
df_final['handle'] = df_final['author_id']
metrics = [col for col in df_final.columns if 'total_' in col]
cols = metrics + ['verifiability', 'true']

stage5 = df_final[df_final['stage'] == 5]
stage5_agg = summ_followers2(stage5)
stage5_agg.rename(columns = {'handle':'username', 
                             'author_id':'follower_id'}, inplace = True)
df_rt_agg, df_no_rt_agg = divide_and_conquer(stage5)

stage5_agg = stage5_agg.drop(['username'], axis=1)
df_rt_agg = df_rt_agg.drop(['username'], axis=1)
df_no_rt_agg = df_no_rt_agg.drop(['username'], axis=1)

base1 = base1.merge(df_rt_agg, on=['follower_id'], 
                  how='left')

base1 = base1.merge(df_no_rt_agg, on=['follower_id'], 
                  how='left')

base1 = base1.merge(stage5_agg, on=['follower_id'], 
                  how='left')

cols_base = ([col for col in base1.columns if '_rt' in col] + 
             [col for col in base1.columns if '_no_rt' in col] + 
             cols + ['n_posts'])

for x in cols_base:
    base1[x] = np.where(base1[x].isnull(), 0, base1[x])

In [6]:
stage5_agg.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 29367 entries, 0 to 29366
Data columns (total 7 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   follower_id      29367 non-null  object 
 1   total_shares     29367 non-null  int32  
 2   total_reactions  29367 non-null  int64  
 3   total_comments   29367 non-null  int64  
 4   verifiability    29367 non-null  float64
 5   true             29367 non-null  float64
 6   n_posts          29367 non-null  int64  
dtypes: float64(2), int32(1), int64(3), object(1)
memory usage: 1.7+ MB


In [7]:
base1.to_parquet(f'{agg}aggregated/stage5_rt.parquet', 
                index=False)

## Post Treat 2

In [28]:
base1 = pd.read_parquet(f'../../data/04-analysis/{country}/baseline_rt.parquet')
base1.rename(columns = {'shares_base': 'total_shares_base', 
                       'reactions_base':'total_reactions_base',
                       'comments_base':'total_comments_base'}, inplace = True)

In [29]:
# KE
path_tw, rand, baseline, agg, agg_base = get_path(country, 'april2')

df_final = pd.DataFrame()
for i in range(0, N_ARCHS2):
    df = pd.read_parquet(f'{agg}predicted/posttreat2_{i}.parquet.gzip')
    df_final = pd.concat([df_final, df])

df_final = df_final.reset_index(drop=True)


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3121093 entries, 0 to 3121092
Data columns (total 16 columns):
 #   Column            Dtype  
---  ------            -----  
 0   author_id         object 
 1   id                object 
 2   conversation_id   object 
 3   created_at        object 
 4   text              object 
 5   lang              object 
 6   total_shares      int32  
 7   reply_count       int32  
 8   like_count        int32  
 9   quote_count       int32  
 10  impression_count  int32  
 11  lang2             object 
 12  total_reactions   int64  
 13  total_comments    int64  
 14  verifiability     float64
 15  true              float64
dtypes: float64(2), int32(5), int64(2), object(7)
memory usage: 321.5+ MB


In [30]:
df_final['stage'] = 6
df_final['handle'] = df_final['author_id']
metrics = [col for col in df_final.columns if 'total_' in col]
cols = metrics + ['verifiability', 'true']

stage6 = df_final[df_final['stage'] == 6]
stage6_agg = summ_followers2(stage6)
stage6_agg.rename(columns = {'handle':'username', 
                             'author_id':'follower_id'}, inplace = True)
df_rt_agg, df_no_rt_agg = divide_and_conquer(stage6)

stage6_agg = stage6_agg.drop(['username'], axis=1)
df_rt_agg = df_rt_agg.drop(['username'], axis=1)
df_no_rt_agg = df_no_rt_agg.drop(['username'], axis=1)

base1 = base1.merge(df_rt_agg, on=['follower_id'], 
                  how='left')

base1 = base1.merge(df_no_rt_agg, on=['follower_id'], 
                  how='left')

base1 = base1.merge(stage6_agg, on=['follower_id'], 
                  how='left')

cols_base = ([col for col in base1.columns if '_rt' in col] + 
             [col for col in base1.columns if '_no_rt' in col] + 
             cols + ['n_posts'])

for x in cols_base:
    base1[x] = np.where(base1[x].isnull(), 0, base1[x])

In [32]:
base1.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 44201 entries, 0 to 44200
Data columns (total 48 columns):
 #   Column                      Non-Null Count  Dtype  
---  ------                      --------------  -----  
 0   username                    44201 non-null  object 
 1   follower_id                 44201 non-null  object 
 2   ads_treatment               44201 non-null  float64
 3   strat_block1                44201 non-null  object 
 4   strat_block2                44201 non-null  object 
 5   c_t_strong_total            44201 non-null  int32  
 6   c_t_weak_total              44201 non-null  int32  
 7   c_t_neither_total           44201 non-null  int32  
 8   t_strong                    44201 non-null  float64
 9   t_weak                      44201 non-null  float64
 10  t_neither                   44201 non-null  float64
 11  total_shares_base           44201 non-null  float64
 12  total_reactions_base        44201 non-null  float64
 13  total_comments_base         442

In [33]:
base1.to_parquet(f'{agg}aggregated/stage6_rt.parquet', 
                index=False)