## 導入所有必要的程式庫和宣告

In [1]:
import matplotlib.pyplot as plt
import numpy as np
import random

import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from sklearn.ensemble import IsolationForest

TARGET_SIZE = (28, 28)
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

## 載入 MNIST 訓練資料

In [2]:
transform = transforms.Compose([transforms.ToTensor(), transforms.Resize(size=TARGET_SIZE)])
trainset = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=transform)
testset = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transform)

class SubDataset(Dataset):
    LABEL_TO_INDEX = {
        1: 0, 3: 1, 5: 2, 7: 3
    }
    INDEX_TO_LABEL = [
        1, 3, 5, 7
    ]

    def __init__(self, full_dataset):
        self._data_pair = list()
        for data in full_dataset:
            img, label = data
            if label in self.LABEL_TO_INDEX.keys():
                self._data_pair.append((img, label))

    def __len__(self):
        return len(self._data_pair)

    def __getitem__(self, idx):
        img, label = self._data_pair[idx]
        return img, self.LABEL_TO_INDEX[label]

class AbnormDataset(Dataset):
    NORM_LABEL = [
        1, 3, 5, 7
    ]

    def __init__(self, full_dataset):
        self._data_pair = list()
        for data in full_dataset:
            img, label = data
            self._data_pair.append((img, label not in self.NORM_LABEL))

    def __len__(self):
        return len(self._data_pair)

    def __getitem__(self, idx):
        img, abnorm = self._data_pair[idx]
        return img, abnorm

t_subset = SubDataset(trainset)
t_loader = torch.utils.data.DataLoader(t_subset, batch_size=64, shuffle=True)
v_subset = SubDataset(testset)
v_loader = torch.utils.data.DataLoader(v_subset, batch_size=64, shuffle=True)
r_dataset = AbnormDataset(trainset)
r_loader = torch.utils.data.DataLoader(r_dataset, batch_size=64, shuffle=True)
a_dataset = AbnormDataset(testset)
a_loader = torch.utils.data.DataLoader(a_dataset, batch_size=64, shuffle=True)

## 建構用於分類網路（第一小題）

In [3]:
class FullyConnect(nn.Module):
    def __init__(self, in_size,
                       out_size,
                       activation=None):
        super().__init__()
        self.act = activation
        self.linear = nn.Linear(
            in_size,
            out_size,
            bias=True
        )

    def forward(self, x):
        x = self.linear(x)
        if not self.act is None:
            x = self.act(x)
        return x

class ConvBlock(nn.Module):
    def __init__(self, in_channels,
                       out_channels,
                       kernel_size,
                       activation=None):
        super().__init__()
        self.act = activation
        self.conv = nn.Conv2d(
            in_channels,
            out_channels,
            kernel_size,
            padding="same",
            bias=False,
        )
        self.bn = nn.BatchNorm2d(
            out_channels,
            eps=1e-5
        )
        nn.init.kaiming_normal_(self.conv.weight,
                                mode="fan_out",
                                nonlinearity="relu")
    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        if not self.act is None:
            x = self.act(x)
        return x

class ClassifyNetwork(nn.Module):
    def __init__(self):
        super(ClassifyNetwork, self).__init__()
        self.img_size = TARGET_SIZE

        self.body = nn.Sequential(
            ConvBlock(1, 32, 7, nn.SiLU()),
            ConvBlock(32, 32, 3, nn.SiLU()),
            ConvBlock(32, 2, 3, nn.SiLU())
        )

        h, w = self.img_size
        self.head = nn.Sequential(
            nn.Flatten(),
            nn.Dropout(p=0.1),
            FullyConnect(h * w * 2, 128, nn.SiLU()),
            nn.Dropout(p=0.1),
            FullyConnect(128, 4),
        )
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.body(x)
        x = self.head(x)
        return x

    def get_prob(self, x):
        x = self.forward(x)
        return self.softmax(x)

## 訓練 MNIST 分類網路（第一小題）

In [4]:
classify_net = ClassifyNetwork()
classify_net = classify_net.to(device)

cross_entroy = nn.CrossEntropyLoss()
opt = optim.SGD(classify_net.parameters(),
                lr=0.01,
                momentum=0.9,
                nesterov=True,
                weight_decay=0.001)

running_loss = list()
for e in range(10):
    classify_net.train()
    for imgs, labels in t_loader:
        imgs, labels = imgs.to(device), labels.to(device)
        opt.zero_grad()
        loss = cross_entroy(classify_net(imgs), labels)
        loss.backward()
        opt.step()

        running_loss.append(loss.item())
        if len(running_loss) > 500:
            running_loss.pop(0)

    classify_net.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for imgs, labels in v_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = classify_net(imgs)
            _, pred = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (pred == labels).sum().item()
        print("epoch {} -> loss: {:.4f}, acc: {:.2f}%".format(
                  e+1, sum(running_loss)/len(running_loss), 100.0 * correct/total))

epoch 1 -> loss: 0.0895, acc: 99.21%
epoch 2 -> loss: 0.0260, acc: 99.48%
epoch 3 -> loss: 0.0185, acc: 99.38%
epoch 4 -> loss: 0.0160, acc: 99.46%
epoch 5 -> loss: 0.0129, acc: 99.66%
epoch 6 -> loss: 0.0109, acc: 99.58%
epoch 7 -> loss: 0.0089, acc: 99.61%
epoch 8 -> loss: 0.0084, acc: 99.68%
epoch 9 -> loss: 0.0070, acc: 99.63%
epoch 10 -> loss: 0.0062, acc: 99.29%


## 測試分類網路的檢測性能（第一小題）

In [5]:
classify_net.eval()
best_classify_net_acc = 0
for i in range(100):
    thres = 0.995 + 0.005 * (i/100)
    with torch.no_grad():
        correct = 0
        total = 0
        for imgs, abnorm in a_loader:
            imgs, abnorm = imgs.to(device), abnorm.to(device)
            prob, _ = torch.max(classify_net.get_prob(imgs), dim=1)
            pred_abnorm = torch.where(prob < thres, True, False)
            correct += (pred_abnorm == abnorm).sum().item()
            total += abnorm.size(0)
        classify_net_acc = correct/total
        if best_classify_net_acc < classify_net_acc:
            best_classify_net_acc = classify_net_acc
            print("Test... current best accuracy is {:.2f}%".format(100 * best_classify_net_acc))
print("Best accuracy: {:.2f}%".format(100 * best_classify_net_acc))

Test... current best accuracy is 78.80%
Test... current best accuracy is 78.86%
Test... current best accuracy is 78.88%
Test... current best accuracy is 78.91%
Test... current best accuracy is 78.98%
Test... current best accuracy is 79.00%
Test... current best accuracy is 79.06%
Test... current best accuracy is 79.12%
Test... current best accuracy is 79.16%
Test... current best accuracy is 79.22%
Test... current best accuracy is 79.26%
Test... current best accuracy is 79.33%
Test... current best accuracy is 79.36%
Test... current best accuracy is 79.40%
Test... current best accuracy is 79.50%
Test... current best accuracy is 79.54%
Test... current best accuracy is 79.59%
Test... current best accuracy is 79.62%
Test... current best accuracy is 79.68%
Test... current best accuracy is 79.74%
Test... current best accuracy is 79.86%
Test... current best accuracy is 79.97%
Test... current best accuracy is 80.03%
Test... current best accuracy is 80.11%
Test... current best accuracy is 80.16%


## 建構 AutoEncoder 的網路（第二小題）

In [6]:
class AutoEncoder(nn.Module):
    def __init__(self, ctx_size):
        super(AutoEncoder, self).__init__()
        self.img_size = TARGET_SIZE
        h, w = self.img_size

        self.encoder = nn.Sequential(
            FullyConnect(w * h, 256, nn.SiLU()),
            FullyConnect(256, 128, nn.SiLU()),
            FullyConnect(128, ctx_size, nn.SiLU()),
        )
        self.decoder = nn.Sequential(
            FullyConnect(ctx_size, 128, nn.SiLU()),
            FullyConnect(128, 256, nn.SiLU()),
            FullyConnect(256, w * h, nn.Sigmoid())
        )

    def encode(self, img):
        b, _, _, _ = img.shape
        h, w = self.img_size
        img = torch.reshape(img, (b, h * w))
        ctx = self.encoder(img)
        return ctx

    def decode(self, ctx):
        b, _ = ctx.shape
        h, w = self.img_size
        img = self.decoder(ctx)
        img = torch.reshape(img, (b, 1, h, w))
        return img

    def forward(self, x):
        ctx = self.encode(x)
        x = self.decode(ctx)
        return x

def train_auto_encoder(net, device, loader):
    net = net.to(device)
    net.train()
    bce_loss = nn.BCELoss()
    opt = optim.Adam(net.parameters(),
                     lr=0.002,
                     weight_decay=0.0)

    running_loss = list()

    for e in range(200):
        for imgs, _ in loader:
            imgs = imgs.to(device)
            opt.zero_grad()
            loss = bce_loss(net(imgs), imgs)
            loss.backward()
            opt.step()

            running_loss.append(loss.item())
            if len(running_loss) > 500:
                running_loss.pop(0)
        if (e+1) % 20 == 0:
            print("epoch {} -> loss: {:.4f}".format(
                      e+1, sum(running_loss)/len(running_loss)))
    return net

def compute_threshold(net, device, loader, factor=1.2):
    net.eval()
    bce_loss_without_reduction = nn.BCELoss(reduction='none')
    item_loss = torch.zeros(0).to(device)

    with torch.no_grad():
        for imgs, _ in loader:
            imgs = imgs.to(device)
            loss = bce_loss_without_reduction(net(imgs), imgs)
            loss = torch.flatten(loss, start_dim=1)
            loss = torch.mean(loss, dim=1)
            item_loss = torch.cat((item_loss, loss), 0)

    item_loss = item_loss.detach().cpu().numpy()
    mean = np.mean(item_loss)
    std = np.std(item_loss)
    thres = mean + factor * std
    return thres

def compute_accuracy(net, device, loader, thres):
    bce_loss_without_reduction = nn.BCELoss(reduction='none')
    with torch.no_grad():
        correct = 0
        total = 0
        for imgs, abnorm in loader:
            imgs, abnorm = imgs.to(device), abnorm.to(device)
            loss = bce_loss_without_reduction(net(imgs), imgs)
            loss = torch.flatten(loss, start_dim=1)
            loss = torch.mean(loss, dim=1)
            pred_abnorm = torch.where(loss > thres, True, False)
            correct += (pred_abnorm == abnorm).sum().item()
            total += abnorm.size(0)
    return correct/total

## 訓練 AutoEncoder 的網路（第二小題）

In [7]:
ae_net = AutoEncoder(2)
ae_net = ae_net.to(device)
ae_net = train_auto_encoder(ae_net, device, t_loader)

epoch 20 -> loss: 0.1449
epoch 40 -> loss: 0.1405
epoch 60 -> loss: 0.1377
epoch 80 -> loss: 0.1371
epoch 100 -> loss: 0.1366
epoch 120 -> loss: 0.1370
epoch 140 -> loss: 0.1364
epoch 160 -> loss: 0.1339
epoch 180 -> loss: 0.1350
epoch 200 -> loss: 0.1363


## 測試 AutoEncoder 的檢測性能（第二小題）

In [8]:
best_ae_net_acc = 0
for i in range(30):
    factor = i/20
    thres = compute_threshold(ae_net, device, t_loader, factor)
    ae_net_acc = compute_accuracy(ae_net, device, a_loader, thres)
    if best_ae_net_acc > ae_net_acc:
        break
    best_ae_net_acc = ae_net_acc
    print("Test... current best accuracy is {:.2f}%".format(100 * best_ae_net_acc))
print("Best accuracy: {:.2f}%".format(100 * best_ae_net_acc))

Test... current best accuracy is 79.10%
Test... current best accuracy is 81.14%
Test... current best accuracy is 83.24%
Test... current best accuracy is 85.09%
Test... current best accuracy is 86.14%
Test... current best accuracy is 86.96%
Best accuracy: 86.96%


## 建構 Denoising AutoEncoder 的網路（第三小題）

In [9]:
class DenoisingAutoEncoder(AutoEncoder):
    def __init__(self, ctx_size):
        super(DenoisingAutoEncoder, self).__init__(ctx_size)

    def add_noise(self, img, noise_factor=0.25):
        noise = torch.randn_like(img) * noise_factor
        noisy_img = img + noise
        return torch.clamp(noisy_img, 0., 1.)

    def forward(self, x):
        x = self.add_noise(x)
        ctx = self.encode(x)
        x = self.decode(ctx)
        return x

## 訓練 Denoising AutoEncoder 的網路（第三小題）

In [10]:
dae_net = DenoisingAutoEncoder(2)
dae_net = dae_net.to(device)
dae_net = train_auto_encoder(dae_net, device, t_loader)

epoch 20 -> loss: 0.2350
epoch 40 -> loss: 0.2349
epoch 60 -> loss: 0.2349
epoch 80 -> loss: 0.2351
epoch 100 -> loss: 0.2350
epoch 120 -> loss: 0.2350
epoch 140 -> loss: 0.2345
epoch 160 -> loss: 0.2350
epoch 180 -> loss: 0.2350
epoch 200 -> loss: 0.2351


## 測試 Denoising AutoEncoder 的檢測性能（第三小題）

In [11]:
best_dae_net_acc = 0
for i in range(30):
    factor = i/20
    thres = compute_threshold(dae_net, device, t_loader, factor)
    dae_net_acc = compute_accuracy(dae_net, device, a_loader, thres)
    if best_dae_net_acc > dae_net_acc:
        break
    best_dae_net_acc = dae_net_acc
    print("Test... current best accuracy is {:.2f}%".format(100 * best_dae_net_acc))
print("Best accuracy: {:.2f}%".format(100 * best_dae_net_acc))

Test... current best accuracy is 69.33%
Best accuracy: 69.33%


## 建構 Variational AutoEncoder 的網路（第四小題）

In [12]:
class VariationalAutoEncoder(nn.Module):
    def __init__(self, ctx_size):
        super(VariationalAutoEncoder, self).__init__()
        self.img_size = TARGET_SIZE
        self.ctx_size = ctx_size
        h, w = self.img_size

        self.encoder = nn.Sequential(
            FullyConnect(w * h, 256, nn.SiLU()),
            FullyConnect(256, 128, nn.SiLU()),
            FullyConnect(128, 2 * ctx_size),
        )
        self.decoder = nn.Sequential(
            FullyConnect(ctx_size, 128, nn.SiLU()),
            FullyConnect(128, 256, nn.SiLU()),
            FullyConnect(256, w * h, nn.Sigmoid())
        )
    
    def encode(self, img):
        b, _, _, _ = img.shape
        h, w = self.img_size
        img = torch.reshape(img, (b, h * w))
        x = self.encoder(img)
        mu, logvar = torch.split(x, self.ctx_size, dim=1)
        return self.reparameterize(mu, logvar)

    def decode(self, ctx):
        b, _ = ctx.shape
        h, w = self.img_size
        img = self.decoder(ctx)
        img = torch.reshape(img, (b, 1, h, w))
        return img

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def encode_with_others(self, img):
        b, _, _, _ = img.shape
        h, w = self.img_size
        img = torch.reshape(img, (b, h * w))
        x = self.encoder(img)
        mu, logvar = torch.split(x, self.ctx_size, dim=1)
        return self.reparameterize(mu, logvar), mu, logvar

    def forward_with_others(self, x):
        input = x
        x, mu, logvar = self.encode_with_others(x)
        x = self.decode(x)
        return x, mu, logvar

    def forward(self, x):
        input = x
        x, mu, logvar = self.encode_with_others(x)
        x = self.decode(x)
        return x

## 訓練 Variational AutoEncoder 的網路（第四小題）

In [13]:
def train_variational_auto_encoder(net, device, loader, loss_type="all"):
    assert loss_type in ["all", "bce", "kld"]
    net = net.to(device)
    net.train()
    bce_loss = nn.BCELoss()
    opt = optim.Adam(net.parameters(),
                     lr=0.002,
                     weight_decay=0.0)

    running_loss = list()

    for e in range(200):
        for imgs, _ in loader:
            imgs = imgs.to(device)
            opt.zero_grad()
            output, mu, logvar = net.forward_with_others(imgs)

            bce = bce_loss(output, imgs)
            kld = torch.mean(-0.5 * torch.sum(1 + logvar - mu ** 2 - logvar.exp(), dim = 1), dim = 0)
            if loss_type == "all":
                loss = bce + kld
            elif loss_type == "bce":
                loss = bce
            elif loss_type == "kld":
                loss = kld
            loss.backward()
            opt.step()

            running_loss.append(loss.item())
            if len(running_loss) > 500:
                running_loss.pop(0)
        if (e+1) % 20 == 0:
            print("epoch {} -> loss: {:.4f}".format(
                      e+1, sum(running_loss)/len(running_loss)))
    return net

vae_net = VariationalAutoEncoder(2)
vae_net = vae_net.to(device)
vae_net = train_variational_auto_encoder(vae_net, device, t_loader, "bce")

epoch 20 -> loss: 0.1375
epoch 40 -> loss: 0.1344
epoch 60 -> loss: 0.1325
epoch 80 -> loss: 0.1309
epoch 100 -> loss: 0.1296
epoch 120 -> loss: 0.1289
epoch 140 -> loss: 0.1275
epoch 160 -> loss: 0.1265
epoch 180 -> loss: 0.1260
epoch 200 -> loss: 0.1255


## 測試 Variational AutoEncoder 的檢測性能（第四小題）

In [14]:
best_vae_net_acc = 0
for i in range(30):
    factor = i/20
    thres = compute_threshold(vae_net, device, t_loader, factor)
    vae_net_acc = compute_accuracy(vae_net, device, a_loader, thres)
    if best_vae_net_acc > vae_net_acc:
        continue
    best_vae_net_acc = vae_net_acc
    print("Test... current best accuracy is {:.2f}%".format(100 * best_vae_net_acc))
print("Best accuracy: {:.2f}%".format(100 * best_vae_net_acc))

Test... current best accuracy is 77.41%
Test... current best accuracy is 78.07%
Test... current best accuracy is 79.21%
Test... current best accuracy is 80.06%
Test... current best accuracy is 80.25%
Test... current best accuracy is 80.35%
Test... current best accuracy is 83.00%
Test... current best accuracy is 83.56%
Test... current best accuracy is 88.49%
Best accuracy: 88.49%


## 使用 Isolated Forest method 對 Variational AutoEncoder 的結果做異常檢測（第五小題）

In [15]:
def train_isolation_forest(net, device, loader):
    net.eval()
    ctx_items = torch.zeros(0).to(device)
    abnorm_tag = torch.zeros((0), dtype=torch.bool).to(device)
    with torch.no_grad():
        num_abnorms = 0
        for imgs, abnorm in loader:
            imgs, abnorm = imgs.to(device), abnorm.to(device)
            ctx = net.encode(imgs)
            ctx_items = torch.cat((ctx_items, ctx), 0)
            abnorm_tag = torch.cat((abnorm_tag, abnorm), 0)
            num_abnorms += (abnorm == True).sum().item()

    ctx_items = ctx_items.detach().cpu().numpy()
    abnorm_tag = abnorm_tag.detach().cpu().numpy()
    num_items = ctx_items.shape[0]

    if num_abnorms > 0.2 * num_items:
        target_num_abnorms = 0.25 * (num_items - num_abnorms)
        num_abnorms = 0
        new_ctx_items = list()
        for c, tag in zip(ctx_items, abnorm_tag):
            if tag == True:
                if num_abnorms < target_num_abnorms:
                    new_ctx_items.append(c)
                    num_abnorms += 1
            else:
                new_ctx_items.append(c)
        ctx_items = np.array(new_ctx_items)
        num_items = ctx_items.shape[0]
    clf = IsolationForest(n_estimators=40,
                          contamination=num_abnorms/num_items,
                          max_samples=round(0.9 * num_items),
                          random_state=99)
    clf.fit(ctx_items)
    return clf

def compute_clf_accuracy(net, device, loader, clf):
    with torch.no_grad():
        total = 0
        correct = 0
        for imgs, abnorm in loader:
            imgs = imgs.to(device)
            ctx = net.encode(imgs)
            ctx = ctx.detach().cpu().numpy()
    
            abnorm = abnorm.detach().cpu().numpy()
            pred = clf.predict(ctx)
            pred = np.where(pred == -1, True, False)

            total += abnorm.shape[0]
            correct += (pred == abnorm).sum()
    return correct/total

vae_net_for_clf = VariationalAutoEncoder(4)
vae_net_for_clf = vae_net_for_clf.to(device)
vae_net_for_clf = train_variational_auto_encoder(vae_net_for_clf, device, t_loader, "all")

clf = train_isolation_forest(vae_net_for_clf, device, r_loader)
best_clf_acc = compute_clf_accuracy(vae_net_for_clf, device, a_loader, clf)
print("Accuracy: {:.2f}%".format(100 * best_clf_acc))

epoch 20 -> loss: 0.2351
epoch 40 -> loss: 0.2351
epoch 60 -> loss: 0.2347
epoch 80 -> loss: 0.2349
epoch 100 -> loss: 0.2350
epoch 120 -> loss: 0.2352
epoch 140 -> loss: 0.2351
epoch 160 -> loss: 0.2349
epoch 180 -> loss: 0.2351
epoch 200 -> loss: 0.2349
Accuracy: 44.94%
