<a href="https://colab.research.google.com/github/M-H-Amini/GAN-Webinars/blob/main/SGAN_MNIST.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#  In The Name Of ALLAH
#  Generative Adversarial Networks
#  PythonChallenge.ir
#  Mohammad Hossein Amini (mhamini@aut.ac.ir)
#  Lecture 2 - SGAN

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import keras.backend as K
from keras.datasets import mnist
from keras.models import Sequential
from keras.layers import Conv2D, Conv2DTranspose, BatchNormalization, LeakyReLU, Flatten, Reshape, Dense, Dropout, Activation, Lambda
from sklearn.utils import shuffle

In [None]:
class MHDS:
  def __init__(self, n_labeled=200, n_unlabeled=None):
    (self.Xtrain, self.ytrain), (self.Xtest, self.ytest) = mnist.load_data()
    self.Xtrain, self.ytrain = shuffle(self.Xtrain, self.ytrain, random_state=0)

    self.n_labeled = n_labeled 
    if n_unlabeled is not None:
      self.n_unlabeled = n_unlabeled
    else:
      self.n_unlabeled = len(self.Xtrain) - n_labeled
    
    self.Xtrain = np.expand_dims((self.Xtrain - 127.5) / 127.5, -1)
    self.Xtest = np.expand_dims((self.Xtest - 127.5) / 127.5, -1)

    self.X_sup = self.Xtrain[:n_labeled]
    self.y_sup = self.ytrain[:n_labeled]

    self.X_unsup = self.Xtrain[n_labeled:n_labeled+self.n_unlabeled]

  def batchSupervised(self, batch_size=32):
    index = np.random.permutation(self.n_labeled)[:batch_size]
    return self.X_sup[index], self.y_sup[index]
  
  def batchUnsupervised(self, batch_size=32):
    index = np.random.permutation(self.n_unlabeled)[:batch_size]
    return self.X_unsup[index]

mhds = MHDS()

In [None]:
def show(X, y=None, r=4, c=4):
  fig, ax = plt.subplots(r, c, True, True, figsize=(8,8))
  for i in range(r):
    for j in range(c):
      ax[i][j].imshow(X[i*c + j, :, :, 0])
      if y is not None:
        ax[i][j].set_title(str(y[i*c + j]))
  plt.show()

# show(mhds.batchUnsupervised())
show(*mhds.batchSupervised())

In [None]:
def buildDisc(img_shape=(28, 28, 1), k=10):
  model = Sequential()
  model.add(Conv2D(32, 3, 2, 'same', input_shape=img_shape))
  model.add(BatchNormalization())
  model.add(LeakyReLU(0.01))
  
  model.add(Conv2D(64, 3, 2, 'same'))
  model.add(BatchNormalization())
  model.add(LeakyReLU(0.01))
  
  model.add(Conv2D(128, 3, 2, 'same'))
  model.add(BatchNormalization())
  model.add(LeakyReLU(0.01))

  model.add(Dropout(0.5))
  model.add(Flatten())
  model.add(Dense(k))

  return model

In [None]:
def buildSupervisedDisc(disc):
  model = Sequential()
  model.add(disc)
  model.add(Activation('softmax'))
  return model

def buildUnsupervisedDisc(disc):
  model = Sequential()
  model.add(disc)
  
  def predict(x):
    p = 1. - 1./(K.sum(K.exp(x), keepdims=True, axis=-1) + 1.)
    return p
  
  model.add(Lambda(predict))
  return model

In [None]:
disc = buildDisc()
disc_sup = buildSupervisedDisc(disc)
disc_unsup = buildUnsupervisedDisc(disc)
disc_sup.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['acc'])
disc_unsup.compile(optimizer='adam', loss='bce', metrics=['acc'])

In [None]:
def buildGen(z_dim=100):
  model = Sequential()
  model.add(Dense(7*7*256, input_shape=(z_dim,)))
  model.add(BatchNormalization())
  model.add(Reshape((7, 7, 256)))
  
  model.add(Conv2DTranspose(128, 3, 2, 'same'))
  model.add(BatchNormalization())
  model.add(LeakyReLU(0.01))
  
  model.add(Conv2DTranspose(64, 3, 2, 'same'))
  model.add(BatchNormalization())
  model.add(LeakyReLU(0.01))

  model.add(Conv2DTranspose(1, 3, 1, 'same', activation='tanh'))
  return model

In [None]:
def buildGan(disc, gen):
  return Sequential([gen, disc])

In [None]:
gen = buildGen()

disc_unsup.trainable = False
gan = buildGan(disc_unsup, gen)
gan.compile(optimizer='adam', loss='bce')

In [None]:
def train(iterations=2000, batch_size=32):
  for i in range(iterations):
    Xsup, ysup = mhds.batchSupervised(batch_size)
    Xunsup = mhds.batchUnsupervised(batch_size)
    noise = np.random.randn(batch_size, 100)
    Xfake = gen.predict(noise)
    ones = np.ones((batch_size, 1))
    zeros = np.zeros((batch_size, 1))


    ##  Supervised Training
    loss_sup, acc_sup = disc_sup.train_on_batch(Xsup, ysup)

    ##  Unsupervised Training
    loss_unsup_real = disc_unsup.train_on_batch(Xunsup, ones)
    loss_unsup_fake = disc_unsup.train_on_batch(Xunsup, zeros)

    loss_unsup, acc_unsup = 0.5 * np.add(loss_unsup_real, loss_unsup_fake)

    ##  Generator Training
    noise = np.random.randn(batch_size, 100)
    gan.train_on_batch(noise, ones)

    if not(i%100):
      test_loss, test_acc = disc_sup.evaluate(mhds.Xtest, mhds.ytest)
      print(f'Iteration {i}:\tLoss supervised={loss_sup:4.2f}\tAcc supervised={acc_sup:4.2f}\t Loss Unsupervised={loss_unsup:4.2f}')
      print(f'Loss Test: {test_loss:4.2f}\tAcc Test:{test_acc:4.2f}')

train()