In [None]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, f1_score
from PIL import Image
from tqdm import tqdm
import timm
import albumentations as A
from albumentations.pytorch import ToTensorV2
import matplotlib.pyplot as plt
import cv2  # 추가: OpenCV 라이브러리

# Grad-CAM 구현을 위한 클래스 정의
class GradCAM:
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer
        self.gradient = None
        self.activation = None
        
        # 후킹 함수
        def forward_hook(module, input, output):
            self.activation = output
            
        def backward_hook(module, grad_in, grad_out):
            self.gradient = grad_out[0]
        
        target_layer.register_forward_hook(forward_hook)
        target_layer.register_backward_hook(backward_hook)
    
    def __call__(self, x, class_idx=None):
        self.model.zero_grad()
        output = self.model(x)
        
        if class_idx is None:
            class_idx = output.argmax().item()
        
        target = output[0][class_idx]
        target.backward()
        
        gradient = self.gradient[0].cpu().data.numpy()
        activation = self.activation[0].cpu().data.numpy()
        
        weights = np.mean(gradient, axis=(1, 2))
        cam = np.zeros(activation.shape[1:], dtype=np.float32)
        
        for i, w in enumerate(weights):
            cam += w * activation[i]
        
        cam = np.maximum(cam, 0)
        cam = cam / cam.max()
        return cam

# 데이터셋 클래스를 정의합니다.
class ImageDataset(Dataset):
    def __init__(self, df, path, transform=None):
        self.df = df.values
        self.path = path
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        name, target = self.df[idx]
        # train_aug에 있는 파일 이름 형식으로 변환
        matching_name = [f for f in os.listdir(self.path) if f.endswith(name)]
        if not matching_name:
            raise FileNotFoundError(f"File not found: {name}")
        img_path = os.path.join(self.path, matching_name[0])
        img = np.array(Image.open(img_path))
        if self.transform:
            img = self.transform(image=img)['image']
        return img, target

# one epoch 학습을 위한 함수입니다.
def train_one_epoch(loader, model, optimizer, loss_fn, device):
    model.train()
    train_loss = 0
    preds_list = []
    targets_list = []

    pbar = tqdm(loader)
    for image, targets in pbar:
        image = image.to(device)
        targets = targets.to(device)

        optimizer.zero_grad()

        preds = model(image)
        loss = loss_fn(preds, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        preds_list.extend(preds.argmax(dim=1).detach().cpu().numpy())
        targets_list.extend(targets.detach().cpu().numpy())

        pbar.set_description(f"Loss: {loss.item():.4f}")

    train_loss /= len(loader)
    train_acc = accuracy_score(targets_list, preds_list)
    train_f1 = f1_score(targets_list, preds_list, average='macro')

    ret = {
        "train_loss": train_loss,
        "train_acc": train_acc,
        "train_f1": train_f1,
    }

    return ret

# 모델 평가 함수
def evaluate(loader, model, loss_fn, device, target_layer, train_df):
    model.eval()
    val_loss = 0
    preds_list = []
    targets_list = []
    incorrect_samples = []

    with torch.no_grad():
        for image, targets in loader:
            image = image.to(device)
            targets = targets.to(device)

            preds = model(image)
            loss = loss_fn(preds, targets)

            val_loss += loss.item()
            preds_np = preds.argmax(dim=1).detach().cpu().numpy()
            targets_np = targets.detach().cpu().numpy()

            preds_list.extend(preds_np)
            targets_list.extend(targets_np)

            for target, pred, img_name in zip(targets_np, preds_np, loader.dataset.df[:, 0]):
                if target != pred:
                    img_path = os.path.join(loader.dataset.path, img_name)
                    actual_target = int(train_df[train_df['filename'] == img_name]['target'].values[0])
                    incorrect_samples.append((actual_target, int(pred), img_path))

    # 잘못된 예측에 대해 Grad-CAM 시각화
    grad_cam = GradCAM(model, target_layer)
    os.makedirs("/root/incorrect_images_CAM/", exist_ok=True)
    for target, pred, img_path in incorrect_samples:
        img_name = os.path.basename(img_path)
        src_path = os.path.join("/root/data/train/", img_name)  # train 디렉토리에서 이미지 경로
        img = Image.open(src_path).convert('RGB')
        img = img.resize((img_size, img_size))
        img_tensor = transform(image=np.array(img))['image'].unsqueeze(0).to(device)

        # Grad-CAM 시각화
        cam = grad_cam(img_tensor, class_idx=pred)

        # 원본 이미지와 Grad-CAM 결과 시각화
        image_np = img_tensor.cpu().data.numpy()[0].transpose(1, 2, 0)
        image_np = np.array([0.229, 0.224, 0.225]) * image_np + np.array([0.485, 0.456, 0.406])
        image_np = np.clip(image_np, 0, 1)

        # CAM을 원본 이미지 크기에 맞게 변환
        cam = np.uint8(255 * cam)
        cam = np.uint8(Image.fromarray(cam).resize((image_np.shape[1], image_np.shape[0]), Image.LANCZOS))

        # OpenCV로 색상 맵 적용
        heatmap = cv2.applyColorMap(cam, cv2.COLORMAP_JET)

        # BGR을 RGB로 변환
        heatmap = cv2.cvtColor(heatmap, cv2.COLOR_BGR2RGB)

        # 원본 이미지 위에 히트맵 겹치기
        superimposed_img = heatmap * 0.4 + np.uint8(image_np * 255)

        # 이미지를 0-1 범위로 정규화
        superimposed_img = np.clip(superimposed_img / 255.0, 0, 1)

        # 시각화 이미지 저장
        cam_path = os.path.join("/root/incorrect_images_CAM/", f"cam_{target}_{pred}_{img_name}")
        plt.imsave(cam_path, superimposed_img)

    val_loss /= len(loader)
    val_acc = accuracy_score(targets_list, preds_list)
    val_f1 = f1_score(targets_list, preds_list, average='macro')

    ret = {
        "val_loss": val_loss,
        "val_acc": val_acc,
        "val_f1": val_f1,
        "incorrect_samples": incorrect_samples
    }

    return ret

# Hyper-parameters
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
data_path = '/root/data/'
model_name = 'efficientnet_b0'
img_size = 224
LR = 1e-3
EPOCHS = 30
BATCH_SIZE = 32
num_workers = 4
n_splits = 5
patience = 3

# Transform 정의
transform = A.Compose([
    A.Resize(height=img_size, width=img_size),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

# 데이터 로드
df = pd.read_csv("/root/data/train.csv")

best_overall_model = None
best_overall_f1 = 0
best_overall_loss = float('inf')
best_fold_idx = -1

all_incorrect_samples = []  # 모든 폴드의 잘못된 예측을 저장할 리스트

skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)

fold_weights = []
for fold, (train_idx, val_idx) in enumerate(skf.split(df, df['target'])):
    print(f"Fold {fold + 1}")

    train_df = df.iloc[train_idx]
    val_df = df.iloc[val_idx]

    train_dataset = ImageDataset(train_df, "/root/data/train_aug/", transform=transform)
    val_dataset = ImageDataset(val_df, "/root/data/train_aug/", transform=transform)

    train_loader = DataLoader(
        train_dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=True,
        drop_last=False
    )

    val_loader = DataLoader(
        val_dataset,
        batch_size=BATCH_SIZE,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True
    )

    model = timm.create_model(
        model_name,
        pretrained=True,
        num_classes=len(df['target'].unique())
    ).to(device)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LR)

    best_val_loss = float('inf')
    best_val_f1 = 0
    early_stopping_counter = 0

    # Grad-CAM 시각화를 위한 모델 준비
    target_layer = model.conv_head

    for epoch in range(EPOCHS):
        train_ret = train_one_epoch(train_loader, model, optimizer, loss_fn, device=device)
        val_ret = evaluate(val_loader, model, loss_fn, device=device, target_layer=target_layer, train_df=df)
        val_ret['epoch'] = epoch

        log = f"Epoch {epoch + 1}/{EPOCHS}\n"
        for k, v in train_ret.items():
            log += f"Train {k}: {v:.4f}\n"
        for k, v in val_ret.items():
            if k != 'incorrect_samples':  # 잘못된 예측 샘플은 로그에서 제외
                log += f"Val {k}: {v:.4f}\n"
        print(log)

        if val_ret['val_loss'] < best_val_loss or val_ret['val_f1'] > best_val_f1:
            best_val_loss = val_ret['val_loss']
            best_val_f1 = val_ret['val_f1']
            torch.save(model.state_dict(), f"best_model_fold_{fold + 1}_{model_name}.pth")
            early_stopping_counter = 0
        else:
            early_stopping_counter += 1

        if early_stopping_counter >= patience:
            print("Early stopping")
            break

    # 현재 폴드의 최상의 F1 점수를 기록
    fold_weights.append(val_ret['val_f1'])
    # 현재 폴드의 잘못된 예측을 저장
    all_incorrect_samples.extend(val_ret['incorrect_samples'])

    # 모델별 최상의 성능 비교 및 업데이트
    if best_val_f1 > best_overall_f1 and best_val_loss < best_overall_loss:
        best_overall_f1 = best_val_f1
        best_overall_loss = best_val_loss
        best_fold_idx = fold + 1

# 잘못된 예측 결과를 저장
incorrect_df = pd.DataFrame(all_incorrect_samples, columns=['target', 'pred', 'img_path'])
incorrect_df.to_csv("/root/incorrect_predictions.csv", index=False)

print(f"Best Model from Fold {best_fold_idx} with F1: {best_overall_f1} and Loss: {best_overall_loss}")

# 예측 및 결과 저장
test_df = pd.read_csv("/root/data/sample_submission.csv")
tst_dataset = ImageDataset(test_df, "/root/data/test/", transform=transform)
tst_loader = DataLoader(
    tst_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=num_workers,
    pin_memory=True
)

model = timm.create_model(
    model_name,
    pretrained=True,
    num_classes=len(df['target'].unique())
).to(device)

model.load_state_dict(torch.load(f"best_model_fold_{best_fold_idx}_{model_name}.pth"))
model.eval()

fold_preds = []
for image, _ in tqdm(tst_loader):
    image = image.to(device)

    with torch.no_grad():
        preds = model(image)
    fold_preds.append(preds.detach().cpu().numpy())

fold_preds = np.concatenate(fold_preds, axis=0)
final_preds = np.argmax(fold_preds, axis=1)

pred_df = pd.DataFrame(test_df, columns=['ID'])
pred_df['target'] = final_preds
sample_submission_df = pd.read_csv("/root/data/sample_submission.csv")
assert (sample_submission_df['ID'] == pred_df['ID']).all()
pred_df.to_csv("/root/efficient_net_test.csv", index=False)
