In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!nvidia-smi

In [None]:
!pip install transformers

In [None]:
!pip install -q -U watermark

In [None]:
%reload_ext watermark
%watermark -v -p numpy,pandas,torch,transformers

# Making the necessary imports

In [None]:
import transformers
from transformers import XLNetTokenizer, XLNetModel, get_linear_schedule_with_warmup
from torch.optim import AdamW
import torch

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib import rc
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
from collections import defaultdict
from textwrap import wrap
from pylab import rcParams

from torch import nn, optim
from keras.preprocessing.sequence import pad_sequences
from torch.utils.data import TensorDataset,RandomSampler,SequentialSampler
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

In [None]:
%matplotlib inline
%config InlineBackend.figure_format='retina'

sns.set(style='whitegrid', palette='muted', font_scale=1.2)

HAPPY_COLORS_PALETTE = ["#01BEFE", "#FFDD00", "#FF7D00", "#FF006D", "#ADFF02", "#8F00FF"]

sns.set_palette(sns.color_palette(HAPPY_COLORS_PALETTE))

rcParams['figure.figsize'] = 12, 8

RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

# Data Preprocessing

In [None]:
df = pd.read_csv('drive/MyDrive/Data/dataset.csv')
df.head()

In [None]:
from sklearn.utils import shuffle
df = shuffle(df)
df.head(20)

In [None]:
df = df[:20000]
len(df)

In [None]:
import re
def clean_text(text):
    text = re.sub(r'https?://\S+', '', text)  # Menghapus link
    text = re.sub(r'<.*?>', '', text)  # Menghapus tag HTML
    text = re.sub(r'(&#\d+;|&[a-zA-Z]+;)', '', text)  # Menghapus entitas ASCII dan HTML
    # text = re.sub(r'[?@#]', '', text)  # Menghapus karakter ?, @, #
    text = re.sub(r'[?]', '', text)  # Menghapus tanda tanya (?)
    text = re.sub(r'@[\w]*', '', text)  # Menghapus @ beserta teksnya
    text = re.sub(r'#[\w]*', '', text)  # Menghapus # beserta teksnya
    text = re.sub(r'&#\d+;', '', text)  # Menghapus tag emoji dalam HTML (contoh: &#128512;)
    text = text.strip()  # Menghapus spasi berlebih
    return text

In [None]:
df['text'] = df['text'].apply(clean_text)

In [None]:
rcParams['figure.figsize'] = 8, 6
sns.countplot(df.sentiment)
plt.xlabel('review score');

In [None]:
def sentiment2label(sentiment):
    if sentiment == "positive":
        return 1
    elif sentiment == "negative":
        return 2
    else :
        return 0

df['sentiment'] = df['sentiment'].apply(sentiment2label)

In [None]:
df['sentiment'].value_counts()

In [None]:
class_names = ['neutral', 'positive', 'negative']

# Playing with XLNetTokenizer

In [None]:
from transformers import XLNetTokenizer, XLNetModel
PRE_TRAINED_MODEL_NAME = 'xlnet-base-cased'
tokenizer = XLNetTokenizer.from_pretrained(PRE_TRAINED_MODEL_NAME)

In [None]:
input_txt = "Pemerintahan Prabowo dan Gibran"
# encodings = tokenizer.encode_plus(input_txt, add_special_tokens=True, max_length=16, return_tensors='pt', return_token_type_ids=False, return_attention_mask=True, pad_to_max_length=False)
encodings = tokenizer.encode_plus(input_txt, add_special_tokens=True, max_length=16, truncation=True, return_tensors='pt', return_token_type_ids=False, return_attention_mask=True, padding='max_length')

In [None]:
print('input_ids : ',encodings['input_ids'])

In [None]:
tokenizer.convert_ids_to_tokens(encodings['input_ids'][0])

In [None]:
type(encodings['attention_mask'])

In [None]:
attention_mask = pad_sequences(encodings['attention_mask'], maxlen=512, dtype=torch.Tensor ,truncating="post",padding="post")

In [None]:
attention_mask = attention_mask.astype(dtype = 'int64')
attention_mask = torch.tensor(attention_mask)
attention_mask.flatten()

In [None]:
encodings['input_ids']

# Checking the distribution  of token lengths

In [None]:
token_lens = []

for txt in df['text']:
  tokens = tokenizer.encode(txt, max_length=512, truncation=True)
  token_lens.append(len(tokens))

In [None]:
sns.displot(token_lens)
plt.xlim([0, 1024]);
plt.xlabel('Token count');

In [None]:
MAX_LEN = 512

# Custom Dataset class

In [None]:
class GovermentDataset(Dataset):

    def __init__(self, texts, targets, tokenizer, max_len):
        self.texts = texts
        self.targets = targets
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, item):
        text = str(self.texts[item])
        target = self.targets[item]

        encoding = self.tokenizer.encode_plus(
        text,
        add_special_tokens=True,
        max_length=self.max_len,
        return_token_type_ids=False,
        padding='max_length',
        return_attention_mask=True,
        return_tensors='pt',
        truncation=True
        )

        input_ids = pad_sequences(encoding['input_ids'], maxlen=MAX_LEN, dtype=torch.Tensor ,truncating="post",padding="post")
        input_ids = input_ids.astype(dtype = 'int64')
        input_ids = torch.tensor(input_ids)

        attention_mask = pad_sequences(encoding['attention_mask'], maxlen=MAX_LEN, dtype=torch.Tensor ,truncating="post",padding="post")
        attention_mask = attention_mask.astype(dtype = 'int64')
        attention_mask = torch.tensor(attention_mask)

        return {
        'text': text,
        'input_ids': input_ids,
        'attention_mask': attention_mask.flatten(),
        'targets': torch.tensor(target, dtype=torch.long)
        }

In [None]:
# class GovermentDataset(Dataset):

#     def __init__(self, text, targets, tokenizer, max_len):
#         self.text = text
#         self.targets = targets
#         self.tokenizer = tokenizer
#         self.max_len = max_len

#     def __len__(self):
#         return len(self.text)

#     def __getitem__(self, item):
#         text = str(self.text[item])
#         target = self.targets[item]

#         encoding = self.tokenizer.encode_plus(
#         text,
#         add_special_tokens=True,
#         max_length=self.max_len,
#         return_token_type_ids=False,
#         pad_to_max_length=True, # Changed to True to enable automatic padding
#         return_attention_mask=True,
#         return_tensors='pt',
#         )

#         # Removed the manual padding since pad_to_max_length=True is used
#         # input_ids = pad_sequences(encoding['input_ids'], maxlen=MAX_LEN, dtype=torch.Tensor ,truncating="post",padding="post")
#         # input_ids = input_ids.astype(dtype = 'int64')
#         # input_ids = torch.tensor(input_ids)
#         input_ids = encoding['input_ids'].squeeze(0) # Squeezing to remove extra dimension

#         # Removed the manual padding since pad_to_max_length=True is used
#         # attention_mask = pad_sequences(encoding['attention_mask'], maxlen=MAX_LEN, dtype=torch.Tensor ,truncating="post",padding="post")
#         # attention_mask = attention_mask.astype(dtype = 'int64')
#         # attention_mask = torch.tensor(attention_mask)
#         attention_mask = encoding['attention_mask'].squeeze(0) # Squeezing to remove extra dimension

#         return {
#         'text': text,
#         'input_ids': input_ids,
#         'attention_mask': attention_mask, # Removed flatten
#         'targets': torch.tensor(target, dtype=torch.long)
#         }

In [None]:
# df_train, df_test = train_test_split(df, test_size=0.5, random_state=101)
# df_val, df_test = train_test_split(df_test, test_size=0.5, random_state=101)

# df_train, df_test = train_test_split(df, test_size=0.3, random_state=101)
# df_val, df_test = train_test_split(df_test, test_size=0.3, random_state=101)

df_train, df_test = train_test_split(df, test_size=0.4, random_state=1)
df_val, df_test = train_test_split(df_test, test_size=0.4, random_state=1)

In [None]:
df_train.shape, df_val.shape, df_test.shape

# Custom Dataloader

In [None]:
def create_data_loader(df, tokenizer, max_len, batch_size):
  ds = GovermentDataset(
    texts=df.text.to_numpy(),
    targets=df.sentiment.to_numpy(),
    tokenizer=tokenizer,
    max_len=max_len
  )

  return DataLoader(
    ds,
    batch_size=batch_size,
    # num_workers=4
    num_workers=2
  )

In [None]:
BATCH_SIZE = 4
# BATCH_SIZE = 8

train_data_loader = create_data_loader(df_train, tokenizer, MAX_LEN, BATCH_SIZE)
val_data_loader = create_data_loader(df_val, tokenizer, MAX_LEN, BATCH_SIZE)
test_data_loader = create_data_loader(df_test, tokenizer, MAX_LEN, BATCH_SIZE)

# Loading the Pre-trained XLNet model for sequence classification from huggingface transformers

In [None]:
from transformers import XLNetForSequenceClassification

model = XLNetForSequenceClassification.from_pretrained('xlnet-base-cased', num_labels = 3)
model = model.to(device)

In [None]:
model

# Setting Hyperparameters

In [None]:
EPOCHS = 5

param_optimizer = list(model.named_parameters())
no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight']
optimizer_grouped_parameters = [
                                {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01},
                                {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay':0.0}
]
# optimizer = AdamW(optimizer_grouped_parameters, lr=3e-5)
# optimizer = torch.optim.AdamW(optimizer_grouped_parameters, lr=3e-5)
optimizer = torch.optim.AdamW(optimizer_grouped_parameters, lr=3e-6)

total_steps = len(train_data_loader) * EPOCHS

scheduler = get_linear_schedule_with_warmup(
  optimizer,
  num_warmup_steps=0,
  num_training_steps=total_steps
)

# Sanity check with one batch

In [None]:
data = next(iter(val_data_loader))
data.keys()

In [None]:
input_ids = data['input_ids'].to(device)
attention_mask = data['attention_mask'].to(device)
targets = data['targets'].to(device)
# print(input_ids.reshape(8, 512).shape) # batch size x seq length
print(input_ids.reshape(4, 512).shape)
print(attention_mask.shape) # batch size x seq length

In [None]:
input_ids[0]

In [None]:
# outputs = model(input_ids.reshape(8, 512), token_type_ids=None, attention_mask=attention_mask, labels=targets)
# outputs

outputs = model(input_ids.reshape(4, 512), token_type_ids=None, attention_mask=attention_mask, labels=targets)
outputs

In [None]:
type(outputs[0])

# Defining the training step function

In [None]:
from sklearn import metrics
def train_epoch(model, data_loader, optimizer, device, scheduler, n_examples):
    model = model.train()
    losses = []
    acc = 0
    counter = 0

    for d in data_loader:
        # Get the actual batch size
        batch_size = d["input_ids"].shape[0]
        # Reshape according to the actual batch size
        input_ids = d["input_ids"].reshape(batch_size, 512).to(device) # Reshape based on actual batch size
        attention_mask = d["attention_mask"].to(device)
        targets = d["targets"].to(device)

        outputs = model(input_ids=input_ids, token_type_ids=None, attention_mask=attention_mask, labels = targets)
        loss = outputs[0]
        logits = outputs[1]

        # preds = preds.cpu().detach().numpy()
        _, prediction = torch.max(outputs[1], dim=1)
        targets = targets.cpu().detach().numpy()
        prediction = prediction.cpu().detach().numpy()
        accuracy = metrics.accuracy_score(targets, prediction)

        acc += accuracy
        losses.append(loss.item())

        loss.backward()

        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()
        counter = counter + 1

    return acc / counter, np.mean(losses)

In [None]:
# from sklearn import metrics

# def eval_model(model, data_loader, device, scheduler, n_examples):
#     """
#     Evaluates the model on the given data loader.

#     Args:
#         model: The model to evaluate.
#         data_loader: The data loader to use for evaluation.
#         device: The device to use for evaluation.
#         scheduler: The learning rate scheduler.
#         n_examples: The number of examples in the data loader.

#     Returns:
#         A tuple containing the accuracy and loss of the model on the data loader.
#     """
#     model = model.eval()
#     losses = []
#     acc = 0
#     counter = 0

#     with torch.no_grad():
#         for d in data_loader:
#             # Get the actual batch size
#             batch_size = d["input_ids"].shape[0]
#             # Reshape according to the actual batch size
#             input_ids = d["input_ids"].reshape(batch_size, 512).to(device)
#             attention_mask = d["attention_mask"].to(device)
#             targets = d["targets"].to(device)

#             outputs = model(input_ids=input_ids, token_type_ids=None, attention_mask=attention_mask, labels=targets)
#             loss = outputs[0]
#             logits = outputs[1]

#             _, prediction = torch.max(outputs[1], dim=1)
#             targets = targets.cpu().detach().numpy()
#             prediction = prediction.cpu().detach().numpy()
#             accuracy = metrics.accuracy_score(targets, prediction)

#             acc += accuracy
#             losses.append(loss.item())
#             counter += 1

#     return acc / counter, np.mean(losses)

# Defining the evaluation function

In [None]:
def eval_model(model, data_loader, device, n_examples):
    model = model.eval()
    losses = []
    acc = 0
    counter = 0

    with torch.no_grad():
        for d in data_loader:
            # Get the actual batch size
            batch_size = d["input_ids"].shape[0]  # Get actual batch size
            # Reshape according to the actual batch size
            input_ids = d["input_ids"].reshape(batch_size, 512).to(device) # Reshape based on actual batch size
            attention_mask = d["attention_mask"].to(device)
            targets = d["targets"].to(device)

            outputs = model(input_ids=input_ids, token_type_ids=None, attention_mask=attention_mask, labels = targets)
            loss = outputs[0]
            logits = outputs[1]

            _, prediction = torch.max(outputs[1], dim=1)
            targets = targets.cpu().detach().numpy()
            prediction = prediction.cpu().detach().numpy()
            accuracy = metrics.accuracy_score(targets, prediction)

            acc += accuracy
            losses.append(loss.item())
            counter += 1

    return acc / counter, np.mean(losses)

# Fine-tuning the pre-trained model

In [None]:
%%time
import os

history = defaultdict(list)
best_accuracy = 0

for epoch in range(EPOCHS):
    print(f'Epoch {epoch + 1}/{EPOCHS}')
    print('-' * 10)

    train_acc, train_loss = train_epoch(
        model,
        train_data_loader,
        optimizer,
        device,
        scheduler,
        len(df_train)
    )

    print(f'Train loss {train_loss} Train accuracy {train_acc}')

    val_acc, val_loss = eval_model(
        model,
        val_data_loader,
        device,
        len(df_val)
    )

    print(f'Val loss {val_loss} Val accuracy {val_acc}')
    print()

    history['train_acc'].append(train_acc)
    history['train_loss'].append(train_loss)
    history['val_acc'].append(val_acc)
    history['val_loss'].append(val_loss)

    if val_acc > best_accuracy:
        # Create the directory if it doesn't exist
        os.makedirs(os.path.dirname('/content/drive/My Drive/NLP/Sentiment Analysis Series/models/xlnet_model.bin'), exist_ok=True)
        torch.save(model.state_dict(), '/content/drive/My Drive/NLP/Sentiment Analysis Series/models/xlnet_model.bin')
        best_accuracy = val_acc

# Evaluation of the fine-tuned model

In [None]:
# model.load_state_dict(torch.load('/content/drive/My Drive/NLP/Sentiment Analysis Series/models/xlnet_model.bin'))

model.load_state_dict(torch.load('/content/drive/My Drive/NLP/Sentiment Analysis Series/models/xlnet_model.bin', weights_only=True))

In [None]:
model = model.to(device)

In [None]:
test_acc, test_loss = eval_model(
  model,
  test_data_loader,
  device,
  len(df_test)
)

print('Test Accuracy :', test_acc)
print('Test Loss :', test_loss)

In [None]:
def get_predictions(model, data_loader):
    model = model.eval()

    review_texts = []
    predictions = []
    prediction_probs = []
    real_values = []

    with torch.no_grad():
        for d in data_loader:

            texts = d["text"]
            # input_ids = d["input_ids"].reshape(4,512).to(device)
            # attention_mask = d["attention_mask"].to(device)
            # targets = d["targets"].to(device)

            # Get the actual batch size
            batch_size = d["input_ids"].shape[0]

            # Reshape based on the actual batch size
            input_ids = d["input_ids"].reshape(batch_size, 512).to(device)
            attention_mask = d["attention_mask"].to(device)
            targets = d["targets"].to(device)

            outputs = model(input_ids=input_ids, token_type_ids=None, attention_mask=attention_mask, labels = targets)

            loss = outputs[0]
            logits = outputs[1]

            _, preds = torch.max(outputs[1], dim=1)

            probs = F.softmax(outputs[1], dim=1)

            review_texts.extend(texts)
            predictions.extend(preds)
            prediction_probs.extend(probs)
            real_values.extend(targets)

    predictions = torch.stack(predictions).cpu()
    prediction_probs = torch.stack(prediction_probs).cpu()
    real_values = torch.stack(real_values).cpu()
    return review_texts, predictions, prediction_probs, real_values

In [None]:
y_review_texts, y_pred, y_pred_probs, y_test = get_predictions(
  model,
  test_data_loader
)

In [None]:
from sklearn.metrics import classification_report

print(classification_report(y_test, y_pred, target_names=class_names, zero_division=1))

In [None]:
cm = confusion_matrix(y_test, y_pred)

In [None]:
def plot_confusion_matrix(cm, class_names):
    plt.figure(figsize=(6, 5))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title('Confusion Matrix')
    plt.show()

# Nama label (Netral, Positif, Negatif)
class_names = ["Neutral", "Positive", "Negative"]
plot_confusion_matrix(cm, class_names)

In [None]:
print(classification_report(y_test, y_pred, target_names=class_names))

# Custom prediction function on raw text

In [None]:
def predict_sentiment(text):
    review_text = text

    encoded_review = tokenizer.encode_plus(
    review_text,
    max_length=MAX_LEN,
    add_special_tokens=True,
    return_token_type_ids=False,
    padding='max_length',
    return_attention_mask=True,
    return_tensors='pt',
    )

    input_ids = pad_sequences(encoded_review['input_ids'], maxlen=MAX_LEN, dtype=torch.Tensor ,truncating="post",padding="post")
    input_ids = input_ids.astype(dtype = 'int64')
    input_ids = torch.tensor(input_ids)

    attention_mask = pad_sequences(encoded_review['attention_mask'], maxlen=MAX_LEN, dtype=torch.Tensor ,truncating="post",padding="post")
    attention_mask = attention_mask.astype(dtype = 'int64')
    attention_mask = torch.tensor(attention_mask)

    input_ids = input_ids.reshape(1,512).to(device)
    attention_mask = attention_mask.to(device)

    outputs = model(input_ids=input_ids, attention_mask=attention_mask)

    outputs = outputs[0][0].cpu().detach()

    probs = F.softmax(outputs, dim=-1).cpu().detach().numpy().tolist()
    _, prediction = torch.max(outputs, dim =-1)

    print("Neutral score:", probs[0])
    print("Positive score:", probs[1])
    print("Negative score:", probs[2])
    print(f'Review Text: {review_text}')
    print(f'Sentiment  : {class_names[prediction]}')

In [None]:
text = "Pemerintahan Prabowo dan Gibran membawa angin segar bagi Indonesia dengan kebijakan inovatif yang fokus pada kemajuan ekonomi, pendidikan, dan kesejahteraan rakyat."
predict_sentiment(text)

In [None]:
text = "Pemerintahan Prabowo dan Gibran dianggap gagal total dalam menepati janji kampanye. Banyak program tidak jelas arahnya dan rakyat merasa kecewa dengan hasil kerja yang jauh dari harapan."
predict_sentiment(text)

In [None]:
text = "Apakah kinerja 100 hari Pemerintahan Prabowo dan Gibran sesuai dengan visi dan misinya?"
predict_sentiment(text)

# Pytorch

In [None]:
from transformers import XLNetForSequenceClassification, XLNetTokenizer

# Misal: setelah fine-tuning XLNet
model = XLNetForSequenceClassification.from_pretrained("xlnet-base-cased")
tokenizer = XLNetTokenizer.from_pretrained("xlnet-base-cased")

# torch.save(model.state_dict(), "saved_xlnet//pytorch_model.bin")
# model.config.to_json_file("saved_xlnet//config.json")
# tokenizer.save_pretrained("saved_xlnet//")

# Simpan model dan tokenizer
model.save_pretrained("saved_xlnet/")
tokenizer.save_pretrained("saved_xlnet/")

In [None]:
torch.save(model.state_dict(), "saved_xlnet/pytorch_model.bin")

In [None]:
import os
print("Isi folder test_bert:", os.listdir("saved_xlnet/"))

In [None]:
from transformers import XLNetForSequenceClassification, XLNetConfig
import torch

# Buat konfigurasi baru dengan 3 label
config = XLNetConfig.from_pretrained("xlnet-base-cased")
config.num_labels = 3

# Buat model dengan config yang sudah dimodifikasi
model = XLNetForSequenceClassification(config)

# Load bobot hasil training
model.load_state_dict(torch.load('/content/drive/My Drive/NLP/Sentiment Analysis Series/models/xlnet_model.bin', map_location='cpu'))

# Cek ulang
print(model.config.num_labels)  # HARUS 3


config.id2label = {
    "0": "positive",
    "1": "neutral",
    "2": "negative"
}
config.label2id = {
    "positive": 0,
    "neutral": 1,
    "negative": 2
}

In [None]:
config.save_pretrained("/content/drive/My Drive/NLP/Sentiment Analysis Series/models/xlnet_model_fixed")

In [None]:
print("Jumlah label:", model.config.num_labels)

print("id2label:", model.config.id2label)
print("label2id:", model.config.label2id)