In [5]:
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
import matplotlib.pyplot as plt
import time
import os

# Configure directories for saving models and samples
SAMPLE_DIR = './MNIST/figs/generated_images_for_testRun1'
MODEL_DIR = './MNIST/models/models_for_testRun1'
BUFFER_SIZE = 60000
BATCH_SIZE = 256
IMG_SHAPE = (32, 32, 1)
NOISE_DIM = 100

# Ensure directories exist
os.makedirs(SAMPLE_DIR, exist_ok=True)
os.makedirs(MODEL_DIR, exist_ok=True)

# Preprocess images (resize to 32x32 and normalize to [-1, 1])
def preprocess_images(image):
    image = tf.image.resize(image, [32, 32])
    image = (image - 127.5) / 127.5  # Normalize to [-1, 1]
    return image

# Load and preprocess MNIST dataset
(train_images, _), (_, _) = tf.keras.datasets.mnist.load_data()
train_images = train_images.reshape(train_images.shape[0], 28, 28, 1).astype('float32')
train_images = tf.data.Dataset.from_tensor_slices(train_images)
train_images = train_images.map(preprocess_images)
train_images = train_images.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

# Generator model
def build_generator():
    model = tf.keras.Sequential([
        layers.Dense(4 * 4 * 256, use_bias=False, input_shape=(NOISE_DIM,)),
        layers.BatchNormalization(),
        layers.ReLU(),

        layers.Reshape((4, 4, 256)),

        layers.Conv2DTranspose(128, (5, 5), strides=(2, 2), padding='same', use_bias=False),
        layers.BatchNormalization(),
        layers.ReLU(),

        layers.Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', use_bias=False),
        layers.BatchNormalization(),
        layers.ReLU(),

        layers.Conv2DTranspose(1, (5, 5), strides=(2, 2), padding='same', activation='tanh')
    ])

    return model

# Discriminator model
def build_discriminator():
    model = tf.keras.Sequential([
        layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same', input_shape=IMG_SHAPE),
        layers.LeakyReLU(alpha=0.2),
        layers.Dropout(0.3),

        layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same'),
        layers.LeakyReLU(alpha=0.2),
        layers.Dropout(0.3),

        layers.Flatten(),
        layers.Dense(1)
    ])

    return model

# Define loss functions and optimizers
def discriminator_loss(real_output, fake_output):
    real_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)(tf.ones_like(real_output), real_output)
    fake_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)(tf.zeros_like(fake_output), fake_output)
    return real_loss + fake_loss

def generator_loss(fake_output):
    return tf.keras.losses.BinaryCrossentropy(from_logits=True)(tf.ones_like(fake_output), fake_output)

# Instantiate models
generator = build_generator()
discriminator = build_discriminator()

# Optimizers
generator_optimizer = tf.keras.optimizers.Adam(1e-4)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)

# Training step
@tf.function
def train_step(images):
    noise = tf.random.normal([BATCH_SIZE, NOISE_DIM])

    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        generated_images = generator(noise, training=True)

        real_output = discriminator(images, training=True)
        fake_output = discriminator(generated_images, training=True)

        gen_loss = generator_loss(fake_output)
        disc_loss = discriminator_loss(real_output, fake_output)

    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

    return gen_loss, disc_loss

# Generate and save images
def generate_and_save_images(model, epoch, test_input):
    predictions = model(test_input, training=False)

    fig = plt.figure(figsize=(6, 6))
    for i in range(predictions.shape[0]):
        plt.subplot(6, 6, i + 1)
        plt.imshow(predictions[i, :, :, 0] * 127.5 + 127.5, cmap='gray')
        plt.axis('off')

    plt.savefig(os.path.join(SAMPLE_DIR, f'image_at_epoch_{epoch:04d}.png'))
    plt.close()

# Training loop
def train(dataset, epochs):
    seed = tf.random.normal([36, NOISE_DIM])

    for epoch in range(epochs):
        start = time.time()

        for image_batch in dataset:
            gen_loss, disc_loss = train_step(image_batch)

        print(f'Epoch {epoch + 1}, Gen Loss: {gen_loss:.4f}, Disc Loss: {disc_loss:.4f}, Time: {time.time() - start:.2f} sec')

        # Generate and save images at the end of each epoch
        generate_and_save_images(generator, epoch + 1, seed)

        # Save the model every 5 epochs
        if (epoch + 1) % 5 == 0:
            checkpoint_prefix = os.path.join(MODEL_DIR, "ckpt")
            generator.save_weights(checkpoint_prefix + f"_generator_epoch_{epoch+1}")
            discriminator.save_weights(checkpoint_prefix + f"_discriminator_epoch_{epoch+1}")

# Run training
EPOCHS = 50
train(train_images, EPOCHS)


Epoch 1, Gen Loss: 0.9256, Disc Loss: 1.1652, Time: 7.38 sec
Epoch 2, Gen Loss: 0.8507, Disc Loss: 1.5089, Time: 5.85 sec
Epoch 3, Gen Loss: 1.0107, Disc Loss: 1.1905, Time: 5.70 sec
Epoch 4, Gen Loss: 1.1526, Disc Loss: 0.9104, Time: 5.87 sec
Epoch 5, Gen Loss: 0.9645, Disc Loss: 1.0319, Time: 5.84 sec
Epoch 6, Gen Loss: 1.2793, Disc Loss: 0.8223, Time: 6.04 sec
Epoch 7, Gen Loss: 1.2788, Disc Loss: 0.7165, Time: 5.77 sec
Epoch 8, Gen Loss: 1.1006, Disc Loss: 1.1412, Time: 5.64 sec
Epoch 9, Gen Loss: 1.0005, Disc Loss: 1.1039, Time: 5.72 sec
Epoch 10, Gen Loss: 1.4514, Disc Loss: 0.7428, Time: 5.46 sec
Epoch 11, Gen Loss: 1.6699, Disc Loss: 0.5825, Time: 5.41 sec
Epoch 12, Gen Loss: 1.4090, Disc Loss: 0.6642, Time: 5.48 sec
Epoch 13, Gen Loss: 1.1749, Disc Loss: 0.9800, Time: 6.26 sec
Epoch 14, Gen Loss: 1.1192, Disc Loss: 0.9754, Time: 5.59 sec
Epoch 15, Gen Loss: 0.7504, Disc Loss: 1.8639, Time: 5.94 sec
Epoch 16, Gen Loss: 1.4822, Disc Loss: 0.7013, Time: 5.62 sec
Epoch 17, Gen Los