## ver 1.1
> cutmix 사용안함
> 
> seed는 고정
> 
> classifier학습 - 전체학습 2단계 (전체 학습시는 lr을 1e-5로 함. 1e-4는 과적합이 빨리 올수 있다는 충고반영)
> 
> BatchNorm freeze는 안함
>
> acc : 0.9441233140655106, recall : 0.94255940562606
>
> confusion_matrix :
> [[122   0   3   2   0]
>  [  1  87   3   1   0]
>  [  0   1  85   1   1]
>  [  0   0   0 113   5]
>  [  0   2   1   8  83]]
>
> Grad-CAM 적용


In [None]:
import pandas as pd
import numpy as np
import os
from pathlib import Path
import cv2
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from albumentations import ToTensorV2
import albumentations as A
import random

import torch
import torch.nn as nn
from torch.utils.data import Dataset,DataLoader
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.optim import Adam
from torchmetrics.classification import MulticlassF1Score, MulticlassAccuracy,MulticlassRecall

In [None]:
def seed_everything(seed: int = 42):
    random.seed(seed)          # python random
    np.random.seed(seed)       # numpy
    torch.manual_seed(seed)    # torch CPU
    torch.cuda.manual_seed_all(seed)

    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed_everything(42)

base_path=Path(r"/kaggle/input/flowers-recognition/flowers")
path_list=[]

for img_path in base_path.rglob("*.jpg"):
    path_list.append({"label":img_path.parent.name,"path":img_path})

df=pd.DataFrame(path_list)
df['targets']=pd.factorize(df['label'])[0]
df=df.sample(frac=1,random_state=42).reset_index(drop=True)

train_df,tmp_df=train_test_split(df,test_size=0.3,stratify=df['label'],random_state=42)
val_df,test_df=train_test_split(tmp_df,test_size=0.4,stratify=tmp_df['label'],random_state=42)

In [None]:
img_aug=A.Compose([
    A.RandomResizedCrop(size=(224,224),scale=(0.8,1.0),ratio=(0.9,1.1),p=1),
    A.HorizontalFlip(p=0.3),
    A.Affine(scale=(0.9,1.1),rotate=(-15,15),border_mode=cv2.BORDER_REFLECT_101,p=0.3),
    A.ColorJitter(brightness=0.2,contrast=0.2,saturation=0.2,hue=0.03,p=0.6),
    A.CoarseDropout(num_holes_range=(1, 1), hole_height_range=(48, 48),
                    hole_width_range=(48, 48),fill=0,p=0.25)
])

tr_resnet34=A.Compose([
    A.RandomResizedCrop(size=(224,224),scale=(0.8,1.0),ratio=(0.9,1.1),p=1),
    A.HorizontalFlip(p=0.3),
    A.Affine(scale=(0.9,1.1),rotate=(-15,15),border_mode=cv2.BORDER_REFLECT_101,p=0.3),
    A.ColorJitter(brightness=0.2,contrast=0.2,saturation=0.2,hue=0.03,p=0.6),
    A.CoarseDropout(num_holes_range=(1, 1), hole_height_range=(48, 48),
                    hole_width_range=(48, 48),fill=0,p=0.25),
    A.Normalize(mean=(0.485, 0.456, 0.406),std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])

val_resnet34=A.Compose([
    A.Resize(224,224,p=1),
    A.Normalize(mean=(0.485, 0.456, 0.406),std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])

In [None]:
def visualize(df,nrows=4,ncols=4,augment=None):
    df=df.sample(min(nrows*ncols,len(df)))
    fig,axs=plt.subplots(nrows,ncols,figsize=(ncols*3,nrows*3))
    axs=axs.flatten()
    for ax,(_,row) in zip(axs,df.iterrows()):
        img=cv2.imread(row['path'])
        img=cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
        if augment is not None:
            img=augment(image=img)['image']
        H,W=img.shape[:2]
        label=row['label']
        ax.imshow(img)
        ax.set_title(f"{label}\n{H}x{W}")
    plt.show()

visualize(df,augment=img_aug)

In [None]:
class FlowerCustom(Dataset):
    def __init__(self,path,targets,augment=None):
        self.path=path
        self.targets=targets
        self.augment=augment
    def __len__(self):
        return len(self.path)
    def __getitem__(self,idx):
        img=cv2.imread(self.path[idx])
        img=cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
        if self.augment is None:
            raise ValueError("IMG Augment must be need")
        img=self.augment(image=img)['image']
        targets=torch.tensor(self.targets[idx],dtype=torch.long)
        return img,targets


train_custom=FlowerCustom(train_df['path'].to_list(),train_df['targets'].to_list(),
                          augment=tr_resnet34)
val_custom=FlowerCustom(val_df['path'].to_list(),val_df['targets'].to_list(),
                        augment=val_resnet34)
test_custom=FlowerCustom(test_df['path'].to_list(),test_df['targets'].to_list(),
                         augment=val_resnet34)

train_loader=DataLoader(train_custom,batch_size=32,shuffle=True,num_workers=4,pin_memory=True)
val_loader=DataLoader(val_custom,batch_size=32,shuffle=False,num_workers=4,pin_memory=True)
test_loader=DataLoader(test_custom,batch_size=32,shuffle=False,num_workers=4,pin_memory=True)

In [None]:
from torchvision.models import efficientnet_b1,EfficientNet_B1_Weights

device="cuda" if torch.cuda.is_available() else "cpu"

weights = EfficientNet_B1_Weights.IMAGENET1K_V2
model=efficientnet_b1(weights=weights).to(device)
model.classifier[1]=nn.Linear(in_features=1280, out_features=5, bias=True).to(device)

for p in model.parameters():
    p.requires_grad=False
for p in model.classifier.parameters():
    p.requires_grad=True
optimizer=Adam(model.classifier.parameters(),lr=1e-3,weight_decay=1e-4)
scheduler=ReduceLROnPlateau(optimizer,factor=0.1,patience=3)
loss_func=nn.CrossEntropyLoss()
metric_rec=MulticlassRecall(num_classes=5,average="macro").to(device)
metric_acc=MulticlassAccuracy(num_classes=5).to(device)
metric_f1=MulticlassF1Score(num_classes=5,average='macro').to(device)



In [None]:
from typing import List
from dataclasses import dataclass,field
from tqdm import tqdm

@dataclass
class History:
    training_accuracy:List[float]=field(default_factory=list)
    training_recall:List[float]=field(default_factory=list)
    training_loss:List[float]=field(default_factory=list)
    val_accuracy:List[float]=field(default_factory=list)
    val_recall:List[float]=field(default_factory=list)
    val_loss:List[float]=field(default_factory=list)
history=History()


class Trainer:
    def __init__(self,train_loader,val_loader,model,optimizer,loss_func,
                 scheduler,metric_acc,metric_rec,device,history,mode="min"):
        self.model=model
        self.train_loader=train_loader
        self.val_loader=val_loader
        self.optimizer=optimizer
        self.loss_func=loss_func
        self.scheduler=scheduler
        self.metric_acc=metric_acc
        self.metric_rec=metric_rec
        self.device=device
        self.history=history
        if mode=="max":
            self.best_value=float('-inf')
        else:
            self.best_value=float('inf')

    def training_epoch(self,epoch):
        self.metric_acc.reset()
        self.metric_rec.reset()
        self.model.train()
        loss_sum=0.0
        avg_loss=0.0
        with tqdm(total=len(self.train_loader),desc=f"training {epoch}",leave=True) as bar:
            for batch_idx,(x_train,y_train) in enumerate(self.train_loader):
                x_train=x_train.to(self.device)
                y_train=y_train.to(self.device)
                logits=self.model(x_train)
                loss=self.loss_func(logits,y_train)
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                loss_sum+=loss.item()
                avg_loss=loss_sum/(batch_idx+1)
                preds=logits.argmax(dim=1)   # dim=-1과 같다. (B,12)  1번 dim 즉 행에 대해서
                self.metric_acc.update(preds, y_train)
                self.metric_rec.update(preds, y_train)
                bar.update(1)

                if batch_idx%10==0:
                    acc=self.metric_acc.compute().item()
                    recall=self.metric_rec.compute().item()
                    bar.set_postfix({"acc": acc, "recall":recall, "loss":avg_loss,"epoch":epoch})
            return self.metric_acc.compute().item(), self.metric_rec.compute().item(),avg_loss  

    def validating_epoch(self,epoch):
        self.metric_acc.reset()
        self.metric_rec.reset()
        self.model.eval()
        loss_sum=0
        avg_loss=0.0
        with tqdm(total=len(self.val_loader),desc=f"validating {epoch}", leave=True) as bar:
            with torch.no_grad():
                for batch_idx,(x_val,y_val) in enumerate(self.val_loader):
                    x_val=x_val.to(self.device)
                    y_val=y_val.to(self.device)
                    logits=self.model(x_val)
                    loss=self.loss_func(logits,y_val)

                    preds=logits.argmax(dim=-1)
                    self.metric_acc.update(preds,y_val)
                    self.metric_rec.update(preds,y_val)
                    loss_sum+=loss.item()
                    avg_loss=loss_sum/(batch_idx+1)
                    bar.update(1)
                    if batch_idx%10==0:
                        acc=self.metric_acc.compute().item()
                        recall=self.metric_rec.compute().item()
                        bar.set_postfix({"acc": acc, "recall":recall, "loss":avg_loss,"epoch":epoch})
                return self.metric_acc.compute().item(), self.metric_rec.compute().item(),avg_loss

    
    def fit(self,epochs,early_stop,path):
        stop_count=0   
        for epoch in range(epochs):
            training_accuracy,training_recall,training_loss=self.training_epoch(epoch)
            self.history.training_accuracy.append(training_accuracy)
            self.history.training_recall.append(training_recall)
            self.history.training_loss.append(training_loss)
            val_accuracy,val_recall,val_loss=self.validating_epoch(epoch)
            self.history.val_accuracy.append(val_accuracy)
            self.history.val_recall.append(val_recall)
            self.history.val_loss.append(val_loss)
            self.scheduler.step(val_loss)   # scheduler는 early_stop >= scheduler.patience + 1정도가 안정적. ex)scheduler patience = 3이면 early_stop = 5
            
            if self.best_value>val_loss:
                self.best_value=val_loss
                stop_count=0
                torch.save(self.model.state_dict(),os.path.join(path,f"{epoch}_{val_loss}.pt"))
            else:
                stop_count+=1
                if stop_count>=early_stop:
                    print(f"early_stopped. current epoch : {epoch}")
                    return self.history
                    
        return self.history



In [None]:
output_path=r"/kaggle/working/"

t=Trainer(train_loader,val_loader,model,optimizer,loss_func,scheduler,metric_acc,metric_rec,device,history,mode="min")
history=t.fit(5,2,output_path)

In [None]:

best_param=torch.load(r"/kaggle/working/4_0.31785393834114073.pt")
model.load_state_dict(best_param)
for p in model.parameters():
    p.requires_grad=True
optimizer=Adam(model.parameters(),lr=1e-5,weight_decay=1e-4)    # 1e-4는 3,000장에선 다소 공격적
t1=Trainer(train_loader,val_loader,model,optimizer,loss_func,scheduler,metric_acc,metric_rec,device,history,mode="min")
history=t1.fit(20,5,output_path)

In [None]:
final_param=torch.load(r"/kaggle/working/19_0.16000709250569345.pt")
model.load_state_dict(final_param)

import torch
import torch.nn.functional as F
import numpy as np
import cv2
import matplotlib.pyplot as plt

class GradCAM:
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer

        self.activations = None
        self.gradients = None

        self.fwd_handle = self.target_layer.register_forward_hook(self._forward_hook)
        self.bwd_handle = self.target_layer.register_full_backward_hook(self._backward_hook)

    def _forward_hook(self, module, inputs, output):
        self.activations = output  # (B,C,H,W)

    def _backward_hook(self, module, grad_input, grad_output):
        self.gradients = grad_output[0]  # (B,C,H,W)

    def remove_hooks(self):
        self.fwd_handle.remove()
        self.bwd_handle.remove()

    @torch.enable_grad()
    def generate(self, x, class_idx=None):
        """
        x: (B,3,H,W) normalized tensor
        class_idx: None이면 예측 클래스 기준으로 생성
        return: cam (B,H,W) in [0,1], pred (B,)
        """
        self.model.zero_grad(set_to_none=True)

        logits = self.model(x)              # (B,num_classes)
        pred = logits.argmax(dim=1)         # (B,)

        if class_idx is None:
            class_idx = pred
        if isinstance(class_idx, int):
            class_idx = torch.tensor([class_idx] * x.size(0), device=x.device)

        score = logits[torch.arange(x.size(0)), class_idx].sum()
        score.backward()

        A = self.activations                # (B,C,h,w)
        dA = self.gradients                 # (B,C,h,w)

        weights = dA.mean(dim=(2, 3), keepdim=True)      # (B,C,1,1)
        cam = (weights * A).sum(dim=1, keepdim=True)     # (B,1,h,w)
        cam = F.relu(cam)

        cam = F.interpolate(cam, size=x.shape[-2:], mode="bilinear", align_corners=False)
        cam = cam.squeeze(1)  # (B,H,W)

        cam_min = cam.view(cam.size(0), -1).min(dim=1)[0].view(-1, 1, 1)
        cam_max = cam.view(cam.size(0), -1).max(dim=1)[0].view(-1, 1, 1)
        cam = (cam - cam_min) / (cam_max - cam_min + 1e-8)

        return cam.detach(), pred.detach()


class Predict:
    def __init__(self, model, test_loader, device):
        self.model = model
        self.test_loader = test_loader
        self.actual_list = []
        self.pred_list = []
        self.device = device

    def predict(self):
        self.model.eval()
        with tqdm(total=len(self.test_loader), desc="predicting", leave=True) as bar:
            for x_test, y_test in self.test_loader:
                x_test = x_test.to(self.device)
                y_test = y_test.to(self.device)

                self.actual_list.extend(y_test.detach().cpu().numpy())

                with torch.no_grad():  # 예측만 할 땐 no_grad OK
                    logits = self.model(x_test)
                    preds = torch.argmax(logits, dim=-1)

                self.pred_list.extend(preds.detach().cpu().numpy())
                bar.update(1)

        return self.actual_list, self.pred_list

    # ✅ NameError 방지: 유틸을 클래스 내부로 넣음
    def _denormalize(self, img_tensor, mean=(0.485,0.456,0.406), std=(0.229,0.224,0.225)):
        img = img_tensor.detach().cpu().float().numpy()     # (3,H,W)
        img = np.transpose(img, (1, 2, 0))                  # (H,W,3)
        img = img * np.array(std) + np.array(mean)
        img = np.clip(img, 0, 1)
        img = (img * 255).astype(np.uint8)
        return img

    def _overlay_cam(self, rgb_img_uint8, cam_01, alpha=0.45):
        heatmap = (cam_01 * 255).astype(np.uint8)
        heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)  # BGR
        heatmap = cv2.cvtColor(heatmap, cv2.COLOR_BGR2RGB)
        overlay = cv2.addWeighted(rgb_img_uint8, 1 - alpha, heatmap, alpha, 0)
        return overlay

    def gradcam_visualize(self, num_images=6, target_layer=None, use_true_label=False):
        """
        num_images: 몇 장 보여줄지
        target_layer: None이면 EfficientNet-B1에서 model.features[-1] 사용
        use_true_label: True면 정답 클래스 기준 CAM, False면 예측 클래스 기준 CAM
        """
        self.model.eval()

        if target_layer is None:
            # EfficientNet-B1에서 가장 흔한 선택
            target_layer = self.model.features[-1]

        cam_extractor = GradCAM(self.model, target_layer)

        # test 배치 하나 가져오기
        x_batch, y_batch = next(iter(self.test_loader))
        x_batch = x_batch.to(self.device)
        y_batch = y_batch.to(self.device)

        # ✅ Grad-CAM은 gradient 필요 -> no_grad 금지
        class_idx = y_batch if use_true_label else None
        cam, pred = cam_extractor.generate(x_batch, class_idx=class_idx)

        n_show = min(num_images, x_batch.size(0))
        plt.figure(figsize=(12, 8))

        for i in range(n_show):
            img = self._denormalize(x_batch[i])          # RGB uint8
            cam_i = cam[i].cpu().numpy()                 # (H,W)
            overlay = self._overlay_cam(img, cam_i)

            plt.subplot((n_show + 2)//3, 3, i + 1)
            plt.imshow(overlay)
            plt.title(f"pred={pred[i].item()} / gt={y_batch[i].item()}")
            plt.axis("off")

        plt.tight_layout()
        plt.show()

        cam_extractor.remove_hooks()


In [None]:
p=Predict(model,test_loader,device)
actual_list,pred_list=p.predict()

In [None]:
p.gradcam_visualize(num_images=6)