In [None]:
import pandas as pd
import numpy as np
import os
from pathlib import Path
import cv2
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
from torch.utils.data import Dataset,DataLoader
from torch.optim import Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau

In [None]:
angry_path=r"/kaggle/input/human-face-emotions/Data/Angry"
fear_path=r"/kaggle/input/human-face-emotions/Data/Fear"
happy_path=r"/kaggle/input/human-face-emotions/Data/Happy"
sad_path=r"/kaggle/input/human-face-emotions/Data/Sad"
suprise_path=r"/kaggle/input/human-face-emotions/Data/Suprise"

In [None]:
angry=Path(angry_path)
fear=Path(fear_path)
happy=Path(happy_path)
sad=Path(sad_path)
suprise=Path(suprise_path)

angry_row=[]
fear_row=[]
happy_row=[]
sad_row=[]
suprise_row=[]

for file_path in angry.iterdir():
    if file_path.is_file():
        angry_row.append({"path":str(file_path),"label":"angry","targets":0})

for file_path in fear.iterdir():
    if file_path.is_file():
        fear_row.append({"path":str(file_path),"label":"fear","targets":1})

for file_path in happy.iterdir():
    if file_path.is_file():
        happy_row.append({"path":str(file_path),"label":"happy","targets":2})

for file_path in sad.iterdir():
    if file_path.is_file():
        sad_row.append({"path":str(file_path),"label":"sad","targets":3})

for file_path in suprise.iterdir():
    if file_path.is_file():
        suprise_row.append({"path":str(file_path),"label":"suprise","targets":4})
        
angry_df=pd.DataFrame(angry_row)
fear_df=pd.DataFrame(fear_row)
happy_df=pd.DataFrame(happy_row)
sad_df=pd.DataFrame(sad_row)
suprise_df=pd.DataFrame(suprise_row)

all_df=pd.concat([angry_df,fear_df,happy_df,sad_df,suprise_df], ignore_index=True)

print("dataframe concat complete")

In [None]:
train_df,tmp_df=train_test_split(all_df,test_size=0.3,stratify=all_df["targets"])
val_df,test_df=train_test_split(tmp_df,test_size=0.3,stratify=tmp_df["targets"])

print(train_df.shape)
print(val_df.shape)
print(test_df.shape)

In [None]:

def visualize_samples(df,nrows=3,ncols=3,nsample=9):
    df_sample=df.sample(n=min(nsample,len(df)))
    fig,axs=plt.subplots(nrows,ncols,figsize=(ncols*3, nrows*3))
    axs=axs.flatten()
    for ax,(_,row) in zip(axs,df_sample.iterrows()):
        img=cv2.cvtColor(cv2.imread(row["path"]),cv2.COLOR_BGR2RGB)
        h,w=img.shape[:2]
        label=row['label']

        ax.imshow(img)
        ax.set_title(f"label : {label}\n{h} x {w}",fontsize=10)
        
    plt.tight_layout()
    plt.show()
    
visualize_samples(train_df)        

In [None]:
import albumentations as A
from albumentations.pytorch import ToTensorV2

tr_augment=A.Compose([
    A.Resize(48,48,p=1),
    A.HorizontalFlip(p=0.3),
    A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.10, rotate_limit=10, p=0.5), # 기하학적 변환
    # shift_limit = 가로세로 방향으로 최대 5%정도 이동 
    # 얼굴의 위치·크기·각도가 조금 달라도 같은 표정으로 인식하도록 만드는 “현실적인 흔들림” 증강
    # scale_limit=0.10 : img를 10%범위내에서 확대,축소
    # rotate_limit=10  10정도 사이에서 회전
    A.RandomBrightnessContrast(p=0.3), # 광학적변화 - 조명/카메라
    A.Normalize(mean=(0.5, ),std=(0.5,)),
    ToTensorV2()
])

val_augment=A.Compose([
    A.Resize(48,48,p=1),
    A.Normalize(mean=(0.5,),std=(0.5,)),  # grayscale로 할꺼니 채널을 하나만!
    ToTensorV2()
])



In [None]:
class EmotionCustom(torch.utils.data.Dataset):
    def __init__(self,path,targets,augmentor):
        super().__init__()
        self.path=path
        self.targets=targets
        self.augmentor=augmentor
    def __len__(self):
        return len(self.path)
    def __getitem__(self,idx):
        path_idx=self.path[idx]
        cv2_img=cv2.imread(path_idx,cv2.IMREAD_GRAYSCALE)  # (H,W)
        if self.augmentor is not None:
            img=self.augmentor(image=cv2_img)['image']
        else:
            raise ValueError("augmentor must be need")
        targets=torch.tensor(self.targets[idx],dtype=torch.long)
        return img,targets
       

In [None]:
train_custom=EmotionCustom(train_df["path"].to_list(),train_df["targets"].to_list(),tr_augment)
val_custom=EmotionCustom(val_df['path'].to_list(),val_df['targets'].to_list(),val_augment)
test_custom=EmotionCustom(test_df['path'].to_list(),test_df['targets'].to_list(),val_augment)

train_loader=DataLoader(train_custom,batch_size=128,shuffle=True,num_workers=4,pin_memory=True)
val_loader=DataLoader(val_custom,batch_size=128,shuffle=False,num_workers=4,pin_memory=True)
test_loader=DataLoader(test_custom,batch_size=128,shuffle=False,num_workers=4,pin_memory=True)
print('done')

In [None]:
k,v=next(iter(train_loader))
print(k.shape)
print(v.shape)

In [None]:
from torchvision.models import resnet34,ResNet34_Weights

num_classes=5

model = resnet34(weights=ResNet34_Weights.DEFAULT)
model.conv1=nn.Conv2d(in_channels=1,out_channels=64, kernel_size=3, stride=1, padding=1, bias=False )
model.maxpool=nn.Identity() # nn.Identity() => x=y  즉, 아무것도 안한다. 구조를 깨지않기 위한 장치. “stem에서만큼은 해상도를 최대한 보존하자”.  maxpool을 주석처리하면 에러. 지워도 에러
                            # 초반 stem을 바꾼것이라 초반부는 pretrained효과가 없다고 보면 된다. 어쩌면 이게 더 득이 될수도 독이 될수도..
model.fc=nn.Linear(model.fc.in_features, num_classes)

In [None]:
import torch.optim as optim
from torchmetrics import Accuracy, Recall

device="cuda" if torch.cuda.is_available() else "cpu"

backbone_params = []
new_params = []

for name, p in model.named_parameters():     # 원래 레이어와 일부레이어의 학습률 차이를 줄때 쓰는 기법
    if not p.requires_grad:
        continue

    # conv1 / fc는 새로 만든 파트
    if name.startswith("conv1") or name.startswith("fc"):
        new_params.append(p)
    else:
        backbone_params.append(p)

optimizer = optim.Adam(
    [
        {"params": backbone_params, "lr": 1e-4},
        {"params": new_params,      "lr": 1e-3},
    ],
    weight_decay=1e-4
)

loss_func=nn.CrossEntropyLoss()
scheduler=ReduceLROnPlateau(optimizer, factor=0.5, patience=4)
acc_metric=Accuracy(task="multiclass",num_classes=num_classes).to(device)
recall_metric=Recall(task="multiclass", num_classes=num_classes ).to(device)

In [None]:
def is_improvement(best_value,value):
    if value>best_value:
        return True
    else:
        return False
   
class EarlyStop:
    def __init__(self,early_stop,stop_count):
        self.early_stop=early_stop
        self.stop_count=0
    def stop_logic(self,best_value,value):
        if is_improvement(best_value,value):
            self.stop_count=0
            return False
        else:
            if self.stop_count>=self.early_stop:
                print("early_stopped")
                return True
            else:
                self.stop_count+=1
                return False

class Weights_ChkPoint:
    def __init__(self,path):
        self.path=path
        self.count=0
    def save(self,value):
        if is_improvement(best_value,value):
            torch.save(state_dict,f"{self.count}_{value:.4f}.pt")
            self.count+=1
            return False
        else:
            return False

In [None]:
from typing import List
from dataclasses import dataclass,field
from tqdm import tqdm

@dataclass
class History:
    training_accuracy:List[float]=field(default_factory=list)
    training_recall:List[float]=field(default_factory=list)
    training_loss:List[float]=field(default_factory=list)
    val_accuracy:List[float]=field(default_factory=list)
    val_recall:List[float]=field(default_factory=list)
    val_loss:List[float]=field(default_factory=list)
history=History()


class Trainer:
    def __init__(self,train_loader,val_loader,model,optimizer,loss_func,
                 scheduler,metric_acc,metric_rec,device,history,mode="min"):
        self.model=model.to(device)
        self.train_loader=train_loader
        self.val_loader=val_loader
        self.optimizer=optimizer
        self.loss_func=loss_func
        self.scheduler=scheduler
        self.metric_acc=metric_acc.to(device)
        self.metric_rec=metric_rec.to(device)
        self.device=device
        self.history=history
        if mode=="max":
            self.best_value=float('-inf')
        else:
            self.best_value=float('inf')

    def training_epoch(self,epoch):
        self.metric_acc.reset()
        self.metric_rec.reset()
        self.model.train()
        loss_sum=0.0
        avg_loss=0.0
        with tqdm(total=len(self.train_loader),desc=f"training {epoch}",leave=True) as bar:
            for batch_idx,(x_train,y_train) in enumerate(self.train_loader):
                x_train=x_train.to(self.device)
                y_train=y_train.to(self.device)
                logits=self.model(x_train)
                loss=self.loss_func(logits,y_train)
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                loss_sum+=loss.item()
                avg_loss=loss_sum/(batch_idx+1)
                preds=logits.argmax(dim=1)   # dim=-1과 같다. (B,12)  1번 dim 즉 행에 대해서
                self.metric_acc.update(preds, y_train)
                self.metric_rec.update(preds, y_train)
                bar.update(1)

                if batch_idx%10==0:
                    acc=self.metric_acc.compute().item()
                    recall=self.metric_rec.compute().item()
                    bar.set_postfix({"acc": acc, "recall":recall, "loss":avg_loss,"epoch":epoch})
            return self.metric_acc.compute().item(), self.metric_rec.compute().item(),avg_loss  

    def validating_epoch(self,epoch):
        self.metric_acc.reset()
        self.metric_rec.reset()
        self.model.eval()
        loss_sum=0
        avg_loss=0.0
        with tqdm(total=len(self.val_loader),desc=f"validating {epoch}", leave=True) as bar:
            with torch.no_grad():
                for batch_idx,(x_val,y_val) in enumerate(self.val_loader):
                    x_val=x_val.to(self.device)
                    y_val=y_val.to(self.device)
                    logits=self.model(x_val)
                    loss=self.loss_func(logits,y_val)

                    preds=logits.argmax(dim=-1)
                    self.metric_acc.update(preds,y_val)
                    self.metric_rec.update(preds,y_val)
                    loss_sum+=loss.item()
                    avg_loss=loss_sum/(batch_idx+1)
                    bar.update(1)
                    if batch_idx%10==0:
                        acc=self.metric_acc.compute().item()
                        recall=self.metric_rec.compute().item()
                        bar.set_postfix({"acc": acc, "recall":recall, "loss":avg_loss,"epoch":epoch})
                return self.metric_acc.compute().item(), self.metric_rec.compute().item(),avg_loss

    
    def fit(self,epochs,early_stop,path):
        stop_count=0   
        for epoch in range(epochs):
            training_accuracy,training_recall,training_loss=self.training_epoch(epoch)
            self.history.training_accuracy.append(training_accuracy)
            self.history.training_recall.append(training_recall)
            self.history.training_loss.append(training_loss)
            val_accuracy,val_recall,val_loss=self.validating_epoch(epoch)
            self.history.val_accuracy.append(val_accuracy)
            self.history.val_recall.append(val_recall)
            self.history.val_loss.append(val_loss)
            self.scheduler.step(val_loss)

            if self.best_value>val_loss:
                self.best_value=val_loss
                stop_count=0
                torch.save(self.model.state_dict(),os.path.join(path,f"{epoch}_{val_loss}.pt"))
            else:
                stop_count+=1
                if stop_count>=early_stop:
                    print(f"early_stopped. current epoch : {epoch}")
                    return self.history
                    
        return self.history

In [None]:
output_path=r"/kaggle/working/"

t=Trainer(train_loader,val_loader,model,optimizer,loss_func,scheduler,acc_metric,recall_metric,device,history,mode="min")
history=t.fit(30,10,output_path)

In [None]:
model.load_state_dict(torch.load("/kaggle/working/13_0.3982275021598511.pt"))
print('done')

In [None]:
from sklearn.metrics import confusion_matrix

class Predictor:
    def __init__(self, model, test_loader, device):
        self.model = model.to(device)
        self.test_loader = test_loader
        self.device = device

    def pred(self):
        pred_list, true_list = [], []
        self.model.eval()
        with tqdm(total=len(self.test_loader), desc="predicting", leave=True) as bar:
            with torch.no_grad():
                for x, y in self.test_loader:
                    x = x.to(self.device)
                    y = y.to(self.device)
                    logits = self.model(x)
                    preds = logits.argmax(dim=-1)

                    pred_list.extend(preds.detach().cpu().tolist())
                    true_list.extend(y.detach().cpu().tolist())
                    bar.update(1)

        return pred_list, true_list

p=Predictor(model,test_loader,device)
pred,true=p.pred()

In [None]:
from sklearn.metrics import confusion_matrix, accuracy_score

cm = confusion_matrix(true, pred)
acc = accuracy_score(true, pred)

cm