In [None]:
!pip install timm

In [None]:
import os
import pandas as pd
import numpy as np
from glob import glob
import matplotlib.pyplot as plt
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts
from torch.utils.data import DataLoader, Dataset, Subset
from torchvision import datasets, transforms, models
import timm 
from tqdm import tqdm
from sklearn.metrics import f1_score, accuracy_score
from sklearn.model_selection import StratifiedKFold

In [None]:
# CONFIG
DATA_DIR = "/kaggle/input/srifoton"
TRAIN_DIR = os.path.join(DATA_DIR, "train", "train")
VAL_DIR = os.path.join(DATA_DIR, "val", "val")
TEST_DIR  = os.path.join(DATA_DIR, "test", "test")
OUT_FILE  = "/kaggle/working/submission.csv"
MODEL_DIR = "/kaggle/working/models/" 
os.makedirs(MODEL_DIR, exist_ok=True)

# Hyperparameters 
BATCH_SIZE = 16 
EPOCHS_PER_FOLD = 25 
IMG_SIZE = 384 
BACKBONE = "efficientnet_v2_m"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
LR = 1e-4
N_FOLDS = 5

print(f"Menggunakan device: {DEVICE}")
print(f"Model backbone: {BACKBONE}")
print(f"Ukuran gambar: {IMG_SIZE}x{IMG_SIZE}")
print(f"Strategi Pelatihan: {N_FOLDS}-Fold Cross-Validation")

print(f"Menggunakan device: {DEVICE}")
print(f"Model backbone: {BACKBONE}")
print(f"Ukuran gambar: {IMG_SIZE}x{IMG_SIZE}")
print(f"Strategi Pelatihan: {N_FOLDS}-Fold Cross-Validation")

# AUGMENTASI
train_tfms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomAffine(degrees=10, translate=(0.1, 0.1), shear=10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

val_test_tfms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# SETUP DATASET UNTUK K-FOLD
train_val_ds = datasets.ImageFolder(TRAIN_DIR, transform=train_tfms)
full_dataset_for_split = datasets.ImageFolder(TRAIN_DIR) 

class_names = train_val_ds.classes
n_classes = len(class_names)
print(f"Jumlah kelas: {n_classes}, Nama kelas: {class_names}")

image_paths = [item[0] for item in full_dataset_for_split.samples]
labels = [item[1] for item in full_dataset_for_split.samples]

test_imgs = sorted(glob(os.path.join(TEST_DIR, "*")))

class KFoldDatasetWrapper(Dataset):
    def __init__(self, all_paths, all_labels, indices, transform):
        self.paths = [all_paths[i] for i in indices]
        self.labels = [all_labels[i] for i in indices]
        self.transform = transform

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        img_path = self.paths[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, label

# MODEL 
def get_model(backbone_name, n_classes, pretrained=True):
    weights = 'DEFAULT' if pretrained else None
    if backbone_name == "xception":
        model = timm.create_model('xception', pretrained=pretrained, num_classes=n_classes)
        print("Model Xception dari 'timm' berhasil dimuat.")
    elif backbone_name == "efficientnet_v2_m":
        model = models.efficientnet_v2_m(weights=weights)
        in_features = model.classifier[1].in_features
        model.classifier[1] = nn.Linear(in_features, n_classes)
        print("Model EfficientNetV2-M berhasil dimuat.")
    else:
        raise ValueError("Backbone tidak didukung")
    return model

# LOOP PELATIHAN DENGAN STRATIFIED K-FOLD
skf = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=42)
fold_histories = []

for fold, (train_idx, val_idx) in enumerate(skf.split(image_paths, labels)):
    print(f"\n===== FOLD {fold+1}/{N_FOLDS} =====")

    train_dataset = KFoldDatasetWrapper(image_paths, labels, train_idx, transform=train_tfms)
    val_dataset = KFoldDatasetWrapper(image_paths, labels, val_idx, transform=val_test_tfms)

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE*2, shuffle=False, num_workers=2, pin_memory=True)

    model = get_model(BACKBONE, n_classes).to(DEVICE)
    criterion = nn.CrossEntropyLoss()
    optimizer = AdamW(model.parameters(), lr=LR, weight_decay=1e-2)
    scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=1, eta_min=1e-6)
    
    best_val_acc = 0.0
    history = {'train_loss': [], 'val_loss': [], 'train_acc': [], 'val_acc': [], 'val_f1': []}

    for epoch in range(EPOCHS_PER_FOLD):
        print(f"\n--- Epoch {epoch+1}/{EPOCHS_PER_FOLD} ---")
        
        # Training
        model.train()
        train_loss, train_correct, train_total = 0, 0, 0
        pbar_train = tqdm(train_loader, desc=f"Training Fold {fold+1}")
        for imgs, lbls in pbar_train:
            imgs, lbls = imgs.to(DEVICE), lbls.to(DEVICE)
            
            optimizer.zero_grad()
            outputs = model(imgs)
            loss = criterion(outputs, lbls)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item() * imgs.size(0)
            _, preds = torch.max(outputs, 1)
            train_correct += (preds == lbls).sum().item()
            train_total += lbls.size(0)
            pbar_train.set_postfix(loss=loss.item(), acc=train_correct/train_total)
        
        avg_train_loss = train_loss / train_total
        avg_train_acc = train_correct / train_total
        
        # Validasi
        model.eval()
        val_loss = 0
        all_labels, all_preds = [], []
        with torch.no_grad():
            pbar_val = tqdm(val_loader, desc=f"Validating Fold {fold+1}")
            for imgs, lbls in pbar_val:
                imgs, lbls = imgs.to(DEVICE), lbls.to(DEVICE)
                outputs = model(imgs)
                loss = criterion(outputs, lbls)
                val_loss += loss.item() * imgs.size(0)
                _, preds = torch.max(outputs, 1)
                all_labels.extend(lbls.cpu().numpy())
                all_preds.extend(preds.cpu().numpy())
        
        avg_val_loss = val_loss / len(val_dataset)
        val_acc = accuracy_score(all_labels, all_preds)
        val_f1 = f1_score(all_labels, all_preds, average='macro')
        
        scheduler.step()
        history['train_loss'].append(avg_train_loss)
        history['train_acc'].append(avg_train_acc)
        history['val_loss'].append(avg_val_loss)
        history['val_acc'].append(val_acc)
        history['val_f1'].append(val_f1)
        
        print(f"Fold {fold+1} Epoch {epoch+1}: Train Loss: {avg_train_loss:.4f}, Acc: {avg_train_acc:.4f} | Val Loss: {avg_val_loss:.4f}, Acc: {val_acc:.4f}, F1: {val_f1:.4f}")

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            model_path = os.path.join(MODEL_DIR, f"best_model_fold_{fold}.pth")
            torch.save(model.state_dict(), model_path)
            print(f"Val ACC meningkat. Menyimpan model ke {model_path}")
            
    fold_histories.append(history)

print("\nPelatihan K-Fold selesai.")

# PLOT CURVES
last_fold_history = fold_histories[-1]
plt.figure(figsize=(18, 6))
plt.subplot(1, 3, 1)
plt.plot(last_fold_history['train_loss'], label="Train Loss")
plt.plot(last_fold_history['val_loss'], label="Val Loss")
plt.title(f"Loss Curve (Fold {N_FOLDS})"); plt.legend()
plt.subplot(1, 3, 2)
plt.plot(last_fold_history['train_acc'], label="Train Accuracy")
plt.plot(last_fold_history['val_acc'], label="Val Accuracy")
plt.title(f"Accuracy Curve (Fold {N_FOLDS})"); plt.legend()
plt.subplot(1, 3, 3)
plt.plot(last_fold_history['val_f1'], label="Val F1-Score (Macro)")
plt.title(f"F1-Score Curve (Fold {N_FOLDS})"); plt.legend()
plt.show()

# INFERENCE
tta_tfms = [
    val_test_tfms,
    transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.RandomHorizontalFlip(p=1.0),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.RandomAffine(degrees=10),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
]

def ensemble_tta_predict(models_list, img_path, tta_transforms, device):
    img = Image.open(img_path).convert("RGB")
    all_model_probs = []
    
    with torch.no_grad():
        for model in models_list:
            model.eval()
            tta_probs = []
            for tfm in tta_transforms:
                x = tfm(img).unsqueeze(0).to(device)
                out = model(x)
                tta_probs.append(F.softmax(out, dim=1).cpu().numpy())
            all_model_probs.append(np.mean(tta_probs, axis=0))

    # Ensemble
    final_probs = np.mean(all_model_probs, axis=0)
    return np.argmax(final_probs)

models_ensemble = []
for fold in range(N_FOLDS):
    model_path = os.path.join(MODEL_DIR, f"best_model_fold_{fold}.pth")
    model = get_model(BACKBONE, n_classes, pretrained=False).to(DEVICE)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    models_ensemble.append(model)
print(f"\n{len(models_ensemble)} model dimuat untuk ensemble inference.")

In [None]:
# Prediksi data test
labels_pred = []
for p in tqdm(test_imgs, desc="Predict Test with Ensemble TTA"):
    pred = ensemble_tta_predict(models_ensemble, p, tta_tfms, DEVICE)
    labels_pred.append(pred)

ids_test = [os.path.basename(p) for p in test_imgs]
submission = pd.DataFrame({"Id": ids_test, "Predicted": labels_pred})
submission.to_csv(OUT_FILE, index=False)
print(f"\n[+] Submission disimpan ke {OUT_FILE}")
print(submission.head())