In [14]:
#Declare class

from tensorflow.keras import layers as L 
import tensorflow as tf
from functools import reduce

import numpy as np

class ResidualConv(tf.keras.Model):
    def __init__(self, filters, kernel_size, use_bn=True, activation=tf.nn.elu, **kwargs):
        super().__init__()
        self.bn1 = tf.keras.layers.BatchNormalization()
        self.c1 = tf.keras.layers.Conv2D(filters, kernel_size, padding='SAME', **kwargs)
        self.bn2 = tf.keras.layers.BatchNormalization()
        self.c2 = tf.keras.layers.Conv2D(filters, kernel_size, padding='SAME', **kwargs)
        self.activation = activation
    
    def call(self, x):
        y = self.activation(self.c1(self.bn1(x)))
        y = self.activation(self.c2(self.bn2(y)))
        #print(x.shape)
        return y + x

def round_even(x):
    if x % 2 == 0:
        return x + 1
    else:
        return x + 2
    
class ConvRegressor(tf.keras.Model):
    def __init__(
        self, filters, kernel_size, 
        conv_type=tf.keras.layers.Conv2D, activation=tf.nn.elu, 
        **kwargs
    ):
        super().__init__()
        self.filters = filters
        self.conv_type = conv_type
        self.convs = []
        for n_filter in filters:
            self.convs.append(
                conv_type(n_filter, round_even(kernel_size), strides=2, padding='SAME', activation=activation, **kwargs)
            )
            self.convs.append(conv_type(n_filter, kernel_size, padding='SAME', activation=activation, **kwargs))
        self.convs.append(tf.keras.layers.Conv2D(n_filter, kernel_size, padding='SAME'))
        
    def call(self, x):
        y = x
        for conv in self.convs:
            y = conv(y)
        return y

class ResidualConvRegressor(tf.keras.Model):
    def __init__(
        self, filters, kernel_size, n_freq_residual, 
        conv_type=tf.keras.layers.Conv2D, activation=tf.nn.elu, 
        **kwargs
    ):
        super().__init__()
        self.filters = filters
        self.n_freq_residual = n_freq_residual
        self.conv_type = conv_type
        self.convs = []
        for n_filter in filters:
            self.convs.append(
                conv_type(
                    n_filter, round_even(kernel_size), strides=2, 
                    activation=activation, padding='SAME', **kwargs)
            )
            for i in range(n_freq_residual):
                self.convs.append(
                    ResidualConv(n_filter, kernel_size, activation=activation, **kwargs)
                )
        self.convs.append(tf.keras.layers.Conv2D(n_filter, kernel_size, padding='SAME'))
        
    def call(self, x):
        y = x
        for conv in self.convs:
            y = conv(y)
        return y
    
class ConvVAE(tf.keras.Model):
    def __init__(
        self, dim_list, latent_dim, out_ch=1, kernel_size=3, **kwargs
    ):
        super().__init__()
        self.latent_dim = latent_dim
        self.inference_net = self.create_regressor(
            dim_list, kernel_size, conv_type=tf.keras.layers.Conv2D,
            **kwargs)
        self.dense_inference = tf.keras.Sequential([
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(latent_dim * 2),
        ])

        self.generative_net = self.create_regressor(
            dim_list[::-1], kernel_size, conv_type=tf.keras.layers.Conv2DTranspose,
            **kwargs)

    def create_regressor(self, dim_list, kernel_size, **kwargs):
        return ConvRegressor(dim_list, kernel_size=kernel_size, **kwargs)
        
    def call(self, x):
        mean, logvar = self.encode(x)
        z = self.reparameterize(mean, logvar)
        return self.decode(z)

    @tf.function
    def sample(self, eps=None):
        if eps is None:
            eps = tf.random.normal(shape=(100, self.latent_dim))
        return self.decode(eps, apply_sigmoid=True)

    def encode(self, x):
        self.orig_latent = self.inference_net(x)
        self.latent = self.dense_inference(self.orig_latent)
        mean, logvar = tf.split(self.latent, num_or_size_splits=2, axis=1)
        return mean, logvar

    def reparameterize(self, mean, logvar, eps=None):
        if eps is None:
            eps = tf.random.normal(shape=mean.shape)
        return eps * tf.exp(logvar * 0.5) + mean
    
    def decode(self, z, apply_sigmoid=False):
        s = int(np.sqrt(z.shape[1]))
        #print(self.latent.shape, s, z.shape)
        z = tf.reshape(z, (tf.shape(z)[0], s, s, 1))
        logits = self.generative_net(z)
        if apply_sigmoid:
            probs = tf.sigmoid(logits)
            return probs

        return logits
    
class ResidualConvVAE(ConvVAE):
    def __init__(
        self, dim_list, latent_dim, n_freq_residual=4,
        kernel_size=3, **kwargs
    ):
        self.n_freq_residual = n_freq_residual
        super().__init__(dim_list, latent_dim, kernel_size, **kwargs)

    def create_regressor(self, dim_list, kernel_size, **kwargs):
        return ResidualConvRegressor(
            dim_list, kernel_size=kernel_size,
            n_freq_residual=self.n_freq_residual, **kwargs)
        
#net = ConvVAE([1, 16, 32], 64, kernel_size=3)
net = ResidualConvVAE([1, 16, 32], 64, n_freq_residual=4, kernel_size=3)
output = net(tf.random.uniform((4, 64, 64, 1)))
net.summary()
print(output.shape)

Model: "residual_conv_vae_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
residual_conv_regressor_2 (R multiple                  116714    
_________________________________________________________________
sequential_8 (Sequential)    multiple                  262272    
_________________________________________________________________
residual_conv_regressor_3 (R multiple                  108251    
Total params: 487,237
Trainable params: 485,669
Non-trainable params: 1,568
_________________________________________________________________
(4, 64, 64, 1)


In [2]:
# Load breast cancer dataset
from sklearn.datasets import load_wine, load_breast_cancer
from sklearn.model_selection import train_test_split

data = load_breast_cancer()

import pandas as pd
xs = pd.DataFrame(data.data, columns=data.feature_names)
ys = data.target

mu_std = {}
for col in xs.columns:
    mu = xs[col].mean()
    std = xs[col].std()
    xs[col] = (xs[col] - mu) / std
    mu_std[col] = (mu, std)

train_indices, test_indices = train_test_split(range(len(xs)))
train_xs = xs.iloc[train_indices]
test_xs = xs.iloc[test_indices]

TRAIN_BUF = 1000
BATCH_SIZE = 32
TEST_BUF = 1000
n_input_dim = train_xs.shape[1]

train_dataset = tf.data.Dataset.from_tensor_slices(train_xs.astype(np.float32)).shuffle(TRAIN_BUF).batch(BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices(test_xs.astype(np.float32)).shuffle(TEST_BUF).batch(BATCH_SIZE)

In [21]:
# Load digits.
from sklearn.datasets import load_wine, load_digits
from sklearn.model_selection import train_test_split

digits, labels = load_digits(return_X_y=True)
digits = (digits / 16).astype(np.float32).reshape(-1, 8, 8, 1)

import pandas as pd
train_indices, test_indices = train_test_split(range(len(digits)))
train_xs = digits[train_indices].reshape(-1, 8, 8, 1)
test_xs = digits[test_indices]

TRAIN_BUF = 1000
BATCH_SIZE = 32
TEST_BUF = 1000
n_input_dim = train_xs.shape[1]

train_dataset = tf.data.Dataset.from_tensor_slices(train_xs).shuffle(TRAIN_BUF).batch(BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices(test_xs).shuffle(TEST_BUF).batch(BATCH_SIZE)

In [18]:
fig = plt.figure()
plt.plot([10, 10, 20], [30, 30, 30])
fig.show()

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [26]:
#### from IPython import display
import IPython 
from collections import defaultdict 
from itertools import product
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm, trange
import time
import seaborn as sns

%matplotlib widget
#%matplotlib notebook


epochs = 200
latent_dim = 4
num_examples_to_generate = 16

# keeping the random vector constant for generation (prediction) so
# it will be easier to see the improvement.
#model = ResidualVAE([n_input_dim, 8, 4])
model = ResidualConvVAE([1, 16,], latent_dim, kernel_size=3)
optimizer = tf.keras.optimizers.Adam(1e-3, beta_1=0.5, beta_2=0.99)

def gaussian_kl_divergence(mean, ln_var, raxis=1):
    var = tf.exp(ln_var)
    mean_square = mean * mean
    return tf.reduce_sum((mean_square + var - ln_var - 1) * 0.5, axis=raxis)


#@tf.function
def compute_loss(model, x):
    mean, logvar = model.encode(x)
    z = model.reparameterize(mean, logvar)
    reconst_x = model.decode(z)

    absolute_error = (reconst_x - x) ** 2
    loss_reconst = tf.reduce_sum(absolute_error, axis=1)
    loss_kld = gaussian_kl_divergence(mean, logvar)

    return {
        'loss': {
            'Reconstruct': tf.reduce_mean(loss_reconst),
            'KL-d': tf.reduce_mean(loss_kld) * 1e-3,
        },
        'raw': {
            'AE': absolute_error, 
            'Original': x,
            'Reconstruct': reconst_x,
        }
    }

#@tf.function
def compute_apply_gradients(model, x, optimizer):
    with tf.GradientTape() as tape:
        losses = compute_loss(model, x)
        loss = sum(losses['loss'].values())
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    return losses

#fig, axes = plt.subplots(3, 1, figsize=(5, 8))
fig = plt.figure(figsize=(5, 12))
grid_shape = (6, 3)
loss_ax = plt.subplot2grid(shape=grid_shape, loc=(0, 0), colspan=grid_shape[1], fig=fig)
elbo_ax = plt.subplot2grid(shape=grid_shape, loc=(1, 0), colspan=grid_shape[1], fig=fig)
data_ax = np.zeros((4,3), dtype=np.object)
for i in range(4):
    for j in range(3):
        data_ax[i, j] = plt.subplot2grid(shape=grid_shape, loc=(2 + i, j), fig=fig)

        
plt.ion()
fig.show()
fig.canvas.draw()

loss_history = defaultdict(list)
elbo_history = []
with tf.device('/GPU:0'):
    for epoch in trange(1, epochs + 1):
        start_time = time.time()
        for train_x in train_dataset:
            data = compute_apply_gradients(model, train_x, optimizer)
            for name, loss in data['loss'].items():
                loss_history[name].append(loss.numpy())
        end_time = time.time()

        if epoch % 1 == 0:
            loss = tf.keras.metrics.Mean()
            raw = None
            for test_x in test_dataset:
                losses = compute_loss(model, test_x)
                if raw is None:
                    raw = losses['raw']
                loss(sum(losses['loss'].values()))
            elbo = -loss.result()
            elbo_history.append(elbo)
            
            loss_ax.clear()
            for name, loss in loss_history.items():
                loss_ax.plot(loss, label=name, alpha=0.5)
            loss_ax.set_xlabel('epoch')
            loss_ax.set_ylabel('Loss')
            loss_ax.set_yscale('log')
            loss_ax.legend()

            elbo_ax.clear()
            elbo_ax.plot(elbo_history)

            original = np.squeeze(raw['Original'].numpy())
            reconstruct = np.squeeze(raw['Reconstruct'].numpy())
            absolute_error = np.squeeze(raw['AE'].numpy())
            if np.any(np.isnan(absolute_error)):
                tqdm.write('The output is nan.')
            for i in range(4):
                ax = data_ax[i, 0]
                ax.clear()
                ax.imshow(original[i], vmin=0.0, vmax=1.0)
                
                ax = data_ax[i, 1]
                ax.clear()
                ax.imshow(reconstruct[i], vmin=0.0, vmax=1.0)
                
                ax = data_ax[i, 2]
                ax.clear()
                ax.imshow(absolute_error[i], vmin=0.0, vmax=1.0)
            fig.canvas.draw()

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

HBox(children=(FloatProgress(value=0.0, max=200.0), HTML(value='')))


