In [3]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="0" # if using multiple gpu

In [4]:
import numpy as np
import pandas as pd
from collections import deque

import torch
from sklearn.model_selection import train_test_split
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader, SequentialSampler
from transformers import BertModel, BertForPreTraining, AutoTokenizer
import json
from tqdm import tqdm, trange
import emoji

random_seed = 0
torch.manual_seed(random_seed)


<torch._C.Generator at 0x7f01e39a3050>

# Input

In [1]:
TASK = 2
# Task description can be found here https://multimediaeval.github.io/editions/2021/tasks/fakenews/
# Models have to be downloaded here (https://mediaeval-fakenews.tools.eurecom.fr/) and put in a folder ../models/

In [2]:
your_tweets_as_list = [
    "This is outrageous! This politician lied about the vaccine, as it contains 5G and has been made to control population!",
    "My colleague think Covid is a hoax and has been staged, how do I prove him wrong?",
    "The vaccine contains the mark of the beast! It is the devil's work! Also the deep state is holding all together, the new world order is upon us",
    "What about climate change? Ice is melting and temperatures keep getting warmer. I hope we find a durable solution to all this"
    ]

# Some utils fonctions

In [5]:
def extract_emojis (tw):
    # Returns emojis in a list for a given tweet
    # Using Deque for a sliding window (emojis can be combined together to form other emojis)
    
    emojis = []
    
    l = []
    max_l = 7
    
    if len(tw)>=max_l:

        for i in range(0, max_l):
            l.append(tw[-1-i])
        l = deque(l, maxlen=max_l)
        skip=0

        for i in range (0, len(tw)):
            if skip == 0:
                for j in range (max_l-1, -1, -1):
                    str_to_test = ''
                    for k in range (0, j+1):
                        str_to_test+=l[j-k]
                    if str_to_test in emoji.UNICODE_EMOJI:

                        emojis.append(str_to_test)
                        skip=j
                        break
                try:
                    l.append(tw[-1-i-max_l])
                except IndexError:
                    l.append('')
            else:
                skip=skip-1
                try:
                    l.append(tw[-1-i-max_l])
                except IndexError:
                    l.append('')
        emojis.reverse()
    else:
        emojis = []
    return emojis

def remove_hashtags(tweets):
    # Remove the # char
    
    tweets = [tw.replace('#', '') for tw in tweets]
    return tweets

def replace_emojis(tweets):
    # Replace emojis with their description
    
    tweets_no_emojis = []
    for tw in tweets:
        emojis = extract_emojis(tw)
        for e in emojis:
            e_text = emoji.UNICODE_EMOJI[e].replace('_',' ').replace(':', '')
            tw = tw.replace(e, e_text)
        tweets_no_emojis.append(tw)

    return tweets_no_emojis
    
        

In [6]:
remove_hashtags_flag = True
replace_emojis_flag = True
bw_flag = False

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
n_gpu = torch.cuda.device_count()
torch.cuda.get_device_name(0)

'Tesla K80'

# Models

In [7]:
weights_task1 = torch.tensor([0.1, 0.1, 0.1]).to(device)
class CovidTwitterBertClassifier_task1(nn.Module):
    
    def __init__(self, n_classes):
        super().__init__()
        self.n_classes = n_classes
        self.bert = BertForPreTraining.from_pretrained('digitalepidemiologylab/covid-twitter-bert-v2')    
        self.bert.cls.seq_relationship = nn.Linear(1024, n_classes)
        
        if n_classes >1:
            self.criterion = nn.CrossEntropyLoss(weight=weights_task1)
        else:
            self.criterion = nn.MSELoss()
        
    def forward(self, input_ids, token_type_ids, input_mask, labels):
        outputs = self.bert(input_ids = input_ids, token_type_ids = token_type_ids, attention_mask = input_mask)
        
        logits = outputs[1]
        
        loss = self.criterion(logits, labels)
        return loss, logits

weights_task2 = torch.tensor([0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]).to(device)
class CovidTwitterBertClassifier_task2(nn.Module):
    
    def __init__(self, n_classes):
        super().__init__()
        self.n_classes = n_classes
        self.bert = BertForPreTraining.from_pretrained('digitalepidemiologylab/covid-twitter-bert-v2')    
        self.sigmoid = nn.Sigmoid()
        self.bert.cls.seq_relationship = nn.Linear(1024, n_classes)
        
        if n_classes >1:
            self.criterion = nn.BCELoss(reduction='none')
        else:
            self.criterion = nn.MSELoss()
        
    def forward(self, input_ids, token_type_ids, input_mask, labels):
        outputs = self.bert(input_ids = input_ids, token_type_ids = token_type_ids, attention_mask = input_mask)
        logits = outputs[1]
        logits = self.sigmoid(logits)
        
        
        loss = self.criterion(logits, labels)
        
        loss = (loss * weights_task2).mean()
        
        return loss, logits
    
weights_intra_conspiracy = torch.tensor([[0.1, 0.1, 0.1],
                                         [0.1, 0.1, 0.1],
                                         [0.1, 0.1, 0.1],
                                         [0.1, 0.1, 0.1],
                                         [0.1, 0.1, 0.1],
                                         [0.1, 0.1, 0.1],
                                         [0.1, 0.1, 0.1],
                                         [0.1, 0.1, 0.1],
                                         [0.1, 0.1, 0.1]]).to(device)

class CovidTwitterBertClassifier_task3(nn.Module):
    
    def __init__(self, n_classes):
        super().__init__()
        self.n_classes = n_classes
        self.bert = BertForPreTraining.from_pretrained('digitalepidemiologylab/covid-twitter-bert-v2')    
        self.bert.cls.seq_relationship = nn.Linear(1024, n_classes)

        self.criterions = []
        for i in range(0, 9):
            self.criterions.append(nn.CrossEntropyLoss(weight = weights_intra_conspiracy[i]))

        self.criterion = nn.CrossEntropyLoss()
            
        
    def forward(self, input_ids, token_type_ids, input_mask, labels):
        outputs = self.bert(input_ids = input_ids, token_type_ids = token_type_ids, attention_mask = input_mask)

        logits = outputs[1]
        
            
        losses = [0,0,0,0,0,0,0,0,0]
        
        loss = 0
        
        for i in range(0, 9):
            logits_i = logits[:,3*i:3*i+3]
            label_i = labels[:, i].long()
            losses[i] = self.criterions[i](logits_i, label_i)
            loss +=self.criterions[i](logits_i, label_i)
        loss = loss/9
        
        return loss, logits
    

In [8]:
if TASK==1:
    model = CovidTwitterBertClassifier_task1(3)
    model.to(device)
    model.load_state_dict(torch.load('../models/task1.pth'))
    model.eval()
elif TASK==2:
    model = CovidTwitterBertClassifier_task2(9)
    model.to(device)
    model.load_state_dict(torch.load('../models/task2.pth'))
    model.eval()
elif TASK==3:
    model = CovidTwitterBertClassifier_task3(9*3)
    model.to(device)
    model.load_state_dict(torch.load('../models/task3.pth'))
    model.eval()
else:
    raise Exception("Task must be 1, 2 or 3") 

In [9]:
text = your_tweets_as_list
if remove_hashtags_flag:
    text = remove_hashtags(text)

if replace_emojis_flag:
    text = replace_emojis(text)



In [10]:
tokenizer = AutoTokenizer.from_pretrained('digitalepidemiologylab/covid-twitter-bert')

MAX_LEN = 128

tokenized_input = tokenizer(text, max_length=MAX_LEN, padding='max_length', truncation=True)

input_ids, token_type_ids, attention_mask = tokenized_input['input_ids'], tokenized_input['token_type_ids'], tokenized_input['attention_mask']

# labels are not important for inference
if TASK==1:
    labels = [1 for i in range(0, len(text))]
elif TASK==2:
    labels = [[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0] for i in range(0, len(text))]
elif TASK==3:
    labels = [[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0] for i in range(0, len(text))]


input_ids = (torch.tensor(input_ids))
token_type_ids = (torch.tensor(token_type_ids))
attention_mask = (torch.tensor(attention_mask))
labels = (torch.tensor(labels))


# Inference

In [12]:
batch_size = 64

dataset = TensorDataset(input_ids, token_type_ids, attention_mask, labels)
sampler = SequentialSampler(dataset)
dataloader = DataLoader(dataset, sampler=sampler, batch_size=64)

output = []

for b in tqdm(dataloader):

    batch = b
    batch = tuple(t.to(device) for t in batch)

    b_input_ids, b_token_type_ids, b_attention_mask, b_labels = batch

    with torch.no_grad():
        
        logits = model(b_input_ids, b_token_type_ids, b_attention_mask, b_labels)[1]
        if TASK==1:
            output += logits.argmax(dim=1).cpu().numpy().tolist()
        elif TASK==2:
            output += (logits>0.5).int().squeeze().cpu().numpy().tolist()
        elif TASK==3:
            for p in logits:
                output+=[[p[3*i: 3*i+3].argmax().item() for i in range(0,9)]]


100%|█████████████████████████████████████████████████| 1/1 [00:00<00:00,  3.59it/s]


In [13]:
conspiracies = ['Suppressed Cures',
     'Behaviour and Mind Control',
     'Antivax',
     'Fake virus',
     'Intentional Pandemic',
     'Harmful Radiation/ Influence',
     'Population reduction',
     'New World Order',
     'Satanism']

conspiracy_levels = ["No Conspiracy", "Discussing", "Supporting"]

def pretty_print_task1(output):
    for i in range(0, len(text)):
        
        t = your_tweets_as_list[i]
        output_i = output[i]
        
        print(t)
        
        result = conspiracy_levels[output_i]
        if output_i != 0:
            result+=' any conspiracy theory'
        print("---> " +result)
        print('-----'*15)

def pretty_print_task2(output):
    for i in range(0, len(text)):
        
        t = your_tweets_as_list[i]
        output_i = output[i]
        
        print(t)
        
        result = ''
        if sum(output_i) == 0:
            result = 'No conspiracy'
        else:
            result+= 'Discussing or Supporting '
            for i in range(0, 9):
                if output_i[i]:
                    result+=conspiracies[i]
                    result+=', '
                    
            if result:
                result = result[:-2]
             
        print("---> " +result)
        print('-----'*15)

        
def pretty_print_task3(output):
    
    for i in range(0, len(text)):
        
        t = text[i]
        output_i = output[i]
        
        print(t)
        
        result = ''
        if sum(output_i) == 0:
            result = 'No conspiracy'
        else:
            for i in range(0, 9):
                if output_i[i]:
                    result+=', '
                    result+=conspiracy_levels[output_i[i]]
                    result+=' '
                    result+=conspiracies[i]
            if result:
                result = result[2:]
             
        print("---> " +result)
        print('-----'*15)



# Print

In [14]:
if TASK ==1:
    pretty_print_task1(output)
elif TASK==2:
    pretty_print_task2(output)
elif TASK==3:
    pretty_print_task3(output)

This is outrageous! This politician lied about the vaccine, as it contains 5G and has been made to control population!
---> Discussing or Supporting Behaviour and Mind Control, Antivax, Harmful Radiation/ Influence, Population reduction
---------------------------------------------------------------------------
My colleague think Covid is a hoax and has been staged, how do I prove him wrong?
---> Discussing or Supporting Fake virus
---------------------------------------------------------------------------
The vaccine contains the mark of the beast! It is the devil's work! Also the deep state is holding all together, the new world order is upon us
---> Discussing or Supporting New World Order, Satanism
---------------------------------------------------------------------------
What about climate change? Ice is melting and temperatures keep getting warmer. I hope we find a durable solution to all this
---> No conspiracy
-------------------------------------------------------------------

# Save as csv

In [15]:
df = pd.DataFrame()

df['text'] = your_tweets_as_list
df['label'] = output
#df.to_csv('./path/to/save.csv', index=False)
df


Unnamed: 0,text,label
0,This is outrageous! This politician lied about...,"[0, 1, 1, 0, 0, 1, 1, 0, 0]"
1,My colleague think Covid is a hoax and has bee...,"[0, 0, 0, 1, 0, 0, 0, 0, 0]"
2,The vaccine contains the mark of the beast! It...,"[0, 0, 0, 0, 0, 0, 0, 1, 1]"
3,What about climate change? Ice is melting and ...,"[0, 0, 0, 0, 0, 0, 0, 0, 0]"
