# DCGAN - Custom Dataset
## Deep Convolutional GAN

This is much the same as the previous notebook we looked at (still using the [official Keras DCGAN implementation](https://keras.io/examples/generative/dcgan_overriding_train_step/)) but I have some extra code at the start to allow you to make your own dataset from a YouTube video!

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import matplotlib.pyplot as plt
import os
import gdown
from zipfile import ZipFile
import cv2
import math
import random

In [None]:
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

We have to install [YouTube-DL](https://youtube-dl.org/) via pip in the cell below.

In [None]:
!pip install --upgrade --quiet youtube_dl

All you need to do is to run the cell below. It just contains helper functions to download a YouTube video and to extract frames from the video to make the dataset. Feel free to poke around if you're interested.

In [None]:
# Some helper functions we will use to make the dataset

from __future__ import unicode_literals
import youtube_dl


class MyLogger(object):
    def debug(self, msg):
        pass

    def warning(self, msg):
        pass

    def error(self, msg):
        print(msg)


def my_hook(d):
    if d['status'] == 'finished':
        print('Done downloading.')

def download_youtube_video(_url):
  ydl_opts = {
      'format': '(mp4)[height>=256][height<=400]',
      'outtmpl': '%(id)s.%(ext)s',
      'logger': MyLogger(),
      'progress_hooks': [my_hook],
  }
  with youtube_dl.YoutubeDL(ydl_opts) as ydl:
      result = ydl.extract_info(_url, download=True)

  if 'entries' in result:
    video = result['entries'][0]
  else:
    video = result

  return video

def analyse_video(_videoPath):
  vidcap = cv2.VideoCapture(_videoPath)
  success, frame = vidcap.read()

  frameCount = 0
  darkFrames = []
  validFrames = []

  while success:
    grey = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    average = grey.mean(axis=0).mean(axis=0)

    if average < 2:
      darkFrames.append(frameCount)
    else:
      validFrames.append(frameCount)
    
    success, frame = vidcap.read()
    frameCount += 1
  
  print(f'Found {len(darkFrames)} dark frames.')
  return validFrames, darkFrames

def extract_frames(_videoPath, _outputPath, _num, _size):
  SIZE = _size[0]
  MAX = _num
  count = 0
  id = 0
  validFrames, darkFrames = analyse_video(_videoPath)
  doubles = []
  frames = []

  if MAX > len(validFrames):
    numDoubles = MAX - len(validFrames)
    doubles = np.random.choice(validFrames, size=numDoubles, replace=False)
    frames = validFrames
  else:
    frames = np.random.choice(validFrames, size=MAX, replace=False)

  vidcap = cv2.VideoCapture(_videoPath)
  success, frame = vidcap.read()

  frameHeight = frame.shape[0]
  frameWidth = frame.shape[1]

  scaleFactor = SIZE / frameHeight
  newWidth = int(frameWidth * scaleFactor)
  padding = int((newWidth - SIZE) / 2)

  while success:
    if count in frames:
      frame = cv2.resize(frame, (newWidth, SIZE), interpolation=cv2.INTER_AREA)
      crops = []

      if count in doubles:
        crops = [frame[0:SIZE, 0:SIZE],
                frame[0:SIZE, padding*2:SIZE+padding*2]]
      else:
        crops = [frame[0:SIZE, padding:SIZE+padding]]

      for crop in crops:
        try:
          cv2.imwrite(os.path.join(_outputPath, f'{id:04}.jpg'), crop)
          id += 1
        except:
          print("Error saving frame.")
          pass
    
    count += 1
    success, frame = vidcap.read()

  print(f"Saved {id} images from video '{videoInfo['title']}'")

  return id

# Choose a YouTube Video

Find a video on YouTube which is about 4-10 minutes long. The video can be anything but ideally it will be generally consistent throughout. So timelapse video are perfect, [like this video of clouds forming](https://www.youtube.com/watch?v=NJfI_GaEyJw), or [this video of life underwater](https://www.youtube.com/watch?v=J2BKd5e15Jc) has nice consistent colours and forms. However it is entirely up to you, maybe it would be interesting to get a random video of Lady Gaga dresses.. who knows!

Paste the YouTube video URL in the cell below, __replacing the url that is between the single quotes__.

If the URL is long like this:

```
https://www.youtube.com/watch?v=NJfI_GaEyJw&ab_channel=wizard327
                                           ^
                       We don't need the stuff after this & symbol.
```

just trim the end off after (and including) the '__&__' symbol.

In [None]:
url = 'https://www.youtube.com/watch?v=J2BKd5e15Jc' # Coral Reef
videoInfo = download_youtube_video(url)
videoFile = "{0}.{1}".format(videoInfo['webpage_url'].split('=')[-1], videoInfo['ext'])

If your video has an intro and an outro we can trim that off! Change the `startTime` and `endTime` values below. The values need to be in seconds, so if the good bit of the video start at 38 seconds then enter `38` for `startTime`. If the end credits start at 9:14, then an easy way to find the seconds is `(9*60)+14`.

__If you don't need to trim the video down then just don't run the cell below.__

In [None]:
# Optional trimming of the video to remove intro / end credits.
# Skip this if your video does not need trimming.

from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip

def trim_video(_video, _start, _end):
  trimmedVideo = f"trimmed.{videoInfo['ext']}"
  ffmpeg_extract_subclip(videoFile, _start, _end, targetname=trimmedVideo)
  return trimmedVideo


startTime = 38
endTime = (9*60)+14
videoFile = trim_video(videoFile, startTime, endTime)

Next we make some directories and extract 3000 images from the the video.

In [None]:
DATASET_DIR = 'dataset'
IMAGES_DIR = f"{DATASET_DIR}/images"
OUTPUT_DIR = 'output'
IMAGE_SIZE =(64, 64)
try:
  os.makedirs(DATASET_DIR)
  os.makedirs(IMAGES_DIR)
  os.makedirs(OUTPUT_DIR)
except:
  pass
NUM_IMAGES = extract_frames(f"/content/{videoFile}", IMAGES_DIR, 3000, IMAGE_SIZE)

As with before we then turn it into a _Tensorflow dataset_.

In [None]:
dataset = keras.preprocessing.image_dataset_from_directory(
    DATASET_DIR, label_mode=None, image_size=IMAGE_SIZE, batch_size=32
)
dataset = dataset.map(lambda x: x / 255.0)

All things going well, if you run the cell below you should see frames from the video you chose being taken from the dataset we created.

In [None]:
for x in dataset:
    plt.axis("off")
    plt.imshow((x.numpy() * 255).astype("int32")[0])
    break

# Defining the Model

This is the same as the previous example.

In [None]:
discriminator = keras.Sequential(
    [
        keras.Input(shape=(64, 64, 3)),
        layers.Conv2D(64, kernel_size=4, strides=2, padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, kernel_size=4, strides=2, padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, kernel_size=4, strides=2, padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Flatten(),
        layers.Dropout(0.2),
        layers.Dense(1, activation="sigmoid"),
    ],
    name="discriminator",
)
discriminator.summary()


In [None]:
latent_dim = 128

generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        layers.Dense(8 * 8 * 128),
        layers.Reshape((8, 8, 128)),
        layers.Conv2DTranspose(128, kernel_size=4, strides=2, padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(256, kernel_size=4, strides=2, padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(512, kernel_size=4, strides=2, padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(3, kernel_size=5, padding="same", activation="sigmoid"),
    ],
    name="generator",
)
generator.summary()

In [None]:
class GAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(GAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(GAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn
        self.d_loss_metric = keras.metrics.Mean(name="d_loss")
        self.g_loss_metric = keras.metrics.Mean(name="g_loss")

    @property
    def metrics(self):
        return [self.d_loss_metric, self.g_loss_metric]

    def train_step(self, real_images):
        # Sample random points in the latent space
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Decode them to fake images
        generated_images = self.generator(random_latent_vectors)

        # Combine them with real images
        combined_images = tf.concat([generated_images, real_images], axis=0)

        # Assemble labels discriminating real from fake images
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )
        # Add random noise to the labels - important trick!
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # Train the discriminator
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )

        # Sample random points in the latent space
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Assemble labels that say "all real images"
        misleading_labels = tf.zeros((batch_size, 1))

        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as tape:
            predictions = self.discriminator(self.generator(random_latent_vectors))
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))

        # Update metrics
        self.d_loss_metric.update_state(d_loss)
        self.g_loss_metric.update_state(g_loss)
        return {
            "d_loss": self.d_loss_metric.result(),
            "g_loss": self.g_loss_metric.result(),
        }

In [None]:
class GANMonitor(keras.callbacks.Callback):
    def __init__(self, num_img=1, latent_dim=128):
        self.num_img = num_img
        self.latent_dim = latent_dim

    def on_epoch_end(self, epoch, logs=None):
        random_latent_vectors = tf.random.normal(shape=(self.num_img, self.latent_dim))
        generated_images = self.model.generator(random_latent_vectors)
        generated_images *= 255
        generated_images.numpy()
        for i in range(self.num_img):
            img = keras.preprocessing.image.array_to_img(generated_images[i])
            img.save(os.path.join(OUTPUT_DIR, "generated_img_%03d_%d.png" % (epoch, i)))


# Train the GAN

Now our dataset is much much smaller than [CelebA](http://mmlab.ie.cuhk.edu.hk/projects/CelebA.html) and so this will train a lot quicker.

Unfortunately Keras likes to output a lot of it's own information during training which makes it difficult to display images using `imshow` or something similar. But as with before images from the generator are saved to a directory called `output` which you can find in the file explorer on the left.

In [None]:
epochs = 60

gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
    d_optimizer=keras.optimizers.Adam(learning_rate=0.0001),
    g_optimizer=keras.optimizers.Adam(learning_rate=0.0001),
    loss_fn=keras.losses.BinaryCrossentropy(),
)

gan.fit(
    dataset, epochs=epochs, callbacks=[GANMonitor(num_img=1, latent_dim=latent_dim)]
)

# Extra: Saving a Model

In [None]:
# Make a new directory to save the model into
os.mkdir('model')
gan.generator.save('model') # We're just saving the generator here as that's the interesting bit!

In [None]:
# Give Colab access to your Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Copy the 'model' folder to your drive
# -r means 'copy recursively' so it copies all the files and folders
# inside the 'model' directory.
!cp -r /content/model/ /content/drive/MyDrive/

# Look at outputs for each epoch (Step 3)

In [None]:
#Get image paths
my_images = []
for root, dirs, files in os.walk(OUTPUT_DIR, topdown=False):
    for name in files:
        if not ".DS_Store" in name:
            my_images.append(os.path.join(root, name))
    my_images =np.array(my_images)

In [None]:
#Show images
num_to_show = len(my_images)
plt.figure(figsize=(12,12))
cols = 10
rows = int(np.ceil(num_to_show / cols))
for index, im in enumerate(my_images):
    loaded_image = cv2.cvtColor(cv2.resize(cv2.imread(im), IMAGE_SIZE), cv2.COLOR_BGR2RGB)
    plt.subplot(rows, cols, index+1)
    plt.imshow(loaded_image, interpolation="bilinear")
    plt.axis('off')
plt.show()

# Interpolate between two points (Step 4)

In [None]:
#editted from https://machinelearningmastery.com/how-to-interpolate-and-perform-vector-arithmetic-with-faces-using-a-generative-adversarial-network/
from numpy import asarray
from numpy.random import randn
from numpy.random import randint
from numpy import linspace
from keras.models import load_model

 # generate points in latent space as input for the generator
def generate_latent_points(latent_dim, n_samples, n_classes=10):
  # generate points in the latent space
  x_input = randn(latent_dim * n_samples)
  # reshape into a batch of inputs for the network
  z_input = x_input.reshape(n_samples, latent_dim)
  return z_input

# uniform interpolation between two points in latent space
def interpolate_points(p1, p2, n_steps=10):
  # interpolate ratios between the points
  ratios = linspace(0, 1, num=n_steps)
  # linear interpolate vectors
  vectors = list()
  for ratio in ratios:
    v = (1.0 - ratio) * p1 + ratio * p2
    vectors.append(v)
  return asarray(vectors)

# create a plot of generated images
def plot_generated(examples):
  # plot images
  num_to_show = len(examples)
  plt.figure(figsize=(12,12))
  cols = 3
  rows = int(np.ceil(num_to_show / cols))
  for i in range(num_to_show):
    # define subplot
    plt.subplot(rows, cols, i+1)
    # turn off axis
    plt.axis('off')
    # plot raw pixel data
    plt.imshow(examples[i])
  plt.show()

### How many images do you want?

Pick the number of images to interpolate. It will pick two new random points to interpolate between every time you run it! 

In [None]:
num_images = 20

# generate points in latent space
pts = generate_latent_points(latent_dim, 2)
print(pts.shape)
# interpolate points in latent space
interpolated = interpolate_points(pts[0], pts[1], num_images)
# generate images
X = gan.generator(interpolated)
X *= 255
X.numpy()
generated_images = []
for i in range(num_images):
    generated_images.append(keras.preprocessing.image.array_to_img(X[i]))
plt.figure(figsize=(12,12))
plot_generated(generated_images)