<a href="https://colab.research.google.com/github/Denev6/CapStone/blob/main/base.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install torchsummary

# CNN 살펴보기

In [None]:
import os
import gc
import warnings
from google.colab import drive

import numpy as np 
import pandas as pd
from tqdm.auto import tqdm, trange
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchsummary import summary
from sklearn.metrics import (
    f1_score,
    accuracy_score,
    recall_score,
    precision_score,
    confusion_matrix,
)

In [None]:
drive.mount("/content/drive")
warnings.simplefilter("ignore")

def clear():
    gc.collect()
    torch.cuda.empty_cache()

def join_path(*args):
    return os.path.join("/content/drive/MyDrive", *args)

# 사용한 데이터 파일
TRAIN_CSV = [
    join_path("tempData", csv)
    for csv in ["feat_xtrain.csv", "pose_xtrain.csv", "ytrain.csv"]
]
TEST_CSV = [
    join_path("tempData", csv) for csv in ["feat_xtest.csv", "pose_xtest.csv", "ytest.csv"]
]

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE = 8
EPOCHS = 5
LEARNING_RATE = 1e-6
MODEL_PATH = join_path("tdcn.pth")

## Dataset

Train Data
```python
clear()
data_len = 500
for csv in ["feat_xtrain.csv", "pose_xtrain.csv"]:
    data = join_path("Capstone", "data", csv)
    df = pd.read_csv(data)
    df = df.iloc[:data_len * 5000, :]
    df.to_csv(f"/content/drive/MyDrive/tempData/{csv}", index=False)

data = join_path("Capstone", "data", "ytrain.csv")
df = pd.read_csv(data)
df = df.iloc[:data_len, 1]
df.to_csv(f"/content/drive/MyDrive/tempData/{csv}", index=False)
```

In [None]:
class CustomDataset(Dataset):
    """데이터 처리"""
    def __init__(self, x1_file, x2_file, y_file, mode=None):
        x1_df = pd.read_csv(x1_file)  # landmark data
        x2_df = pd.read_csv(x2_file)  # pose data
        y_df = pd.read_csv(y_file)

        x1 = x1_df.values
        x2 = x2_df.values
        y = y_df.iloc[:, 0].values

        self.x1_data = torch.FloatTensor(x1)
        self.x2_data = torch.FloatTensor(x2)
        self.y_data = torch.IntTensor(y)

    def __len__(self):
        return len(self.y_data)

    def __getitem__(self, index):
        return (
            self.x1_data[index * 5000 : (index + 1) * 5000],
            self.x2_data[index * 5000 : (index + 1) * 5000],
        ), self.y_data[index]

In [None]:
# 데이터셋
training_data = CustomDataset(*TRAIN_CSV, mode="train")
test_data = CustomDataset(*TEST_CSV)

train_dataloader = DataLoader(training_data, batch_size=BATCH_SIZE, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=BATCH_SIZE)

## CNN

In [None]:
class CNN(nn.Module):
    def __init__(self, num_label=2):
        super(CNN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(1, 4, kernel_size=(5, 2), dilation=2, padding="same"),
            nn.ELU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(4, 8, kernel_size=(5, 2), dilation=2, padding="same"),
            nn.ELU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.BatchNorm2d(8, affine=True)
        )

        self.classifier = nn.Sequential(
            nn.Flatten(1, -1),
            nn.Linear(350000, 1024),
            nn.ELU(inplace=True),
            nn.Linear(1024, 64),
            nn.LeakyReLU(inplace=True),
            nn.Dropout(p=0.3),
            nn.Linear(64, num_label),
        )

    def forward(self, feat_x, pose_x):
        feat_y = self.conv(feat_x)
        pose_y = self.conv(pose_x)
        out = torch.cat((feat_y, pose_y), dim=-1)
        out = self.classifier(out)
        return out

In [None]:
model = CNN(num_label=2)
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=LEARNING_RATE, momentum=0.9)

In [None]:
model.to(DEVICE)
summary(model, [(1, 5000, 136), (1, 5000, 6)], batch_size=BATCH_SIZE)

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1          [8, 4, 5000, 136]              44
               ELU-2          [8, 4, 5000, 136]               0
         MaxPool2d-3           [8, 4, 2500, 68]               0
            Conv2d-4           [8, 8, 2500, 68]             328
               ELU-5           [8, 8, 2500, 68]               0
         MaxPool2d-6           [8, 8, 1250, 34]               0
       BatchNorm2d-7           [8, 8, 1250, 34]              16
            Conv2d-8            [8, 4, 5000, 6]              44
               ELU-9            [8, 4, 5000, 6]               0
        MaxPool2d-10            [8, 4, 2500, 3]               0
           Conv2d-11            [8, 8, 2500, 3]             328
              ELU-12            [8, 8, 2500, 3]               0
        MaxPool2d-13            [8, 8, 1250, 1]               0
      BatchNorm2d-14            [8, 8, 

## Train

In [None]:
def train(train_loader, test_loader, model, loss_fn, optimizer):
    model.to(DEVICE) 
    loss_fn.to(DEVICE)
    optimizer.zero_grad()
    num_batches = len(train_loader)

    epoch_progress = trange(1, EPOCHS + 1)
    tqdm.write("\nEpoch | Train Loss | Test Loss")
    tqdm.write("-" * 30)
    
    for epoch in epoch_progress:
        model.train()
        train_loss = 0
        for (x1, x2), label in train_loader:
            x1 = x1.unsqueeze(1).to(DEVICE) 
            x2 = x2.unsqueeze(1).to(DEVICE) 
            label = label.to(DEVICE)
            # x1: [8, 1, 5000, 136] > [Batch, Channel, Height, Width]
            # x2: [8, 1, 5000, 6] > [Batch, Channel, Height, Width]
            # label: [8] > [Height]
            
            pred = model(x1, x2)  
            # pred: [8, 1] > [Batch, Height, Width]
            loss = loss_fn(pred, label.long())
            train_loss += loss.item()
            
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

        train_loss /= num_batches
    
        test_loss = test(test_loader, model, loss_fn)
        tqdm.write(f"{epoch:5} | {train_loss:10.5f} | {test_loss:9.5f}")
    return model


def test(test_loader, model, loss_fn, return_metrics=False):
    model.eval()
    num_batches = len(test_loader)
    test_loss = 0
    true_labels = list()
    pred_values = list()

    with torch.no_grad():
        for (x1, x2), label in test_loader:
            x1 = x1.unsqueeze(1).to(DEVICE) 
            x2 = x2.unsqueeze(1).to(DEVICE) 
            label = label.to(DEVICE)

            pred = model(x1, x2)  
            loss = loss_fn(pred, label.long())
            test_loss += loss.item()

            if return_metrics:
                true_labels += label.detach().cpu().numpy().tolist()
                pred_values += pred.argmax(-1).detach().cpu().numpy().tolist()

    if not return_metrics:
        # 학습 과정에서는 Loss 값만 확인합니다.
        test_loss /= num_batches
        return test_loss

    else:
        # 학습이 종료되고 성능 평가 지표를 확인합니다.
        accuracy = accuracy_score(true_labels, pred_values)
        f1 = f1_score(true_labels, pred_values)
        f1_macro = f1_score(true_labels, pred_values, average="macro")
        recall = recall_score(true_labels, pred_values)
        precision = precision_score(true_labels, pred_values)
        matrix = confusion_matrix(true_labels, pred_values).ravel()

        return {
            "accuracy": accuracy,
            "f1": f1,
            "f1-macro": f1_macro,
            "recall": recall,
            "precision": precision,
            "loss": test_loss,
            "matrix": matrix,
        }

In [None]:
# 모델 학습
model = train(train_dataloader, test_dataloader, model, loss_fn, optimizer)

  0%|          | 0/5 [00:00<?, ?it/s]


Epoch | Train Loss | Test Loss
------------------------------
    1 |    0.67854 |   0.70533
    2 |    0.66904 |   0.70619
    3 |    0.66265 |   0.69999
    4 |    0.65952 |   0.70330
    5 |    0.65994 |   0.69991


## Test

In [None]:
clear()
metrics = test(test_dataloader, model, loss_fn, return_metrics=True)

print(f"Accuracy:  {metrics['accuracy']:.3f}")
print(f"F1-score:  {metrics['f1']:.3f}")
print(f"F1-macro:  {metrics['f1-macro']:.3f}")
print(f"Recall:    {metrics['recall']:.3f}")
print(f"Precision: {metrics['precision']:.3f}")

print("-" * 30)
tn, fp, fn, tp = metrics["matrix"]
print(f"TN: {tn}")
print(f"FP: {fp}")
print(f"FN: {fn}")
print(f"TP: {tp}")

Accuracy:  0.277
F1-score:  0.393
F1-macro:  0.249
Recall:    0.786
Precision: 0.262
------------------------------
TN: 2
FP: 31
FN: 3
TP: 11


In [None]:
def show_probs(test_data, model, max=6):
    dataloader = DataLoader(test_data, batch_size=1, shuffle=True)

    model.eval()
    neg_max = max // 2
    pos_max = max - neg_max
    pos_count = 0
    neg_count = 0
    with torch.no_grad():
        for (x1, x2), label in dataloader:
            x1 = x1.unsqueeze(1).to(DEVICE) 
            x2 = x2.unsqueeze(1).to(DEVICE) 
            # label = label.to(DEVICE)

            if label.item() == 0 and pos_count < pos_max:
                pos_count += 1
                label = label.item()
            elif label.item() == 1 and neg_count < neg_max:
                neg_count += 1
                label = label.item()
            elif pos_count + neg_count == max:
                break
            else:
                continue

            pred = model(x1, x2)
            pred = F.softmax(pred, dim=-1)
            normal, abnormal = pred.squeeze(0)
            print(f"{label}: [{normal:.3f}  {abnormal:.3f}]")


show_probs(test_data, model, 10)

0: [0.493  0.507]
0: [0.489  0.511]
1: [0.493  0.507]
1: [0.493  0.507]
0: [0.493  0.507]
0: [0.493  0.507]
1: [0.493  0.507]
0: [0.493  0.507]
1: [0.511  0.489]
1: [0.493  0.507]
