# Tweet Turing Test: Detecting Disinformation on Twitter  

|          | Group #2 - Disinformation Detectors                     |
|---------:|---------------------------------------------------------|
| Members  | John Johnson, Katy Matulay, Justin Minnion, Jared Rubin |
| Notebook | `02_preprocess.ipynb`                                   |
| Purpose  | Apply a pre-processing pipeline to merged data.         |

> (TODO - write more explaining notebook)

# 1 - Setup

In [1]:
# imports from Python standard library

# imports requiring installation
#   connection to Google Cloud Storage
from google.cloud import storage            # pip install google-cloud-storage
from google.oauth2 import service_account   # pip install google-auth

#  data science packages
import pandas as pd                         # pip install pandas

In [2]:
# imports from tweet_turing.py
from tweet_turing import get_json_files, load_local_json, get_gcp_storage_client, get_gcp_bucket, \
    list_gcp_objects, get_gcp_object_as_json, get_gcp_object_as_text, set_gcp_object_from_json, \
    is_retweet, has_url, convert_emoji_list, emoji_count, load_local_json_parquet, \
    authentic_df_eda_dtype_mapping, get_gcp_object_from_parq_as_df, set_gcp_object_from_df_as_parq

# imports from tweet_turing_paths.py
from tweet_turing_paths import local_data_paths, local_snapshot_paths, gcp_data_paths, \
    gcp_snapshot_paths, gcp_project_name, gcp_bucket_name, gcp_key_file

## Local or Cloud?

Decide here whether to run notebook with local data or GCP bucket data
 - if the working directory of this notebook has a "../data/" folder with data loaded (e.g. working on local computer or have data files loaded to a cloud VM) then use the "local files" option and comment out the "gcp bucket files" option
 - if this notebook is being run from a GCP VM (preferrably in the `us-central1` location) then use the "gcp bucket files" option and comment out the "local files" option

In [3]:
# option: local files
local_or_cloud: str = "local"   # comment/uncomment this line or next

# option: gcp bucket files
#local_or_cloud: str = "cloud"   # comment/uncomment this line or previous

# don't comment/uncomment for remainder of cell
if (local_or_cloud == "local"):
    data_paths = local_data_paths
    snapshot_paths = local_snapshot_paths
elif (local_or_cloud == "cloud"):
    data_paths = gcp_data_paths
    snapshot_paths = gcp_snapshot_paths
else:
    raise ValueError("Variable 'local_or_cloud' can only take on one of two values, 'local' or 'cloud'.")
    # subsequent cells will not do this final "else" check

In [4]:
# this cell only needs to run its code if local_or_cloud=="cloud"
#   (though it is harmless if run when local_or_cloud=="local")
gcp_storage_client: storage.Client = None
gcp_bucket: storage.Bucket = None

if (local_or_cloud == "cloud"):
    gcp_storage_client = get_gcp_storage_client(project_name=gcp_project_name, key_file=gcp_key_file)
    gcp_bucket = get_gcp_bucket(storage_client=gcp_storage_client, bucket_name=gcp_bucket_name)

# 2 - Troll Tweets (CSV) Pre-processing

## 2.1 Load CSV Snapshot (from prior merge step)

In [5]:
# load the merged troll tweet CSV snapshot file
csv_filename: str = "csv_snapshot.parquet.snappy"
csv_path: str = f"{snapshot_paths['csv_snapshot']}{csv_filename}"
troll_df_raw: pd.DataFrame = pd.DataFrame()

if (local_or_cloud == "local"):
    troll_df_raw = pd.read_parquet(csv_path, engine='pyarrow')
elif (local_or_cloud == "cloud"):
    troll_df_raw = get_gcp_object_from_parq_as_df(bucket=gcp_bucket, object_name=csv_path)

## 2.2 Filter for *"Only English language tweets"*

In [6]:
# filter for english language tweets only
#   - relevant dataframe column is `language`
mask_lang_en: pd.Series = (troll_df_raw['language'] == 'English')

troll_df = troll_df_raw[mask_lang_en]

In [7]:
# debug
troll_df.columns.tolist()

['external_author_id',
 'author',
 'content',
 'region',
 'language',
 'publish_date',
 'harvested_date',
 'following',
 'followers',
 'updates',
 'post_type',
 'account_type',
 'retweet',
 'account_category',
 'new_june_2018',
 'alt_external_id',
 'tweet_id',
 'article_url',
 'tco1_step1',
 'tco2_step1',
 'tco3_step1']

## 2.3 Extract columns of interest

In [8]:
# extract only the columns we will use for later steps
cols_to_keep = [
    'external_author_id',
    'author',
    'content',
    'region',
    'language',
    'publish_date',
    'following',
    'followers',
    'updates',
    'post_type',
    'retweet',
    'account_category',
    'tweet_id',
    'tco1_step1'
    ]

troll_df = troll_df[cols_to_keep]

print("Troll Dataframe Shape (rows, cols):", troll_df.shape)

Troll Dataframe Shape (rows, cols): (2116867, 14)


## 2.4 Derive new feature: `data_source`

This feature is setup as a constant value __"Troll"__ for this subset of the dataset to indicate that the data originates from the troll tweets CSV snapshot file. The tweets obtained from Twitter API (in JSON files) have the same feature added by the `01_merge` notebook, but their values are either __"verified_user"__ or __"verified_random"__.

In [9]:
# creating column using a pd.Series constructor so we can specify dtype
troll_df['data_source'] = pd.Series('Troll', index=troll_df.index, dtype='string')

## 2.5 Align column names

In [10]:
# setup rename mapping
#   key = old column name; value = new column name
col_name_mapping = {
    "retweet": "is_retweet",
}

troll_df.rename(columns=col_name_mapping, inplace=True)

# 3 - Authentic Tweets (JSON) Pre-processing

## 3.1 Load JSON Snapshot (from prior merge step)

In [11]:
# load the merged authentic tweet snapshot file
json_filename: str = "json_snapshot.parquet.snappy"
json_path: str = f"{snapshot_paths['json_snapshot']}{json_filename}"

json_df: pd.DataFrame = None

if (local_or_cloud == "local"):
    json_df = load_local_json_parquet(json_path)
elif (local_or_cloud == "cloud"):
    pass
    # "this kills the cloud VM"
    #   - crashes when loading ~3.6GB (uncompressed) from parquet file to pandas dataframe
    #   - commenting out until workaround determined
    #json_df = get_gcp_object_from_parq_as_df(bucket=gcp_bucket, object_name=json_path)

## 3.2 Extract columns of interest (1 of 2)

We'll first extract columns into a copy of the `authentic_df_raw` dataframe. This copy will fork off into a stand-alone snapshot file for use with EDA of the authentic tweets. Before forking off, though, we'll add in our derived features.

Later we'll modify the `authentic_df_raw` dataframe to keep only the columns we intend to merge with the `troll_df` dataframe.

In [12]:
# fork off with a larger subset of columns for later use in authentic-tweet-specific EDA
cols_keep_EDA = [
    'author_id',
    'created_at',
    'id',
    'text',
    'lang',
    'referenced_tweets',
    'public_metrics.retweet_count', 
    'public_metrics.reply_count', 
    'public_metrics.like_count', 
    'public_metrics.quote_count',
    'author.location', 
    'author.name', 
    'author.username', 
    'author.public_metrics.followers_count',
    'author.public_metrics.following_count', 
    'author.entities.url.urls', 
    'author.created_at',
    'author.verified', 
    'context_annotations', 
    'entities.annotations', 
    'entities.mentions',
    'entities.hashtags', 
    'entities.urls',
    'data_source'
    ]

# setup a new dataframe with subset of columns
authentic_df_eda = json_df[cols_keep_EDA].copy()

## 3.3 Set data types

This helps our pandas dataframes run more smoothly and predictably, and sets up for an accurate type mapping when we save to a parquet file.

In [13]:
authentic_df_eda = authentic_df_eda.astype(authentic_df_eda_dtype_mapping)

## 3.4 Derive new features

### 3.4.1 Derive new feature: `is_retweet`

A tweet is flagged as a retweet if field `referenced_tweets` meets all the following criteria:
 - is not `NaN`
 - contains a list with at least one element
 - the first (index=0) element is a dict where key `type` has value "`retweeted`" or has value "`quoted`"

In [26]:
# make a mask of non-NaN `referenced_tweets` rows
notna_mask = authentic_df_eda['referenced_tweets'].notna()

# mask off for non-NaN and apply `is_retweet_alt`, outputting 1 or 0 to masked rows
new_column = authentic_df_eda.loc[notna_mask].apply(is_retweet, axis='columns')
authentic_df_eda.loc[notna_mask, 'is_retweet'] = new_column.astype('uint8')

# fill in NaN values for any rows filtered out of prior step
authentic_df_eda.loc[~notna_mask, 'is_retweet'] = int(0)

### 3.4.2 Derive new feature: `updates`

This feature is derived by adding together four public metrics for a given tweet. This matches up with the feature definition from the troll dataset.

In [29]:
update_cols = [
    'public_metrics.retweet_count',
    'public_metrics.reply_count', 
    'public_metrics.like_count',
    'public_metrics.quote_count'
    ]

authentic_df_eda['updates'] = authentic_df_eda[update_cols].sum(axis='columns').astype('uint64')

### 3.4.3 Derive new feature: `account_category`

In [30]:
account_category_mapping = {
    True: "Verified_User",
    False: "Unknown",
    }

authentic_df_eda['account_category'] = authentic_df_eda['author.verified'].apply(lambda b: account_category_mapping[b]).astype('string')

### 3.4.4 Derive new feature: `tco1_step1`

In [31]:
# create mask for tweets where `entities.urls` has a non-null value
has_url_mask = (authentic_df_eda['entities.urls'].notnull())

# use this mask to feed into a lambda function pipeline:
#   first lambda function grabs the first (index=0) element of the `entities.urls` list
#   second lambda function grabs the `unwound_url` value (preferrably) or the `expanded_url` value as a backup (if `unwound_url` is None)
authentic_df_eda.loc[has_url_mask, 'tco1_step1'] = authentic_df_eda.loc[has_url_mask, 'entities.urls'] \
    .apply(lambda entities_urls_list: entities_urls_list[0]) \
    .apply(lambda url_element: 
        url_element['expanded_url'] if (url_element['unwound_url'] is None) 
        else url_element['unwound_url']
        ).astype('string')

### 3.4.5 Derive new feature: `post_type`

A variation of `is_retweet` but with an additional dimension: differentiates retweet tweets between plain retweets (reposts original tweet with no comment added) and quote tweets (reposts original tweet with comment added). This mimics a feature by the same name from the troll dataset.

In [46]:
# function to be moved to `tweet_turing.py` after testing
def get_post_type(tweet_series: pd.Series) -> str:
    """Examines a tweet series, returns whether it is a generic tweet, a retweet, or a quote tweet"""
    ref_twt = tweet_series['referenced_tweets']

    if (ref_twt is None):
        return None
    else:
        return ref_twt[0]['type']

In [47]:
authentic_df_eda['post_type'] = authentic_df_eda.apply(get_post_type, axis='columns').astype('string')

In [49]:
authentic_df_eda['post_type'].value_counts(dropna=False)

<NA>          819915
replied_to    334749
retweeted     315175
quoted         38189
Name: post_type, dtype: Int64

## 3.5 Align column names 

In [32]:
# setup rename mapping
#   key = old column name; value = new column name
col_name_mapping = {
    "author_id": "external_author_id", 
    "created_at": "publish_date", 
    "text": "content",
    "lang": "language", 
    "author.location": "region", 
    "author.username": "author",
    "author.name": "full_name",
    "author.public_metrics.followers_count": "followers",
    "author.public_metrics.following_count": "following",
    "id": "tweet_id",
    }

authentic_df_eda.rename(columns=col_name_mapping, inplace=True)

## 3.6 Save EDA Snapshot

As a follow-up to section 3.2 above, save a snapshot of the dataset intended for authentic-specific EDA. This dataset will have additional columns beyond the troll dataset.

In [33]:
# note this cell requires package `pyarrow` to be installed in environment
# save `authentic_df_eda` snapshot
parq_filename: str = "authentic_df_eda.parquet.snappy"
parq_path: str = f"{snapshot_paths['parq_snapshot']}{parq_filename}"

if (local_or_cloud == "local"):
    authentic_df_eda.to_parquet(parq_path, engine='pyarrow', index=False)
elif (local_or_cloud == "cloud"):
    pass

## 3.7 Drop columns not needed for merge

In [34]:
cols_to_drop = [
    'public_metrics.retweet_count',
    'public_metrics.reply_count', 
    'public_metrics.like_count',
    'public_metrics.quote_count',
    'author.entities.url.urls', 
    'author.created_at', 
    'author.verified',
    'context_annotations', 
    'entities.annotations', 
    'entities.mentions',
    'entities.hashtags',
    'full_name',
    'referenced_tweets',
    'entities.urls'
    ]

authentic_df = authentic_df_eda.drop(columns=cols_to_drop)

# 4 - Merge (Partially) Pre-processed Tweets

At this stage, the two separate datasets can be merged. Additional pre-processing will still be performed but can be applied to the entire dataset.

## 4.1 Merge

In [35]:
merged_df = pd.concat([troll_df, authentic_df], axis='index', ignore_index=True)

print("Merged dataframe shape:", merged_df.shape)

Merged dataframe shape: (3624895, 15)


## 4.2 Save Snapshot

In [36]:
# note this cell requires package `pyarrow` to be installed in environment
# save `merged_df` snapshot
parq_filename: str = "merged_df.parquet.snappy"
parq_path: str = f"{snapshot_paths['json_snapshot']}{parq_filename}"

if (local_or_cloud == "local"):
    merged_df.to_parquet(parq_path, engine='pyarrow', index=False)
elif (local_or_cloud == "cloud"):
    pass

# 5 - Merged DF Pre-Processing

Below are preprocessing steps intended for the full, merged dataframe.

## 5.1 (Optional) Load Snapshot of `merged_df`

Optional cell to load snapshot (parquet file) saved during prior step (4.3).

In [37]:
# note this cell requires package `pyarrow` to be installed in environment
parq_filename: str = "merged_df.parquet.snappy"
parq_path: str = f"{snapshot_paths['json_snapshot']}{parq_filename}"

if (local_or_cloud == "local"):
    merged_df = pd.read_parquet(parq_path, engine='pyarrow')
elif (local_or_cloud == "cloud"):
    pass

## 5.2 Remove tweets with null `content` field

In [38]:
merged_df.dropna(subset=['content'], inplace=True)

## 5.3 Derive new feature: `has_url`

In [39]:
new_column = merged_df.apply(has_url, axis='columns')
merged_df.loc[:, 'has_url'] = new_column

## 5.4 Emojis

### 5.4.1 Derive new feature: `emoji_text`

In [40]:
# apply convert_emoji_list
new_column = merged_df.apply(convert_emoji_list, axis='columns')
merged_df.loc[:, 'emoji_text'] = new_column

### 5.4.2 Derive new feature: `emoji_count`

In [41]:
# count emoji list
new_column = merged_df.apply(emoji_count, axis='columns')
merged_df.loc[:, 'emoji_count'] = new_column

## 5.5 Convert `publish_date` to `datetime` format

In [42]:
# dates from Twitter API, using ISO 8601 format
#   pandas docs report good performance using "infer_datetime_format=True" when format is ISO 8601 already
verified_mask = merged_df['data_source'].isin(['verified_random', 'verified_user'])
merged_df.loc[verified_mask, 'publish_date_parsed'] = pd.to_datetime(merged_df.loc[verified_mask, 'publish_date'], infer_datetime_format=True, utc=True)

# dates from Troll dataset, using a defined format
#   unclear if times are in UTC or their local time zone
troll_mask = (merged_df['data_source'] == 'Troll')
troll_format = r"%m/%d/%Y %H:%M"  # example: 10/1/2017 22:43
merged_df.loc[troll_mask, 'publish_date_parsed'] = pd.to_datetime(merged_df.loc[troll_mask, 'publish_date'], format=troll_format, utc=True)

# rename new column, drop old column
merged_df.drop(columns=['publish_date'], inplace=True)
merged_df.rename(columns={'publish_date_parsed': 'publish_date'}, inplace=True)

In [43]:
merged_df.isnull().sum()

external_author_id          0
author                      0
content                     0
region                 180192
language                    0
following                   0
followers                   0
updates                     0
post_type             2772358
is_retweet                  0
account_category            0
tweet_id                    0
tco1_step1            1427061
data_source                 0
has_url                     0
emoji_text                  0
emoji_count                 0
publish_date                0
dtype: int64

# 6 - Save Preprocessed Data

In [37]:
# note this cell requires package `pyarrow` to be installed in environment
# save `merged_df` snapshot
parq_filename: str = "merged_df_preprocessed.parquet.snappy"
parq_path: str = f"{snapshot_paths['json_snapshot']}{parq_filename}"

if (local_or_cloud == "local"):
    merged_df.to_parquet(parq_path, engine='pyarrow', index=False)
elif (local_or_cloud == "cloud"):
    pass