(https://keras.io/examples/nlp/text_extraction_with_bert/) 

In [None]:
import matplotlib.pyplot as plt
import os
import re
import string
import numpy as np
import pandas as pd
import tensorflow as tf
import transformers
from tensorflow import keras
from tensorflow.keras import layers
from transformers import BertTokenizer, TFBertModel, BertConfig, BertTokenizerFast 
print(tf.__version__)

from learning_checks import hist_graph
from learning_checks import text_check
from learning_checks import check_metrics

In [None]:
labels = [
"Reductio ad hitlerum",
"Whataboutism",
"Presenting Irrelevant Data (Red Herring)",
"Doubt",
"Slogans",
"Appeal to fear/prejudice",
"Obfuscation, Intentional vagueness, Confusion",
"Misrepresentation of Someone's Position (Straw Man)",
"Glittering generalities (Virtue)",
"Appeal to authority",
"Repetition",
"Bandwagon",
"Causal Oversimplification",
"Name calling/Labeling",
"Thought-terminating cliché",
"Flag-waving",
"Exaggeration/Minimisation",
"Smears",
"Loaded Language",
"Black-and-white Fallacy/Dictatorship"
]

In [None]:
data_pd = pd.read_csv("data.csv").sample(20)
data_pd = data_pd.reset_index().drop(columns=["index"])

In [None]:
from googletrans import Translator

data_pd

In [None]:
def make_list(x):
    x = x[1:-1].split(", ")
    x = list(map(int, x))
    return x

data_pd["prop_mask"] = data_pd["prop_mask"].apply(lambda x: make_list(x))

In [None]:
from transformers import AutoTokenizer, AutoModel

tokenizer = AutoTokenizer.from_pretrained("SpanBERT/spanbert-base-cased")

In [None]:
def preprocessing(data, max_len=250):
    col_input_ids = []
    col_attention_mask = []
    col_token_type_ids = []
    col_token_prop_mask = []
    
    for i in range(data.shape[0]):
        token_text = tokenizer.encode_plus(
            data["text"][i], 
            return_offsets_mapping=True,
            max_length=max_len,
            truncation=True
        )
        
        token_count = len(token_text.input_ids)
        token_prop_mask = [0] * max_len
        for j, (ind_s, ind_e) in enumerate(token_text.offset_mapping):
            if sum(data["prop_mask"][i][ind_s:ind_e]) > 0:
                #print(data["text"][i][ind_s:ind_e], 109)
                token_prop_mask[j] = 1
            
        col_token_prop_mask.append(token_prop_mask) 

        token_technique = tokenizer.encode_plus(
            data["technique"][i], 
            return_offsets_mapping=True, 
            max_length=max_len, 
            truncation=True
        )

        input_ids = token_text.input_ids + token_technique.input_ids[1:]
        token_type_ids = [0] * token_count + [1] * len(token_technique.input_ids[1:])
        len_input_ids = len(input_ids)
        attention_mask = [1] * len_input_ids

        assert max_len > len_input_ids, f"max_len {max_len} <= len_input_ids {len_input_ids}"
        
        padding = [0] * (max_len - len_input_ids)
        col_input_ids.append(input_ids + padding)
        col_attention_mask.append(attention_mask + padding)
        col_token_type_ids.append(token_type_ids + padding)

        #if sum(token_prop_mask) > 0:
        #    print(data["technique"][i])
        #    print(data["text"][i])
        #    inp_ids = np.array(input_ids + padding)
        #    mask_ids = np.array(token_prop_mask)
        #    np_text = np.array(list(data["text"][i]))
        #    np_prop = np.array(data["prop_mask"][i])
        #    my_tok = inp_ids[mask_ids == 1] 
        #    prop_mask = data["prop_mask"][i]
        #    print("|", "".join(np_text[np_prop == 1]), "|")
        #    print(tokenizer.decode(my_tok))
        #    return%
        
    return col_input_ids, col_attention_mask, col_token_type_ids, col_token_prop_mask

In [None]:
col_input_ids, col_attention_mask, col_token_type_ids, col_token_prop_mask = preprocessing(data_pd)
data_pd["col_input_ids"] = col_input_ids
data_pd["col_attention_mask"] = col_attention_mask
data_pd["col_token_type_ids"] = col_token_type_ids
data_pd["col_token_prop_mask"] = col_token_prop_mask

In [None]:
data_pd

In [None]:
FRACTION = 0.1
data_num = int(data_pd.shape[0] * FRACTION)

data_pd = data_pd.sample(frac=1).reset_index(drop=True)
sep_dev_pd = data_pd.iloc[:data_num,:]
sep_train_pd = data_pd.iloc[data_num:,:].reset_index(drop=True)

In [None]:
sep_dev_pd

In [None]:
import torch.nn as nn

class Model(transformers.BertPreTrainedModel):
    def __init__(self, config, PATH):
        super(Model, self).__init__(config)
        self.bert = transformers.BertModel.from_pretrained(PATH)
        self.linear = nn.Linear(1024, 1)
        self.flatten = nn.Flatten()
        self.sigm = nn.Sigmoid()
    
    def forward(self, ids, mask, token_type_ids):
        embedding = self.bert(
            ids,
            attention_mask=mask,
            token_type_ids=token_type_ids
        )[0]
        logits = self.linear(embedding)
        logits = self.flatten(logits)
        result = self.sigm(logits)
        return result

In [None]:
PATH = 'tune_model/rubert_cased_L-12_H-768_A-12_v2.tar.gz'

bert = transformers.BertModel.from_pretrained(PATH)

In [None]:
#PATH = 'tune_model/squad2'

#bert = transformers.BertModel.from_pretrained(PATH)
#my_model = Model(bert.config, PATH)

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

optimizer = optim.SGD(my_model.parameters(), lr=1e-2)
criterion = nn.BCELoss()

In [None]:
import torch

if torch.cuda.is_available():  
  dev = "cuda:0" 
else:  
  dev = "cpu" 

dev

In [None]:
my_model.to(dev)

In [None]:
sep_dev_pd

In [None]:
i = 0
text_check(my_model, i, 0.5, sep_dev_pd, tokenizer)

In [None]:
from tqdm.notebook import trange, tqdm

EPOCHS = 4
batch_size = 5
dev_batch_size = 10
N = len(sep_train_pd)
M = len(sep_dev_pd)
data = sep_train_pd.values

precision = []
recall = []
accuracy = []
F = []

for epoch in range(EPOCHS):
    
    my_model.train()
    train_loss = 0
    for i in tqdm(range(0, N, batch_size), leave=False):
        start = i
        end = i + batch_size if i + batch_size < N else N
        ids = torch.tensor(list(sep_train_pd["col_input_ids"][start:end])).to(dev)
        attention_mask = torch.tensor(list(sep_train_pd["col_attention_mask"][start:end])).to(dev)
        type_ids = torch.tensor(list(sep_train_pd["col_token_type_ids"][start:end])).to(dev)
        target = torch.tensor(list(sep_train_pd["col_token_prop_mask"][start:end]), dtype=torch.float).to(dev)
        
        optimizer.zero_grad()
        output = my_model(ids, attention_mask, type_ids)
        output = torch.squeeze(output, dim=1)
        loss = criterion(output, target * 0.99)
        train_loss += loss
        loss.backward()
        optimizer.step()
        
    dev_loss = 0
    my_model.eval()
    with torch.no_grad():
        for i in range(0, M, dev_batch_size):
            start = i
            end = i + dev_batch_size if i + dev_batch_size < M else M
            ids = torch.tensor(list(sep_dev_pd["col_input_ids"][start:end])).to(dev)
            attention_mask = torch.tensor(list(sep_dev_pd["col_attention_mask"][start:end])).to(dev)
            type_ids = torch.tensor(list(sep_dev_pd["col_token_type_ids"][start:end])).to(dev)
            target = torch.tensor(list(sep_dev_pd["col_token_prop_mask"][start:end]), dtype=torch.float).to(dev)
            
            output = my_model(ids, attention_mask, type_ids)
            output = torch.squeeze(output, dim=1)
            loss = criterion(output, target * 0.99)
            dev_loss += loss
        print(f"\nepoch {epoch}: \ntrain_loss = {train_loss}, \ndev_loss = {dev_loss}")
    check_metrics(my_model, sep_dev_pd, 0.5)

In [None]:
check_metrics(my_model, sep_dev_pd)

In [None]:
torch.save(my_model.state_dict(), "models/model.pth")