# Library Import

In [None]:
import numpy as np  # 수치 계산을 위한 라이브러리
import easydict  # 딕셔너리처럼 값을 다룰 수 있는 간편한 방법을 제공하는 라이브러리
import torch  # PyTorch 메인 라이브러리
import torch.nn as nn  # 신경망 모델을 구성하기 위한 모듈
import torch.nn.functional as F  # 신경망에서 자주 사용하는 함수들을 포함하는 모듈
from torch.utils import data  # 데이터 로딩 및 처리 유틸리티
from torch.utils.data import DataLoader  # 데이터를 배치로 묶어주는 유틸리티
from torchvision import datasets, transforms  # 컴퓨터 비전용 데이터셋과 데이터 변환을 위한 유틸리티
from PIL import Image  # 이미지 처리를 위한 라이브러리
from sklearn.metrics import roc_auc_score  # ROC AUC 점수를 계산하기 위한 함수

# Dataset Load and Preprocessing

In [None]:
class MNIST_loader(data.Dataset):  # Dataset 클래스를 상속받아 MNIST 데이터를 로드하는 클래스 정의
    """Preprocessing을 포함한 dataloader를 구성"""

    def __init__(self, data, target, transform):  # 클래스 초기화 메서드
        self.data = data  # 데이터셋의 입력 이미지들
        self.target = target  # 데이터셋의 레이블들
        self.transform = transform  # 이미지 전처리 변환 함수

    def __getitem__(self, index):  # 주어진 인덱스의 데이터를 반환하는 메서드
        x = self.data[index]  # 인덱스에 해당하는 이미지 데이터
        y = self.target[index]  # 인덱스에 해당하는 레이블
        if self.transform:  # 변환 함수가 존재하면
            x = Image.fromarray(x.numpy(), mode='L')  # numpy 배열을 PIL 이미지로 변환
            x = self.transform(x)  # 변환 함수 적용
        return x, y  # 전처리된 이미지와 레이블 반환

    def __len__(self):  # 데이터셋의 크기를 반환하는 메서드
        return len(self.data)  # 데이터셋의 전체 이미지 개수 반환

In [None]:
def get_mnist(args, data_dir='../data/'):  # MNIST 데이터셋을 로드하고 데이터 로더를 반환하는 함수
    """get dataloaders"""
    # 각 클래스에 대한 GCN을 적용한 후의 최소 및 최대 값 (원래 구현에 따라)
    # GCN (Global Contrast Normalization) : 전역적 명암 대비 조절 정규화
    min_max = [(-0.8826567065619495, 9.001545489292527),
               (-0.6661464580883915, 20.108062262467364),
               (-0.7820454743183202, 11.665100841080346),
               (-0.7645772083211267, 12.895051191467457),
               (-0.7253923114302238, 12.683235701611533),
               (-0.7698501867861425, 13.103278415430502),
               (-0.778418217980696, 10.457837397569108),
               (-0.7129780970522351, 12.057777597673047),
               (-0.8280402650205075, 10.581538445782988),
               (-0.7369959242164307, 10.697039838804978)]

    # 데이터 변환을 정의 (텐서 변환, GCN 적용, 정규화)
    transform = transforms.Compose([transforms.ToTensor(),  # 이미지를 텐서로 변환 : pixel값 [0, 225] -> [0.0, 1.0]
                                    transforms.Lambda(lambda x: global_contrast_normalization(x)),  # GCN 적용
                                    # pixel값별 정규화 적용
                                    transforms.Normalize([min_max[args.normal_class][0]],  # mean
                                                         [min_max[args.normal_class][1] \  # std
                                                         -min_max[args.normal_class][0]])])

    # MNIST 데이터셋 로드 (train=True: 훈련 데이터, train=False: 테스트 데이터)
    train = datasets.MNIST(root=data_dir, train=True, download=True)
    test = datasets.MNIST(root=data_dir, train=False, download=True)

    # 훈련 데이터와 레이블 추출
    x_train = train.data
    y_train = train.targets

    # 정상 클래스에 해당하는 데이터만 선택 (train은 정상 데이터만 사용)
    x_train = x_train[np.where(y_train==args.normal_class)]
    y_train = y_train[np.where(y_train==args.normal_class)]

    # MNIST_loader 객체를 생성하여 훈련 데이터 로더 생성
    data_train = MNIST_loader(x_train, y_train, transform)
    # batch 적용
    dataloader_train = DataLoader(data_train, batch_size=args.batch_size,
                                  shuffle=True, num_workers=0)

    # 테스트 데이터와 레이블 추출
    x_test = test.data
    y_test = test.targets

    # 정상 클래스는 0으로, 나머지 클래스는 1로 변환 (정상 vs 비정상 분류)
    y_test = np.where(y_test==args.normal_class, 0, 1)

    # MNIST_loader 객체를 생성하여 테스트 데이터 로더 생성
    data_test = MNIST_loader(x_test, y_test, transform)
    dataloader_test = DataLoader(data_test, batch_size=args.batch_size,
                                 shuffle=False, num_workers=0)

    # 훈련 데이터 로더와 테스트 데이터 로더 반환
    return dataloader_train, dataloader_test


In [None]:
def global_contrast_normalization(x):
    """Apply global contrast normalization to tensor."""
    mean = torch.mean(x)  # 텐서의 모든 요소(픽셀)에 대한 평균을 계산
    x -= mean  # 평균을 제거하여 텐서의 중심을 맞춤
    x_scale = torch.mean(torch.abs(x))  # 텐서 요소의 절대값에 대한 평균을 계산
    x /= x_scale  # 절대값의 평균으로 나누어 대비를 정규화
    return x  # 정규화된 텐서를 반환


# Model Class

In [None]:
class DeepSVDD_network(nn.Module):  # nn.Module을 상속받아 DeepSVDD 네트워크 클래스 정의
    def __init__(self, z_dim=32):  # 클래스 초기화 메서드
        super(DeepSVDD_network, self).__init__()  # 부모 클래스(nn.Module)의 초기화 메서드 호출
        self.pool = nn.MaxPool2d(2, 2)  # 2x2 크기의 최대 풀링 레이어 정의

        self.conv1 = nn.Conv2d(1, 8, 5, bias=False, padding=2)  # 1채널 입력, 8채널 출력, 5x5 커널 크기의 첫 번째 conv 레이어
        self.bn1 = nn.BatchNorm2d(8, eps=1e-04, affine=False)  # 첫 번째 배치 정규화 레이어, 8채널

        self.conv2 = nn.Conv2d(8, 4, 5, bias=False, padding=2)  # 8채널 입력, 4채널 출력, 5x5 커널 크기의 두 번째 conv 레이어
        self.bn2 = nn.BatchNorm2d(4, eps=1e-04, affine=False)  # 두 번째 배치 정규화 레이어, 4채널

        self.fc1 = nn.Linear(4 * 7 * 7, z_dim, bias=False)  # 4*7*7 입력, z_dim 출력의 fc 레이어

    def forward(self, x):  # 순전파 메서드 정의
        x = self.conv1(x)  # 첫 번째 합성곱 레이어 적용
        # 입력: (N, 1, 28, 28), 출력: (N, 8, 28, 28) (패딩을 주어 크기 유지)
        x = self.pool(F.leaky_relu(self.bn1(x)))  # 첫 번째 배치 정규화, Leaky ReLU 활성화 함수, 최대 풀링 적용
        # 출력: (N, 8, 14, 14) (2x2 풀링으로 인해 공간 차원 절반으로 감소)

        x = self.conv2(x)  # 두 번째 합성곱 레이어 적용
        # 출력: (N, 4, 14, 14) (패딩을 주어 크기 유지)
        x = self.pool(F.leaky_relu(self.bn2(x)))  # 두 번째 배치 정규화, Leaky ReLU 활성화 함수, 최대 풀링 적용
        # 출력: (N, 4, 7, 7) (2x2 풀링으로 인해 공간 차원 절반으로 감소)

        x = x.view(x.size(0), -1)  # 텐서를 (배치 크기, 나머지 요소) 형태로 변형 (Flatten)
        # 출력: (N, 4 * 7 * 7) = (N, 196)

        return self.fc1(x)  # fc 레이어 적용 후 반환
        # 출력: (N, z_dim)


In [None]:
class pretrain_autoencoder(nn.Module):  # nn.Module을 상속받아 Autoencoder 네트워크 클래스 정의
    def __init__(self, z_dim=32):  # 클래스 초기화 메서드
        super(pretrain_autoencoder, self).__init__()
        self.z_dim = z_dim  # 잠재 공간(z) 차원
        self.pool = nn.MaxPool2d(2, 2)  # 2x2 크기의 최대 풀링 레이어 정의

        # 인코더 부분
        self.conv1 = nn.Conv2d(1, 8, 5, bias=False, padding=2)  # 1채널 입력, 8채널 출력, 5x5 커널 크기의 첫 번째 conv 레이어
        self.bn1 = nn.BatchNorm2d(8, eps=1e-04, affine=False)  # 첫 번째 배치 정규화 레이어, 8채널
        self.conv2 = nn.Conv2d(8, 4, 5, bias=False, padding=2)  # 8채널 입력, 4채널 출력, 5x5 커널 크기의 두 번째 conv 레이어
        self.bn2 = nn.BatchNorm2d(4, eps=1e-04, affine=False)  # 두 번째 배치 정규화 레이어, 4채널
        self.fc1 = nn.Linear(4 * 7 * 7, z_dim, bias=False)  # 4*7*7 입력, z_dim 출력의 fc 레이어

        # 디코더 부분
        self.deconv1 = nn.ConvTranspose2d(2, 4, 5, bias=False, padding=2)  # 2채널 입력, 4채널 출력, 5x5 커널 크기의 첫 번째 Transpose conv 레이어
        self.bn3 = nn.BatchNorm2d(4, eps=1e-04, affine=False)  # 세 번째 배치 정규화 레이어, 4채널
        self.deconv2 = nn.ConvTranspose2d(4, 8, 5, bias=False, padding=3)  # 4채널 입력, 8채널 출력, 5x5 커널 크기의 두 번째 Transpose conv 레이어
        self.bn4 = nn.BatchNorm2d(8, eps=1e-04, affine=False)  # 네 번째 배치 정규화 레이어, 8채널
        self.deconv3 = nn.ConvTranspose2d(8, 1, 5, bias=False, padding=2)  # 8채널 입력, 1채널 출력, 5x5 커널 크기의 세 번째 Transpose conv 레이어

    def encoder(self, x):  # 인코더 부분
        x = self.conv1(x)  # 첫 번째 합성곱 레이어 적용
        # 입력: (N, 1, 28, 28), 출력: (N, 8, 28, 28) (패딩을 주어 크기 유지)
        x = self.pool(F.leaky_relu(self.bn1(x)))  # 첫 번째 배치 정규화, Leaky ReLU 활성화 함수, 최대 풀링 적용
        # 출력: (N, 8, 14, 14) (2x2 풀링으로 인해 공간 차원 절반으로 감소)

        x = self.conv2(x)  # 두 번째 합성곱 레이어 적용
        # 출력: (N, 4, 14, 14) (패딩을 주어 크기 유지)
        x = self.pool(F.leaky_relu(self.bn2(x)))  # 두 번째 배치 정규화, Leaky ReLU 활성화 함수, 최대 풀링 적용
        # 출력: (N, 4, 7, 7) (2x2 풀링으로 인해 공간 차원 절반으로 감소)

        x = x.view(x.size(0), -1)  # 텐서를 (배치 크기, 나머지 요소) 형태로 변형 (Flatten)
        # 출력: (N, 4 * 7 * 7) = (N, 196)

        return self.fc1(x)  # fc 레이어 적용 후 반환
        # 출력: (N, z_dim)

    def decoder(self, x):  # 디코더 부분
        x = x.view(x.size(0), int(self.z_dim / 16), 4, 4)  # 텐서를 (배치 크기, 채널, 높이, 너비) 형태로 변형
        # 입력: (N, z_dim), 출력: (N, 2, 4, 4) (여기서 2는 z_dim을 16으로 나눈 값)

        x = F.interpolate(F.leaky_relu(x), scale_factor=2)  # Leaky ReLU 활성화 함수 적용 후 2배 업샘플링
        # 출력: (N, 2, 8, 8)

        x = self.deconv1(x)  # 첫 번째 Transpose conv 레이어 적용
        # 출력: (N, 4, 8, 8)

        x = F.interpolate(F.leaky_relu(self.bn3(x)), scale_factor=2)  # 세 번째 배치 정규화, Leaky ReLU 활성화 함수, 2배 업샘플링 적용
        # 출력: (N, 4, 16, 16)

        x = self.deconv2(x)  # 두 번째 Transpose conv 레이어 적용
        # 출력: (N, 8, 18, 18) (패딩으로 인해 크기 증가)

        x = F.interpolate(F.leaky_relu(self.bn4(x)), scale_factor=2)  # 네 번째 배치 정규화, Leaky ReLU 활성화 함수, 2배 업샘플링 적용
        # 출력: (N, 8, 36, 36)

        x = self.deconv3(x)  # 세 번째 Transpose conv 레이어 적용
        # 출력: (N, 1, 36, 36)

        return torch.sigmoid(x)  # 출력 값을 [0, 1] 범위로 제한하기 위해 Sigmoid 함수 적용

    def forward(self, x):  # 순전파 메서드 정의
        z = self.encoder(x)  # 인코더를 통해 입력 이미지를 잠재 공간(z)으로 변환
        x_hat = self.decoder(z)  # 디코더를 통해 잠재 공간(z)에서 재구성된 이미지를 생성
        return x_hat  # 재구성된 이미지 반환


# Train Class

In [None]:
class TrainerDeepSVDD:
    def __init__(self, args, data_loader, device):  # 클래스 초기화 메서드
        self.args = args  # 인자로 받은 설정 값들
        self.train_loader = data_loader  # 데이터 로더
        self.device = device  # 사용할 디바이스 (CPU 또는 GPU)

    def pretrain(self):
        """ DeepSVDD 모델에서 사용할 가중치를 학습시키는 AutoEncoder 학습 단계 """
        # Autoencoder 모델 초기화 및 디바이스로 이동
        ae = pretrain_autoencoder(self.args.latent_dim).to(self.device)
        ae.apply(weights_init_normal)  # 가중치 초기화

        # Adam 옵티마이저 초기화
        optimizer = torch.optim.Adam(ae.parameters(), lr=self.args.lr_ae,
                               weight_decay=self.args.weight_decay_ae)

        # 학습률 스케줄러 초기화
        scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer,
                    milestones=self.args.lr_milestones, gamma=0.1)

        ae.train()  # 모델을 학습 모드로 전환
        for epoch in range(self.args.num_epochs_ae):  # 지정된 에폭 수만큼 반복
            total_loss = 0  # 총 손실 초기화
            for x, _ in self.train_loader:  # 데이터 로더에서 미니 배치 단위로 데이터 가져오기
                x = x.float().to(self.device)  # 입력 데이터를 플로트형으로 변환 후 디바이스로 이동

                optimizer.zero_grad()  # 옵티마이저의 기울기 초기화
                x_hat = ae(x)  # Autoencoder를 통해 입력 데이터 재구성
                reconst_loss = torch.mean(torch.sum((x_hat - x) ** 2, dim=tuple(range(1, x_hat.dim()))))  # 재구성 손실 계산
                reconst_loss.backward()  # 역전파를 통해 기울기 계산
                optimizer.step()  # 옵티마이저를 통해 모델 파라미터 업데이트

                total_loss += reconst_loss.item()  # 총 손실에 현재 배치의 손실 더하기
            scheduler.step()  # 학습률 스케줄러 갱신
            print('Pretraining Autoencoder... Epoch: {}, Loss: {:.3f}'.format(
                   epoch, total_loss/len(self.train_loader)))  # 현재 에폭과 평균 손실 출력
        self.save_weights_for_DeepSVDD(ae, self.train_loader)  # 학습된 가중치를 저장하는 메서드 호출

    def save_weights_for_DeepSVDD(self, model, dataloader):
        """학습된 AutoEncoder 가중치를 DeepSVDD모델에 Initialize해주는 함수"""
        c = self.set_c(model, dataloader)  # hyper sphere의 중심 c를 설정
        net = DeepSVDD_network(self.args.latent_dim).to(self.device)  # DeepSVDD 네트워크 초기화
        state_dict = model.state_dict()  # AutoEncoder의 가중치를 가져옴
        net.load_state_dict(state_dict, strict=False)  # 가중치를 DeepSVDD 네트워크에 로드
        torch.save({'center': c.cpu().data.numpy().tolist(),
                    'net_dict': net.state_dict()}, '../weights/pretrained_parameters.pth')  # 가중치와 중심을 파일로 저장

    def set_c(self, model, dataloader, eps=0.1):
        """Initializing the center for the hypersphere"""
        model.eval()  # 모델을 평가 모드로 전환
        z_ = []
        with torch.no_grad():  # 기울기 계산을 하지 않음
            for x, _ in dataloader:
                x = x.float().to(self.device)
                z = model.encoder(x)  # 인코더를 통해 잠재 벡터 추출
                z_.append(z.detach())  # 추출된 벡터를 리스트에 추가
        z_ = torch.cat(z_)  # 리스트를 하나의 텐서로 결합
        c = torch.mean(z_, dim=0)  # 각 차원의 평균을 계산하여 중심을 설정
        c[(abs(c) < eps) & (c < 0)] = -eps  # 중심 값이 작은 경우 epsilon 값으로 설정
        c[(abs(c) < eps) & (c > 0)] = eps
        return c

    def train(self):
        """Deep SVDD model 학습"""
        net = DeepSVDD_network(self.args.latent_dim).to(self.device)  # DeepSVDD 네트워크 초기화

        if self.args.pretrain == True:  # 사전 학습된 가중치를 사용할 경우
            state_dict = torch.load('../weights/pretrained_parameters.pth')  # 가중치 파일 로드
            net.load_state_dict(state_dict['net_dict'])  # 네트워크에 가중치 로드
            c = torch.Tensor(state_dict['center']).to(self.device)  # 중심 로드
        else:  # 사전 학습된 가중치를 사용하지 않을 경우
            net.apply(weights_init_normal)  # 가중치 초기화
            c = torch.randn(self.args.latent_dim).to(self.device)  # 랜덤 중심 설정

        optimizer = torch.optim.Adam(net.parameters(), lr=self.args.lr,
                                     weight_decay=self.args.weight_decay)  # Adam 옵티마이저 초기화
        scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer,
                    milestones=self.args.lr_milestones, gamma=0.1)  # 학습률 스케줄러 초기화

        net.train()  # 모델을 학습 모드로 전환
        for epoch in range(self.args.num_epochs):
            total_loss = 0
            for x, _ in self.train_loader:
                x = x.float().to(self.device)

                optimizer.zero_grad()  # 옵티마이저의 기울기 초기화
                z = net(x)  # 입력 데이터를 네트워크에 통과시켜 잠재 벡터 추출
                loss = torch.mean(torch.sum((z - c) ** 2, dim=1))  # 중심과의 거리 제곱 합으로 손실 계산
                loss.backward()  # 역전파를 통해 기울기 계산
                optimizer.step()  # 옵티마이저를 통해 모델 파라미터 업데이트

                total_loss += loss.item()  # 총 손실에 현재 배치의 손실 더하기
            scheduler.step()  # 학습률 스케줄러 갱신
            print('Training Deep SVDD... Epoch: {}, Loss: {:.3f}'.format(
                   epoch, total_loss / len(self.train_loader)))  # 현재 에폭과 평균 손실 출력
        self.net = net  # 학습된 네트워크 저장
        self.c = c  # 학습된 중심 저장

        return self.net, self.c  # 학습된 네트워크와 중심 반환

def weights_init_normal(m):
    classname = m.__class__.__name__
    if classname.find("Conv") != -1 and classname != 'Conv':
        torch.nn.init.normal_(m.weight.data, 0.0, 0.02)  # conv 레이어 가중치를 정규분포로 초기화
    elif classname.find("Linear") != -1:
        torch.nn.init.normal_(m.weight.data, 0.0, 0.02)  # fc 레이어 가중치를 정규분포로 초기화


# Train

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
args = easydict.EasyDict({
       'num_epochs':50,
       'num_epochs_ae':50,
       'lr':1e-3,
       'lr_ae':1e-3,
       'weight_decay':5e-7,
       'weight_decay_ae':5e-3,
       'lr_milestones':[50],
       'batch_size':1024,
       'pretrain':True,
       'latent_dim':32,
       'normal_class':0
                })

if __name__ == '__main__':

    # Train/Test Loader 불러오기
    dataloader_train, dataloader_test = get_mnist(args)

    # Network 학습준비, 구조 불러오기
    deep_SVDD = TrainerDeepSVDD(args, dataloader_train, device)

    # DeepSVDD를 위한 DeepLearning pretrain 모델로 Weight 학습 (AutoEncoder)
    if args.pretrain:
        deep_SVDD.pretrain()

    # 학습된 가중치로 Deep_SVDD모델 Train
    net, c = deep_SVDD.train()

# Val

In [None]:
def eval(net, c, dataloader, device):
    """Testing the Deep SVDD model"""
    scores = []  # 점수를 저장할 리스트
    labels = []  # 레이블을 저장할 리스트
    net.eval()  # 모델을 평가 모드로 전환
    print('Testing...')

    with torch.no_grad():  # 기울기 계산을 하지 않음
        for x, y in dataloader:  # 데이터 로더에서 배치를 반복하여 가져옴
            x = x.float().to(device)  # 입력 데이터를 float type으로 변환 후 디바이스로 이동
            z = net(x)  # 네트워크를 통해 잠재 벡터 추출
            score = torch.sum((z - c) ** 2, dim=1)  # 중심과의 거리 제곱 합으로 점수 계산

            scores.append(score.detach().cpu())  # 계산된 점수를 리스트에 추가
            labels.append(y.cpu())  # 레이블을 리스트에 추가

    labels, scores = torch.cat(labels).numpy(), torch.cat(scores).numpy()  # 리스트를 하나의 텐서로 결합하고 numpy 배열로 변환
    print('ROC AUC score: {:.2f}'.format(roc_auc_score(labels, scores) * 100))  # ROC AUC 점수 계산 및 출력
    return labels, scores  # 레이블과 점수를 반환
