# Prepare material

In [1]:
import torch
import torch.nn as nn
import numpy as np
from tqdm.auto import tqdm
from transformers import AutoModelForTokenClassification, AutoTokenizer
import torch
from torch.utils.data import Dataset, DataLoader

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
MAX_LEN = 256
TEST_PATH = 'data/span_detection_datasets_split_word_IOB/test.jsonl'

In [3]:
from torch import cuda
device = 'cuda' if cuda.is_available() else 'cpu'
print(device)

cuda


In [4]:
# function read jsonl file as dataframe
import pandas as pd
import json

def read_jsonl_to_dataframe(file_path):
    data = []

    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            try:
                json_obj = json.loads(line)
                data.append(json_obj)
            except json.JSONDecodeError as e:
                print(f"Skipping invalid JSON: {e}")

    df = pd.DataFrame(data)

    return df

In [29]:
import json

# load tag_to_id
with open('data/tag_to_id_bert.json', 'r') as f:
    tag_to_id = json.load((f))

# load sentiment_to_id
with open('data/sentiment_to_id.json', 'r') as f:
    sentiment_to_id = json.load((f))

# convert tag_to_id to id_to_tag
id_to_tag = {v: k for k, v in tag_to_id.items()}

# convert sentiment_to_id to id_to_sentiment
id_to_sentiment = {v: k for k, v in sentiment_to_id.items()}

In [None]:
# load test data

df_test = read_jsonl_to_dataframe(TEST_PATH)

df_test.text = df_test.text.apply(lambda x: " ".join(x))

# Model Span Detection

In [6]:
env = ".env"

try:
    with open(env, "r") as file:
        AUTH_TOKEN = file.read()
except FileNotFoundError:
    print(f"The file {env} does not exist.")
except Exception as e:
    print(f"An error occurred: {e}")


MRC_PATH = 'nguyenvulebinh/vi-mrc-base'
PRETRAINED_PATH = 'model/span_detection_bert_base'

In [7]:
# load pretrained model
tokenizer = AutoTokenizer.from_pretrained(MRC_PATH)
# load model
model = AutoModelForTokenClassification.from_pretrained(PRETRAINED_PATH,
                                                   num_labels=len(tag_to_id),
                                                   id2label=id_to_tag,
                                                   label2id=tag_to_id)

In [8]:
model.eval()
model.to(device)
print('Model loaded')

Model loaded


In [9]:
def num_params(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

def model_architecture(model):
    # print all layers and number of parameters
    for name, param in model.named_parameters():
        print(f"Layer: {name} | Size: {param.size()} | Values : {param[:2]} \n")

print(f"Number of parameters: {num_params(model):,}")

Number of parameters: 277,469,205


## Define data

In [10]:
from sklearn.metrics import f1_score
from sklearn.metrics import accuracy_score

def accuracy_f1(logits, targets, masks):
    """
    outputs: (batch_size, seq_len, num_labels)
    targets: (batch_size, seq_len)
    masks: (batch_size, seq_len)
    """
    # reshape to (batch_size * seq_len, num_labels)
    logits = logits.view(-1, logits.shape[-1])
    targets = targets.view(-1)

    # ignore padded tokens
    masks = masks.view(-1)
    logits = logits[masks == 1]
    targets = targets[masks == 1]

    # compute accuracy
    preds = torch.argmax(logits, dim=1)
    acc = accuracy_score(targets.cpu().numpy(), preds.cpu().numpy())

    # compute f1 score
    f1 = f1_score(targets.cpu().numpy(), preds.cpu().numpy(), average='macro')

    return acc, f1

def tokenize_and_preserve_labels(sentence, text_labels, tokenizer):
    """
    Word piece tokenization makes it difficult to match word labels
    back up with individual word pieces. This function tokenizes each
    word one at a time so that it is easier to preserve the correct
    label for each subword. It is, of course, a bit slower in processing
    time, but it will help our model achieve higher accuracy.
    """

    tokenized_sentence = []
    labels = []

    sentence = sentence.strip()

    for word, label in zip(sentence.split(), text_labels):

        # Tokenize the word and count # of subwords the word is broken into
        tokenized_word = tokenizer.tokenize(word)
        n_subwords = len(tokenized_word)

        # Add the tokenized word to the final tokenized word list
        tokenized_sentence.extend(tokenized_word)

        # Add the same label to the new list of labels `n_subwords` times
        labels.extend([label] * n_subwords)

    return tokenized_sentence, labels

In [11]:
# define special tokens
pad_token = tokenizer.pad_token
sep_token = tokenizer.sep_token
cls_token = tokenizer.cls_token

In [12]:
class SpanDetectionDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_len=MAX_LEN):
        self.len = len(dataframe)
        self.data = dataframe
        self.tokenizer = tokenizer
        self.max_len = max_len
        
    def __getitem__(self, index):
        # step 1: tokenize (and adapt corresponding labels)
        sentence = self.data.text[index]  
        word_labels = self.data.labels[index]  
        tokenized_sentence, labels = tokenize_and_preserve_labels(sentence, word_labels, self.tokenizer)
        
        # step 2: add special tokens (and corresponding labels)
        tokenized_sentence = [cls_token] + tokenized_sentence + [sep_token] # add special tokens
        labels.insert(0, "O") # add outside label for [CLS] token
        labels.insert(-1, "O") # add outside label for [SEP] token

        # step 3: truncating/padding
        maxlen = self.max_len

        if (len(tokenized_sentence) > maxlen):
          # truncate
          tokenized_sentence = tokenized_sentence[:maxlen]
          labels = labels[:maxlen]
        else:
          # pad
          tokenized_sentence = tokenized_sentence + [pad_token for _ in range(maxlen - len(tokenized_sentence))]
          labels = labels + ["O" for _ in range(maxlen - len(labels))]

        # step 4: obtain the attention mask
        attn_mask = [1 if tok != pad_token else 0 for tok in tokenized_sentence]
        
        # step 5: convert tokens to input ids
        ids = self.tokenizer.convert_tokens_to_ids(tokenized_sentence)

        label_ids = [tag_to_id[label] for label in labels]
        # the following line is deprecated
        #label_ids = [label if label != 0 else -100 for label in label_ids]
        
        return {
              'ids': torch.tensor(ids, dtype=torch.long),
              'mask': torch.tensor(attn_mask, dtype=torch.long),
              #'token_type_ids': torch.tensor(token_ids, dtype=torch.long),
              'targets': torch.tensor(label_ids, dtype=torch.long)
        } 
    
    def __len__(self):
        return self.len

In [13]:
test_dataset = SpanDetectionDataset(df_test, tokenizer, max_len=MAX_LEN)
# function to create dataloader
def create_data_loader(datasets, params):

    return DataLoader(
        datasets,
        **params
    )

test_params = {
    'batch_size': 16,
    'shuffle': False,
    'num_workers': 0
}

test_loader = create_data_loader(test_dataset, test_params)

NameError: name 'df_test' is not defined

In [36]:
def test(model, loader):
    with torch.no_grad():
        # model.eval()
        steps = len(loader)
        loss = 0
        acc = 0
        f1 = 0
        for step, batch in tqdm(enumerate(loader), total=steps):

            ids = batch['ids'].to(device, dtype=torch.long)
            mask = batch['mask'].to(device, dtype=torch.long)
            targets = batch['targets'].to(device, dtype=torch.long)

            # forward pass
            outputs = model(ids, mask)
            logits = outputs.logits

            # # compute loss
            # batch_loss = loss_fn(logits, targets, mask)

            # compute accuracy and f1 score
            batch_acc, batch_f1 = accuracy_f1(logits, targets, mask)

            # loss += batch_loss.item()
            acc += batch_acc.item()
            f1 += batch_f1    
    return loss / steps, acc / steps, f1 / steps

In [37]:
_, test_acc, test_f1 = test(model, test_loader)
print(f"Test accuracy: {test_acc:.3f} | Test f1 score: {test_f1:.3f}")

100%|██████████| 137/137 [07:05<00:00,  3.10s/it]

Test accuracy: 0.757 | Test f1 score: 0.665





# Test model sentiment analysis

In [14]:
from transformers import AutoModelForSequenceClassification

MAX_LEN_SA = 128
TEST_PATH_SA = 'data/sentiment_analysis_data/test.jsonl'

In [15]:
# MRC_PATH = 'nguyenvulebinh/vi-mrc-base'
PHOBERT_PATH = 'vinai/phobert-base'
PRETRAINED_PATH_SA = 'model/sentiment_analysis_bert_base'

In [8]:
df_test_sa = read_jsonl_to_dataframe(TEST_PATH_SA)

In [16]:
# load tokenizer
tokenizer_sa = AutoTokenizer.from_pretrained(PHOBERT_PATH, use_auth_token=AUTH_TOKEN)



In [17]:
class SentimentAnalysisDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_len=MAX_LEN_SA):
        self.len = len(dataframe)
        self.data = dataframe
        self.tokenizer = tokenizer
        self.max_len = max_len
        
    def __getitem__(self, index):
        text = str(self.data.text[index])
        inputs = self.tokenizer.encode_plus(
            text,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            truncation=True,
            padding='max_length',
            return_token_type_ids=True
        )
        ids = inputs['input_ids']
        mask = inputs['attention_mask']
        token_type_ids = inputs["token_type_ids"]
        label = self.data.sentiment[index]

        return {
            'ids': torch.tensor(ids, dtype=torch.long),
            'mask': torch.tensor(mask, dtype=torch.long),
            'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long),
            'targets': torch.tensor(label, dtype=torch.long)
        }
        
    
    def __len__(self):
        return self.len

In [13]:
test_params_sa = {
    'batch_size': 16,
    'shuffle': False,
    'num_workers': 0
}

test_dataset_sa = SentimentAnalysisDataset(df_test_sa, tokenizer_sa)
test_loader_sa = create_data_loader(test_dataset_sa, test_params_sa)

In [18]:
model_sa = AutoModelForSequenceClassification.from_pretrained(PRETRAINED_PATH_SA)

model_sa.to(device)
model_sa.eval()

RobertaForSequenceClassification(
  (roberta): RobertaModel(
    (embeddings): RobertaEmbeddings(
      (word_embeddings): Embedding(64001, 768, padding_idx=1)
      (position_embeddings): Embedding(258, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): RobertaEncoder(
      (layer): ModuleList(
        (0-11): 12 x RobertaLayer(
          (attention): RobertaAttention(
            (self): RobertaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): RobertaSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
             

In [18]:
from sklearn.metrics import f1_score
from sklearn.metrics import accuracy_score


def loss_fn(logits, targets):
    """
    outputs: (batch_size, num_labels)
    targets: (batch_size,)
    """


    # reshape
    logits = logits.view(-1, logits.shape[-1])
    targets = targets.view(-1)

    # compute cross entropy loss
    return nn.CrossEntropyLoss()(logits, targets)

def accuracy_f1(logits, targets):
    """
    outputs: (batch_size, num_labels)
    targets: (batch_size, 1)
    """
    # reshape to (batch_size * seq_len, num_labels)
    logits = logits.view(-1, logits.shape[-1])
    targets = targets.view(-1)

    # compute accuracy
    preds = torch.argmax(logits, dim=1)
    acc = accuracy_score(targets.cpu().numpy(), preds.cpu().numpy())

    # compute f1 score
    f1 = f1_score(targets.cpu().numpy(), preds.cpu().numpy(), average='macro')

    return acc, f1

In [19]:
def test_sa(model, loader):

    with torch.no_grad():
        loss, accuracy, f1_score = 0, 0, 0
        steps = len(loader)
        model.eval()
        for step, batch in tqdm(enumerate(loader), total=steps):

            ids = batch['ids'].to(device, dtype=torch.long)
            mask = batch['mask'].to(device, dtype=torch.long)
            token_type_ids = batch['token_type_ids'].to(device, dtype=torch.long)
            targets = batch['targets'].to(device, dtype=torch.long)

            # forward pass
            outputs = model(ids, mask, token_type_ids=token_type_ids)
            logits = outputs.logits

            # compute accuracy and f1 score
            batch_acc, batch_f1 = accuracy_f1(logits, targets)
            
            accuracy += batch_acc.item()
            f1_score += batch_f1     

    return loss / steps, accuracy / steps, f1_score / steps

In [20]:
# test model
_, test_acc_sa, test_f1_sa = test_sa(model_sa, test_loader_sa)
print(f"Test accuracy: {test_acc_sa:.3f} | Test f1 score: {test_f1_sa:.3f}")

100%|██████████| 441/441 [10:33<00:00,  1.44s/it]

Test accuracy: 0.942 | Test f1 score: 0.859





# End to end model prediction

## Span detection

In [19]:
from underthesea import word_tokenize
import regex as re

def tokenize(text):
    return word_tokenize(text, format="text")

def preprocess(text):
    text = text.lower()
    text = re.sub(r"[^\w\s]", " ", text) # remove punctuation
    text = re.sub(r"\s+", " ", text) # remove extra space
    text = text.strip()
    return text

def post_process(text):

    text = tokenize(text)
    text = preprocess(text)

    return text

In [20]:
# end to end prediction function
def e2e_span_detection(sentence):

    # preprocess sentence
    processed_sentence = post_process(sentence)

    # tokenize sentence
    tokenized_sentence = tokenizer.tokenize(processed_sentence)

    # add special tokens
    tokenized_sentence = [cls_token] + tokenized_sentence + [sep_token]

    # convert tokens to input ids
    ids = tokenizer.convert_tokens_to_ids(tokenized_sentence)
    mask = [1 if tok != pad_token else 0 for tok in tokenized_sentence]

    # convert to tensor
    ids = torch.tensor(ids, dtype=torch.long).unsqueeze(0)
    mask = torch.tensor(mask, dtype=torch.long).unsqueeze(0)

    # move to device
    ids = ids.to(device)
    mask = mask.to(device)

    # forward pass
    outputs = model(ids, mask)
    logits = outputs.logits

    # get predictions
    preds = torch.argmax(logits, dim=2).squeeze(0)

    # convert to numpy array
    preds = preds.detach().cpu().numpy()

    # convert to tags
    preds = [id_to_tag[pred] for pred in preds]

    # get entities
    entities = []

    for i, pred in enumerate(preds):
        if pred != 'O':
            if pred.startswith('B-'):
                entity = [i, i]
                entity.append(pred.split('-')[1])
                entities.append(entity)
            elif pred.startswith('I-'):
                entities[-1][1] = i
            else:
                print("Something wrong")

    # get entities text

    entities_text = []

    for entity in entities:
        start, end, tag = entity
        entity_text = tokenized_sentence[start: end+1]
        entity_text = tokenizer.convert_tokens_to_string(entity_text)
        entities_text.append(entity_text)

    return entities_text, entities
    

In [21]:
def e2e_sentiment_analysis(span):
    # tokenize
    inputs = tokenizer_sa.encode_plus(
        span,
        None,
        add_special_tokens=True,
        max_length=MAX_LEN_SA,
        truncation=True,
        padding='max_length',
        return_token_type_ids=True
    )

    ids = inputs['input_ids']
    mask = inputs['attention_mask']
    token_type_ids = inputs["token_type_ids"]

    # convert to tensor
    ids = torch.tensor(ids, dtype=torch.long).unsqueeze(0).to(device)
    mask = torch.tensor(mask, dtype=torch.long).unsqueeze(0).to(device)
    token_type_ids = torch.tensor(token_type_ids, dtype=torch.long).unsqueeze(0).to(device)

    # forward pass
    outputs = model_sa(ids, mask, token_type_ids=token_type_ids)
    logits = outputs.logits

    # compute probability
    probs = nn.functional.softmax(logits, dim=1).squeeze(0)

    # get label
    label = torch.argmax(probs).item()

    return label

In [36]:
def e2e_system(sentence):

    # span detection
    entities_text, entities = e2e_span_detection(sentence)

    # sentiment analysis
    sentiments = []
    for span in entities_text:
        sentiment = e2e_sentiment_analysis(span)
        sentiments.append(sentiment)

    # combine entities and sentiments
    entities_sentiments = []
    for entity, sentiment in zip(entities, sentiments):
        entities_sentiments.append(f"{entity[2]}#{id_to_sentiment[sentiment]}")

    # remove duplicate
    entities_sentiments = list(set(entities_sentiments))

    return entities_sentiments

In [37]:
sample = "Sp ổn, mỗi tội vân tay lúc nhận lúc không, nhân viên nhiệt tình, pin trâu, cả đêm tụt 1%"

spans, aspects = e2e_span_detection(sample)

for span, aspect in zip(spans, aspects):
    print(f"Span: {span} - Aspect: {aspect[2]}")

Span: sp ổn - Aspect: GENERAL
Span: mỗi tội - Aspect: FEATURES
Span: vân_tay lúc nhận lúc không - Aspect: FEATURES
Span: nhân - Aspect: SER&ACC
Span: _ - Aspect: SER&ACC
Span: vi - Aspect: SER&ACC
Span: ên nhiệt_tình - Aspect: SER&ACC
Span: pin trâu cả đêm tụt 1</s> - Aspect: BATTERY


In [38]:
# end to end sample for sample
e2e_system(sample)

['BATTERY#POSITIVE',
 'FEATURES#NEGATIVE',
 'SER&ACC#NEGATIVE',
 'GENERAL#POSITIVE',
 'SER&ACC#POSITIVE']

# End