# 목표

Conv based로 D 모델만 학습이 가능한지 확인.

1. 가상의 G 모델(CLSTMC, pretrained)로 testset에 대해 prediction들을 생성
2. 생성한 데이터를 npy 파일로 저장
3. D 모델에서 가짜와 진짜 데이터를 구분이 가능한지(예측 데이터와 실제 데이터에 실제로 차이가 있는지) 를 확인한다.

In [50]:
from multiprocessing import cpu_count
from pathlib import Path
from typing import List, Union, AnyStr
from math import pi
import pickle

import numpy as np
import torch
import matplotlib.pyplot as plt
import torch.nn as nn
from sklearn import model_selection
from tqdm import tqdm
import torch_optimizer
from torch.utils.data import Dataset, DataLoader, random_split
import pandas as pd
import random
from torchvision.models.resnet import resnet18

import torch_burn as tb
import utils

In [2]:
tb.seed_everything(0)

In [3]:
EXPERIMENT_NAME = '1201-GAN-G-D'
CHECKPOINT_DIR = Path('checkpoint', EXPERIMENT_NAME)
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)

DATA_DIR = Path('data/1116')
TRAIN_FILES = sorted(list(DATA_DIR.glob('*scene3_0.csv')))
TEST_FILES = sorted(list(DATA_DIR.glob('*scene3_1.csv')))

# Create Dataset

In [4]:
def load_result(checkpoint_dir):
    d = Path(checkpoint_dir)
    X = np.load(d / 'result-X.npy')
    Y = np.load(d / 'result-Y.npy')
    P = np.load(d / 'result-P.npy')
    return X, Y, P

In [5]:
X, Y, P = load_result('checkpoint/1130-Scene3-CLSTMC4-X60')

In [6]:
X.shape, Y.shape, P.shape

((54167, 60, 6), (54167, 3), (54167, 3))

In [58]:
class DetectorDataset(Dataset):
    def __init__(self, Y, P, L):
        super(DetectorDataset, self).__init__()

        self.Y = torch.tensor(Y, dtype=torch.float32).transpose(0, 1)
        self.P = torch.tensor(P, dtype=torch.float32).transpose(0, 1)
        self.L = L
        self.Ysize = self.Y.shape[1] // self.L
        self.Psize = self.P.shape[1] // self.L
        self.dssize = self.Ysize + self.Psize

    def __len__(self):
        return self.dssize

    def __getitem__(self, idx):
        if idx < self.Ysize:
            input = self.Y[:, idx:idx + self.L]
            target = torch.tensor([0], dtype=torch.float32)
        else:
            idx -= self.Ysize
            input = self.P[:, idx:idx + self.L]
            target = torch.tensor([1], dtype=torch.float32)
        return input, target

In [59]:
ds = DetectorDataset(Y, P, 60)

# Create Model

## Model Common Parts

In [9]:
class ResBlock1d(nn.Module):
    expansion = 1
    
    def __init__(self, inchannels, channels, kernel_size, stride=1, groups=1):
        super(ResBlock1d, self).__init__()

        self.conv1 = nn.Sequential(
            nn.Conv1d(inchannels, channels, kernel_size, padding=kernel_size // 2, stride=stride, groups=groups),
            nn.BatchNorm1d(channels),
            nn.LeakyReLU(),
            nn.Conv1d(channels, channels, kernel_size, padding=kernel_size // 2, groups=groups),
            nn.BatchNorm1d(channels)
        )
        self.act = nn.LeakyReLU()
        
        self.conv2 = None
        if inchannels != channels:
            self.conv2 = nn.Sequential(
                nn.Conv1d(inchannels, channels, 1, stride=stride, groups=groups),
                nn.BatchNorm1d(channels)
            )

    def forward(self, x):
        identity = x

        x = self.conv1(x)
        if self.conv2 is not None:
            identity = self.conv2(identity)
        x += identity
        x = self.act(x)

        return x

## D Model - Resnet15 1d

In [45]:
class ConvDetector(nn.Module):
    def __init__(self, block, layers):
        super(ConvDetector, self).__init__()
        
        self.inchannels = 64
        
        self.conv1 = nn.Sequential(
            nn.Conv1d(3, self.inchannels, 7, stride=2, padding=3, bias=False),
            nn.BatchNorm1d(64),
            nn.ReLU()
        )
        self.maxpool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)
        
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(512 * block.expansion, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.conv1(x)
        x = self.maxpool(x)
        
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        x = self.sigmoid(x)

        return x
    
    def _make_layer(self, block, channels, blocks, stride=1):
        layers = []
        layers.append(block(self.inchannels, channels, 3))
        self.inchannels = channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inchannels, channels, 3))
        
        return nn.Sequential(*layers)

## G Model - Conv Layer

In [39]:
class GModel(nn.Module):
    def __init__(self):
        super(GModel, self).__init__()

        self.conv = nn.Sequential(
            nn.Conv1d(3, 64, 3, padding=1),
            nn.BatchNorm1d(64),
            nn.LeakyReLU(),
            nn.Conv1d(64, 64, 3, padding=1),
            nn.BatchNorm1d(64),
            nn.LeakyReLU(),
            nn.Conv1d(64, 3, 1)
        )

    def forward(self, x):
        x = self.conv(x)
        return x

## 모델 생성

In [46]:
D = ConvDetector(ResBlock1d, [2, 2, 2, 2]).cuda()

In [47]:
G = GModel().cuda()

In [48]:
x, y = ds[0]
x.shape, y.shape

(torch.Size([3, 60]), torch.Size([1]))

In [49]:
G(x.unsqueeze(0).cuda()).shape

torch.Size([1, 3, 60])

# Training

In [61]:
valid_size = int(len(ds) * 0.2)
ds_train, ds_test = random_split(ds, [len(ds) - valid_size, valid_size])

In [63]:
dl_train = DataLoader(ds_train, batch_size=256, num_workers=8, pin_memory=True, shuffle=True)
dl_test  = DataLoader(ds_test,  batch_size=256, num_workers=8, pin_memory=True, shuffle=True)

In [None]:
for epoch in range(1, 101):
    # Training D
    with tqdm(total=len(dl_train), position=0, ncols=100) as t:
        for x, y in dl_train:
            x = x.cuda()
            y = y.cuda()
            p = G(x)
            
            loss_D = D(y)
            z
            
        
    # Training G
    with tqdm(total=len(dl_train), position=0, ncols=100) as t: