## Setup

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers


## Prepare the  data


In [None]:
import tensorflow as tf
from tensorflow import keras
import time
from tensorflow.keras import layers
import numpy as np
import matplotlib.pyplot as plt
import os.path, sys
import os
import pandas as pd
from zipfile import ZipFile
from tqdm import tqdm
from PIL import Image
import os
from skimage.transform import resize
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import load_img, img_to_array


def load_data(img_folder):
  img_data_array=[]
  class_name=[]
  def imagetensor(imagedir):
    img = tf.keras.preprocessing.image.load_img(imagedir, color_mode='rgb', target_size = (64,64))
    image_array = tf.keras.preprocessing.image.img_to_array(img)
    return image_array 

  for dir1 in os.listdir(img_folder):
      for file in os.listdir(os.path.join(img_folder, dir1)):
          image_path= os.path.join(img_folder, dir1,  file)
          image= imagetensor(image_path)
          
          img_data_array.append(image)
          class_name.append(dir1)

          
  target_dict={k: v for v, k in enumerate(np.unique(class_name))}
  target_val=  [target_dict[class_name[i]] for i in range(len(class_name))]
  target_val = list(map(int,target_val))

  images = np.array(img_data_array)
  labels = np.array(target_val)
  train_images, test_images, train_labels, test_labels = train_test_split(images, labels, test_size=0.20, random_state=42)
  print("DATA LOADED")
 
  return (train_images, train_labels), (test_images, test_labels)
  

In [None]:
!unzip /content/drive/MyDrive/affect_net_4.zip -d /content

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: /content/affect_net_4/sad/image0000013.jpg  
  inflating: /content/affect_net_4/sad/image0000026.jpg  
  inflating: /content/affect_net_4/sad/image0000027.jpg  
  inflating: /content/affect_net_4/sad/image0000028.jpg  
  inflating: /content/affect_net_4/sad/image0000032.jpg  
  inflating: /content/affect_net_4/sad/image0000042.jpg  
  inflating: /content/affect_net_4/sad/image0000047.jpg  
  inflating: /content/affect_net_4/sad/image0000062.jpg  
  inflating: /content/affect_net_4/sad/image0000073.jpg  
  inflating: /content/affect_net_4/sad/image0000093.jpg  
  inflating: /content/affect_net_4/sad/image0000096.jpg  
  inflating: /content/affect_net_4/sad/image0000111.jpg  
  inflating: /content/affect_net_4/sad/image0000118.jpg  
  inflating: /content/affect_net_4/sad/image0000145.jpg  
  inflating: /content/affect_net_4/sad/image0000151.jpg  
  inflating: /content/affect_net_4/sad/image0000162.jpg  
  infla

In [None]:
IMG_SHAPE = (64, 64, 3)

# Size of the noise vector
noise_dim = 128

(train_images, train_labels), (test_images, test_labels) = load_data('/content/affect_net_4')
print(f"Number of examples: {len(train_images)}")
print(f"Shape of the images in the dataset: {train_images.shape[1:]}")

train_images = train_images.reshape(train_images.shape[0], *IMG_SHAPE).astype("float32")
train_images = (train_images - 127.5) / 127.5

DATA LOADED
Number of examples: 16000
Shape of the images in the dataset: (64, 64, 3)


In [None]:
test_images = test_images.reshape(test_images.shape[0], *IMG_SHAPE).astype("float32")
test_images = (test_images - 127.5) / 127.5

In [None]:
from tensorflow.keras.utils import to_categorical

train_labels = to_categorical(train_labels)
test_labels = to_categorical(test_labels)


## Create the discriminator

In [None]:
def conv_block(
    x,
    filters,
    activation,
    kernel_size=(3, 3),
    strides=(1, 1),
    padding="same",
    use_bias=True,
    use_bn=False,
    use_dropout=False,
    drop_value=0.5,
):
    x = layers.Conv2D(
        filters, kernel_size, strides=strides, padding=padding, use_bias=use_bias
    )(x)
    if use_bn:
        x = layers.BatchNormalization()(x)
    x = activation(x)
    if use_dropout:
        x = layers.Dropout(drop_value)(x)
    return x


def get_discriminator_model():
    img_input = layers.Input(shape=IMG_SHAPE)
    # Zero pad the input to make the input images size to (32, 32, 1).
    x = layers.ZeroPadding2D((2, 2))(img_input)
    x = conv_block(
        x,
        64,
        kernel_size=(5, 5),
        strides=(2, 2),
        use_bn=False,
        use_bias=True,
        activation=layers.LeakyReLU(0.2),
        use_dropout=False,
        drop_value=0.3,
    )
    x = conv_block(
        x,
        128,
        kernel_size=(5, 5),
        strides=(2, 2),
        use_bn=False,
        activation=layers.LeakyReLU(0.2),
        use_bias=True,
        use_dropout=True,
        drop_value=0.3,
    )
    x = conv_block(
        x,
        256,
        kernel_size=(5, 5),
        strides=(2, 2),
        use_bn=False,
        activation=layers.LeakyReLU(0.2),
        use_bias=True,
        use_dropout=True,
        drop_value=0.3,
    )
    x = conv_block(
        x,
        512,
        kernel_size=(5, 5),
        strides=(2, 2),
        use_bn=False,
        activation=layers.LeakyReLU(0.2),
        use_bias=True,
        use_dropout=False,
        drop_value=0.3,
    )

    x = layers.Flatten()(x)
    x = layers.Dropout(0.2)(x)
    x = layers.Dense(1)(x)

    d_model = keras.models.Model(img_input, x, name="discriminator")
    return d_model


d_model = get_discriminator_model()
d_model.summary()

Model: "discriminator"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_5 (InputLayer)        [(None, 64, 64, 3)]       0         
                                                                 
 zero_padding2d_2 (ZeroPaddi  (None, 68, 68, 3)        0         
 ng2D)                                                           
                                                                 
 conv2d_14 (Conv2D)          (None, 34, 34, 64)        4864      
                                                                 
 leaky_re_lu_14 (LeakyReLU)  (None, 34, 34, 64)        0         
                                                                 
 conv2d_15 (Conv2D)          (None, 17, 17, 128)       204928    
                                                                 
 leaky_re_lu_15 (LeakyReLU)  (None, 17, 17, 128)       0         
                                                     

## Create the generator

In [None]:
def upsample_block(
    x,
    filters,
    activation,
    kernel_size=(3, 3),
    strides=(1, 1),
    up_size=(2, 2),
    padding="same",
    use_bn=False,
    use_bias=True,
    use_dropout=False,
    drop_value=0.3,
):
    x = layers.UpSampling2D(up_size)(x)
    x = layers.Conv2D(
        filters, kernel_size, strides=strides, padding=padding, use_bias=use_bias
    )(x)

    if use_bn:
        x = layers.BatchNormalization()(x)

    if activation:
        x = activation(x)
    if use_dropout:
        x = layers.Dropout(drop_value)(x)
    return x


def get_generator_model():
    noise = layers.Input(shape=(noise_dim,))
    x = layers.Dense(8 * 8 * 1024, use_bias=False)(noise)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(0.2)(x)

    x = layers.Reshape((8, 8, 1024))(x)
    x = upsample_block(
        x,
        512,
        layers.LeakyReLU(0.2),
        strides=(1, 1),
        use_bias=False,
        use_bn=True,
        padding="same",
        use_dropout=False,
    )
    x = upsample_block(
        x,
        256,
        layers.LeakyReLU(0.2),
        strides=(1, 1),
        use_bias=False,
        use_bn=True,
        padding="same",
        use_dropout=False,
    )
    x = upsample_block(
        x, 3, layers.Activation("tanh"), strides=(1, 1), use_bias=False, use_bn=True
    )
    # At this point, we have an output which has the same shape as the input, (32, 32, 1).
    # We will use a Cropping2D layer to make it (28, 28, 1).
    #x = layers.Cropping2D((2, 2))(x)

    g_model = keras.models.Model(noise, x, name="generator")
    return g_model


g_model = get_generator_model()
g_model.summary()

Model: "generator"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_4 (InputLayer)        [(None, 128)]             0         
                                                                 
 dense_5 (Dense)             (None, 65536)             8388608   
                                                                 
 batch_normalization_8 (Batc  (None, 65536)            262144    
 hNormalization)                                                 
                                                                 
 leaky_re_lu_11 (LeakyReLU)  (None, 65536)             0         
                                                                 
 reshape_1 (Reshape)         (None, 8, 8, 1024)        0         
                                                                 
 up_sampling2d_3 (UpSampling  (None, 16, 16, 1024)     0         
 2D)                                                     

## Create the WGAN-GP model

Now that we have defined our generator and discriminator, it's time to implement
the WGAN-GP model. We will also override the `train_step` for training.

In [None]:
class WGAN(keras.Model):
    def __init__(
        self,
        discriminator,
        generator,
        latent_dim,
        discriminator_extra_steps=3,
        gp_weight=10.0,
    ):
        super(WGAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim
        self.d_steps = discriminator_extra_steps
        self.gp_weight = gp_weight

    def compile(self, d_optimizer, g_optimizer, d_loss_fn, g_loss_fn):
        super(WGAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.d_loss_fn = d_loss_fn
        self.g_loss_fn = g_loss_fn

    def gradient_penalty(self, batch_size, real_images, fake_images):
        """Calculates the gradient penalty.

        This loss is calculated on an interpolated image
        and added to the discriminator loss.
        """
        # Get the interpolated image
        alpha = tf.random.normal([batch_size, 1, 1, 1], 0.0, 1.0)
        diff = fake_images - real_images
        interpolated = real_images + alpha * diff

        with tf.GradientTape() as gp_tape:
            gp_tape.watch(interpolated)
            # 1. Get the discriminator output for this interpolated image.
            pred = self.discriminator(interpolated, training=True)

        # 2. Calculate the gradients w.r.t to this interpolated image.
        grads = gp_tape.gradient(pred, [interpolated])[0]
        # 3. Calculate the norm of the gradients.
        norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]))
        gp = tf.reduce_mean((norm - 1.0) ** 2)
        return gp

    def train_step(self, real_images):
        if isinstance(real_images, tuple):
            real_images = real_images[0]

        # Get the batch size
        batch_size = tf.shape(real_images)[0]

        # For each batch, we are going to perform the
        # following steps as laid out in the original paper:
        # 1. Train the generator and get the generator loss
        # 2. Train the discriminator and get the discriminator loss
        # 3. Calculate the gradient penalty
        # 4. Multiply this gradient penalty with a constant weight factor
        # 5. Add the gradient penalty to the discriminator loss
        # 6. Return the generator and discriminator losses as a loss dictionary

        # Train the discriminator first. The original paper recommends training
        # the discriminator for `x` more steps (typically 5) as compared to
        # one step of the generator. Here we will train it for 3 extra steps
        # as compared to 5 to reduce the training time.
        for i in range(self.d_steps):
            # Get the latent vector
            random_latent_vectors = tf.random.normal(
                shape=(batch_size, self.latent_dim)
            )
            with tf.GradientTape() as tape:
                # Generate fake images from the latent vector
                fake_images = self.generator(random_latent_vectors, training=True)
                # Get the logits for the fake images
                fake_logits = self.discriminator(fake_images, training=True)
                # Get the logits for the real images
                real_logits = self.discriminator(real_images, training=True)

                # Calculate the discriminator loss using the fake and real image logits
                d_cost = self.d_loss_fn(real_img=real_logits, fake_img=fake_logits)
                # Calculate the gradient penalty
                gp = self.gradient_penalty(batch_size, real_images, fake_images)
                # Add the gradient penalty to the original discriminator loss
                d_loss = d_cost + gp * self.gp_weight

            # Get the gradients w.r.t the discriminator loss
            d_gradient = tape.gradient(d_loss, self.discriminator.trainable_variables)
            # Update the weights of the discriminator using the discriminator optimizer
            self.d_optimizer.apply_gradients(
                zip(d_gradient, self.discriminator.trainable_variables)
            )

        # Train the generator
        # Get the latent vector
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
        with tf.GradientTape() as tape:
            # Generate fake images using the generator
            generated_images = self.generator(random_latent_vectors, training=True)
            # Get the discriminator logits for fake images
            gen_img_logits = self.discriminator(generated_images, training=True)
            # Calculate the generator loss
            g_loss = self.g_loss_fn(gen_img_logits)

        # Get the gradients w.r.t the generator loss
        gen_gradient = tape.gradient(g_loss, self.generator.trainable_variables)
        # Update the weights of the generator using the generator optimizer
        self.g_optimizer.apply_gradients(
            zip(gen_gradient, self.generator.trainable_variables)
        )
        return {"d_loss": d_loss, "g_loss": g_loss}


In [None]:
# Instantiate the optimizer for both networks
# (learning_rate=0.0002, beta_1=0.5 are recommended)
generator_optimizer = keras.optimizers.Adam(
    learning_rate=0.0002, beta_1=0.5, beta_2=0.9
)
discriminator_optimizer = keras.optimizers.Adam(
    learning_rate=0.0002, beta_1=0.5, beta_2=0.9
)

checkpoint_dir = '/content/drive/MyDrive/training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")


## Create a Keras callback that periodically saves generated images and saves the model

In [None]:
import math
checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                 discriminator_optimizer=discriminator_optimizer,
                                 generator=g_model,
                                 discriminator=d_model)
class GANMonitor(keras.callbacks.Callback):
    def __init__(self, num_img, latent_dim=128):
        self.num_img = num_img
        self.latent_dim = latent_dim
        

    def on_epoch_end(self, epoch, logs=None):
        checkpoint.save(file_prefix = checkpoint_prefix)
        def image_grid(imgs, rows, cols):
          assert len(imgs) == rows*cols

          w, h = imgs[0].size
          grid = Image.new('RGB', size=(cols*w, rows*h))
          grid_w, grid_h = grid.size
          
          for i, img in enumerate(imgs):
              grid.paste(img, box=(i%cols*w, i//cols*h))
          return grid
        
        random_latent_vectors = tf.random.normal(shape=(self.num_img, self.latent_dim))
        generated_images = self.model.generator(random_latent_vectors)
        generated_images = (generated_images * 127.5) + 127.5

        filenames = []

        for i in range(self.num_img):
          img = generated_images[i].numpy()
          img = keras.preprocessing.image.array_to_img(img)
          filenames.append(img)

        image = image_grid(filenames, 9,16)

        image.save("generated_img_{i}_{epoch}.png".format(i=i, epoch=str(epoch + 96)))


## Train the end-to-end model

In [None]:
%cd /content/drive/MyDrive/images_v2

/content/drive/.shortcut-targets-by-id/1m_cSF5KHWFFf8JsD-CJjScsUfh8m2MwN/images_v2


In [None]:
import os.path
checkpoint.restore(tf.train.latest_checkpoint('/content/drive/MyDrive/training_checkpoints/ckpt-155.data-00000-of-00001'))

<tensorflow.python.training.tracking.util.InitializationOnlyStatus at 0x7fac2e09c0d0>

In [None]:
# Define the loss functions for the discriminator,
# which should be (fake_loss - real_loss).
# We will add the gradient penalty later to this loss function.
import os.path
checkpoint.restore(tf.train.latest_checkpoint('/content/drive/MyDrive/training_checkpoints/ckpt-155.data-00000-of-00001'))


def discriminator_loss(real_img, fake_img):
    real_loss = tf.reduce_mean(real_img)
    fake_loss = tf.reduce_mean(fake_img)
    return fake_loss - real_loss


# Define the loss functions for the generator.
def generator_loss(fake_img):
    return -tf.reduce_mean(fake_img)


# Set the number of epochs for trainining.
epochs = 200
BATCH_SIZE = 256

# Instantiate the customer `GANMonitor` Keras callback.
cbk = GANMonitor(num_img=16*9, latent_dim=noise_dim)

wgan = WGAN(
    discriminator=d_model,
    generator=g_model,
    latent_dim=noise_dim,
    discriminator_extra_steps=3,
)

# Compile the wgan model
wgan.compile(
    d_optimizer=discriminator_optimizer,
    g_optimizer=generator_optimizer,
    g_loss_fn=generator_loss,
    d_loss_fn=discriminator_loss,
)
wgan.fit(train_images, batch_size=BATCH_SIZE, epochs=epochs, callbacks=[cbk])

Epoch 1/200
 8/63 [==>...........................] - ETA: 2:31 - d_loss: -13.7761 - g_loss: -10.3560

KeyboardInterrupt: ignored

In [None]:
%cd /drive/MyDrive/
d_model.save('d_model')
g_model.save('g_model')

## Gif Preperation

In [None]:
import glob
import imageio

In [None]:
%cd /content/drive/MyDrive/images_v2

/content/drive/MyDrive/images_matrix


In [None]:
anim_file = 'wgan_v2.gif'

    
with imageio.get_writer(anim_file, mode='I') as writer:
  filenames = glob.glob('generated*.png')
  filenames = sorted(filenames)
  
  print(filenames)
  for filename in filenames:
    image = imageio.imread(filename)
    writer.append_data(image)
  image = imageio.imread(filename)
  writer.append_data(image)

['generated_img_143_0.png', 'generated_img_143_1.png', 'generated_img_143_10.png', 'generated_img_143_11.png', 'generated_img_143_12.png', 'generated_img_143_13.png', 'generated_img_143_14.png', 'generated_img_143_15.png', 'generated_img_143_16.png', 'generated_img_143_17.png', 'generated_img_143_18.png', 'generated_img_143_19.png', 'generated_img_143_2.png', 'generated_img_143_20.png', 'generated_img_143_21.png', 'generated_img_143_22.png', 'generated_img_143_23.png', 'generated_img_143_24.png', 'generated_img_143_25.png', 'generated_img_143_26.png', 'generated_img_143_27.png', 'generated_img_143_28.png', 'generated_img_143_29.png', 'generated_img_143_3.png', 'generated_img_143_30.png', 'generated_img_143_31.png', 'generated_img_143_32.png', 'generated_img_143_33.png', 'generated_img_143_34.png', 'generated_img_143_35.png', 'generated_img_143_36.png', 'generated_img_143_37.png', 'generated_img_143_38.png', 'generated_img_143_39.png', 'generated_img_143_4.png', 'generated_img_143_40.pn

#Classifier

In [None]:
from tensorflow.keras import Sequential, layers
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras.utils import to_categorical
disc = checkpoint.discriminator

new_model = Sequential()
for i in range(len(disc.layers) - 1):
    new_model.add(disc.layers[i])

# freeze the layers 
for layer in new_model.layers:
    layer.trainable = False    

new_model.add(Dense(256, activation = 'relu'))
new_model.add(Dense(4, activation='softmax'))
new_model.summary()

Model: "sequential_6"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 zero_padding2d_1 (ZeroPaddi  (None, 68, 68, 3)        0         
 ng2D)                                                           
                                                                 
 conv2d_7 (Conv2D)           (None, 34, 34, 64)        4864      
                                                                 
 batch_normalization_4 (Batc  (None, 34, 34, 64)       256       
 hNormalization)                                                 
                                                                 
 leaky_re_lu_7 (LeakyReLU)   (None, 34, 34, 64)        0         
                                                                 
 conv2d_8 (Conv2D)           (None, 17, 17, 128)       204928    
                                                                 
 batch_normalization_5 (Batc  (None, 17, 17, 128)     

In [None]:
new_model.compile(optimizer=tf.keras.optimizers.Adam(.0001),loss =tf.keras.losses.CategoricalCrossentropy(), metrics=['accuracy'])



history1 = new_model.fit(train_images, train_labels, batch_size=16, epochs=200,
                        validation_data=(test_images, test_labels))

Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200
Epoch 55/200
Epoch 56/200
Epoch 57/200
Epoch 58/200
Epoch 59/200
Epoch 60/200
Epoch 61/200
Epoch 62/200
Epoch 63/200
Epoch 64/200
Epoch 65/200
Epoch 66/200
Epoch 67/200
Epoch 68/200
Epoch 69/200
Epoch 70/200
Epoch 71/200
Epoch 72/200
Epoch 73/200
Epoch 74/200
Epoch 75/200
Epoch 76/200
Epoch 77/200
Epoch 78

In [None]:
gen = checkpoint.generator

In [None]:
random_latent_vectors = tf.random.normal(shape=(1, 128))
generated_images = gen(random_latent_vectors)

In [None]:
new_model.predict_classes(generated_images)

AttributeError: ignored

In [None]:
train_images[0]

array([[[ 0.62352943, -0.35686275, -0.5764706 ],
        [ 0.6313726 , -0.35686275, -0.67058825],
        [ 0.3882353 , -0.4509804 , -0.21568628],
        ...,
        [-0.31764707, -0.27058825, -0.14509805],
        [-0.44313726, -0.40392157, -0.23137255],
        [-0.4117647 , -0.37254903, -0.2       ]],

       [[ 0.6392157 , -0.38039216, -0.58431375],
        [ 0.5686275 , -0.35686275, -0.6392157 ],
        [ 0.1764706 , -0.5686275 , -0.3019608 ],
        ...,
        [-0.31764707, -0.27058825, -0.14509805],
        [-0.48235294, -0.44313726, -0.28627452],
        [-0.41960785, -0.38039216, -0.22352941]],

       [[ 0.5921569 , -0.28627452, -0.45882353],
        [ 0.52156866, -0.29411766, -0.52156866],
        [ 0.08235294, -0.654902  , -0.37254903],
        ...,
        [-0.30980393, -0.2627451 , -0.13725491],
        [-0.46666667, -0.41960785, -0.29411766],
        [-0.41960785, -0.37254903, -0.24705882]],

       ...,

       [[ 0.78039217,  0.12156863, -0.7882353 ],
        [ 0

In [None]:
from sklearn.preprocessing import LabelBinarizer 
label_binarizer = LabelBinarizer()

label_binarizer.inverse_transform([train_labels[0]])


NotFittedError: ignored