In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from transformers import AutoConfig, CvtForImageClassification
import numpy as np
from sklearn.utils.class_weight import compute_class_weight
from PIL import Image, UnidentifiedImageError
import os
from sklearn.metrics import confusion_matrix, f1_score, classification_report
import matplotlib.pyplot as plt
from torch.utils.tensorboard import SummaryWriter
import seaborn as sns
import random
from collections import Counter
import optuna
from optuna import Trial
from optuna.samplers import TPESampler
from tqdm import tqdm
import logging

# ----------------------------
# 1. Konfigurasi Logging
# ----------------------------
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("optimization.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger()





In [None]:
# ----------------------------
# 2. Set seed for reproducibility
# ----------------------------
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BATCH_SIZE = 16
NUM_CLASSES = 20
EPOCHS = 100
IMAGE_SIZE = 224
DATASET_PATH = '/kaggle/input/d/phiard/aksara-jawa/v3/v3'  # Pastikan path ini benar

In [None]:
# ----------------------------
# 3. Transformasi Data
# ----------------------------
train_transform = transforms.Compose([
    transforms.Resize(IMAGE_SIZE),
    transforms.CenterCrop(IMAGE_SIZE),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

val_transform = transforms.Compose([
    transforms.Resize(IMAGE_SIZE),
    transforms.CenterCrop(IMAGE_SIZE),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])



In [None]:
# ----------------------------
# 4. Custom Dataset
# ----------------------------
class CustomImageFolder(Dataset):
    def __init__(self, root, transform=None):
        self.samples = []
        self.targets = []
        self.transform = transform
        self.classes, self.class_to_idx = self._find_classes(root)
        self.num_classes = len(self.classes)

        for target_class in sorted(self.class_to_idx.keys()):
            class_index = self.class_to_idx[target_class]
            target_dir = os.path.join(root, target_class)
            if not os.path.isdir(target_dir):
                continue
            for root_, _, fnames in sorted(os.walk(target_dir, followlinks=True)):
                for fname in sorted(fnames):
                    path = os.path.join(root_, fname)
                    try:
                        with Image.open(path) as img:
                            img.verify()  # Verifikasi integritas gambar
                        self.samples.append((path, class_index))
                        self.targets.append(class_index)
                    except (UnidentifiedImageError, OSError):
                        logger.warning(f"Gambar korup dilewati: {path}")
                        continue

    def __getitem__(self, index):
        path, target = self.samples[index]
        with Image.open(path) as sample:
            sample = sample.convert('RGB')

        if self.transform:
            sample = self.transform(sample)

        return sample, target

    def __len__(self):
        return len(self.samples)

    def _find_classes(self, dir):
        classes = [d.name for d in os.scandir(dir) if d.is_dir()]
        classes.sort()
        class_to_idx = {classes[i]: i for i in range(len(classes))}
        return classes, class_to_idx



In [None]:
# ----------------------------
# 5. Membuat DataLoader
# ----------------------------
train_dir = os.path.join(DATASET_PATH, 'train')
val_dir = os.path.join(DATASET_PATH, 'val')

train_dataset = CustomImageFolder(train_dir, transform=train_transform)
val_dataset = CustomImageFolder(val_dir, transform=val_transform)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=True)


In [None]:
# ----------------------------
# 6. Verifikasi Distribusi Kelas
# ----------------------------
counter = Counter(train_dataset.targets)
logger.info("Distribusi Kelas dalam Training Set:")
for cls, count in counter.items():
    logger.info(f"Kelas {cls}: {count} sampel")



In [None]:
# ----------------------------
# 7. Menghitung Class Weights
# ----------------------------
class_weights = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(train_dataset.targets),
    y=train_dataset.targets
)
class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)
logger.info(f"Class Weights: {class_weights}")



In [None]:
# ----------------------------
# 8. Fungsi untuk Melatih Satu Epoch
# ----------------------------
def train_epoch(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    for data, target in loader:
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data).logits
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        _, predicted = output.max(1)
        correct += predicted.eq(target).sum().item()
        total += target.size(0)
    return total_loss / len(loader), 100. * correct / total



In [None]:

# ----------------------------
# 9. Fungsi untuk Validasi
# ----------------------------
def validate(model, loader, criterion):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    all_targets = []
    all_predictions = []
    with torch.no_grad():
        for data, target in loader:
            data, target = data.to(device), target.to(device)
            output = model(data).logits
            loss = criterion(output, target)
            total_loss += loss.item()
            _, predicted = output.max(1)
            correct += predicted.eq(target).sum().item()
            total += target.size(0)
            all_targets.extend(target.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())
    f1 = f1_score(all_targets, all_predictions, average='weighted')
    return total_loss / len(loader), 100. * correct / total, f1, all_targets, all_predictions

   
    
    

In [None]:
# ----------------------------
# 10. Definisikan Fungsi Objektif untuk Optuna
# ----------------------------
def objective(trial: Trial):
    # Saran hyperparameter
    lr = trial.suggest_loguniform('learning_rate', 1e-5, 1e-2)
    weight_decay = trial.suggest_loguniform('weight_decay', 1e-6, 1e-2)

    logger.info(f"Trial {trial.number}: Suggesting learning_rate={lr}, weight_decay={weight_decay}")

    # Konfigurasi model
    config = AutoConfig.from_pretrained("microsoft/cvt-13")
    config.num_labels = NUM_CLASSES
    config.hidden_dropout_prob = 0.1  # Mengurangi dropout

    model = CvtForImageClassification.from_pretrained(
        "microsoft/cvt-13", config=config, ignore_mismatched_sizes=True
    ).to(device)

    # Pastikan semua parameter dioptimalkan
    for param in model.parameters():
        param.requires_grad = True

    # Definisikan loss function tanpa label smoothing
    criterion = nn.CrossEntropyLoss(weight=class_weights, label_smoothing=0.0)

    # Definisikan optimizer dengan hyperparameter yang disarankan
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)

    # Definisikan scheduler
    scheduler = StepLR(optimizer, step_size=30, gamma=0.1)

    # Variabel untuk tracking
    best_val_acc = 0
    counter = 0
    patience = 20  # Meningkatkan patience untuk early stopping

    # Loop Pelatihan dengan Progress Bar
    for epoch in range(EPOCHS):
        train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion)
        val_loss, val_acc, val_f1, _, _ = validate(model, val_loader, criterion)

        # Update scheduler
        scheduler.step()

        # Early Stopping
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            counter = 0
        else:
            counter += 1

        # Melaporkan hasil ke Optuna
        trial.report(val_acc, epoch)

        # Memeriksa apakah trial harus dipangkas
        if trial.should_prune():
            logger.info(f"Trial {trial.number} pruned at epoch {epoch+1}")
            raise optuna.exceptions.TrialPruned()

        if counter >= patience:
            logger.info(f"Trial {trial.number} stopped early at epoch {epoch+1} due to no improvement.")
            break

    logger.info(f"Trial {trial.number} finished with best val_acc={best_val_acc}")
    return best_val_acc


In [None]:

# ----------------------------
# 11. Menambahkan Progress Bar untuk Optuna
# ----------------------------
class TqdmOptunaCallback:
    def __init__(self, total_trials):
        self.pbar = tqdm(total=total_trials, desc="Optuna Optimization", unit="trial")

    def __call__(self, study, trial):
        if trial.state == optuna.trial.TrialState.COMPLETE:
            self.pbar.update(1)
        elif trial.state == optuna.trial.TrialState.PRUNED:
            self.pbar.update(1)

    def close(self):
        self.pbar.close()

# ----------------------------
# 12. Membuat dan Menjalankan Studi Optuna
# ----------------------------
def run_optuna_optimization():
    sampler = TPESampler(seed=seed)
    study = optuna.create_study(direction='maximize', sampler=sampler)
    
    # Tentukan jumlah trial
    n_trials = 50

    # Inisialisasi callback dengan progress bar
    tqdm_callback = TqdmOptunaCallback(total_trials=n_trials)

    try:
        study.optimize(objective, n_trials=n_trials, callbacks=[tqdm_callback])
    except KeyboardInterrupt:
        logger.info("Optimization interrupted by user.")

    tqdm_callback.close()

    logger.info("Optimization selesai.")
    logger.info(f"Number of finished trials: {len(study.trials)}")
    logger.info("Best trial:")
    trial = study.best_trial

    logger.info(f"  Value: {trial.value}")
    logger.info("  Params: ")
    for key, value in trial.params.items():
        logger.info(f"    {key}: {value}")

    # Menampilkan hasil visualisasi (Opsional)
    try:
        import optuna.visualization as vis
        fig1 = vis.plot_optimization_history(study)
        fig2 = vis.plot_param_importances(study)
        fig1.show()
        fig2.show()
    except ImportError:
        logger.warning("Optuna visualization modules not installed.")

    return study



In [None]:
# ----------------------------
# 13. Menjalankan Optimisasi
# ----------------------------
if __name__ == "__main__":
    study = run_optuna_optimization()

    # ----------------------------
    # 14. Melatih Ulang dengan Hyperparameter Terbaik (Opsional)
    # ----------------------------
    best_lr = study.best_params['learning_rate']
    best_weight_decay = study.best_params['weight_decay']
    logger.info(f"Best hyperparameters found: learning_rate={best_lr}, weight_decay={best_weight_decay}")

    # Inisialisasi model lagi dengan hyperparameter terbaik
    config = AutoConfig.from_pretrained("microsoft/cvt-13")
    config.num_labels = NUM_CLASSES
    config.hidden_dropout_prob = 0.1

    model = CvtForImageClassification.from_pretrained(
        "microsoft/cvt-13", config=config, ignore_mismatched_sizes=True
    ).to(device)

    for param in model.parameters():
        param.requires_grad = True

    # Definisikan loss function
    criterion = nn.CrossEntropyLoss(weight=class_weights, label_smoothing=0.0)

    # Definisikan optimizer dengan hyperparameter terbaik
    optimizer = optim.AdamW(model.parameters(), lr=best_lr, weight_decay=best_weight_decay)

    # Definisikan scheduler
    scheduler = StepLR(optimizer, step_size=30, gamma=0.1)

    # Definisikan writer TensorBoard
    writer = SummaryWriter()

    # Variabel untuk tracking
    best_val_acc = 0
    counter = 0
    patience = 20  # Meningkatkan patience untuk early stopping

    # Loop Pelatihan dengan Progress Bar
    for epoch in range(EPOCHS):
        epoch_pbar = tqdm(total=1, desc=f"Training Epoch {epoch+1}/{EPOCHS}", unit="epoch")
        train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion)
        val_loss, val_acc, val_f1, all_targets, all_predictions = validate(model, val_loader, criterion)

        # Logging ke TensorBoard
        writer.add_scalars('Loss', {'Train': train_loss, 'Val': val_loss}, epoch)
        writer.add_scalars('Accuracy', {'Train': train_acc, 'Val': val_acc}, epoch)
        writer.add_scalar('F1_Score/Val', val_f1, epoch)

        # Simpan model terbaik
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), "best_cvt_model.pth")
            counter = 0  # Reset counter jika ada peningkatan
        else:
            counter += 1

        # Early stopping
        if counter >= patience:
            logger.info(f"Early stopping setelah epoch {epoch+1} karena tidak ada peningkatan akurasi validasi.")
            break

        # Menampilkan progres
        logger.info(f"Epoch {epoch+1}/{EPOCHS}")
        logger.info(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
        logger.info(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%, Val F1 Score: {val_f1:.4f}")

        # Update scheduler
        scheduler.step()

        epoch_pbar.update(1)
        epoch_pbar.close()

    writer.close()



In [None]:
 # ----------------------------
    # 15. Plot Confusion Matrix untuk Validasi
    # ----------------------------
    def plot_confusion_matrix(targets, predictions, class_names):
        cm = confusion_matrix(targets, predictions)
        plt.figure(figsize=(12, 10))
        sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
        plt.xlabel("Prediksi")
        plt.ylabel("Sebenarnya")
        plt.title("Confusion Matrix")
        plt.show()

    plot_confusion_matrix(all_targets, all_predictions, train_dataset.classes)


In [None]:
# ----------------------------
    # 16. Tampilkan Prediksi per Kelas
    # ----------------------------
    def show_predictions_per_class(model, dataset, num_classes=20, cols=5):
        model.eval()
        rows = (num_classes + cols - 1) // cols
        fig, axs = plt.subplots(rows, cols, figsize=(15, 3 * rows))
        axs = axs.flatten()

        class_images = {i: None for i in range(num_classes)}

        for i in range(len(dataset)):
            image, label = dataset[i]
            image = image.unsqueeze(0).to(device)
            with torch.no_grad():
                output = model(image).logits
                _, predicted = output.max(1)
                if class_images[label] is None and predicted.item() == label:
                    class_images[label] = (image.squeeze().cpu(), label, predicted.item())
                    if all(value is not None for value in class_images.values()):
                        break

        for i in range(num_classes):
            ax = axs[i]
            if class_images[i] is not None:
                img, true_label, pred_label = class_images[i]
                # Denormalisasi gambar
                img = img * torch.tensor([0.229, 0.224, 0.225]).view(3,1,1) + torch.tensor([0.485, 0.456, 0.406]).view(3,1,1)
                img = img.permute(1, 2, 0).numpy()
                img = np.clip(img, 0, 1)
                ax.imshow(img)
                ax.set_title(f"True: {train_dataset.classes[true_label]}\nPred: {train_dataset.classes[pred_label]}")
                ax.axis('off')
            else:
                ax.text(0.5, 0.5, 'No Prediction', ha='center', va='center')
                ax.axis('off')

        for j in range(num_classes, len(axs)):
            axs[j].axis('off')

        plt.tight_layout()
        plt.show()

    show_predictions_per_class(model, val_dataset)


In [None]:
# ----------------------------
    # 17. Plot Loss dan Akurasi
    # ----------------------------
    plt.figure(figsize=(12, 5))

    # Plot Loss
    plt.subplot(1, 2, 1)
    plt.plot(range(1, len(train_losses) + 1), train_losses, label='Train Loss')
    plt.plot(range(1, len(val_losses) + 1), val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()

    # Plot Akurasi
    plt.subplot(1, 2, 2)
    plt.plot(range(1, len(train_accs) + 1), train_accs, label='Train Accuracy')
    plt.plot(range(1, len(val_accs) + 1), val_accs, label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.title('Training and Validation Accuracy')
    plt.legend()

    plt.tight_layout()
    plt.show()

    # ----------------------------
  

In [None]:
  # 18. Evaluasi Tambahan: Classification Report
    # ----------------------------
    def classification_metrics(loader):
        model.eval()
        all_targets = []
        all_predictions = []
        with torch.no_grad():
            for data, target in loader:
                data, target = data.to(device), target.to(device)
                output = model(data).logits
                _, predicted = torch.max(output, 1)
                all_targets.extend(target.cpu().numpy())
                all_predictions.extend(predicted.cpu().numpy())
        logger.info(classification_report(all_targets, all_predictions, target_names=train_dataset.classes))

    logger.info("Classification Report untuk Training Set:")
    classification_metrics(train_loader)

    logger.info("Classification Report untuk Validation Set:")
    classification_metrics(val_loader)

