In [3]:
import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from sklearn.metrics import f1_score
from tqdm.auto import tqdm
import cv2
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
from pathlib import Path
import glob
import warnings

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
warnings.filterwarnings('ignore')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(torch.cuda.is_available())
print(torch.cuda.get_device_name()) if torch.cuda.is_available() else None

False


In [6]:
CFG = {
    'IMG_SIZE': 299,
    'EPOCHS': 1,
    'LEARNING_RATE': 3e-4,
    'BATCH_SIZE': 16,
    'SEED': 777
}

In [7]:

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED'])

In [8]:
class PadSquare(A.ImageOnlyTransform):
    def __init__(self, value=0, always_apply=False, p=1.0):
        super().__init__(always_apply, p)
        self.value = value

    def apply(self, image, **params):
        h, w, c = image.shape
        max_dim = max(h, w)
        pad_h = max_dim - h
        pad_w = max_dim - w
        top = pad_h // 2
        bottom = pad_h - top
        left = pad_w // 2
        right = pad_w - left
        return cv2.copyMakeBorder(image, top, bottom, left, right, cv2.BORDER_CONSTANT, value=self.value)

    def get_transform_init_args_names(self):
        return ("value",)

In [9]:
class CustomDataset(Dataset):
    def __init__(self, img_path_list, label_list, transforms=None):
        self.img_path_list = img_path_list
        self.label_list = label_list
        self.transforms = transforms

    def __getitem__(self, index):
        img_path = self.img_path_list[index]
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        if self.transforms:
            image = self.transforms(image=image)['image']
        if self.label_list is not None:
            return image, self.label_list[index]
        else:
            return image

    def __len__(self):
        return len(self.img_path_list)


In [10]:

train_transform = A.Compose([
    PadSquare(value=(0, 0, 0)),
    A.Resize(CFG['IMG_SIZE'], CFG['IMG_SIZE']),
    A.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ToTensorV2()
])

test_transform = train_transform

In [11]:
class Generator(nn.Module):
    def __init__(self, latent_dim=128):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 32, 3, 1, 1), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, 1, 1), nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, 1, 1), nn.BatchNorm2d(128), nn.ReLU(), nn.AdaptiveAvgPool2d((1, 1))
        )
        self.fc = nn.Linear(128, latent_dim)

    def forward(self, x):
        x = self.encoder(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

In [12]:
class Classifier(nn.Module):
    def __init__(self, latent_dim=128, num_classes=7):
        super().__init__()
        self.fc = nn.Linear(latent_dim, num_classes)

    def forward(self, z):
        return self.fc(z)

In [13]:
class AutoClassifierUnit(nn.Module):
    def __init__(self, latent_dim=128, num_classes=7):
        super().__init__()
        self.generator = Generator(latent_dim)
        self.classifier = Classifier(latent_dim, num_classes)

    def forward(self, x):
        z = self.generator(x)
        return z, self.classifier(z)

In [14]:
class CooperativeAutoClassifier(nn.Module):
    def __init__(self, latent_dim=128, num_classes=7):
        super().__init__()
        self.acus = nn.ModuleList([AutoClassifierUnit(latent_dim, num_classes) for _ in range(3)])

    def forward(self, x):
        zs, logits_list = [], []
        for acu in self.acus:
            z, logits = acu(x)
            zs.append(z)
            logits_list.append(logits)
        return zs, logits_list

    def compute_loss(self, logits_list, zs, labels, lambda_coop=0.1):
        ce_loss = sum(F.cross_entropy(logits, labels) for logits in logits_list) / len(logits_list)
        coop_loss = sum(F.mse_loss(zs[i], zs[j]) for i in range(len(zs)) for j in range(i + 1, len(zs)))
        coop_loss = 2 * coop_loss / (len(zs) * (len(zs) - 1))
        return ce_loss + lambda_coop * coop_loss

In [15]:
def train_fn(model, train_loader, val_loader, optimizer, device, epochs):
    model.to(device)
    best_f1, best_model = 0, None
    for epoch in range(1, epochs + 1):
        model.train()
        train_losses = []
        for imgs, labels in tqdm(train_loader, desc=f"Epoch {epoch}"):
            imgs, labels = imgs.to(device), labels.to(device)
            optimizer.zero_grad()
            zs, logits_list = model(imgs)
            loss = model.compute_loss(logits_list, zs, labels)
            loss.backward()
            optimizer.step()
            train_losses.append(loss.item())
        val_f1 = validate(model, val_loader, device)
        print(f"Epoch {epoch}: Train Loss={np.mean(train_losses):.4f}, Val F1={val_f1:.4f}")
        if val_f1 > best_f1:
            best_f1 = val_f1
            best_model = model.state_dict()
            
            save_path = "best_model(cac).pth"
            torch.save(best_model, save_path)
            print(f"Best model saved (epoch {epoch}, F1={val_f1:.4f}) → {save_path}")
    return best_model

In [16]:
def validate(model, val_loader, device):
    model.eval()
    preds, targets = [], []
    with torch.no_grad():
        for imgs, labels in val_loader:
            imgs = imgs.to(device)
            zs, logits_list = model(imgs)
            avg_logits = sum(logits_list) / len(logits_list)
            preds.extend(avg_logits.argmax(1).cpu().numpy())
            targets.extend(labels.numpy())
    return f1_score(targets, preds, average='macro')

In [17]:
def inference(model, test_loader, device, label_encoder):
    model.eval()
    preds = []
    with torch.no_grad():
        for imgs in tqdm(test_loader):
            imgs = imgs.to(device)
            _, logits_list = model(imgs)
            avg_logits = sum(logits_list) / len(logits_list)
            pred_labels = avg_logits.argmax(1).cpu().numpy().tolist()
            preds.extend(pred_labels)
    return label_encoder.inverse_transform(preds)

In [18]:
# Load and prepare data
df = pd.DataFrame({'img_path': glob.glob('./train/*/*')})
df['rock_type'] = df['img_path'].apply(lambda x: Path(x).parts[-2])

In [19]:
print(df)

                                      img_path       rock_type
0             ./train\Andesite\TRAIN_00000.jpg        Andesite
1             ./train\Andesite\TRAIN_00001.jpg        Andesite
2             ./train\Andesite\TRAIN_00002.jpg        Andesite
3             ./train\Andesite\TRAIN_00003.jpg        Andesite
4             ./train\Andesite\TRAIN_00004.jpg        Andesite
...                                        ...             ...
380015  ./train\Weathered_Rock\TRAIN_37164.jpg  Weathered_Rock
380016  ./train\Weathered_Rock\TRAIN_37165.jpg  Weathered_Rock
380017  ./train\Weathered_Rock\TRAIN_37166.jpg  Weathered_Rock
380018  ./train\Weathered_Rock\TRAIN_37167.jpg  Weathered_Rock
380019  ./train\Weathered_Rock\TRAIN_37168.jpg  Weathered_Rock

[380020 rows x 2 columns]


In [20]:

train_df, val_df = train_test_split(df, test_size=0.2, stratify=df['rock_type'], random_state=CFG['SEED'])
le = preprocessing.LabelEncoder()
train_df['rock_type'] = le.fit_transform(train_df['rock_type'])
val_df['rock_type'] = le.transform(val_df['rock_type'])

In [21]:
train_dataset = CustomDataset(train_df['img_path'].values, train_df['rock_type'].values, train_transform)
val_dataset = CustomDataset(val_df['img_path'].values, val_df['rock_type'].values, test_transform)
train_loader = DataLoader(train_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False)

In [23]:
model = CooperativeAutoClassifier(latent_dim=128, num_classes=7)
optimizer = torch.optim.Adam(model.parameters(), lr=CFG['LEARNING_RATE'])

In [None]:
#학습코드드
best_weights = train_fn(model, train_loader, val_loader, optimizer, device, CFG['EPOCHS'])
model.load_state_dict(best_weights)

In [25]:
checkpoint = torch.load("best_model(cac).pth", map_location=device)

In [28]:
model.load_state_dict(checkpoint)

<All keys matched successfully>

In [29]:
test_df = pd.read_csv('./test.csv')
test_dataset = CustomDataset(test_df['img_path'].values, None, test_transform)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False)
preds = inference(model, test_loader, device, le)

100%|██████████| 5938/5938 [3:30:31<00:00,  2.13s/it]  


In [30]:
submit = pd.read_csv('./sample_submission.csv')
submit['rock_type'] = preds
submit.to_csv('./baseline_submit(cac).csv', index=False)