In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pandas as pd
import torch
import transformers
from torch.utils.data import Dataset, DataLoader
from torch import cuda
import sys
from sklearn.metrics import f1_score
from transformers import AutoTokenizer, AutoModel



In [None]:
model_name = 'allenai/scibert_scivocab_uncased'

In [None]:
LMTokenizer = AutoTokenizer.from_pretrained(model_name)
LMModel = AutoModel.from_pretrained(model_name)

device = 'cuda' if cuda.is_available() else 'cpu'

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/385 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/228k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/442M [00:00<?, ?B/s]

In [None]:
train_dataset = pd.read_csv('/content/drive/MyDrive/3c-citation-text-classification/Task 2/train.csv', sep=',', names=['CGT','CDT','CC','label'])
testing_dataset = pd.read_csv('/content/drive/MyDrive/3c-citation-text-classification/Task 2/validation.csv', sep=',', names=['CGT','CDT','CC','label'])

MAX_LEN = 512
TRAIN_BATCH_SIZE = 4
VALID_BATCH_SIZE = 4
LEARNING_RATE = 0.00001
drop_out = 0.1
EPOCHS = 10
tokenizer = LMTokenizer

In [None]:
output_file_name = f"{sys.argv[5]}_{model_name.split('/')[-1]}_{TRAIN_BATCH_SIZE}_{LEARNING_RATE}_{drop_out}.txt" if len(sys.argv) > 5 else "output.txt"
file = open(output_file_name,'w')


In [None]:

class Triage(Dataset):
    def __init__(self, dataframe, tokenizer, max_len):
        self.len = len(dataframe)
        self.data = dataframe
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __getitem__(self, index):
        CGT = str(self.data.CGT[index])
        CGT = " ".join(CGT.split())
        inputs = self.tokenizer.encode_plus(
            CGT,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            pad_to_max_length=True,
            return_token_type_ids=True,
            truncation=True
        )
        CGT_ids = inputs['input_ids']
        CGT_mask = inputs['attention_mask']


        CDT = str(self.data.CDT[index])
        CDT = " ".join(CDT.split())
        inputs = self.tokenizer.encode_plus(
            CDT,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            pad_to_max_length=True,
            return_token_type_ids=True,
            truncation=True
        )
        CDT_ids = inputs['input_ids']
        CDT_mask = inputs['attention_mask']


        CC = str(self.data.CC[index])
        # print(CC)
        CC = " ".join(CC.split())
        inputs = self.tokenizer.encode_plus(
            CC,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            pad_to_max_length=True,
            return_token_type_ids=True,
            truncation=True
        )
        CC_ids = inputs['input_ids']
        CC_mask = inputs['attention_mask']

        return {
            'CGT_ids': torch.tensor(CGT_ids, dtype=torch.long),
            'CGT_mask': torch.tensor(CGT_mask, dtype=torch.long),

            'CDT_ids': torch.tensor(CDT_ids, dtype=torch.long),
            'CDT_mask': torch.tensor(CDT_mask, dtype=torch.long),

            'CC_ids': torch.tensor(CC_ids, dtype=torch.long),
            'CC_mask': torch.tensor(CC_mask, dtype=torch.long),

            'targets': torch.tensor(self.data.label[index], dtype=torch.long)
        }

    def __len__(self):
        return self.len


In [None]:

training_set = Triage(train_dataset, tokenizer, MAX_LEN)
testing_set = Triage(testing_dataset, tokenizer, MAX_LEN)

train_params = {'batch_size': TRAIN_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

test_params = {'batch_size': VALID_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

training_loader = DataLoader(training_set, **train_params)
testing_loader = DataLoader(testing_set, **test_params)



In [None]:

class LMClass(torch.nn.Module):
    def __init__(self):
        super(LMClass, self).__init__()
        self.l1 = LMModel
        self.pre_classifier = torch.nn.Linear(768*2, 768)
        self.dropout = torch.nn.Dropout(drop_out)
        self.classifier = torch.nn.Linear(768, 2)

    def forward(self, data):

        input_ids = data['CC_ids'].to(device, dtype = torch.long)
        attention_mask = data['CC_mask'].to(device, dtype = torch.long)

        output_1 = self.l1(input_ids=input_ids, attention_mask=attention_mask)
        hidden_state1 = output_1[0]

        input_ids = data['CDT_ids'].to(device, dtype = torch.long)
        attention_mask = data['CDT_mask'].to(device, dtype = torch.long)

        output_1 = self.l1(input_ids=input_ids, attention_mask=attention_mask)
        hidden_state2 = output_1[0]
        # print(hidden_state2.shape)
        pooler = torch.cat((hidden_state1[:, 0],hidden_state2[:, 0]),1)
        # print(pooler.shape)
        # pooler = hidden_state1[:, 0]
        pooler = self.pre_classifier(pooler)
        pooler = torch.nn.ReLU()(pooler)
        pooler = self.dropout(pooler)
        output = self.classifier(pooler)
        return output

In [None]:
model = LMClass()
model.to(device)
# model = torch.load('transfer/model.bin')
# model.to(device)
loss_function = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params = model.parameters(), lr=LEARNING_RATE)


In [None]:

def calcuate_accu(big_idx, targets):
    n_correct = (big_idx==targets).sum().item()
    return n_correct


def train(epoch):
    tr_loss = 0
    n_correct = 0
    nb_tr_steps = 0
    nb_tr_examples = 0
    model.train()
    for _,data in enumerate(training_loader, 0):
        targets = data['targets'].to(device, dtype = torch.long)

        outputs = model(data)
        # print(outputs.shape)
        # print(targets.shape)
        loss = loss_function(outputs, targets)
        tr_loss += loss.item()
        big_val, big_idx = torch.max(outputs.data, dim=1)
        n_correct += calcuate_accu(big_idx, targets)

        nb_tr_steps += 1
        nb_tr_examples+=targets.size(0)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    file.write(f'The Total Accuracy for Epoch {epoch}: {(n_correct*100)/nb_tr_examples}\n')
    epoch_loss = tr_loss/nb_tr_steps
    epoch_accu = (n_correct*100)/nb_tr_examples
    file.write(f"Training Loss Epoch: {epoch_loss}\n")
    file.write(f"Training Accuracy Epoch: {epoch_accu}\n")
    file.write("\n")
    return


In [None]:

def valid(model, testing_loader):
    model.eval()
    n_correct = 0; n_wrong = 0; tr_loss = 0
    nb_tr_steps =0
    nb_tr_examples =0
    pred = []
    act = []
    with torch.no_grad():
        for _, data in enumerate(testing_loader, 0):
            targets = data['targets'].to(device, dtype = torch.long)
            outputs = model(data).squeeze()
            loss = loss_function(outputs, targets)
            tr_loss += loss.item()
            big_val, big_idx = torch.max(outputs.data, dim=1)
            n_correct += calcuate_accu(big_idx, targets)
            pred += big_idx.tolist()
            act += targets.tolist()
            nb_tr_steps += 1
            nb_tr_examples+=targets.size(0)

    epoch_loss = tr_loss/nb_tr_steps
    epoch_accu = (n_correct*100)/nb_tr_examples
    file.write(f"Validation Loss Epoch: {epoch_loss}\n")
    file.write(f"Validation Accuracy Epoch: {epoch_accu}\n")
    mf1 = f1_score(act, pred, average='macro')
    file.write(f"Validation Macro F1: {mf1}\n")
    return mf1,epoch_accu

In [None]:
best_model_path = '/content/drive/MyDrive/3c-citation-text-classification/Task 2/best_model.pt'

In [None]:
class EarlyStopping:
    def __init__(self, patience=5, verbose=False):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.best_acc = 0
        self.best_epoch = 0

    def __call__(self, epoch_acc, model, epoch):
        score = epoch_acc

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(model)
        elif score <= self.best_score:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(model)
            self.counter = 0
            self.best_acc = score
            self.best_epoch = epoch + 1

    def save_checkpoint(self, model):
        torch.save(model.state_dict(), best_model_path)


In [None]:
# Instantiate the EarlyStopping object
early_stopping = EarlyStopping(patience=5, verbose=True)

best_mf1 = 0

for epoch in range(EPOCHS):
    train(epoch)
    mf1, acc = valid(model, testing_loader)
    if mf1 > best_mf1:
        best_mf1 = mf1

    early_stopping(acc, model, epoch)

    if early_stopping.early_stop:
        print(f"Early stopping at epoch {epoch+1}")
        break

file.write("Best \nAccuracy: {0} \nMacro F1 Score: {1}\nAt Epoch: {2}\n".format(early_stopping.best_acc, best_mf1, early_stopping.best_epoch))
file.close()



EarlyStopping counter: 1 out of 5




EarlyStopping counter: 2 out of 5




EarlyStopping counter: 3 out of 5




EarlyStopping counter: 4 out of 5




EarlyStopping counter: 5 out of 5
Early stopping at epoch 7


In [None]:
# Tải lại model tốt nhất để sử dụng sau này (nếu cần)
model.load_state_dict(torch.load(best_model_path))

  model.load_state_dict(torch.load(best_model_path))


<All keys matched successfully>

Predict


In [None]:
class LMClassPredictor:
    def __init__(self, model_path, tokenizer_name, device='cpu'):
        self.device = device
        self.model = LMClass()
        self.model.load_state_dict(torch.load(model_path, map_location=device))
        self.model.to(device)
        self.model.eval()
        self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)

    def predict_sentence(self, sentence):
        inputs = self.tokenizer.encode_plus(
            sentence,
            None,
            add_special_tokens=True,
            max_length=512,
            pad_to_max_length=True,
            return_token_type_ids=True,
            truncation=True
        )
        input_ids = torch.tensor(inputs['input_ids'], dtype=torch.long).unsqueeze(0).to(self.device)
        attention_mask = torch.tensor(inputs['attention_mask'], dtype=torch.long).unsqueeze(0).to(self.device)

        with torch.no_grad():
            data = {
                'CC_ids': input_ids,
                'CC_mask': attention_mask
            }
            outputs = self.model(data)
            probabilities = torch.softmax(outputs, dim=1)
            predicted_label = torch.argmax(probabilities, dim=1).item()

        return predicted_label, probabilities

    def predict_paragraph(self, paragraph, output_file):
        sentences = sent_tokenize(paragraph)
        results = []

        with open(output_file, 'w') as file:
            for sentence in sentences:
                predicted_label, probabilities = self.predict_sentence(sentence)
                results.append((sentence, predicted_label, probabilities))
                file.write(f"Sentence: {sentence}\n")
                file.write(f"Predicted label: {predicted_label}, Probabilities: {probabilities}\n")
                file.write("\n")

        return results


In [None]:
predictor = LMClassPredictor(model_path=model_path, tokenizer_name=tokenizer_name, device=device)

output_file = '/content/drive/MyDrive/3c-citation-text-classification/Task 2/predictions.txt'

paragraph = "This is the first sentence. Here is another one."

results = predictor.predict_paragraph(paragraph, output_file)

for sentence, predicted_label, probabilities in results:
        print(f"Sentence: {sentence}")
        print(f"Predicted label: {predicted_label}, Probabilities: {probabilities}")
        print("\n")

'''
# Sử dụng hàm predict để dự đoán label của các câu trong một đoạn văn
if __name__ == "__main__":
    model_path = "path/to/saved/model.pth"  # Thay bằng đường dẫn đến model đã lưu
    tokenizer_name = "bert-base-uncased"  # Thay bằng tên tokenizer của bạn
    predictor = LMClassPredictor(model_path=model_path, tokenizer_name=tokenizer_name, device=device)

    paragraph = "Your input paragraph goes here."
    output_file = "prediction.txt"

    results = predictor.predict_paragraph(paragraph, output_file)

    # In kết quả ra màn hình
    for sentence, predicted_label, probabilities in results:
        print(f"Sentence: {sentence}")
        print(f"Predicted label: {predicted_label}, Probabilities: {probabilities}")
        print("\n")
'''

NameError: name 'model_path' is not defined