## Chapter 7: Semi-Supervised GAN

In [19]:
%matplotlib inline

import matplotlib.pyplot as plt
import numpy as np

from tensorflow.keras import backend as K

from keras.datasets import mnist
from keras.layers import (Activation, BatchNormalization, Concatenate, Dense,
                          Dropout, Flatten, Input, Lambda, Reshape)
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import Conv2D, Conv2DTranspose
from keras.models import Model, Sequential
from keras.optimizers import Adam
from keras.utils import to_categorical

ModuleNotFoundError: No module named 'keras.engine.training_utils'

Using Tensorflow backend

## Dataset

In [2]:
class Dataset(object):
    def __init__(self, num_labeled):
        
        # Number labeled example to use fro training
        self.num_labeled = num_labeled
        # Load the MNIST dataset
        (self.x_train, self.y_train), (self.x_test, self.y_test) = mnist.load_data()
        
        def preprocess_imgs(x):
            #Reshape [0, 255] grayscale values to [-1, 1]
            x = (x.astype(np.float32) - 127.5)/127.5
            # Expand image dimensions to width x height x channels
            x = np.expand_dims(x ,axis=3)
            return x
        def preprocess_labels(y):
            return y.reshape(-1, 1)
        
        # Training data
        self.x_train = preprocess_imgs(self.x_train)
        self.y_train = preprocess_labels(self.y_train)
        
        # Testing data
        self.x_test = preprocess_imgs(self.x_test)
        self.y_test = preprocess_labels(self.y_test)
        
    def batch_labeled(self, batch_size):
        # Get a random batch of labeled images and their labels
        idx = np.random.randint(0, self.num_labeled, batch_size)
        imgs = self.x_train[idx]
        labels = self.y_train[idx]
        return imgs,labels
    
    def batch_unlabeled(self, batch_size):
        # Get a random batch of unlabeled images
        idx = np.random.randint(self.num_labeled, self.x_train.shape[0], batch_size)
        imgs = self.x_train[idx]
        return imgs
    
    def training_set(self):
        x_train = self.x_train[range(self.num_labeled)]
        y_train = self.y_train[range(self.num_labeled)]
        return x_train, y_train
    
    def test_set(self):
        return self.x_test, self.y_test

In [4]:
# Number of labeled examples to use (rest will be used as unlabeled)
num_labeled = 100
dataset = Dataset(num_labeled)

## Semi-Supervied GAN

In [5]:
img_rows = 28
img_cols = 28
channels = 1

# Input image dimensions
img_shape = (img_rows, img_cols, channels)

# Size of the noise vector, used as input to the generator
z_dim = 100

# Number of classes in the dataset
num_classes = 10

## Generator

In [6]:
def build_generator(z_dim):
    model = Sequential()
    
    # Reshape input into 7x7x256 tensor via a fully connected layer
    model.add(Dense(256*7*7, input_dim=z_dim))
    model.add(Reshape((7,7,256)))
    
    # Transposed convolution layer, from 7x7x256 into 14x14x128 tensor
    model.add(Conv2DTranspose(128, kernel_size=3, strides=2, padding='same'))
    
    # Batch normalization
    model.add(BatchNormalization())
    
    # Leak Relu activation
    model.add(LeakyReLU(alpha=0.01))
    
    # Transposed convolution layer, from 14x14x128 to 14x14x64 tensor
    model.add(Conv2DTranspose(64, kernel_size=3, strides=1, padding='same'))
    
    # Batch normalization
    model.add(BatchNormalization())
    
    # Leak Relu activation
    model.add(LeakyReLU(alpha=0.01))
    
    # Transposed convolution layer, form 14x14x64 to 28x28x1 tensor
    model.add(Conv2DTranspose(1, kernel_size=3, strides=2, padding='same'))
    
    # Output layer with tanh activation
    model.add(Activation('tanh'))
    
    return model

## Discriminator

In [7]:
def build_discriminator(img_shape):
    model = Sequential()
    
    # Convolutional layer, from 28x28x1 into 14x14x32 tensor
    model.add(Conv2D(32,
                    kernel_size=3,
                    strides=2,
                    input_shape=img_shape,
                    padding='same'))
    
    # Leak Relu activation
    model.add(LeakyReLU(alpha=0.01))
    
    # Convolutional layer, from 14x14x28 into 7x7x64 tensor
    model.add(Conv2D(64,
                    kernel_size=3,
                    strides=2,
                    input_shape=img_shape,
                    padding='same'))
    
    # Batch normalization
    model.add(BatchNormalization())
    
    # Leak Relu activation
    model.add(LeakyReLU(alpha=0.01))
    
    # COnvolutional layer, from 7x7x14 tensor into 3x3x28 tensor
    model.add(Conv2D(128,
                    kernel_size=3,
                    strides=2,
                    input_shape=img_shape,
                    padding='same'))
    
    # Batch normalization
    model.add(BatchNormalization())
    
    # Leak Relu activation
    model.add(LeakyReLU(alpha=0.01))
    
    # Dropout
    model.add(Dropout(0.5))
    
    # Flatten the tensor
    model.add(Flatten())
    
    # Fully connected layer with num_classes neurons
    model.add(Dense(num_classes))
    
    return model

In [8]:
def build_discriminator_supervised(discriminator):
    model = Sequential()
    model.add(discriminator)
    
    # Softmax activation, giving predicted probability distribution over the real classes
    model.add(Activation('softmax'))
    return model

In [9]:
def build_discriminator_unsupervised(discriminator):
    model = Sequential()
    model.add(discriminator)
    
    def predict(x):
        
        # Transform distribution over real classes into a binary real-vs-fake probability
        prediction = 1.0 - (1.0/
                            (K.sum(K.exp(x), axis=-1, keepdims=True) + 1.0))
        return prediction
    
    # Real-vs-Fake output neuron defined above
    model.add(Lambda(predict))
    return model

## Build the Model

In [10]:
def build_gan(generator, discriminator):
    model = Sequential()
    
    # Combined the Genrator-> DIscriminator model
    model.add(generator)
    model.add(discriminator)
    return model

## Discrimiantor

In [11]:
# Core Discriminator network
# These layers are shred during supervised and unsupervised training
discriminator_net = build_discriminator(img_shape)

# Build & Compile the discriminator for unsupervised training
discriminator_supervised = build_discriminator_supervised(discriminator_net)
discriminator_supervised.compile(loss='categorical_crossentropy',
                                metrics=['accuracy'],
                                optimizer=adam_v2.Adam())

# Build & Compile the Discriminator for unsupervised training
discriminator_unsupervised = build_discriminator_unsupervised(discriminator_net)
discriminator_unsupervised.compile(loss='binary_crossentropy',
                                  optimizer=adam_v2.Adam())

2022-03-18 14:04:19.554220: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2022-03-18 14:04:19.597900: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-03-18 14:04:19.598186: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1733] Found device 0 with properties: 
pciBusID: 0000:01:00.0 name: NVIDIA GeForce GTX 1050 Ti computeCapability: 6.1
coreClock: 1.392GHz coreCount: 6 deviceMemorySize: 3.94GiB deviceMemoryBandwidth: 104.43GiB/s
2022-03-18 14:04:19.598280: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/ffmpeg/lib:/usr/local/cuda-10.1/lib64
2022-03-18 14:04:19.598372: W tensorflow/stream_exe

## Generator

In [12]:
# Build the generator
generator = build_generator(z_dim)

# Keep Discriminator's parameters constant for Generator training
discriminator_unsupervised.trainable = False

# Build and Compile GAN model with fixed Discriminator to train the Generator
# Note that we are using the Discriminator version with unsupervised output
gan = build_gan(generator, discriminator_unsupervised)
gan.compile(loss='binary_crossentropy', optimizer=adam_v2.Adam())

## Training

In [13]:
supervised_losses = []
iteration_checkpoints = []

def train(iterations, batch_size, sample_interval):
    # Labels for real images: all ones
    real = np.ones((batch_size, 1))
    
    # Labels for fake images: all zeros
    fake = np.zeros((batch_size, 1))
    
    for iteration in range(iterations):
        # --------------
        # Training D----
        # --------------
        # Get labeled examples
        imgs, labels = dataset.batch_labeled(batch_size)
        
        # One-hot encode labels
        labels = utils.to_catagorical(labels, num_classes=num_classes)
        
        # Get unlabeled examples
        imgs_unlabeled = dataset.batch_unlabeled(batch_size)
        
        # Generate a batch of fake images
        z = np.random.normal(0, 1, (batch_size, z_dim))
        gen_imgs = generator.predict(z)
        
        # Train on real labeled example
        d_loss_supervised, accuracy = discriminator_supervised.train_on_batch(imgs, labels)
        
        # Train on real unlabeled examples
        d_loss_real = discriminator_unsupervised.train_on_batch(imgs_unlabeled, real)
        
        # Train on fake examples
        d_loss_fake = discriminator_unsupervised.train_on_batch(gen_imgs, fake)
        d_loss_unsupervised = 0.5*np.add(d_loss_real, d_loss_fake)
        
        # -------------
        # Training G---
        # -------------
        
        # Gnerate a batch of fake images
        z = np.random.normal(0, 1, (batch_size, z_dim))
        gen_imgs = generator.predict(z)
        
        # Train G
        g_loss = gan.train_on_batch(z, np.ones((batch_size, 1)))
        if (iteration + 1) % sample_interval == 0:
            # Save D supervised classification loss to be plotted after training
            supervised_losses.append(d_loss_supervised)
            iteration_checkpoints.append(iteration + 1)
            # Output training progress
            print(
                "%d [D loss supervised: %.4f, acc.: %.2f%%] [D loss unsupervised: %.4f] [G loss: %f]"
                % (iteration + 1, d_loss_supervised, 100 * accuracy,
                   d_loss_unsupervised, g_loss))

## Train the Model and Inspect Output
Note that the 'Discrepancy between trainable weights anf collected trainable' warning from Keras is expected. It is by design: The G's trainable parameters are intentionally held constant during D training and vice versa

In [14]:
# Set hyperparameters
iterations = 8000
batch_size = 32
sample_interval = 800
# Train teh SGAN for the specified number of iterations
train(iterations, batch_size, sample_interval)

AttributeError: module 'tensorflow.python.keras.utils' has no attribute 'to_catagorical'

In [20]:
import tensorflow
print(tensorflow.__version__)

2.5.0
