In [1]:
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')

import matplotlib.pyplot as plt
import seaborn as sns
import os
from PIL import Image

from sklearn.model_selection import train_test_split

import torch
from torch import nn
from torch.utils.data import DataLoader, WeightedRandomSampler
import torch.optim as optim
from torchvision import datasets, transforms
from torchvision import models
import torch._dynamo
torch._dynamo.config.suppress_errors = True

import pytorch_lightning as pl
from pytorch_lightning import LightningModule
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint, LearningRateMonitor
from pytorch_lightning.loggers import TensorBoardLogger

from torchmetrics.classification import MulticlassAccuracy, F1Score
!uv pip install pytorch_optimizer
import pytorch_optimizer as optim1
import optuna

pl.seed_everything(42)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

[2mUsing Python 3.11.13 environment at: /usr[0m
[2mAudited [1m1 package[0m [2min 128ms[0m[0m


In [2]:
import os
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms

class TestDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        Args:
            root_dir (string): Path ke direktori berisi semua gambar test.
            transform (callable, optional): Transformasi yang akan diterapkan pada gambar.
        """
        self.root_dir = root_dir
        self.transform = transform
        allowed_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.gif'}
        self.image_files = sorted([
        f for f in os.listdir(root_dir) if os.path.isfile(os.path.join(root_dir, f)) and os.path.splitext(f)[1].lower() in allowed_extensions
        ])

    def __len__(self):
        """Mengembalikan jumlah total gambar dalam dataset."""
        return len(self.image_files)

    def __getitem__(self, idx):
        """
        Mengambil satu item data.

        Args:
            idx (int): Indeks dari item.
        
        Returns:
            tuple: (image, image_name) di mana image_name adalah nama file.
        """
        img_path = os.path.join(self.root_dir, self.image_files[idx])
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        image_name = self.image_files[idx]
        return image, image_name

In [3]:
weights = models.EfficientNet_B0_Weights.IMAGENET1K_V1
auto_transforms = weights.transforms()
print(auto_transforms)

ImageClassification(
    crop_size=[224]
    resize_size=[256]
    mean=[0.485, 0.456, 0.406]
    std=[0.229, 0.224, 0.225]
    interpolation=InterpolationMode.BICUBIC
)


In [4]:
train= transforms.Compose([
            transforms.RandomResizedCrop(224),
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(15),
            transforms.RandomPerspective(),
            transforms.RandomAffine(0, shear=10, scale=(0.8,1.2)),
            transforms.ColorJitter(),
            transforms.Grayscale(num_output_channels=3),
            auto_transforms
])
val = transforms.Compose([
        auto_transforms
])
data_dir = '/kaggle/input/logika/Train/'

train_dataset = datasets.ImageFolder(root=f'{data_dir}Train', transform=train)
labels = train_dataset.targets
class_counts = torch.bincount(torch.tensor(labels))
print(f"Pemetaan kelas: {train_dataset.class_to_idx}")
print(f"Jumlah sampel per kelas: {class_counts}")
class_weights = 1.0 / class_counts.float()
print(f"Bobot untuk setiap kelas: {class_weights}")
weights_per_sample = class_weights[labels]
print(f"Panjang bobot per sampel: {len(weights_per_sample)}")
print("Contoh 5 bobot pertama:", weights_per_sample[:5])

sampler = WeightedRandomSampler(weights_per_sample, num_samples=len(weights_per_sample), replacement=True)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=False, num_workers=4, pin_memory=True, sampler=sampler)
print(f'Jumlah data test: {len(train_dataset)}')

Pemetaan kelas: {'balinese': 0, 'batak': 1, 'dayak': 2, 'javanese': 3, 'minangkabau': 4}
Jumlah sampel per kelas: tensor([776,  95,  69, 249, 563])
Bobot untuk setiap kelas: tensor([0.0013, 0.0105, 0.0145, 0.0040, 0.0018])
Panjang bobot per sampel: 1752
Contoh 5 bobot pertama: tensor([0.0013, 0.0013, 0.0013, 0.0013, 0.0013])
Jumlah data test: 1752


In [5]:
data_test_dir = '/kaggle/input/logika/Test/Test'

test_dataset = TestDataset(root_dir=data_test_dir, transform=val)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=4, pin_memory=True)
print(f'Jumlah data test: {len(test_dataset)}')

Jumlah data test: 444


In [None]:
feature = next(iter(test_loader))

In [None]:
# for i in feature:
#     print(f'{i}: {len(i)}')

In [None]:
label2cat, idxclass = train_dataset.class_to_idx, train_dataset.classes
label2cat

## Arsitektur dan config

In [None]:
def conv_block(in_feature, out_feature, padding=1, stride=1,
             activation="relu", pool =True, maxpool=True, kernel_size=3,
             kernel_size_pool=2, pool_stride=2)-> list[nn.Sequential]:
    layers = [nn.Conv2d(in_feature, out_feature, kernel_size=kernel_size, padding=padding, stride=stride)]
    if activation == "relu":
        layers.append(nn.ReLU())
    elif activation == "leakyrelu":
        layers.append(nn.LeakyReLU())
    elif activation == "sigmoid":
        layers.append(nn.Sigmoid())
    elif activation == 'mish': layers.append(nn.Mish())
    elif activation == "tanh":
        layers.append(nn.Tanh())
    if pool:
        if maxpool:
            layers.append(nn.MaxPool2d(kernel_size=kernel_size_pool, stride=pool_stride))
        else:
            layers.append(nn.AvgPool2d(kernel_size=kernel_size_pool, stride=pool_stride))
    else:
        layers.append(nn.Identity())
    return nn.Sequential(*layers)


def linear_block(in_features, out_features, activation=None, dropout=0.0, batch_norm=None):
    layers = [nn.Linear(in_features, out_features)]
    if batch_norm:
        layers.append(nn.BatchNorm1d(out_features))
    if activation == 'relu':
        layers.append(nn.ReLU())
    elif activation == 'sigmoid':
        layers.append(nn.Sigmoid())
    elif activation == 'tanh':
        layers.append(nn.Tanh())
    elif activation == 'leakyrelu':
        layers.append(nn.LeakyReLU())
    elif activation == 'mish': layers.append(nn.Mish())
    elif activation == 'softmax':
        layers.append(nn.Softmax(dim=1))
    elif activation == 'elu':
        layers.append(nn.ELU())
    elif activation == 'selu':
        layers.append(nn.SELU())
    elif activation == 'lsoftmax':
        layers.append(nn.LogSoftmax(dim=1))
    if dropout > 0.0:
        layers.append(nn.Dropout(dropout))
    return nn.Sequential(*layers)

In [None]:
class EfficientNet(nn.Module):
    def __init__(self, dropout=0.0, freeze=True):
        super().__init__()
        weights = models.EfficientNet_B0_Weights.IMAGENET1K_V1
        self.model = models.efficientnet_b0(weights=weights)
        if freeze:
            for param in self.model.parameters():
                param.requires_grad = False
        num_in_features = self.model.classifier[1].in_features
        self.model.classifier = nn.Sequential(
            linear_block(num_in_features, 256, activation='mish', dropout=dropout, batch_norm=True),
            linear_block(256, 128, activation='mish', dropout=dropout, batch_norm=True),
            linear_block(128, 5, activation=None)
        )
    def forward(self, X):
        return self.model(X)
        
class PL(LightningModule):
    def __init__(self, model, learning_rate, class_weights) -> None:
        super().__init__()
        self.model = model
        self.learning_rate = learning_rate
        self.criterion = nn.CrossEntropyLoss(weight=class_weights)
        self.macroF1 = F1Score(num_classes=5, average='macro', task='multiclass')
    
    def forward(self, X):
        return self.model(X)
    
    def _common_step(self, batch, batch_idx):
        X, labels = batch
        outputs = self(X) 
        loss = self.criterion(outputs, labels)
        macrof1 = self.macroF1(outputs, labels)
        return loss, macrof1

    def training_step(self, batch, batch_idx):
        loss, macroF1 = self._common_step(batch, batch_idx)
        self.log('train_loss', loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
        self.log('train_macrof1', macroF1, on_step=True, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def validation_step(self, batch, batch_idx):
        loss, macroF1 = self._common_step(batch, batch_idx)
        self.log('val_loss', loss, on_epoch=True, prog_bar=True, logger=True)
        self.log('val_macrof1', macroF1, on_epoch=True, prog_bar=True, logger=True)

    def test_step(self, batch, batch_idx):
        loss, macroF1 = self._common_step(batch, batch_idx)
        self.log('test_loss', loss, on_epoch=True, prog_bar=True, logger=True)
        self.log('test_macrof1', macroF1, on_epoch=True, prog_bar=True, logger=True)

    def backward(self, loss, *args, **kwargs):
        loss.backward(create_graph=True)

    def configure_optimizers(self):
        optimizer = optim1.Lion(self.parameters(), lr=self.learning_rate)
        return optimizer

    def predict_step(self, batch, batch_idx, dataloader_idx=0):
        """
        Langkah prediksi untuk satu batch data test.
        """
        images, image_names = batch
        outputs = self.forward(images)
        _, predicted_labels = torch.max(outputs, 1)
        return {"image_names": image_names, "preds": predicted_labels}

In [None]:
if torch.cuda.is_available():
    accelerator_type = 'gpu'
    devices_to_use = 1
else:
    accelerator_type = 'cpu'
    devices_to_use = 'auto'

checkpoint_callback = ModelCheckpoint(
    monitor='train_macrof1',
    dirpath='checkpoints/',
    filename='logikaui-{epoch:02d}-{train_macrof1:.2f}',
    save_top_k=1,
    mode='max'
)
early_stopping = EarlyStopping(
    monitor='train_loss',
    patience=10,
    mode='min',
)
lr_monitor_callback = LearningRateMonitor(logging_interval='epoch')

trainer1 = pl.Trainer(
    max_epochs=300,
    callbacks=[checkpoint_callback, early_stopping, lr_monitor_callback],
    logger=TensorBoardLogger("tb_logs", name="simple_model_experiment"),
    accelerator=accelerator_type,
    devices=devices_to_use,
    log_every_n_steps=10,
    deterministic=True,
    precision='16-mixed'

)

## Train

In [None]:
class_weights

In [None]:
model = PL(EfficientNet(dropout=0.1, freeze=True), learning_rate=5e-4, class_weights=class_weights)
# model = torch.compile(model) 

In [None]:
trainer1.fit(model, train_loader, val_dataloaders=None)

In [None]:
if __name__ == "__main__":
    preds = trainer1.predict(model, test_loader, ckpt_path='best')
    print(preds)

In [None]:
predictions = []
for batch_result in preds:
    # batch_result adalah dictionary yang kita return dari predict_step
    image_names = batch_result['image_names']
    preds = batch_result['preds'].cpu().numpy() # Pindahkan ke CPU
    
    for name, label in zip(image_names, preds):
        predictions.append({
            'id': name,      # Nama kolom sesuai standar Kaggle
            'style': label   # Nama kolom sesuai standar Kaggle
        })
predictions

In [None]:
submission_df = pd.DataFrame(predictions)

In [None]:
class_mapping = {'balinese': 0, 'batak': 1, 'dayak': 2, 'javanese': 3, 'minangkabau': 4}
idx_to_class = {v: k for k, v in class_mapping.items()}
# Cek tipe data dari kolom 'style'. Kemungkinan besar hasilnya 'object' (string).
print(f"Tipe data kolom 'style': {submission_df['style'].dtype}")
submission_df['id'] = submission_df['id'].str.split('.').str[0]

# Lihat nilai-nilai unik di dalam kolom tersebut.
# Perhatikan apakah ada tanda kutip, yang menandakan string.
print(f"Nilai unik di kolom 'style': {submission_df['style'].unique()}")
submission_df['style'] = submission_df['style'].map(idx_to_class)
submission_df.head()

In [None]:
submission_df.to_csv('submission.csv', index=False)