# Task - PREPARE DATASET
Mount Google Drive, load the CSV file "/content/drive/MyDrive/Colab Notebooks/sentiment-analysis-training-data.csv" into a pandas DataFrame, and display the first few rows of the DataFrame.

## Mount Google Drive

Hubungkan Google Drive ke sesi Colab saat ini.


In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

## Load CSV File

Muat file CSV dari path yang ditentukan ke dalam pandas DataFrame.


In [None]:
import pandas as pd
try:
    df = pd.read_csv('https://github.com/erlanggadewasakti/Prinsip-Sains-Data/releases/download/prod/sa-psd-dataset.csv')
    print("Dataset berhasil dimuat.")
except FileNotFoundError:
    print("Error: File dataset tidak ditemukan. Pastikan path file sudah benar.")
except Exception as e:
    print(f"Terjadi error saat membaca file: {e}")

# Task - EDA
Perform a comprehensive sentiment analysis EDA on the "ecinstruct-sentiment-analysis-samples.csv" dataset, focusing on visualizations and insights. The analysis should include data loading and inspection, data preprocessing (extracting sentiment labels and cleaning input text), target distribution analysis (sentiment countplot), text statistics analysis (character length, word count, average word length with histograms and boxplots), content analysis (N-grams with bar plots), visual analysis (word clouds for all text and per sentiment category), and a summary of key insights. Use the required libraries: pandas, matplotlib.pyplot, seaborn, nltk, and wordcloud.

## Muat & Inspeksi Data

Muat data dari file CSV dan tampilkan informasi dasar seperti tipe data, beberapa baris pertama, dan jumlah nilai yang hilang.


In [None]:
display(df.head())
df.info()
print("\nJumlah nilai yang hilang per kolom:")
print(df.isnull().sum())

## Pra-pemrosesan Data

Buat kolom baru untuk label sentimen yang bersih dan teks input yang sudah dibersihkan (lowercase, tanpa tanda baca, tanpa angka).


In [None]:
import re

# Extract sentiment labels
df['sentiment'] = df['output'].str.replace(r'^[A-E]:\s*', '', regex=True)

# Clean the input text
df['cleaned_input'] = df['input'].str.lower()
# df['cleaned_input'] = df['cleaned_input'].str.replace(r'[^\w\s]', '', regex=True) # Remove punctuation
# df['cleaned_input'] = df['cleaned_input'].str.replace(r'\d+', '', regex=True) # Remove numbers

df['sentiment'] = df['sentiment'].map({'very positive' : 'positive', 'very negative' : 'negative','positive':'positive','negative':'negative','neutral':'neutral'})

# Display the first few rows with new columns
display(df[['output', 'sentiment', 'input', 'cleaned_input']].head())

## Analisis Distribusi Sentimen

Visualisasikan distribusi sentimen menggunakan diagram batang dan laporkan ketidakseimbangan data jika ada.


In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

sentiment_colors = {'positive': 'green', 'negative': 'orange', 'neutral': 'blue'}


plt.figure(figsize=(8, 6))
ax = sns.countplot(x='sentiment', data=df, hue='sentiment', palette=sentiment_colors, legend=False)

# Tambahkan keterangan angka di atas setiap bar
for container in ax.containers:
    ax.bar_label(container, fmt='%d')

plt.title('Sentiment Distribution')
plt.xlabel('Sentiment')
plt.ylabel('Count')
plt.show()

## Analisis Statistik Teks

Hitung panjang karakter, jumlah kata, dan rata-rata panjang kata untuk setiap ulasan. Visualisasikan distribusi statistik teks ini menggunakan histogram dan boxplot berdasarkan sentimen.


In [None]:
df['char_length'] = df['cleaned_input'].str.len()
df['word_count'] = df['cleaned_input'].str.split().str.len()
df['avg_word_length'] = df['char_length'] / df['word_count']
df['avg_word_length'] = df['avg_word_length'].fillna(0) # Handle division by zero for empty strings

display(df[['cleaned_input', 'char_length', 'word_count', 'avg_word_length']].head())

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Histograms for text statistics by sentiment
text_stats = ['char_length', 'word_count', 'avg_word_length']
for stat in text_stats:
    plt.figure(figsize=(10, 6))
    sns.histplot(data=df, x=stat, hue='sentiment', kde=True, multiple='stack', palette=sentiment_colors)
    plt.title(f'Distribution of {stat} by Sentiment')
    plt.xlabel(stat)
    plt.ylabel('Frequency')
    plt.show()

In [None]:
# Boxplots for text statistics by sentiment
text_stats = ['char_length', 'word_count', 'avg_word_length']
for stat in text_stats:
    plt.figure(figsize=(10, 6))
    sns.boxplot(data=df, x='sentiment', y=stat,palette=sentiment_colors)
    plt.title(f'Distribution of {stat} by Sentiment')
    plt.xlabel('Sentiment')
    plt.ylabel(stat)
    plt.show()

## Analisis Konten (N-grams)

Hapus stopwords dan hitung frekuensi unigram, bigram, dan trigram dari teks yang sudah dibersihkan. Visualisasikan 20 n-gram teratas untuk setiap kategori.


In [None]:
import nltk
from sklearn.feature_extraction.text import CountVectorizer
from nltk.corpus import stopwords
import matplotlib.pyplot as plt
import seaborn as sns

try:
    stopwords_english = stopwords.words('english')
except LookupError:
    nltk.download('stopwords')
    stopwords_english = stopwords.words('english')

print("Libraries imported and stopwords downloaded.")

In [None]:
ngram_ranges = [(1, 1), (2, 2), (3, 3)]
sentiments = df['sentiment'].unique()


for sentiment in sentiments:
    print(f"Analyzing sentiment: {sentiment}")
    sentiment_df = df[df['sentiment'] == sentiment]
    cleaned_text = sentiment_df['cleaned_input'].dropna() # Drop NaN values

    if cleaned_text.empty:
        print(f"No cleaned text available for sentiment: {sentiment}")
        continue

    current_sentiment_color = sentiment_colors.get(sentiment, 'gray') # Default to gray if sentiment not found

    for n_range in ngram_ranges:
        print(f"  Analyzing {n_range}-grams")
        vectorizer = CountVectorizer(ngram_range=n_range, stop_words=stopwords_english)
        try:
            X = vectorizer.fit_transform(cleaned_text)
        except ValueError as e:
            print(f"  Could not fit vectorizer for {n_range}-grams and sentiment {sentiment}: {e}")
            continue

        sum_words = X.sum(axis=0)
        words_freq = [(word, sum_words[0, idx]) for word, idx in vectorizer.vocabulary_.items()]
        words_freq = sorted(words_freq, key=lambda x: x[1], reverse=True)
        top_ngrams = words_freq[:20]

        if not top_ngrams:
            print(f"  No {n_range}-grams found for sentiment: {sentiment}")
            continue

        top_ngrams_df = pd.DataFrame(top_ngrams, columns=['ngram', 'count'])

        plt.figure(figsize=(10, 6))
        sns.barplot(x='count', y='ngram', data=top_ngrams_df, color=current_sentiment_color)
        plt.title(f'Top 20 {n_range}-grams for Sentiment: {sentiment}')
        plt.xlabel('Count')
        plt.ylabel(f'{n_range}-gram')
        plt.tight_layout()
        plt.show()

print("N-gram analysis complete.")

## Analisis Visual (Word Clouds)

Buat word cloud dari semua teks yang sudah dibersihkan dan word cloud terpisah untuk setiap kategori sentimen.


In [None]:
from wordcloud import WordCloud
import matplotlib.pyplot as plt
from collections import Counter
import nltk
from nltk.corpus import stopwords

# Download stopwords if not already downloaded
try:
    stopwords_english = stopwords.words('english')
except LookupError:
    nltk.download('stopwords')
    stopwords_english = stopwords.words('english')


# Word cloud for all cleaned text
all_text = ' '.join(df['cleaned_input'].dropna())
wordcloud = WordCloud(width=800, height=400, background_color='white', stopwords=stopwords_english).generate(all_text)

plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.title('Word Cloud of All Text (excluding stopwords)')
plt.show()

# Identify overall most common words (excluding stopwords)
all_words = all_text.split()
all_words = [word for word in all_words if word not in stopwords_english]
overall_word_counts = Counter(all_words)
# Get the 50 most common words across all sentiments
most_common_overall = set([word for word, count in overall_word_counts.most_common(50)])


# Word clouds per sentiment category, excluding overall most common words
sentiments = df['sentiment'].unique()

for sentiment in sentiments:
    sentiment_text = ' '.join(df[df['sentiment'] == sentiment]['cleaned_input'].dropna())
    if sentiment_text:
        # Remove overall most common words from sentiment-specific text
        sentiment_words = sentiment_text.split()
        sentiment_words_filtered = [word for word in sentiment_words if word not in most_common_overall and word not in stopwords_english]
        filtered_sentiment_text = ' '.join(sentiment_words_filtered)

        if filtered_sentiment_text:
            wordcloud = WordCloud(width=800, height=400, background_color='white', stopwords=stopwords_english).generate(filtered_sentiment_text)
            plt.figure(figsize=(10, 5))
            plt.imshow(wordcloud, interpolation='bilinear')
            plt.axis('off')
            plt.title(f'Word Cloud for Sentiment: {sentiment} (excluding overall common words)')
            plt.show()
        else:
             print(f"No significant words remaining for sentiment: {sentiment} after filtering.")

    else:
        print(f"No cleaned text available for sentiment: {sentiment}")

# Task - PREPROCESSING DATASET

## Pembersihan Teks Lanjutan untuk Kompatibilitas LLM

Lakukan pembersihan teks lebih lanjut pada kolom 'cleaned_input' dengan menghapus URL, tag HTML, karakter khusus, dan menormalisasi spasi putih. Tampilkan beberapa contoh teks yang sudah dibersihkan.


In [None]:
import re

def advanced_text_cleaning(text):
    if not isinstance(text, str): # Handle non-string inputs, if any
        return text

    # 1. Remove URLs
    text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)

    # 2. Remove HTML tags
    text = re.sub(r'<.*?>', '', text)

    # 3. Remove special characters (keep letters, numbers, and spaces)
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)

    # 4. Normalize whitespace (replace multiple spaces with single, strip leading/trailing)
    text = re.sub(r'\s+', ' ', text).strip()

    return text

# Apply the cleaning function to the 'cleaned_input' column
df['further_cleaned_input'] = df['cleaned_input'].apply(advanced_text_cleaning)

# Display the first few rows with the relevant columns to verify
display(df[['input', 'cleaned_input', 'further_cleaned_input']].head())

## Encoding Label Sentimen

Ubah label sentimen kategorikal ('positive', 'negative', 'neutral') menjadi representasi numerik. Tampilkan pemetaan dan DataFrame yang diperbarui.


In [None]:
sentiment_mapping = {'positive': 2, 'neutral': 1, 'negative': 0}
df['sentiment_encoded'] = df['sentiment'].map(sentiment_mapping)

print("Sentiment Mapping:")
print(sentiment_mapping)

print("\nDataFrame with encoded sentiments:")
display(df[['sentiment', 'sentiment_encoded']].head())

## Pembagian Data Training, Validation, dan Testing

Bagi DataFrame menjadi set pelatihan (70%), validasi (15%), dan pengujian (15%) menggunakan `train_test_split` dengan stratifikasi untuk menjaga distribusi sentimen. Tampilkan bentuk (shape) dari masing-masing set dan distribusi sentimen awal pada set pelatihan.


In [None]:
from sklearn.model_selection import train_test_split

# Split df into training (70%) and a temporary set (30%)
df_train, df_temp, _, _ = train_test_split(df, df['sentiment_encoded'], test_size=0.3, random_state=42, stratify=df['sentiment_encoded'])

# Split the temporary set into validation (15%) and test (15%)
# test_size=0.5 because 0.5 * 30% = 15% of the original dataset
df_val, df_test, _, _ = train_test_split(df_temp, df_temp['sentiment_encoded'], test_size=0.5, random_state=42, stratify=df_temp['sentiment_encoded'])

print("Shape of training set:", df_train.shape)
print("Shape of validation set:", df_val.shape)
print("Shape of test set:", df_test.shape)

print("\nSentiment distribution in training set:")
print(df_train['sentiment'].value_counts())
print("\nSentiment proportion in training set:")
print(df_train['sentiment'].value_counts(normalize=True))

In [None]:
# Create data for the barplot before oversampling
split_counts_before_oversampling = pd.DataFrame({
  'Split': ['Training', 'Training', 'Training', 'Validation', 'Validation', 'Validation', 'Testing', 'Testing', 'Testing'],
  'Sentiment': ['positive', 'neutral', 'negative', 'positive', 'neutral', 'negative', 'positive', 'neutral', 'negative'],
  'Count': [
    len(df_train[df_train['sentiment'] == 'positive']),
    len(df_train[df_train['sentiment'] == 'neutral']),
    len(df_train[df_train['sentiment'] == 'negative']),
    len(df_val[df_val['sentiment'] == 'positive']),
    len(df_val[df_val['sentiment'] == 'neutral']),
    len(df_val[df_val['sentiment'] == 'negative']),
    len(df_test[df_test['sentiment'] == 'positive']),
    len(df_test[df_test['sentiment'] == 'neutral']),
    len(df_test[df_test['sentiment'] == 'negative'])
  ]
})

# Create the barplot
plt.figure(figsize=(12, 6))
ax = sns.barplot(x='Split', y='Count', hue='Sentiment', data=split_counts_before_oversampling, palette=sentiment_colors)

# Add value labels on top of each bar
for container in ax.containers:
  ax.bar_label(container, fmt='%d')

plt.title('Distribution of Data Across Training, Validation, and Testing Sets (Before Oversampling)')
plt.xlabel('Data Split')
plt.ylabel('Count')
plt.legend(title='Sentiment')
plt.show()

# Print summary statistics
print("\nSummary Before Oversampling:")
print("="*50)
print("\nTraining Set:")
print(df_train['sentiment'].value_counts().sort_index())
print(f"Total: {len(df_train)}")

print("\nValidation Set:")
print(df_val['sentiment'].value_counts().sort_index())
print(f"Total: {len(df_val)}")

print("\nTesting Set:")
print(df_test['sentiment'].value_counts().sort_index())
print(f"Total: {len(df_test)}")

## Oversampling pada Set Pelatihan

Lakukan oversampling pada set pelatihan (training set) menggunakan teknik duplikasi (resampling) untuk menyeimbangkan distribusi kelas sentimen. Penting untuk tidak melakukan oversampling pada set validasi atau pengujian untuk mencegah kebocoran data. Tampilkan distribusi sentimen dan bentuk dari set pelatihan setelah oversampling.


In [None]:
# Identify the majority class count
majority_class = df_train['sentiment'].value_counts().idxmax()
majority_count = df_train['sentiment'].value_counts().max()

df_train_oversampled = pd.DataFrame(columns=df_train.columns)

# Oversample minority classes
for sentiment_label in df_train['sentiment'].unique():
    sentiment_df = df_train[df_train['sentiment'] == sentiment_label]
    if len(sentiment_df) < majority_count:
        # Duplicate samples until it matches the majority_count
        oversampled_sentiment_df = sentiment_df.sample(majority_count, replace=True, random_state=42)
        df_train_oversampled = pd.concat([df_train_oversampled, oversampled_sentiment_df])
    else:
        df_train_oversampled = pd.concat([df_train_oversampled, sentiment_df])

print("Shape of oversampled training set:", df_train_oversampled.shape)
print("\nSentiment distribution in oversampled training set:")
print(df_train_oversampled['sentiment'].value_counts())
print("\nSentiment proportion in oversampled training set:")
print(df_train_oversampled['sentiment'].value_counts(normalize=True))

In [None]:
# Create data for the barplot after oversampling
split_counts_after_oversampling = pd.DataFrame({
  'Split': ['Training', 'Training', 'Training', 'Validation', 'Validation', 'Validation', 'Testing', 'Testing', 'Testing'],
  'Sentiment': ['positive', 'neutral', 'negative', 'positive', 'neutral', 'negative', 'positive', 'neutral', 'negative'],
  'Count': [
    len(df_train_oversampled[df_train_oversampled['sentiment'] == 'positive']),
    len(df_train_oversampled[df_train_oversampled['sentiment'] == 'neutral']),
    len(df_train_oversampled[df_train_oversampled['sentiment'] == 'negative']),
    len(df_val[df_val['sentiment'] == 'positive']),
    len(df_val[df_val['sentiment'] == 'neutral']),
    len(df_val[df_val['sentiment'] == 'negative']),
    len(df_test[df_test['sentiment'] == 'positive']),
    len(df_test[df_test['sentiment'] == 'neutral']),
    len(df_test[df_test['sentiment'] == 'negative'])
  ]
})

# Create the barplot
plt.figure(figsize=(12, 6))
ax = sns.barplot(x='Split', y='Count', hue='Sentiment', data=split_counts_after_oversampling, palette=sentiment_colors)

# Add value labels on top of each bar
for container in ax.containers:
  ax.bar_label(container, fmt='%d')

plt.title('Distribution of Data Across Training, Validation, and Testing Sets (After Oversampling)')
plt.xlabel('Data Split')
plt.ylabel('Count')
plt.legend(title='Sentiment')
plt.show()

# Print summary statistics
print("\nSummary After Oversampling:")
print("="*50)
print("\nTraining Set:")
print(df_train_oversampled['sentiment'].value_counts().sort_index())
print(f"Total: {len(df_train_oversampled)}")

print("\nValidation Set:")
print(df_val['sentiment'].value_counts().sort_index())
print(f"Total: {len(df_val)}")

print("\nTesting Set:")
print(df_test['sentiment'].value_counts().sort_index())
print(f"Total: {len(df_test)}")

In [None]:
# Create data for the barplot
split_counts = pd.DataFrame({
  'Split': ['Training', 'Validation', 'Testing'],
  'Count': [len(df_train_oversampled), len(df_val), len(df_test)]
})

# Create the barplot
plt.figure(figsize=(10, 6))
ax = sns.barplot(x='Split', y='Count', data=split_counts, palette='viridis')

# Add value labels on top of each bar
for container in ax.containers:
  ax.bar_label(container, fmt='%d')

plt.title('Distribution of Data Across Training, Validation, and Testing Sets')
plt.xlabel('Data Split')
plt.ylabel('Count')
plt.show()

# Display sentiment distribution for each split
print("Sentiment Distribution in Training Set:")
print(df_train_oversampled['sentiment'].value_counts())
print(f"\nTotal Training Samples: {len(df_train_oversampled)}")

print("\nSentiment Distribution in Validation Set:")
print(df_val['sentiment'].value_counts())
print(f"\nTotal Validation Samples: {len(df_val)}")

print("\nSentiment Distribution in Testing Set:")
print(df_test['sentiment'].value_counts())
print(f"\nTotal Testing Samples: {len(df_test)}")

# Task - TRAINING MODEL
**Tokenisasi dan Persiapan Data untuk Model BERT**: Lakukan tokenisasi pada kolom 'further_cleaned_input' menggunakan tokenizer BERT. Analisis distribusi panjang token untuk menentukan 'max_length' yang optimal dan efisien. Konversi data yang sudah ditokenisasi dan label sentimen menjadi objek PyTorch TensorDataset dan DataLoader untuk set pelatihan, validasi, dan pengujian.

## Tokenisasi dan Persiapan Data untuk Model BERT

Lakukan tokenisasi pada kolom 'further_cleaned_input' menggunakan tokenizer BERT. Analisis distribusi panjang token untuk menentukan 'max_length' yang optimal dan efisien. Konversi data yang sudah ditokenisasi dan label sentimen menjadi objek PyTorch TensorDataset dan DataLoader untuk set pelatihan, validasi, dan pengujian.


In [None]:
from transformers import AutoTokenizer

# Load a pre-trained BERT tokenizer
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')

print("BERT Tokenizer loaded successfully.")

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# Calculate token lengths for the oversampled training data
# Explicitly truncate to the model's maximum length (e.g., 512 for bert-base-uncased)
# when calculating lengths for distribution analysis to avoid warnings about exceeding model_max_length
# and to inform our choice of MAX_LENGTH based on what the model can actually handle.
token_lengths = [len(tokenizer.encode(str(text), add_special_tokens=True, truncation=True, max_length=tokenizer.model_max_length)) for text in df_train_oversampled['further_cleaned_input']]

# Plot the distribution of token lengths
plt.figure(figsize=(10, 6))
sns.histplot(token_lengths, bins=50, kde=True)
plt.title('Distribution of Token Lengths in Oversampled Training Data (Truncated at 512)')
plt.xlabel('Number of Tokens')
plt.ylabel('Frequency')
plt.show()

# Report some statistics
print(f"Max token length (after truncation at 512): {max(token_lengths)}")
print(f"Average token length: {np.mean(token_lengths):.2f}")
print(f"90th percentile token length: {np.percentile(token_lengths, 90):.2f}")
print(f"95th percentile token length: {np.percentile(token_lengths, 95):.2f}")
print(f"99th percentile token length: {np.percentile(token_lengths, 99):.2f}")

# Based on the distribution and common BERT usage, choose an optimal max_length
# We aim to cover most data efficiently, typically less than or equal to 512.
MAX_LENGTH = int(np.percentile(token_lengths, 95)) # The previous output suggests 95th percentile is 228.55, 99th is 425.00
                # Keeping 229 as a good balance for efficiency, as it captures the majority of short texts.

print(f"\nChosen MAX_LENGTH for tokenization: {MAX_LENGTH}")

In [None]:
def tokenize_data(texts, tokenizer, max_length):
    return tokenizer(list(texts), padding='max_length', truncation=True, max_length=max_length, return_tensors='pt')

# Apply tokenization to the oversampled training, validation, and test sets
X_train_tokenized = tokenize_data(df_train_oversampled['further_cleaned_input'], tokenizer, MAX_LENGTH)
X_val_tokenized = tokenize_data(df_val['further_cleaned_input'], tokenizer, MAX_LENGTH)
X_test_tokenized = tokenize_data(df_test['further_cleaned_input'], tokenizer, MAX_LENGTH)

print("Tokenization applied to training, validation, and test sets.")
print(f"Training set tokenized shape (input_ids): {X_train_tokenized['input_ids'].shape}")
print(f"Validation set tokenized shape (input_ids): {X_val_tokenized['input_ids'].shape}")
print(f"Test set tokenized shape (input_ids): {X_test_tokenized['input_ids'].shape}")

In [None]:
import torch

# Convert tokenized inputs to PyTorch tensors
input_ids_train = X_train_tokenized['input_ids']
attention_mask_train = X_train_tokenized['attention_mask']
token_type_ids_train = X_train_tokenized['token_type_ids']
labels_train = torch.tensor(df_train_oversampled['sentiment_encoded'].values.astype(int))

input_ids_val = X_val_tokenized['input_ids']
attention_mask_val = X_val_tokenized['attention_mask']
token_type_ids_val = X_val_tokenized['token_type_ids']
labels_val = torch.tensor(df_val['sentiment_encoded'].values.astype(int))

input_ids_test = X_test_tokenized['input_ids']
attention_mask_test = X_test_tokenized['attention_mask']
token_type_ids_test = X_test_tokenized['token_type_ids']
labels_test = torch.tensor(df_test['sentiment_encoded'].values.astype(int))

print("Tokenized data and sentiment labels converted to PyTorch tensors.")
print(f"Labels train shape: {labels_train.shape}")
print(f"Labels val shape: {labels_val.shape}")
print(f"Labels test shape: {labels_test.shape}")

In [None]:
from torch.utils.data import TensorDataset, DataLoader

# Create TensorDataset for training, validation, and test sets
train_dataset = TensorDataset(input_ids_train, attention_mask_train, token_type_ids_train, labels_train)
val_dataset = TensorDataset(input_ids_val, attention_mask_val, token_type_ids_val, labels_val)
test_dataset = TensorDataset(input_ids_test, attention_mask_test, token_type_ids_test, labels_test)

print("TensorDatasets created successfully.")
print(f"Training dataset size: {len(train_dataset)}")
print(f"Validation dataset size: {len(val_dataset)}")
print(f"Test dataset size: {len(test_dataset)}")

In [None]:
BATCH_SIZE = 32

train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

print("DataLoaders created successfully.")
print(f"Number of batches in training DataLoader: {len(train_dataloader)}")
print(f"Number of batches in validation DataLoader: {len(val_dataloader)}")
print(f"Number of batches in test DataLoader: {len(test_dataloader)}")

## Pemuatan Model BERT dan Konfigurasi Lingkungan Pelatihan

Muat model pre-trained `BertForSequenceClassification` dari Hugging Face Transformers. Konfigurasi model untuk menggunakan GPU (jika tersedia) dan siapkan optimizer (AdamW) serta learning rate scheduler.


In [None]:
from transformers import BertForSequenceClassification, get_linear_schedule_with_warmup
import torch
from torch.optim import AdamW

# Set device (GPU if available)
if torch.cuda.is_available():
    device = torch.device("cuda")
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")

# Load pre-trained BERT model for sequence classification
num_labels = 3
model = BertForSequenceClassification.from_pretrained(
    "bert-base-uncased",
    num_labels = num_labels,
    output_attentions = False,
    output_hidden_states = False,
)

model.to(device)
print("BERT model loaded and moved to device successfully.")

# Initialize AdamW optimizer
optimizer = AdamW(model.parameters(), lr=2e-5, eps=1e-8)
print("AdamW optimizer initialized.")

# Set up learning rate scheduler
EPOCHS = 5
total_steps = len(train_dataloader) * EPOCHS

scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps = 0,
    num_training_steps = total_steps
)
print(f"Learning rate scheduler initialized for {total_steps} training steps.")


In [None]:
import time
import datetime
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# Function to calculate accuracy
def flat_accuracy(preds, labels):
    pred_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return np.sum(pred_flat == labels_flat) / len(labels_flat)

# Function to format elapsed time
def format_time(elapsed):
    elapsed_rounded = int(round((elapsed)))
    return str(datetime.timedelta(seconds=elapsed_rounded))

# Set the seed for reproducible results
seed_val = 42
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)

# Store the average loss and accuracy over all epochs
loss_values = []

# Training loop
for epoch_i in range(0, EPOCHS):
    print(f'\n======== Epoch {epoch_i + 1} / {EPOCHS} ========')
    print('Training...')

    t0 = time.time()
    total_loss = 0
    model.train() # Set the model to training mode

    for step, batch in enumerate(train_dataloader):
        if step % 500 == 0 and not step == 0:
            elapsed = format_time(time.time() - t0)
            print(f'  Batch {step:>5,}  of  {len(train_dataloader):>5,}.    Elapsed: {elapsed}.')

        b_input_ids = batch[0].to(device)
        b_input_mask = batch[1].to(device)
        b_labels = batch[3].to(device)

        model.zero_grad()

        outputs = model(
            b_input_ids,
            token_type_ids=None,
            attention_mask=b_input_mask,
            labels=b_labels
        )

        loss = outputs.loss
        total_loss += loss.item()

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # Clip the norm of the gradients to 1.0
        optimizer.step()
        scheduler.step()

    avg_train_loss = total_loss / len(train_dataloader)
    loss_values.append(avg_train_loss)
    print(f'  Average training loss: {avg_train_loss:.2f}')
    print(f'  Training epoch took: {format_time(time.time() - t0)}')

    print('\nRunning Validation...')

    t0 = time.time()
    model.eval() # Set the model to evaluation mode
    eval_loss, eval_accuracy = 0, 0
    nb_eval_steps, nb_eval_examples = 0, 0

    # Lists to store predictions and true labels for confusion matrix
    val_preds = []
    val_labels = []

    for batch in val_dataloader:
        batch = tuple(t.to(device) for t in batch)
        b_input_ids, b_input_mask, b_token_type_ids, b_labels = batch

        with torch.no_grad():
            outputs = model(
                b_input_ids,
                token_type_ids=None,
                attention_mask=b_input_mask,
                labels=b_labels
            )

        logits = outputs.logits
        loss = outputs.loss
        eval_loss += loss.item()

        logits = logits.detach().cpu().numpy()
        label_ids = b_labels.to('cpu').numpy()

        tmp_eval_accuracy = flat_accuracy(logits, label_ids)
        eval_accuracy += tmp_eval_accuracy
        nb_eval_steps += 1

        # Store predictions and labels
        val_preds.extend(np.argmax(logits, axis=1))
        val_labels.extend(label_ids)

    print(f'  Validation Loss: {eval_loss / nb_eval_steps:.2f}')
    print(f'  Validation Accuracy: {eval_accuracy / nb_eval_steps:.2f}')
    print(f'  Validation took: {format_time(time.time() - t0)}')

    # Display Confusion Matrix
    cm = confusion_matrix(val_labels, val_preds)
    print(f"\n  Confusion Matrix (Epoch {epoch_i + 1}):")
    print(cm)

print('\nTraining complete!')

## Simpan Model ke Local Storage

Simpan model yang telah dilatih dan tokenizer ke direktori lokal agar dapat digunakan kembali nanti tanpa perlu melatih ulang.

In [None]:
import os

# Directory to save the model
output_dir = './model_save/'

# Create output directory if needed
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

print(f"Saving model to {output_dir}")

# Save a trained model, configuration and tokenizer using `save_pretrained()`.
# They can then be reloaded using `from_pretrained()`
model_to_save = model.module if hasattr(model, 'module') else model  # Take care of distributed/parallel training
model_to_save.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)

print("Model and tokenizer saved successfully.")

## Muat Model dari Local Storage

Muat kembali model dan tokenizer yang telah disimpan untuk pengujian.

In [None]:
# Load the BERT model and tokenizer
print(f"Loading model from {output_dir}")
model = BertForSequenceClassification.from_pretrained(output_dir)
tokenizer = AutoTokenizer.from_pretrained(output_dir)

# Copy the model to the GPU.
model.to(device)
print("Model loaded successfully.")

## Evaluasi Final pada Validation Set

Evaluasi model pada validation set setelah training selesai untuk memastikan model tidak overfit.

In [None]:
from sklearn.metrics import classification_report

# Final evaluation on the validation dataset
print("Final Evaluation on Validation Dataset...")
print("="*50)

t0 = time.time()

model.eval()  # Set the model to evaluation mode
val_loss = 0
val_accuracy = 0
nb_val_steps = 0

# Lists to store predictions and true labels
val_preds_final = []
val_labels_final = []

for batch in val_dataloader:
  batch = tuple(t.to(device) for t in batch)
  b_input_ids, b_input_mask, b_token_type_ids, b_labels = batch

  with torch.no_grad():
    outputs = model(
      b_input_ids,
      token_type_ids=None,
      attention_mask=b_input_mask,
      labels=b_labels
    )

  logits = outputs.logits
  loss = outputs.loss
  val_loss += loss.item()

  logits = logits.detach().cpu().numpy()
  label_ids = b_labels.to('cpu').numpy()

  tmp_val_accuracy = flat_accuracy(logits, label_ids)
  val_accuracy += tmp_val_accuracy
  nb_val_steps += 1

  # Store predictions and labels
  val_preds_final.extend(np.argmax(logits, axis=1))
  val_labels_final.extend(label_ids)

avg_val_loss = val_loss / nb_val_steps
avg_val_accuracy = val_accuracy / nb_val_steps

print(f"Validation Loss: {avg_val_loss:.4f}")
print(f"Validation Accuracy: {avg_val_accuracy:.4f}")
print(f"Validation evaluation took: {format_time(time.time() - t0)}")

# Display Confusion Matrix for validation set
print("\n" + "="*50)
print("Confusion Matrix on Validation Set:")
print("="*50)
cm_val = confusion_matrix(val_labels_final, val_preds_final)
print(cm_val)

# Visualize confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm_val, annot=True, fmt='d', cmap='Greens',
      xticklabels=['negative', 'neutral', 'positive'],
      yticklabels=['negative', 'neutral', 'positive'])
plt.title('Confusion Matrix - Validation Set')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()

# Classification report
print("\n" + "="*50)
print("Classification Report (Validation Set):")
print("="*50)
print(classification_report(val_labels_final, val_preds_final,
              target_names=['negative', 'neutral', 'positive']))

print("\n" + "="*50)
print("Overfitting Check:")
print("="*50)
print(f"Training Loss (Final Epoch): {loss_values[-1]:.4f}")
print(f"Validation Loss: {avg_val_loss:.4f}")
print(f"Difference: {abs(avg_val_loss - loss_values[-1]):.4f}")
if avg_val_loss > loss_values[-1] * 1.2:
    print("‚ö†Ô∏è WARNING: Model might be overfitting! Validation loss is significantly higher than training loss.")
elif avg_val_loss <= loss_values[-1] * 1.1:
    print("‚úì Model is generalizing well. No significant overfitting detected.")
else:
    print("‚ö†Ô∏è CAUTION: There is some difference between training and validation loss. Monitor closely.")

# Testing Model


In [None]:
from sklearn.metrics import classification_report

# Testing the model on the test dataset
print("Testing the model on the test dataset...")
print("="*50)

t0 = time.time()

model.eval()  # Set the model to evaluation mode
test_loss = 0
test_accuracy = 0
nb_test_steps = 0

# Lists to store predictions and true labels
test_preds = []
test_labels_list = []

for batch in test_dataloader:
  batch = tuple(t.to(device) for t in batch)
  b_input_ids, b_input_mask, b_token_type_ids, b_labels = batch

  with torch.no_grad():
    outputs = model(
      b_input_ids,
      token_type_ids=None,
      attention_mask=b_input_mask,
      labels=b_labels
    )

  logits = outputs.logits
  loss = outputs.loss
  test_loss += loss.item()

  logits = logits.detach().cpu().numpy()
  label_ids = b_labels.to('cpu').numpy()

  tmp_test_accuracy = flat_accuracy(logits, label_ids)
  test_accuracy += tmp_test_accuracy
  nb_test_steps += 1

  # Store predictions and labels
  test_preds.extend(np.argmax(logits, axis=1))
  test_labels_list.extend(label_ids)

avg_test_loss = test_loss / nb_test_steps
avg_test_accuracy = test_accuracy / nb_test_steps

print(f"Test Loss: {avg_test_loss:.4f}")
print(f"Test Accuracy: {avg_test_accuracy:.4f}")
print(f"Testing took: {format_time(time.time() - t0)}")

# Display Confusion Matrix for test set
print("\n" + "="*50)
print("Confusion Matrix on Test Set:")
print("="*50)
cm_test = confusion_matrix(test_labels_list, test_preds)
print(cm_test)

# Visualize confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm_test, annot=True, fmt='d', cmap='Blues',
      xticklabels=['negative', 'neutral', 'positive'],
      yticklabels=['negative', 'neutral', 'positive'])
plt.title('Confusion Matrix - Test Set')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()

# Classification report
print("\n" + "="*50)
print("Classification Report:")
print("="*50)
print(classification_report(test_labels_list, test_preds,
              target_names=['negative', 'neutral', 'positive']))

In [None]:
from google.colab import drive
from google.colab import files
from google.colab import auth
from google.auth import default
from googleapiclient.discovery import build
import shutil
import os

# --- 1. Identify User ---
try:
    print("Retrieving user information...")
    auth.authenticate_user()
    creds, _ = default()
    oauth_service = build('oauth2', 'v2', credentials=creds)
    user_info = oauth_service.userinfo().get().execute()
    current_email = user_info.get('email')
    print(f"‚úÖ Logged in as: {current_email}")
except Exception as e:
    print(f"‚ö†Ô∏è Could not retrieve email address automatically. (Error: {e})")

# --- 2. Save to Google Drive ---
# Check if Drive is already mounted to avoid "Mountpoint must not already contain files" error
if os.path.exists('/content/drive/MyDrive'):
    print("‚úÖ Google Drive is already mounted.")
else:
    print("Mounting Google Drive...")
    drive.mount('/content/drive', force_remount=True)

# Define source (Colab VM) and destination (Google Drive)
source_dir = './model_save/'
# Saving to 'PSD_Model_Save' folder in your Drive
destination_dir = '/content/drive/MyDrive/PSD_Model_Save/'

# Create destination directory if it doesn't exist
if not os.path.exists(destination_dir):
    os.makedirs(destination_dir)
    print(f"Created directory: {destination_dir}")

# Copy the model files
print(f"Copying files from {source_dir} to {destination_dir}...")
# Using system command for recursive copy
exit_code = os.system(f'cp -r "{source_dir}"* "{destination_dir}"')
if exit_code == 0:
    print(f"‚úÖ Model successfully saved to Google Drive at: {destination_dir}")
    if 'current_email' in locals():
        print(f"   (Account: {current_email})")
else:
    print("‚ùå Error copying files to Google Drive.")

# --- 3. Download to Local ---
print("\nPreparing file for local download...")
zip_base_name = 'bert_model_save'
zip_filename = zip_base_name + '.zip'

# Zip the directory
shutil.make_archive(zip_base_name, 'zip', source_dir)
print(f"Created zip file: {zip_filename}")

try:
    print(f"Attempting to download {zip_filename}...")
    files.download(zip_filename)
    print("‚úÖ Download command sent.")
except Exception as e:
    print(f"‚ö†Ô∏è Could not trigger direct download (common in VS Code remote kernels).")
    print(f"Error: {e}")
    print(f"üëâ You can download the file '{zip_filename}' manually from the file explorer on the left,")
    print(f"   or access the copy we just saved in your Google Drive folder: {destination_dir}")