In [1]:
%matplotlib inline

import tensorflow as tf
from tensorflow.keras import layers
print(tf.__version__)
import os
import time
import numpy as np
import cv2
import matplotlib.pyplot as plt
from IPython.display import clear_output

2.0.0-rc0


In [2]:
# parameters
ANOMALY = 2
BATCH_SIZE = 64
SHUFFLE_BUFFER_SIZE = 100

In [3]:
def batch_resize(imgs, size: tuple):
    img_out = np.empty((imgs.shape[0],) + size)
    for i in range(imgs.shape[0]):
        img_out[i] = cv2.resize(imgs[i], size, interpolation=cv2.INTER_CUBIC)
    return img_out

In [4]:
# dataset
data_train, data_test = tf.keras.datasets.mnist.load_data()
x_train, y_train = data_train
x_test, y_test = data_test
x_train = x_train.astype(np.float32)
x_test = x_test.astype(np.float32)
# resize to (32, 32)
x_train = batch_resize(x_train, (32, 32))[..., None]
x_test = batch_resize(x_test, (32, 32))[..., None]
# normalization
mean = x_train.mean()
stddev = x_train.std()
x_train = (x_train-mean)/stddev
x_test = (x_test-mean)/stddev
print(x_train.shape, x_train.shape)

(60000, 32, 32, 1) (60000, 32, 32, 1)


In [5]:
# define abnoraml data and normal
# training data only contains normal
x_train = x_train[y_train!=ANOMALY]
y_test = (y_test==ANOMALY).astype(np.float32)

In [6]:
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, x_train))
test_dataset = tf.data.Dataset.from_tensor_slices((x_test, x_test))

In [7]:
train_dataset = train_dataset.shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
test_dataset = test_dataset.batch(BATCH_SIZE, drop_remainder=False)

In [8]:
class Option:
    def __init__(self, is_train):
        if is_train:
            self.batch_size = 300
            self.isize = 32
            self.ckpt_dir = "ckpt"
            self.nz = 100
            self.nc = 1
            self.ndf = 64
            self.ngf = 64
            self.extralayers = 0
            self.niter = 15
            self.lr = 2e-4
            self.w_adv = 1. # Adversarial loss weight
            self.w_con = 50. # Reconstruction loss weight
            self.w_enc = 1. # Encoder loss weight.
            self.beta1 = 0.5
        else:
            self.batch_size = 5
            # TODO

In [9]:
class Conv_BN_Act(tf.keras.layers.Layer):
    
    def __init__(self, filters, ks, act_type, is_bn=True, padding='same', strides=1, conv_tran=False):
        super(Conv_BN_Act, self).__init__()
        if conv_tran:
            self.conv = layers.Conv2DTranspose(filters, ks, strides=strides, padding=padding, use_bias=False)
        else:
            self.conv = layers.Conv2D(filters, ks, strides=strides, padding=padding, use_bias=False)
            
        self.is_bn = is_bn
        if is_bn:
            self.bn = layers.BatchNormalization(epsilon=1e-05, momentum=0.9)
            
        if act_type=='LeakyReLU':
            self.act = layers.LeakyReLU(alpha=0.2)
            self.erase_act=False
        elif act_type=='ReLU':
            self.act = layers.ReLU()
            self.erase_act=False
        elif act_type=='Tanh':
            self.act = layers.Activation(tf.tanh)
            self.erase_act=False
        elif act_type=='':
            self.erase_act=True
        else:
            raise ValueError
            
    def call(self, x):
        x = self.conv(x)
        x = self.bn(x) if self.is_bn else x
        x = x if self.erase_act else self.act(x)
        return x

In [10]:
class Encoder(tf.keras.layers.Layer):
    """ DCGAN ENCODER NETWORK
    """
    
    def __init__(self, isize, nz, nc, ndf, n_extra_layers=0, output_features=False):
        """
        Params:
            isize(int): input image size
            nz(int): num of latent dims
            nc(int): num of input dims
            ndf(int): num of discriminator(Encoder) filters
        """
        super(Encoder, self).__init__()
        assert isize % 16 == 0, "isize has to be a multiple of 16"
        
        self.in_block = Conv_BN_Act(filters=ndf, ks=4, act_type='LeakyReLU', is_bn=False, strides=2)
        csize, cndf = isize / 2, ndf
        
        self.extra_blocks = []
        for t in range(n_extra_layers):
            extra = Conv_BN_Act(filters=cndf, ks=3, act_type='LeakyReLU')
            self.extra_blocks.append(extra)
        
        self.body_blocks = []
        while csize > 4:
            in_feat = cndf
            out_feat = cndf * 2
            body = Conv_BN_Act(filters=out_feat, ks=4, act_type='LeakyReLU', strides=2)
            self.body_blocks.append(body)
            cndf = cndf * 2
            csize = csize / 2
        
        # state size. K x 4 x 4
        self.output_features = output_features
        self.out_conv = layers.Conv2D(filters=nz, kernel_size=4, padding='valid')
            
    def call(self, x):
        x = self.in_block(x)
        for block in self.extra_blocks:
            x = block(x)
        for block in self.body_blocks:
            x = block(x)
        last_features = x
        out = self.out_conv(last_features)
        if self.output_features:
            return out, last_features
        else:
            return out
            

In [11]:
class Decoder(tf.keras.layers.Layer):
    
    def __init__(self, isize, nz, nc, ngf, n_extra_layers=0):
        """
        Params:
            isize(int): input image size
            nz(int): num of latent dims
            nc(int): num of input dims
            ngf(int): num of Generator(Decoder) filters
        """
        super(Decoder, self).__init__()
        assert isize % 16 == 0, "isize has to be a multiple of 16"
        cngf, tisize = ngf // 2, 4
        while tisize != isize:
            cngf = cngf * 2
            tisize = tisize * 2
        
        self.in_block = Conv_BN_Act(filters=cngf, ks=4, act_type='ReLU', padding='valid', conv_tran=True)
            
        csize, _ = 4, cngf
        self.body_blocks = []
        while csize < isize // 2:
            body = Conv_BN_Act(filters=cngf//2, ks=4, act_type='ReLU', strides=2, conv_tran=True)
            self.body_blocks.append(body)
            cngf = cngf // 2
            csize = csize * 2
            
        # Extra layers
        self.extra_blocks = []
        for t in range(n_extra_layers):
            extra = Conv_BN_Act(filters=cngf, ks=3, act_type='ReLU', conv_tran=True)
            self.extra_blocks.append(extra)
        
        self.out_block = Conv_BN_Act(filters=nc, ks=4, act_type='Tanh', strides=2, is_bn=False, conv_tran=True)
            
    def call(self, x):
        x = self.in_block(x)
        for block in self.body_blocks:
            x = block(x)
        for block in self.extra_blocks:
            x = block(x)
        x = self.out_block(x)
        return x

In [12]:
class NetG(tf.keras.Model):
    
    def __init__(self, opt):
        super(NetG, self).__init__()
        self.encoder1 = Encoder(opt.isize, opt.nz, opt.nc, opt.ngf, opt.extralayers)
        self.decoder = Decoder(opt.isize, opt.nz, opt.nc, opt.ngf, opt.extralayers)
        self.encoder2 = Encoder(opt.isize, opt.nz, opt.nc, opt.ngf, opt.extralayers)

    def call(self, x):
        latent_i = self.encoder1(x)
        gen_img = self.decoder(latent_i)
        latent_o = self.encoder2(gen_img)
        return latent_i, gen_img, latent_o
    
    def num_params(self):
        return sum([np.prod(var.shape.as_list()) for var in self.trainable_variables])

In [13]:
class NetD(tf.keras.Model):
    """ DISCRIMINATOR NETWORK
    """
    def __init__(self, opt):
        super(NetD, self).__init__()
        self.encoder = Encoder(opt.isize, 1, opt.nc, opt.ngf, opt.extralayers, output_features=True)
        self.sigmoid = layers.Activation(tf.sigmoid)

    def call(self, x):
        output, last_features = self.encoder(x)
        output = self.sigmoid(output)
        return output, last_features

In [14]:
class Runner:
    def __init__(self, train_dataset, test_dataset=None):
#         self.model = model
        self.train_dataset = train_dataset
        self.test_dataset = test_dataset
    
    def train_step(self, x, y):
        raise NotImplementedError
            
    def fit(self, num_epoch, log_freq=500):
        for step, (x_batch_train, y_batch_train) in enumerate(self.train_dataset):
            loss = self.train_step(x_batch_train, y_batch_train)
            if step % log_freq == 0:
                print('step: {}, loss: {}'.format(step, loss))
    
    def save(self):
        pass
    
    def load(self):
        pass
        
    def evaluate(self):
        pass
        


In [15]:
class Ganomaly(Runner):
    
    def __init__(self, opt, train_dataset, test_dataset=None):
        super(Ganomaly, self).__init__(train_dataset, test_dataset)
        self.opt = opt
        self.G = NetG(self.opt)
        self.D = NetD(self.opt)
        
        # label
        self.real_label = tf.ones([self.opt.batch_size,], dtype=tf.float32)
        self.fake_label = tf.zeros([self.opt.batch_size,], dtype=tf.float32)
        
        # loss
        l2_loss = tf.keras.losses.MeanSquaredError()
        l1_loss = tf.keras.losses.MeanAbsoluteError()
        bce_loss = tf.keras.losses.BinaryCrossentropy()
        
        # optimizer
        self.d_optimizer = tf.keras.optimizers.Adam(self.opt.lr, beta_1=self.opt.beta1, beta_2=0.999)
        self.g_optimizer = tf.keras.optimizers.Adam(self.opt.lr, beta_1=self.opt.beta1, beta_2=0.999)
        
        # adversarial loss (use feature matching)
        self.l_adv = l2_loss 
        # contextual loss
        self.l_con = l1_loss
        # Encoder loss
        self.l_enc = l2_loss
        # discriminator loss
        self.l_bce = bce_loss 

#     @tf.function
    def train_step(self, x, y):
        self.input = x
        with tf.GradientTape() as g_tape, tf.GradientTape() as d_tape:
            self.latent_i, self.gen_img, self.latent_o = self.G(self.input)
            self.pred_real, self.feat_real = self.D(self.input)
            self.pred_fake, self.feat_fake = self.D(self.gen_img)
            g_loss = self.g_loss()
            d_loss = self.d_loss()
        
        g_grads = g_tape.gradient(g_loss, self.G.trainable_weights)
        d_grads = d_tape.gradient(d_loss, self.D.trainable_weights)
        self.g_optimizer.apply_gradients(zip(g_grads, self.G.trainable_weights))
        self.d_optimizer.apply_gradients(zip(d_grads, self.D.trainable_weights))
        if d_loss < 1e-5: 
            self.D = NetD(self.opt)
            print('re-init D')
        return g_loss, d_loss
        
    def g_loss(self):
        self.err_g_adv = self.l_adv(self.feat_real, self.feat_fake)
        self.err_g_con = self.l_con(self.input, self.gen_img)
        self.err_g_enc = self.l_enc(self.latent_i, self.latent_o)
        g_loss = self.err_g_adv * self.opt.w_adv + \
                self.err_g_con * self.opt.w_con + \
                self.err_g_enc * self.opt.w_enc
        return g_loss
    
    def d_loss(self):
        self.err_d_real = self.l_bce(self.pred_real, self.real_label)
        self.err_d_fake = self.l_bce(self.pred_fake, self.fake_label)
        d_loss = (self.err_d_real + self.err_d_fake) * 0.5
        return d_loss

In [16]:
opt = Option(is_train=True)
ganomaly = Ganomaly(opt, train_dataset, test_dataset)

In [17]:
ganomaly.fit(opt.niter)



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

step: 0, loss: (<tf.Tensor: id=990, shape=(), dtype=float32, numpy=34.034542>, <tf.Tensor: id=1071, shape=(), dtype=float32, numpy=7.7511034>)
step: 500, loss: (<tf.Tensor: id=507740, shape=(), dtype=float32, numpy=17.2167>, <tf.Tensor: id=507821, shape=(), dtype=float32, numpy=0.02059581>)
