In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import os, sys
# sys.path.append('../')
# os.chdir('../')

import random
import numpy as np
import pandas as pd
import torch
from torch import optim
from tqdm import tqdm

from transformers import BertConfig, BertTokenizer
from nltk.tokenize import word_tokenize

In [None]:
###
# common functions
###
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    
def count_param(module, trainable=False):
    if trainable:
        return sum(p.numel() for p in module.parameters() if p.requires_grad)
    else:
        return sum(p.numel() for p in module.parameters())
    
def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

def metrics_to_string(metric_dict):
    string_list = []
    for key, value in metric_dict.items():
        string_list.append('{}:{:.2f}'.format(key, value))
    return ' '.join(string_list)

In [None]:
# Set random seed
set_seed(26092020) #asli
# set_seed(42)

In [None]:
import sys
import re
from collections import defaultdict, namedtuple

Metrics = namedtuple('Metrics', 'tp fp fn prec rec fscore')

class EvalCounts(object):
    def __init__(self):
        self.correct_chunk = 0    # number of correctly identified chunks
        self.correct_tags = 0     # number of correct chunk tags
        self.found_correct = 0    # number of chunks in corpus
        self.found_guessed = 0    # number of identified chunks
        self.token_counter = 0    # token counter (ignores sentence breaks)

        # counts by type
        self.t_correct_chunk = defaultdict(int)
        self.t_found_correct = defaultdict(int)
        self.t_found_guessed = defaultdict(int)

###
# Evaluate Function
###        
def parse_tag(t):
    m = re.match(r'^([^-]*)-(.*)$', t)
    return m.groups() if m else (t, '')

def start_of_chunk(prev_tag, tag, prev_type, type_):
    # check if a chunk started between the previous and current word
    # arguments: previous and current chunk tags, previous and current types
    chunk_start = False

    if tag == 'B': chunk_start = True
    if tag == 'S': chunk_start = True

    if prev_tag == 'E' and tag == 'E': chunk_start = True
    if prev_tag == 'E' and tag == 'I': chunk_start = True
    if prev_tag == 'S' and tag == 'E': chunk_start = True
    if prev_tag == 'S' and tag == 'I': chunk_start = True
    if prev_tag == 'O' and tag == 'E': chunk_start = True
    if prev_tag == 'O' and tag == 'I': chunk_start = True

    if tag != 'O' and tag != '.' and prev_type != type_:
        chunk_start = True

    # these chunks are assumed to have length 1
    if tag == '[': chunk_start = True
    if tag == ']': chunk_start = True

    return chunk_start

def end_of_chunk(prev_tag, tag, prev_type, type_):
    # check if a chunk ended between the previous and current word
    # arguments: previous and current chunk tags, previous and current types
    chunk_end = False

    if prev_tag == 'E': chunk_end = True
    if prev_tag == 'S': chunk_end = True

    if prev_tag == 'B' and tag == 'B': chunk_end = True
    if prev_tag == 'B' and tag == 'S': chunk_end = True
    if prev_tag == 'B' and tag == 'O': chunk_end = True
    if prev_tag == 'I' and tag == 'B': chunk_end = True
    if prev_tag == 'I' and tag == 'S': chunk_end = True
    if prev_tag == 'I' and tag == 'O': chunk_end = True

    if prev_tag != 'O' and prev_tag != '.' and prev_type != type_:
        chunk_end = True

    # these chunks are assumed to have length 1
    if prev_tag == ']': chunk_end = True
    if prev_tag == '[': chunk_end = True

    return chunk_end

def evaluate_fn(guessed, correct, last_correct, last_correct_type, last_guessed, last_guessed_type, in_correct, counts):
    guessed, guessed_type = parse_tag(guessed)
    correct, correct_type = parse_tag(correct)

    end_correct = end_of_chunk(last_correct, correct,
                               last_correct_type, correct_type)
    end_guessed = end_of_chunk(last_guessed, guessed,
                               last_guessed_type, guessed_type)
    start_correct = start_of_chunk(last_correct, correct,
                                   last_correct_type, correct_type)
    start_guessed = start_of_chunk(last_guessed, guessed,
                                   last_guessed_type, guessed_type)

    if in_correct:
        if (end_correct and end_guessed and
            last_guessed_type == last_correct_type):
            in_correct = False
            counts.correct_chunk += 1
            counts.t_correct_chunk[last_correct_type] += 1
        elif (end_correct != end_guessed or guessed_type != correct_type):
            in_correct = False

    if start_correct and start_guessed and guessed_type == correct_type:
        in_correct = True

    if start_correct:
        counts.found_correct += 1
        counts.t_found_correct[correct_type] += 1
    if start_guessed:
        counts.found_guessed += 1
        counts.t_found_guessed[guessed_type] += 1
    if correct == guessed and guessed_type == correct_type:
        counts.correct_tags += 1
    counts.token_counter += 1

    last_guessed = guessed
    last_correct = correct
    last_guessed_type = guessed_type
    last_correct_type = correct_type
    
    return last_correct, last_correct_type, last_guessed, last_guessed_type, in_correct, counts
    
def evaluate(hyps_list, labels_list):
    counts = EvalCounts()
    num_features = None       # number of features per line
    in_correct = False        # currently processed chunks is correct until now
    last_correct = 'O'        # previous chunk tag in corpus
    last_correct_type = ''    # type of previously identified chunk tag
    last_guessed = 'O'        # previously identified chunk tag
    last_guessed_type = ''    # type of previous chunk tag in corpus

    for hyps, labels in zip(hyps_list, labels_list):
        for hyp, label in zip(hyps, labels):
            step_result = evaluate_fn(hyp, label, last_correct, last_correct_type, last_guessed, last_guessed_type, in_correct, counts)
            last_correct, last_correct_type, last_guessed, last_guessed_type, in_correct, counts = step_result
        # Boundary between sentence
        step_result = evaluate_fn('O', 'O', last_correct, last_correct_type, last_guessed, last_guessed_type, in_correct, counts)
        last_correct, last_correct_type, last_guessed, last_guessed_type, in_correct, counts = step_result
        
    if in_correct:
        counts.correct_chunk += 1
        counts.t_correct_chunk[last_correct_type] += 1

    return counts

###
# Calculate Metrics Function
###
def uniq(iterable):
    seen = set()
    return [i for i in iterable if not (i in seen or seen.add(i))]

def calculate_metrics(correct, guessed, total):
    tp, fp, fn = correct, guessed-correct, total-correct
    p = 0 if tp + fp == 0 else 1.*tp / (tp + fp)
    r = 0 if tp + fn == 0 else 1.*tp / (tp + fn)
    f = 0 if p + r == 0 else (2 * p * r) / (p + r)
    return Metrics(tp, fp, fn, p, r, f)

def eval_metrics(counts):
    c = counts
    overall = calculate_metrics(
        c.correct_chunk, c.found_guessed, c.found_correct
    )
    by_type = {}
    for t in uniq(list(c.t_found_correct.keys()) + list(c.t_found_guessed.keys())):
        by_type[t] = calculate_metrics(
            c.t_correct_chunk[t], c.t_found_guessed[t], c.t_found_correct[t]
        )
    return overall, by_type
    return overall

###
# Main Function
###
def conll_evaluation(hyps_list, labels_list):
    counts = evaluate(hyps_list, labels_list)
    overall, by_type = eval_metrics(counts)  # Menggunakan fungsi calculate_metrics dari kode sebelumnya

    c = counts
    acc = c.correct_tags / c.token_counter
    pre = overall.prec
    rec = overall.rec
    f1 = overall.fscore
    
    type_macro_pre = 0.0
    type_macro_rec = 0.0
    type_macro_f1 = 0.0
    for k in by_type.keys():
        type_macro_pre += by_type[k].prec
        type_macro_rec += by_type[k].rec
        type_macro_f1 += by_type[k].fscore
        
    type_macro_pre = type_macro_pre / float(len(by_type))
    type_macro_rec = type_macro_rec / float(len(by_type))
    type_macro_f1 = type_macro_f1 / float(len(by_type))
    
    return (acc, pre, rec, f1, type_macro_pre, type_macro_rec, type_macro_f1)

In [None]:
import numpy as np
import pandas as pd
import string
import torch
import re
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer
from tqdm import tqdm


class NerGritDataset(Dataset):
    # Static constant variable
    LABEL2INDEX = {'I-PERSON': 0, 'B-ORGANISATION': 1, 'I-ORGANISATION': 2, 'B-PLACE': 3, 'I-PLACE': 4, 'O': 5, 'B-PERSON': 6}
    INDEX2LABEL = {0: 'I-PERSON', 1: 'B-ORGANISATION', 2: 'I-ORGANISATION', 3: 'B-PLACE', 4: 'I-PLACE', 5: 'O', 6: 'B-PERSON'}
    NUM_LABELS = 7
    
    def load_dataset(self, path):
        # Read file
        data = open(path,'r').readlines()

        # Prepare buffer
        dataset = []
        sentence = []
        seq_label = []
        for line in data:
            if len(line.strip()) > 0:
                token, label = line[:-1].split('\t')
                sentence.append(token)
                seq_label.append(self.LABEL2INDEX[label])
            else:
                dataset.append({
                    'sentence': sentence,
                    'seq_label': seq_label
                })
                sentence = []
                seq_label = []
        return dataset
    
    def __init__(self, dataset_path, tokenizer, *args, **kwargs):
        self.data = self.load_dataset(dataset_path)
        self.tokenizer = tokenizer
        
    def __getitem__(self, index):
        data = self.data[index]
        sentence, seq_label = data['sentence'], data['seq_label']
        
        # Add CLS token
        subwords = [self.tokenizer.cls_token_id]
        subword_to_word_indices = [-1] # For CLS
        
        # Add subwords
        for word_idx, word in enumerate(sentence):
            subword_list = self.tokenizer.encode(word, add_special_tokens=False)
            subword_to_word_indices += [word_idx for i in range(len(subword_list))]
            subwords += subword_list
            
        # Add last SEP token
        subwords += [self.tokenizer.sep_token_id]
        subword_to_word_indices += [-1]
        
        return np.array(subwords), np.array(subword_to_word_indices), np.array(seq_label), data['sentence']
    
    def __len__(self):
        return len(self.data) 
        
class NerDataLoader(DataLoader):
    def __init__(self, max_seq_len=512, *args, **kwargs):
        super(NerDataLoader, self).__init__(*args, **kwargs)
        self.collate_fn = self._collate_fn
        self.max_seq_len = max_seq_len
        
    def _collate_fn(self, batch):
        batch_size = len(batch)
        max_seq_len = max(map(lambda x: len(x[0]), batch))
        max_seq_len = min(self.max_seq_len, max_seq_len)
        max_tgt_len = max(map(lambda x: len(x[2]), batch))
        
        subword_batch = np.zeros((batch_size, max_seq_len), dtype=np.int64)
        mask_batch = np.zeros((batch_size, max_seq_len), dtype=np.float32)
        subword_to_word_indices_batch = np.full((batch_size, max_seq_len), -1, dtype=np.int64)
        seq_label_batch = np.full((batch_size, max_tgt_len), -100, dtype=np.int64)
        
        seq_list = []
        for i, (subwords, subword_to_word_indices, seq_label, raw_seq) in enumerate(batch):
            subwords = subwords[:max_seq_len]
            subword_to_word_indices = subword_to_word_indices[:max_seq_len]

            subword_batch[i,:len(subwords)] = subwords
            mask_batch[i,:len(subwords)] = 1
            subword_to_word_indices_batch[i,:len(subwords)] = subword_to_word_indices
            seq_label_batch[i,:len(seq_label)] = seq_label

            seq_list.append(raw_seq)
            
        return subword_batch, mask_batch, subword_to_word_indices_batch, seq_label_batch, seq_list

In [None]:
import torch

# Forward function for word classification
def forward_word_classification(model, batch_data, i2w, is_test=False, device='cpu', **kwargs):
    # Unpack batch data
    if len(batch_data) == 4:
        (subword_batch, mask_batch, subword_to_word_indices_batch, label_batch) = batch_data
        token_type_batch = None
    elif len(batch_data) == 5:
        (subword_batch, mask_batch, token_type_batch, subword_to_word_indices_batch, label_batch) = batch_data
    
    # Prepare input & label
    subword_batch = torch.LongTensor(subword_batch)
    mask_batch = torch.FloatTensor(mask_batch)
    token_type_batch = torch.LongTensor(token_type_batch) if token_type_batch is not None else None
    subword_to_word_indices_batch = torch.LongTensor(subword_to_word_indices_batch)
    label_batch = torch.LongTensor(label_batch)

    if device == "cuda":
        subword_batch = subword_batch.cuda()
        mask_batch = mask_batch.cuda()
        token_type_batch = token_type_batch.cuda() if token_type_batch is not None else None
        subword_to_word_indices_batch = subword_to_word_indices_batch.cuda()
        label_batch = label_batch.cuda()

    # Forward model
    outputs = model(subword_batch, subword_to_word_indices_batch, attention_mask=mask_batch, token_type_ids=token_type_batch, labels=label_batch)
    loss, logits = outputs[:2]
    
    # generate prediction & label list
    list_hyps = []
    list_labels = []
    hyps_list = torch.topk(logits, k=1, dim=-1)[1].squeeze(dim=-1)
    for i in range(len(hyps_list)):
        hyps, labels = hyps_list[i].tolist(), label_batch[i].tolist()        
        list_hyp, list_label = [], []
        for j in range(len(hyps)):
            if labels[j] == -100:
                break
            else:
                list_hyp.append(i2w[hyps[j]])
                list_label.append(i2w[labels[j]])
        list_hyps.append(list_hyp)
        list_labels.append(list_label)
        
    return loss, list_hyps, list_labels

In [None]:
import itertools
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score

def ner_metrics_fn(list_hyp, list_label):
    metrics = {}
    acc, pre, rec, f1, tm_pre, tm_rec, tm_f1 = conll_evaluation(list_hyp, list_label)
    metrics["ACC"] = acc
    metrics["F1"] = tm_f1
    metrics["REC"] = tm_rec
    metrics["PRE"] = tm_pre
    return metrics

In [None]:
import logging
import math
import os

import torch
from torch import nn
from torch.nn import CrossEntropyLoss, MSELoss

from transformers import AlbertPreTrainedModel, BertPreTrainedModel, AlbertModel, BertModel, BertConfig, XLMModel, XLMConfig, XLMRobertaModel, XLMRobertaConfig
from transformers import AutoTokenizer, AutoConfig

class BertForWordClassification(BertPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels

        self.bert = BertModel(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)

        self.init_weights()

    def forward(
        self,
        input_ids=None,
        subword_to_word_ids=None,
        attention_mask=None,
        token_type_ids=None,
        position_ids=None,
        head_mask=None,
        inputs_embeds=None,
        labels=None,
    ):
        r"""
        labels (:obj:`torch.LongTensor` of shape :obj:`(batch_size, sequence_length)`, `optional`, defaults to :obj:`None`):
            Labels for computing the token classification loss.
            Indices should be in ``[0, ..., config.num_labels - 1]``.

    Returns:
        :obj:`tuple(torch.FloatTensor)` comprising various elements depending on the configuration (:class:`~transformers.BertConfig`) and inputs:
        loss (:obj:`torch.FloatTensor` of shape :obj:`(1,)`, `optional`, returned when ``labels`` is provided) :
            Classification loss.
        scores (:obj:`torch.FloatTensor` of shape :obj:`(batch_size, sequence_length, config.num_labels)`)
            Classification scores (before SoftMax).
        hidden_states (:obj:`tuple(torch.FloatTensor)`, `optional`, returned when ``config.output_hidden_states=True``):
            Tuple of :obj:`torch.FloatTensor` (one for the output of the embeddings + one for the output of each layer)
            of shape :obj:`(batch_size, sequence_length, hidden_size)`.

            Hidden-states of the model at the output of each layer plus the initial embedding outputs.
        attentions (:obj:`tuple(torch.FloatTensor)`, `optional`, returned when ``config.output_attentions=True``):
            Tuple of :obj:`torch.FloatTensor` (one for each layer) of shape
            :obj:`(batch_size, num_heads, sequence_length, sequence_length)`.

            Attentions weights after the attention softmax, used to compute the weighted average in the self-attention
            heads.
        """

        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
        )

        sequence_output = outputs[0]

        # average the token-level outputs to compute word-level representations
        max_seq_len = subword_to_word_ids.max() + 1
        word_latents = []
        for i in range(max_seq_len):
            mask = (subword_to_word_ids == i).unsqueeze(dim=-1)
            word_latents.append((sequence_output * mask).sum(dim=1) / mask.sum())
        word_batch = torch.stack(word_latents, dim=1)

        sequence_output = self.dropout(word_batch)
        logits = self.classifier(sequence_output)

        outputs = (logits,) + outputs[2:]  # add hidden states and attention if they are here
        if labels is not None:
            loss_fct = CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            outputs = (loss,) + outputs

        return outputs  # (loss), scores, (hidden_states), (attentions)

# Load IndoBert Model

In [None]:
# Load Tokenizer and Config
tokenizer = BertTokenizer.from_pretrained('indobenchmark/indobert-base-p2')
config = BertConfig.from_pretrained('indobenchmark/indobert-base-p2')
config.num_labels = NerGritDataset.NUM_LABELS

# Instantiate model
model = BertForWordClassification.from_pretrained('indobenchmark/indobert-base-p2', config=config)

# Simpan model, konfigurasi, dan optimizer setelah pelatihan
model_checkpoint_path = 'model-bert.pt'
tokenizer_directory = 'tokenizer_directory'

# Buat directory jika belum ada
os.makedirs(tokenizer_directory, exist_ok=True)

# Simpan model
torch.save({
            'model_state_dict': model.state_dict(),
            'config': config,
            }, model_checkpoint_path)

# Simpan tokenizer
tokenizer.save_pretrained(tokenizer_directory)

In [None]:
model

In [None]:
count_param(model)

# Prepare Named Entity Recognition Dataset (NERGrit)

In [None]:
train_dataset_path = '/kaggle/input/nergritdata/train_preprocess.txt'
valid_dataset_path = '/kaggle/input/nergritdata/valid_preprocess.txt'
test_dataset_path = '/kaggle/input/nergritdata/test_preprocess.txt'

In [None]:
import matplotlib.pyplot as plt
from collections import Counter

# Load and process the data
file_path = train_dataset_path
entities = []

with open(file_path, 'r', encoding='utf-8') as file:
    for line in file:
        parts = line.strip().split('\t')
        if len(parts) == 2:
            _, label = parts
            if label != 'O':  # Only consider labeled entities
                entities.append(label.split('-')[1])  # Extract entity type (e.g., PERSON, PLACE)

# Count occurrences of each entity type
entity_counts = Counter(entities)

# Plot bar chart
plt.figure(figsize=(10, 5))
plt.bar(entity_counts.keys(), entity_counts.values(), color=['skyblue', 'salmon', 'lightgreen'])
plt.xlabel('Entity Type')
plt.ylabel('Count')
plt.title('Distribution of Entity Types in Dataset')
plt.show()

# Plot pie chart
plt.figure(figsize=(7, 7))
plt.pie(entity_counts.values(), labels=entity_counts.keys(), autopct='%1.1f%%', colors=['skyblue', 'salmon', 'lightgreen'])
plt.title('Entity Types Proportion in Dataset')
plt.show()

In [None]:
# Fungsi untuk menghitung perbandingan jumlah sampel di antara dataset
def dataset_split_ratio(train_dataset, valid_dataset, test_dataset):
    # Hitung jumlah sampel di masing-masing dataset
    train_size = len(train_dataset)
    valid_size = len(valid_dataset)
    test_size = len(test_dataset)
    total_size = train_size + valid_size + test_size
    
    # Hitung persentase tiap dataset
    train_ratio = (train_size / total_size) * 100
    valid_ratio = (valid_size / total_size) * 100
    test_ratio = (test_size / total_size) * 100

    # Tampilkan hasil
    print(f"Total samples: {total_size}")
    print(f"Train samples: {train_size} ({train_ratio:.2f}%)")
    print(f"Valid samples: {valid_size} ({valid_ratio:.2f}%)")
    print(f"Test samples: {test_size} ({test_ratio:.2f}%)")

In [None]:
train_dataset = NerGritDataset(train_dataset_path, tokenizer, lowercase=True)
valid_dataset = NerGritDataset(valid_dataset_path, tokenizer, lowercase=True)
test_dataset = NerGritDataset(test_dataset_path, tokenizer, lowercase=True)

batch_size=8
# num_workers=16 #asli
num_workers=4
max_seq_len=512 #asli

train_loader = NerDataLoader(dataset=train_dataset, max_seq_len=max_seq_len, batch_size=batch_size, num_workers=num_workers, shuffle=True)  
valid_loader = NerDataLoader(dataset=valid_dataset, max_seq_len=max_seq_len, batch_size=batch_size, num_workers=num_workers, shuffle=False)  
test_loader = NerDataLoader(dataset=test_dataset, max_seq_len=max_seq_len, batch_size=batch_size, num_workers=num_workers, shuffle=False)

dataset_split_ratio(train_dataset, valid_dataset, test_dataset)

In [None]:
w2i, i2w = NerGritDataset.LABEL2INDEX, NerGritDataset.INDEX2LABEL
print(w2i)
print(i2w)

In [None]:
batch = next(iter(train_loader))
subword_batch, mask_batch, subword_to_word_indices_batch, seq_label_batch, seq_list = batch
seq_label_batch

# Test model on sample sentences

In [None]:
def word_subword_tokenize(sentence, tokenizer):
    # Add CLS token
    subwords = [tokenizer.cls_token_id]
    subword_to_word_indices = [-1] # For CLS

    # Add subwords
    for word_idx, word in enumerate(sentence):
        subword_list = tokenizer.encode(word, add_special_tokens=False)
        subword_to_word_indices += [word_idx for i in range(len(subword_list))]
        subwords += subword_list

    # Add last SEP token
    subwords += [tokenizer.sep_token_id]
    subword_to_word_indices += [-1]

    return subwords, subword_to_word_indices

In [None]:
text = word_tokenize('Bung Tomo adalah pahlawan nasional Republik Indonesia')
subwords, subword_to_word_indices = word_subword_tokenize(text, tokenizer)

subwords = torch.LongTensor(subwords).view(1, -1).to(model.device)
subword_to_word_indices = torch.LongTensor(subword_to_word_indices).view(1, -1).to(model.device)
logits = model(subwords, subword_to_word_indices)[0]

preds = torch.topk(logits, k=1, dim=-1)[1].squeeze().numpy()
labels = [i2w[preds[i]] for i in range(len(preds))]

pd.DataFrame({'words': text, 'label': labels})
print(len(subwords))

# Fine Tuning & Evaluation

In [None]:
lr=5e-6

optimizer = optim.AdamW(model.parameters(), lr=lr)
model = model.cuda()

In [None]:
!pip install xlsxwriter

import torch
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import itertools
import pandas as pd
from tqdm import tqdm
import numpy as np

# Initialize lists to store losses, accuracy, precision, recall, and F1 score
train_losses = []
valid_losses = []
train_accuracies = []
valid_accuracies = []
train_metrics = []
valid_metrics = []
confusion_matrices = []

# Hyperparameters
n_epochs = 10

# NER label mapping
i2w = {
    0: 'I-PER',
    1: 'B-ORG',
    2: 'I-ORG',
    3: 'B-LOC',
    4: 'I-LOC',
    5: 'O',
    6: 'B-PER'
}

# Membuat nama file dinamis berdasarkan lr dan batch_size
file_name = f'training_lr{lr}_bs{batch_size}_augmented.xlsx'.replace('.', '_')

for epoch in range(n_epochs):
    model.train()
    torch.set_grad_enabled(True)
    
    total_train_loss = 0
    list_hyp, list_label = [], []

    train_pbar = tqdm(train_loader, leave=True, total=len(train_loader))
    for i, batch_data in enumerate(train_pbar):
        # Forward model
        loss, batch_hyp, batch_label = forward_word_classification(model, batch_data[:-1], i2w=i2w, device='cuda')

        # Update model
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        tr_loss = loss.item()
        total_train_loss += tr_loss

        # Collect predictions and labels
        list_hyp += batch_hyp
        list_label += batch_label

        train_pbar.set_description(f"(Epoch {epoch+1}) TRAIN LOSS:{total_train_loss/(i+1):.4f} LR:{get_lr(optimizer):.8f}")

    # Calculate train metric using your ner_metrics_fn
    train_metric = ner_metrics_fn(list_hyp, list_label)
    train_accuracies.append(train_metric['F1'])
    train_metrics.append(train_metric)

    # Store training loss
    train_losses.append(total_train_loss / len(train_loader))

    # Evaluate on validation
    model.eval()
    torch.set_grad_enabled(False)
    
    total_loss = 0
    list_hyp, list_label = [], []

    pbar = tqdm(valid_loader, leave=True, total=len(valid_loader))
    for i, batch_data in enumerate(pbar):
        loss, batch_hyp, batch_label = forward_word_classification(model, batch_data[:-1], i2w=i2w, device='cuda')
        
        # Calculate total loss
        valid_loss = loss.item()
        total_loss += valid_loss

        # Collect predictions and labels
        list_hyp += batch_hyp
        list_label += batch_label
        pbar.set_description(f"VALID LOSS:{total_loss/(i+1):.4f}")

    # Calculate validation metric using your ner_metrics_fn
    valid_metric = ner_metrics_fn(list_hyp, list_label)
    valid_accuracies.append(valid_metric['F1'])
    valid_metrics.append(valid_metric)

    # Store validation loss
    valid_losses.append(total_loss / len(valid_loader))

    # Flatten predictions and true labels
    flat_list_hyp = list(itertools.chain(*list_hyp))
    flat_list_label = list(itertools.chain(*list_label))

    # Calculate confusion matrix for the current epoch
    cm = confusion_matrix(flat_list_label, flat_list_hyp)
    confusion_matrices.append(cm)

    # Print metrics for the current epoch
    print(f"(Epoch {epoch+1}) TRAIN METRICS: {train_metric}")
    print(f"(Epoch {epoch+1}) VALID METRICS: {valid_metric}")
    # print(f"Confusion Matrix for Epoch {epoch+1}:\n", cm)

# Save results to Excel with dynamic file name
with pd.ExcelWriter(file_name, engine='xlsxwriter') as writer:
    # Write accuracy, precision, recall, F1, and loss to Excel
    df_acc_loss = pd.DataFrame({
        'Epoch': range(1, n_epochs+1),
        'Train Accuracy': [m['ACC'] for m in train_metrics],
        'Validation Accuracy': [m['ACC'] for m in valid_metrics],
        'Train Precision': [m['PRE'] for m in train_metrics],
        'Validation Precision': [m['PRE'] for m in valid_metrics],
        'Train Recall': [m['REC'] for m in train_metrics],
        'Validation Recall': [m['REC'] for m in valid_metrics],
        'Train F1': [m['F1'] for m in train_metrics],
        'Validation F1': [m['F1'] for m in valid_metrics],
        'Train Loss': train_losses,
        'Validation Loss': valid_losses
    })
    df_acc_loss.to_excel(writer, sheet_name='Metrics', index=False)

    # Save confusion matrices to a separate sheet
    for i, cm in enumerate(confusion_matrices):
        df_cm = pd.DataFrame(cm, 
                           index=[i2w[i] for i in range(len(i2w))],
                           columns=[i2w[i] for i in range(len(i2w))])
        df_cm.to_excel(writer, sheet_name=f'Confusion_Matrix_Epoch_{i+1}')
    
    # Save confusion matrix plot as an image in the Excel file
    workbook = writer.book
    worksheet = workbook.add_worksheet('Confusion_Matrix_Plots')
    
    # Plot and save the confusion matrix of the last epoch
    plt.figure(figsize=(12, 9))
    
    # Create confusion matrix plot with proper labels
    last_cm_df = pd.DataFrame(confusion_matrices[-1],
                            index=[i2w[i] for i in range(len(i2w))],
                            columns=[i2w[i] for i in range(len(i2w))])
    
    # Plot heatmap with proper font sizes and rotation
    sns.heatmap(last_cm_df, annot=True, fmt="d", cmap="Blues",
                xticklabels=True, yticklabels=True)
    
    plt.xlabel("Predicted", fontsize=12)
    plt.ylabel("True", fontsize=12)
    plt.title(f"Confusion Matrix (Epoch {n_epochs})", fontsize=14, pad=20)
    
    # Rotate x-axis labels for better readability
    plt.xticks(rotation=45, ha='right')
    plt.yticks(rotation=0)
    
    # Adjust layout to prevent label cutoff
    plt.tight_layout()
    
    # Save plot as an image
    plt.savefig('confusion_matrix_last_epoch.png', bbox_inches='tight', dpi=300)
    
    # Insert the image into the Excel worksheet
    worksheet.insert_image('A1', 'confusion_matrix_last_epoch.png')

    # Save the accuracy and loss plots to the Excel file as well
    plt.figure(figsize=(14, 7))
    plt.subplot(1, 2, 1)
    plt.plot(range(1, n_epochs+1), [m['F1'] for m in train_metrics], label="Training F1-SCORE")
    plt.plot(range(1, n_epochs+1), [m['F1'] for m in valid_metrics], label="Validation F1-SCORE")
    plt.legend(loc="lower right")
    plt.title("Training and Validation F1-Score")

    plt.subplot(1, 2, 2)
    plt.plot(range(1, n_epochs+1), train_losses, label="Training Loss")
    plt.plot(range(1, n_epochs+1), valid_losses, label="Validation Loss")
    plt.legend(loc="upper right")
    plt.title("Training and Validation Loss")
    
    plt.savefig('accuracy_loss_plot.png')
    worksheet.insert_image('A20', 'accuracy_loss_plot.png')

In [None]:
from sklearn.metrics import confusion_matrix, classification_report
import pandas as pd
import itertools
import seaborn as sns
import matplotlib.pyplot as plt

# Model Evaluation on Test Data
model.eval()
torch.set_grad_enabled(False)

list_hyp, list_label = [], []

test_pbar = tqdm(test_loader, leave=True, total=len(test_loader))
for i, batch_data in enumerate(test_pbar):
    # Forward pass
    _, batch_hyp, batch_label = forward_word_classification(model, batch_data[:-1], i2w=i2w, device='cuda')

    # Collect predictions and labels
    list_hyp += batch_hyp
    list_label += batch_label

# Flatten predictions and true labels
flat_list_hyp = list(itertools.chain(*list_hyp))
flat_list_label = list(itertools.chain(*list_label))

# Konversi label string ke indeks numerik
flat_list_hyp_indices = [list(i2w.keys())[list(i2w.values()).index(label)] for label in flat_list_hyp]
flat_list_label_indices = [list(i2w.keys())[list(i2w.values()).index(label)] for label in flat_list_label]

# Handle missing labels in true labels (y_true)
missing_labels = set(i2w.keys()) - set(flat_list_label_indices)
for label in missing_labels:
    flat_list_label_indices.append(label)
    flat_list_hyp_indices.append(label)

# Calculate confusion matrix
cm = confusion_matrix(flat_list_label_indices, flat_list_hyp_indices, labels=list(i2w.keys()))

# Save predictions and true labels to a CSV file
df_test_result = pd.DataFrame({'True Labels': flat_list_label, 'Predicted Labels': flat_list_hyp})
df_test_result.to_csv('test_predictions.csv', index=False)

# Plot confusion matrix
plt.figure(figsize=(12, 9))
df_cm = pd.DataFrame(cm, 
                     index=[i2w[i] for i in range(len(i2w))], 
                     columns=[i2w[i] for i in range(len(i2w))])
sns.heatmap(df_cm, annot=True, fmt="d", cmap="Blues")

plt.title('Confusion Matrix on Test Data')
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.xticks(rotation=45)
plt.yticks(rotation=0)
plt.tight_layout()
plt.savefig('confusion_matrix_test.png')
plt.show()

# Print classification report
print("Classification Report:")
print(classification_report(flat_list_label_indices, flat_list_hyp_indices, target_names=[i2w[i] for i in range(len(i2w))]))


In [None]:
# # Evaluate on test
# model.eval()
# torch.set_grad_enabled(False)

# total_loss, total_correct, total_labels = 0, 0, 0
# list_hyp, list_label = [], []

# pbar = tqdm(test_loader, leave=True, total=len(test_loader))
# for i, batch_data in enumerate(pbar):
#     _, batch_hyp, _ = forward_word_classification(model, batch_data[:-1], i2w=i2w, device='cuda')
#     list_hyp += batch_hyp

# # Save prediction
# df = pd.DataFrame({'label':list_hyp}).reset_index()
# df.to_csv('pred.txt', index=False)

# print(df)

In [None]:
import os

# Simpan model, konfigurasi, dan optimizer setelah pelatihan
model_checkpoint_path = '/kaggle/working/fine_tuned_model_test1.pt'
torch.save({
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'config': config,
            }, model_checkpoint_path)

# Simpan tokenizer
tokenizer_directory = '/kaggle/working/tokenizer_directory1'
os.makedirs(tokenizer_directory, exist_ok=True)
tokenizer.save_pretrained(tokenizer_directory)

# Test fine-tuned model with sample sentences

In [None]:
text = word_tokenize('Bung Tomo adalah pahlawan nasional Republik Indonesia')
subwords, subword_to_word_indices = word_subword_tokenize(text, tokenizer)

subwords = torch.LongTensor(subwords).view(1, -1).to(model.device)
subword_to_word_indices = torch.LongTensor(subword_to_word_indices).view(1, -1).to(model.device)
logits = model(subwords, subword_to_word_indices)[0]

preds = torch.topk(logits, k=1, dim=-1)[1].squeeze().cpu().numpy()
labels = [i2w[preds[i]] for i in range(len(preds))]

pd.DataFrame({'words': text, 'label': labels})

In [None]:
text = word_tokenize('Alex berada di Gelora Bung Karno')
subwords, subword_to_word_indices = word_subword_tokenize(text, tokenizer)

subwords = torch.LongTensor(subwords).view(1, -1).to(model.device)
subword_to_word_indices = torch.LongTensor(subword_to_word_indices).view(1, -1).to(model.device)
logits = model(subwords, subword_to_word_indices)[0]

preds = torch.topk(logits, k=1, dim=-1)[1].squeeze().cpu().numpy()
labels = [i2w[preds[i]] for i in range(len(preds))]

pd.DataFrame({'words': text, 'label': labels})

In [None]:
import torch
import pandas as pd
from transformers import BertTokenizer
from nltk.tokenize import word_tokenize

# Fungsi untuk memuat model dan tokenizer
def load_model_and_tokenizer(model_path, tokenizer_path):
    checkpoint = torch.load(model_path)
    model = BertForWordClassification.from_pretrained('indobenchmark/indobert-base-p2', config=checkpoint['config'])
    model.load_state_dict(checkpoint['model_state_dict'])
    tokenizer = BertTokenizer.from_pretrained(tokenizer_path)
    return model, tokenizer

# Fungsi untuk memprediksi label
def predict_labels(text, model, tokenizer, i2w):
    # Tokenisasi teks
    words = word_tokenize(text)
    subwords, subword_to_word_indices = word_subword_tokenize(words, tokenizer)
    
    # Konversi token ke tensor PyTorch
    subwords_tensor = torch.LongTensor(subwords).view(1, -1)
    subword_to_word_indices = torch.LongTensor(subword_to_word_indices).view(1, -1)
    
    # Lakukan inferensi
    outputs = model(input_ids=subwords_tensor, subword_to_word_ids=subword_to_word_indices)
    
    # Ambil label yang diprediksi
    logits = outputs[0]
    preds = torch.argmax(logits, dim=2).squeeze().tolist()
    labels = [i2w[pred] for pred in preds]
    
    # Buat dataframe untuk menampilkan hasil
    df = pd.DataFrame({'words': words, 'label': labels})
    
    return df

# Load model dan tokenizer
model, tokenizer = load_model_and_tokenizer('/kaggle/working/fine_tuned_model_test1.pt', '/kaggle/working/tokenizer_directory1')

# Definisikan kamus label
# Sesuaikan dengan kamus yang Anda miliki
i2w = {0: 'I-PERSON', 1: 'B-ORGANISATION', 2: 'I-ORGANISATION', 3: 'B-PLACE', 4: 'I-PLACE', 5: 'O', 6: 'B-PERSON'}

# Meminta input dari pengguna
user_input = input("Masukkan teks yang ingin Anda prediksi labelnya: ")

# Prediksi label
predicted_labels_df = predict_labels(user_input, model, tokenizer, i2w)

# Tampilkan hasil
print(predicted_labels_df)

In [None]:
# !rm -rf /kaggle/working