<a href="https://colab.research.google.com/github/hayaseleu/Anomlay_Detection/blob/main/Efficient_GAN_Anomaly_Detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install wandb

Collecting wandb
  Downloading wandb-0.12.6-py2.py3-none-any.whl (1.7 MB)
[K     |████████████████████████████████| 1.7 MB 4.3 MB/s 
[?25hCollecting yaspin>=1.0.0
  Downloading yaspin-2.1.0-py3-none-any.whl (18 kB)
Collecting GitPython>=1.0.0
  Downloading GitPython-3.1.24-py3-none-any.whl (180 kB)
[K     |████████████████████████████████| 180 kB 56.7 MB/s 
Collecting shortuuid>=0.5.0
  Downloading shortuuid-1.0.1-py3-none-any.whl (7.5 kB)
Collecting subprocess32>=3.5.3
  Downloading subprocess32-3.5.4.tar.gz (97 kB)
[K     |████████████████████████████████| 97 kB 6.3 MB/s 
Collecting sentry-sdk>=1.0.0
  Downloading sentry_sdk-1.4.3-py2.py3-none-any.whl (139 kB)
[K     |████████████████████████████████| 139 kB 56.3 MB/s 
[?25hCollecting pathtools
  Downloading pathtools-0.1.2.tar.gz (11 kB)
Collecting configparser>=3.8.1
  Downloading configparser-5.1.0-py3-none-any.whl (19 kB)
Collecting docker-pycreds>=0.4.0
  Downloading docker_pycreds-0.4.0-py2.py3-none-any.whl (9.0 kB)
Colle

model

In [2]:
import torch
import torch.nn as nn


def make_Encoder(input_size, z_dim, channel, nef, n_extra_layer=0):
    """
    BiGAN Encoder network
    input_size : the image size of the data
    z_dim : the dimention of the latent space
    channel : the number of channels of the data
    nef : the number of filters in encoder
    """
    assert input_size % 16 == 0, "input size has to be a multiple of 16"

    main = nn.Sequential()

    cnef, tisize = nef, 8
    while tisize != input_size:
        cnef = cnef // 2
        tisize = tisize * 2

    main.add_module(
        "initial_Conv-{}-{}".format(channel, cnef),
        nn.Conv2d(channel, cnef, kernel_size=3, stride=1, padding=1, bias=False),
    )
    # the number of stride is the default setting of tf
    # output size is the same as input_size
    main.add_module("initial_LeakyRU-{}".format(cnef), nn.LeakyReLU(0.1, inplace=True))
    csize = input_size

    while csize > 8:
        # official kernel_size is 3 but changed to 4
        main.add_module(
            "pyramid_Conv-{}-{}".format(cnef, cnef * 2),
            nn.Conv2d(cnef, cnef * 2, kernel_size=4, stride=2, padding=1, bias=False),
        )
        main.add_module(
            "pyramid_BatchNorm-{}".format(cnef * 2), nn.BatchNorm2d(cnef * 2)
        )
        main.add_module(
            "pyramid_LeakyReLU-{}".format(cnef * 2), nn.LeakyReLU(0.1, inplace=True)
        )
        csize = csize // 2
        cnef = cnef * 2

    for l in range(n_extra_layer):
        main.add_module(
            "extra_Conv-{}-{}".format(cnef, cnef),
            nn.Conv2d(cnef, cnef, kernel_size=3, stride=1, padding=1, bias=False),
        )
        main.add_module("extra_BatchNorm-{}".format(cnef), nn.BatchNorm2d(cnef))
        main.add_module(
            "extra_LeakyReLU-{}".format(cnef), nn.LeakyReLU(0.1, inplace=True)
        )

    main.add_module(
        "last_linear-{}-{}".format(cnef * 8 * 8, z_dim), nn.Linear(cnef * 8 * 8, z_dim),
    )

    return main


class NetE(nn.Module):
    """
    the network of Encoder
    """

    def __init__(self, CONFIG):
        super(NetE, self).__init__()

        model = make_Encoder(
            CONFIG.input_size, CONFIG.z_dim, CONFIG.channel, CONFIG.nef
        )
        layers = list(model.children())

        self.pyramid = nn.Sequential(*layers[:-1])
        self.linear = nn.Sequential(layers[-1])

    def forward(self, x, CONFIG):
        out = self.pyramid(x)
        out = out.view(-1, CONFIG.nef * 8 * 8)  # change to the one dimentional vector
        out = self.linear(out)

        return out


def make_Generator(input_size, z_dim, channel, ngf, n_extra_layer=0):
    """
    BiGAN Generator network
    input_size : the image size of the data
    z_dim : the dimention of the latent space
    channel : the number of channels of the image
    ngf : the number of Generator's filter
    """
    assert input_size % 16 == 0, "input size has to be a multiple of 16"

    main = nn.Sequential()

    main.add_module(
        "initial_Linear-{}-{}".format(z_dim, 1024), nn.Linear(z_dim, 1024, bias=False),
    )
    main.add_module("initial_BatchNorm-{}".format(1024), nn.BatchNorm1d(1024))
    main.add_module("initial_ReLU-{}".format(1024), nn.ReLU(inplace=True))

    main.add_module(
        "second_Linear-{}-{}".format(1024, ngf * 8 * 8),
        nn.Linear(1024, ngf * 8 * 8, bias=False),
    )
    main.add_module(
        "second_BatchNorm-{}".format(ngf * 8 * 8), nn.BatchNorm1d(ngf * 8 * 8)
    )
    main.add_module("second_ReLU-{}".format(ngf * 8 * 8), nn.ReLU(inplace=True))
    csize = 8
    cngf = ngf

    while csize < input_size // 2:
        main.add_module(
            "pyramid_Convt-{}-{}".format(cngf, cngf // 2),
            nn.ConvTranspose2d(
                cngf, cngf // 2, kernel_size=4, stride=2, padding=1, bias=False
            ),
        )
        main.add_module(
            "pyramid_BatchNorm-{}".format(cngf // 2), nn.BatchNorm2d(cngf // 2)
        )
        main.add_module("pyramid_ReLU-{}".format(cngf // 2), nn.ReLU(inplace=True))
        cngf = cngf // 2
        csize = csize * 2

    for l in range(n_extra_layer):
        main.add_module(
            "extra_Convt-{}-{}".format(cngf, cngf),
            nn.ConvTranspose2d(
                cngf, cngf, kernel_size=3, stride=1, padding=1, bias=False
            ),
        )
        main.add_module("extra_BatchNorm-{}".format(cngf), nn.BatchNorm2d(cngf))
        main.add_module("extra_ReLU-{}".format(cngf), nn.ReLU(inplace=True))

    main.add_module(
        "last_Convt-{}-{}".format(cngf, channel),
        nn.ConvTranspose2d(
            cngf, channel, kernel_size=4, stride=2, padding=1, bias=False
        ),
    )
    main.add_module("last_Tanh-{}".format(channel), nn.Tanh())

    return main


class NetG(nn.Module):
    """
    the network of Generator
    """

    def __init__(self, CONFIG):
        super(NetG, self).__init__()

        model = make_Generator(
            CONFIG.input_size, CONFIG.z_dim, CONFIG.channel, CONFIG.ngf
        )
        layers = list(model.children())

        self.linear = nn.Sequential(*layers[:6])
        self.pyramid = nn.Sequential(*layers[6:])

    def forward(self, z, CONFIG):
        out = self.linear(z)
        out = out.view(
            z.shape[0], CONFIG.ngf, 8, 8
        )  # (batch size, channel, height, width)
        out = self.pyramid(out)

        return out


def make_Discriminator(input_size, z_dim, channel, ndf, n_extra_layer=0):
    """
    BiGAN Discriminator network
    input_size : the image size of the data
    z_dim : the dimention of the latent space
    channel : the number of channels of the image
    ndf : the number of Generator's filter
    """
    assert input_size % 16 == 0, "input_size has to be a multiple of 16"

    cndf, tisize = ndf * 2, 16
    while tisize != input_size:
        cndf = cndf // 2
        tisize = tisize * 2

    # D(x)
    D_x = nn.Sequential()

    D_x.add_module(
        "initial_Conv-{}-{}".format(channel, cndf),
        nn.Conv2d(channel, cndf, kernel_size=4, stride=2, padding=1, bias=False),
    )
    D_x.add_module("initial_LeakyReLU-{}".format(cndf), nn.LeakyReLU(0.1, inplace=True))
    D_x.add_module("initial_Dropout-{}".format(cndf), nn.Dropout(inplace=True))
    csize = input_size // 2

    while csize > 16:
        D_x.add_module(
            "pyramid_Conv-{}-{}".format(cndf, cndf * 2),
            nn.Conv2d(cndf, cndf * 2, kernel_size=4, stride=2, padding=1, bias=False),
        )
        D_x.add_module(
            "pyramid_LeakyReLU-{}".format(cndf * 2), nn.LeakyReLU(0.1, inplace=True),
        )
        D_x.add_module("pyramid_Dropout-{}".format(cndf * 2), nn.Dropout(inplace=True))
        csize = csize // 2
        cndf = cndf * 2

    for l in range(n_extra_layer):
        D_x.add_module(
            "extra_Conv-{}-{}".format(cndf, cndf),
            nn.Conv2d(cndf, cndf, kernel_size=3, stride=1, padding=1, bias=False),
        )
        D_x.add_module(
            "extra_LeakyReLU-{}".format(cndf), nn.LeakyReLU(0.1, inplace=True)
        )
        D_x.add_module("extra_Dropout-{}".format(cndf), nn.Dropout(inplace=True))

    D_x.add_module(
        "last_Conv-{}-{}".format(cndf, cndf),
        nn.Conv2d(cndf, cndf, kernel_size=4, stride=2, padding=1, bias=False),
    )
    D_x.add_module("last_LeakyReLU-{}".format(cndf), nn.LeakyReLU(0.1, inplace=True))
    D_x.add_module("pyramid_Dropout-{}".format(cndf), nn.Dropout(inplace=True))

    # D(z)
    D_z = nn.Sequential()

    D_z.add_module("z_Linear", nn.Linear(z_dim, 512))
    D_z.add_module("z_LeakyReLU", nn.LeakyReLU(0.1, inplace=True))
    D_z.add_module("z_Dropout", nn.Dropout(inplace=True))

    # D(x,z)
    D_xz = nn.Sequential()
    D_xz.add_module(
        "concat_Linear-{}-{}".format(512 + cndf * 8 * 8, 1024),
        nn.Linear(512 + cndf * 8 * 8, 1024),
    )
    D_xz.add_module("concat_LeakyReLU-{}".format(1024), nn.LeakyReLU(0.1, inplace=True))
    D_xz.add_module("concat_Dropout-{}".format(1024), nn.Dropout(inplace=True))
    D_xz.add_module("last_Linear-{}-{}".format(1024, 1), nn.Linear(1024, 1))

    return D_x, D_z, D_xz


class NetD(nn.Module):
    """
    the network of Discriminator
    """

    def __init__(self, CONFIG):
        super(NetD, self).__init__()
        D_x, D_z, D_xz = make_Discriminator(
            CONFIG.input_size, CONFIG.z_dim, CONFIG.channel, CONFIG.ndf
        )
        # D(x)
        layer_x = list(D_x.children())
        self.layer_x = nn.Sequential(*layer_x)
        # D(z)
        layer_z = list(D_z.children())
        self.layer_z = nn.Sequential(*layer_z)
        # D(x,z)
        layer = list(D_xz.children())
        self.feature = nn.Sequential(*layer[:-1])
        self.classifier = nn.Sequential(layer[-1])

    def forward(self, x, z, CONFIG):
        x_out = self.layer_x(x)
        x_out = x_out.view(-1, CONFIG.ndf * 8 * 8)

        z_out = self.layer_z(z)

        y = torch.cat([x_out, z_out], dim=1)
        out = self.feature(y)

        feature = out
        feature = feature.view(feature.size()[0], -1)

        out = self.classifier(out)

        return out, feature

trainer

In [3]:
import torch
import torch.nn as nn
from torchvision.utils import save_image


import time
from PIL import Image

import wandb


def train(G, D, E, z_dim, dataloader, CONFIG, no_wandb):
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print("Device:", device)

    lr_ge = 0.0001
    lr_d = 0.0001 / 4
    beta1, beta2 = 0.5, 0.999

    g_optimizer = torch.optim.Adam(G.parameters(), lr_ge, [beta1, beta2])
    d_optimizer = torch.optim.Adam(D.parameters(), lr_d, [beta1, beta2])
    e_optimizer = torch.optim.Adam(E.parameters(), lr_ge, [beta1, beta2])

    # Binary Cross Entropy
    criterion = nn.BCEWithLogitsLoss(reduction="mean")

    # the default mini batch size
    mini_batch_size = 64
    fixed_z = torch.randn(CONFIG.num_fakeimg, z_dim, 1, 1).to(device)

    G.to(device)
    D.to(device)
    E.to(device)

    G.train()
    D.train()
    E.train()

    torch.backends.cudnn.benchmark = True

    # num_train_imgs = len(dataloader.dataset)
    # batch_size = dataloader.batch_size

    iteration = 1

    num_epochs = CONFIG.num_epochs

    for epoch in range(num_epochs):
        t_epoch_start = time.time()

        print("----------------------(train)----------------------")
        print("Epoch {}/{}".format(epoch, num_epochs))
        print("---------------------------------------------------")

        d_loss_meter = AverageMeter("D_loss", ":.4e")
        g_loss_meter = AverageMeter("G_loss", ":.4e")
        e_loss_meter = AverageMeter("E_loss", ":.4e")

        for samples in dataloader:
            imges = samples["img"]

            imges = imges.to(device)
            mini_batch_size = imges.size()[0]

            label_real = torch.full((mini_batch_size,), 1).to(device)
            label_fake = torch.full((mini_batch_size,), 0).to(device)

            """
            learning Discriminator
            """
            z_out_real = E(imges, CONFIG)
            d_out_real, _ = D(imges, z_out_real, CONFIG)

            input_z = torch.randn(mini_batch_size, z_dim).to(device)
            fake_imges = G(input_z, CONFIG)
            d_out_fake, _ = D(fake_imges, input_z, CONFIG)

            d_loss_real = criterion(d_out_real.view(-1), label_real)
            d_loss_fake = criterion(d_out_fake.view(-1), label_fake)

            d_loss = d_loss_real + d_loss_fake
            d_loss_meter.update(d_loss.item())

            d_optimizer.zero_grad()
            d_loss.backward()
            d_optimizer.step()

            """
            learning Generator
            """
            input_z = torch.randn(mini_batch_size, z_dim).to(device)
            fake_imges = G(input_z, CONFIG)
            d_out_fake, _ = D(fake_imges, input_z, CONFIG)

            g_loss = criterion(d_out_fake.view(-1), label_real)
            g_loss_meter.update(g_loss.item())

            g_optimizer.zero_grad()
            g_loss.backward()
            g_loss.step()

            """
            learning Encoder
            """
            z_out_real = E(imges, CONFIG)
            d_out_real, _ = D(imges, z_out_real, CONFIG)
            # use label_fake to caliculate log(1-D)
            e_loss = criterion(d_out_real.view(-1), label_fake)
            e_loss_meter.update(e_loss.item())

            e_optimizer.zero_grad()
            e_loss.backward()
            e_optimizer.step()

            iteration += 1

        t_epoch_finish = time.time()
        print("---------------------------------------------------")
        print(
            "Epoch{}|| D_Loss :{:.4f} || G_Loss :{:.4f} || D_Loss :{:.4f}".format(
                epoch, d_loss_meter.avg, g_loss_meter.avg, e_loss_meter.avg,
            )
        )
        print("timer:  {:.4f} sec.".format(t_epoch_finish - t_epoch_start))
        fake_imges = G(fixed_z)
        save_image(fake_imges, "fake_imges.png")

        if not no_wandb:
            wandb.log(
                {
                    "train_time": t_epoch_finish - t_epoch_start,
                    "d_loss": d_loss_meter.avg,
                    "g_loss": g_loss_meter.avg,
                    "e_loss": e_loss_meter.avg,
                },
                step=epoch,
            )

            img = Image.open("fake_imges.png")
            wandb.log({"image": [wandb.Image(img)]}, step=epoch)

            t_epoch_start = time.time()
    return G, D, E

#libs

dataloader

meter

transform

weight

In [4]:
from torch.utils import data
from PIL import Image
import pandas as pd


class Dataset(data.Dataset):
    def __init__(self, csv_file, transform="None"):
        super().__init__()
        self.df = pd.read_csv(csv_file)
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = self.df.iloc[idx]["img_path"]
        img = Image.open(img_path)

        if self.transform is not None:
            img = self.transform(img)

        cls_id = self.df.iloc[idx]["cls_id"]
        cls_label = self.df.iloc[idx]["cls_label"]

        sample = {
            "img": img,
            "cls_id": cls_id,
            "label": cls_label,
            "img_path": img_path,
        }

        return sample

In [5]:
class AverageMeter(object):
    """Computes and stores the average and current value"""

    def __init__(self, name, fmt=":f"):
        self.name = name
        self.fmt = fmt
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

    def __str__(self):
        fmtstr = "{name} {val" + self.fmt + "} ({avg" + self.fmt + "})"
        return fmtstr.format(**self.__dict__)

In [6]:
from torchvision import transforms

# the library for Image Transformation
class ImageTransform():
    def __init__(self, mean, std):
        self.data_transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean, std)
        ])

    def __call__(self, img):
        return self.data_transform(img)

In [7]:
import torch.nn as nn


# the libraly to initialize the weights
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find("Conv") != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
        # nn.init.constant_(m.bias.data, 0)
    elif classname.find("BatchNorm") != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

train

In [8]:
!pip install addict

Collecting addict
  Downloading addict-2.4.0-py3-none-any.whl (3.8 kB)
Installing collected packages: addict
Successfully installed addict-2.4.0


In [9]:
import torch
from torch.utils.data import DataLoader

import yaml
from addict import Dict
import os
import argparse



import wandb


def get_parameters():
    """
    make parser to get parameters
    """

    parser = argparse.ArgumentParser(description="take parameters like num_epochs...")

    parser.add_argument("config", type=str, help="path of a config file for training")

    parser.add_argument("--no_wandb", action="store_true", help="Add --no_wandb option")

    return parser.parse_args()

In [10]:
args = get_parameters()

train_dataloader = DataLoader(train_dataset, batch_size=CONFIG.batch_size, shuffle=True)

G = NetG(CONFIG)
D = NetD(CONFIG)
E = NetE(CONFIG)

G.apply(weights_init)
D.apply(weights_init)
E.apply(weights_init)

if not args.no_wandb:
    # Magic
    wandb.watch(G, log="all")
    wandb.watch(D, log="all")
    wandb.watch(E, log="all")

G_update, D_update, E_update = train(
    G,
    D,
    E,
    z_dim=CONFIG.z_dim,
    dataloader=train_dataloader,
    CONFIG=CONFIG,
    no_wandb=args.no_wandb,
)

if not os.path.exists(CONFIG.save_dir):
    os.makedirs(CONFIG.save_dir)

torch.save = (G_update.state_dict(), os.path.join(CONFIG.save_dir, "G.prm"))
torch.save = (D_update.state_dict(), os.path.join(CONFIG.save_dir, "D.prm"))
torch.save = (E_update.state_dict(), os.path.join(CONFIG.save_dir, "E.prm"))

print("Done")

usage: ipykernel_launcher.py [-h] [--no_wandb] config
ipykernel_launcher.py: error: unrecognized arguments: -f


SystemExit: ignored

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
