In [1]:
!pip install tensorflow matplotlib opencv-python-headless pycocotools wget -q

  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for wget (setup.py) ... [?25l[?25hdone


In [2]:
import os
import wget
import zipfile

# Directory where COCO data will be stored
dataset_dir = 'coco_dataset'
os.makedirs(dataset_dir, exist_ok=True)

# Download COCO dataset annotations
ann_file = 'http://images.cocodataset.org/annotations/annotations_trainval2017.zip'
wget.download(ann_file, out=dataset_dir)

# Extract annotations
with zipfile.ZipFile(os.path.join(dataset_dir, 'annotations_trainval2017.zip'), 'r') as zip_ref:
    zip_ref.extractall(dataset_dir)

In [3]:
import cv2
import numpy as np
from pycocotools.coco import COCO
import random
import matplotlib.pyplot as plt

def load_and_preprocess_image(image_path, target_size=(256, 256)):
    # Load image
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Resize image
    image = cv2.resize(image, target_size)

    # Convert to grayscale
    gray_image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
    gray_image = np.stack([gray_image]*3, axis=-1)  # Stack grayscale to create 3-channel image

    # Normalize images
    image = image / 127.5 - 1
    gray_image = gray_image / 127.5 - 1

    return gray_image, image

def download_coco_images(data_type='val2017', num_images=100):
    # COCO dataset path
    ann_file = f'{dataset_dir}/annotations/instances_{data_type}.json'
    coco = COCO(ann_file)

    # Get image ids and download images
    img_ids = coco.getImgIds()
    img_ids = random.sample(img_ids, num_images)
    img_paths = []
    for img_id in img_ids:
        img_info = coco.loadImgs(img_id)[0]
        img_url = img_info['coco_url']
        img_path = f"{dataset_dir}/{data_type}/{img_info['file_name']}"
        os.makedirs(os.path.dirname(img_path), exist_ok=True)
        if not os.path.exists(img_path):
            wget.download(img_url, out=os.path.dirname(img_path))
        img_paths.append(img_path)
    return img_paths

# Example: Load images and create training pairs
data_type = 'val2017'
num_images = 100

img_paths = download_coco_images(data_type, num_images)
train_images = []
train_labels = []

for image_path in img_paths:
    gray_image, color_image = load_and_preprocess_image(image_path)
    train_images.append(gray_image)
    train_labels.append(color_image)

train_images = np.array(train_images)
train_labels = np.array(train_labels)

print(f"Train images shape: {train_images.shape}")
print(f"Train labels shape: {train_labels.shape}")


loading annotations into memory...
Done (t=0.75s)
creating index...
index created!
Train images shape: (100, 256, 256, 3)
Train labels shape: (100, 256, 256, 3)


In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, LeakyReLU, Activation, Input
from tensorflow.keras.models import Model

def build_generator():
    inputs = Input(shape=(256, 256, 3))
    down1 = Conv2D(64, (4, 4), strides=2, padding='same')(inputs)
    down1 = LeakyReLU(alpha=0.2)(down1)

    down2 = Conv2D(128, (4, 4), strides=2, padding='same')(down1)
    down2 = LeakyReLU(alpha=0.2)(down2)

    up1 = Conv2DTranspose(64, (4, 4), strides=2, padding='same')(down2)
    up1 = LeakyReLU(alpha=0.2)(up1)

    up2 = Conv2DTranspose(3, (4, 4), strides=2, padding='same')(up1)
    outputs = Activation('tanh')(up2)

    return Model(inputs, outputs)

def build_discriminator():
    inputs = Input(shape=(256, 256, 3))
    down1 = Conv2D(64, (4, 4), strides=2, padding='same')(inputs)
    down1 = LeakyReLU(alpha=0.2)(down1)

    down2 = Conv2D(128, (4, 4), strides=2, padding='same')(down1)
    down2 = LeakyReLU(alpha=0.2)(down2)

    flat = Conv2D(1, (4, 4), strides=1, padding='valid')(down2)
    outputs = Activation('sigmoid')(flat)

    return Model(inputs, outputs)

generator = build_generator()
discriminator = build_discriminator()

In [None]:
def generator_loss(disc_generated_output, gen_output, target):
    # Cast tensors to float32
    target = tf.cast(target, tf.float32)
    gen_output = tf.cast(gen_output, tf.float32)

    gan_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)(tf.ones_like(disc_generated_output), disc_generated_output)
    l1_loss = tf.reduce_mean(tf.abs(target - gen_output))
    total_gen_loss = gan_loss + (100 * l1_loss)
    return total_gen_loss

def discriminator_loss(disc_real_output, disc_generated_output):
    # Cast tensors to float32
    disc_real_output = tf.cast(disc_real_output, tf.float32)
    disc_generated_output = tf.cast(disc_generated_output, tf.float32)

    real_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)(tf.ones_like(disc_real_output), disc_real_output)
    generated_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)(tf.zeros_like(disc_generated_output), disc_generated_output)
    total_disc_loss = real_loss + generated_loss
    return total_disc_loss


In [None]:
@tf.function
def train_step(input_image, target):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        # Forward pass
        gen_output = generator(input_image, training=True)
        disc_real_output = discriminator(target, training=True)
        disc_generated_output = discriminator(gen_output, training=True)

        # Calculate losses
        gen_loss = generator_loss(disc_generated_output, gen_output, target)
        disc_loss = discriminator_loss(disc_real_output, disc_generated_output)

    # Compute gradients
    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    # Update weights
    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))


In [None]:
# Define the number of epochs
EPOCHS = 100

# Load the dataset (assuming train_images and train_labels are already loaded and preprocessed)
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
train_dataset = train_dataset.shuffle(buffer_size=100).batch(1)

# Training loop
for epoch in range(EPOCHS):
    print(f"Epoch {epoch + 1}/{EPOCHS}")
    for input_image, target in train_dataset:
        train_step(input_image, target)

    # Save model every 10 epochs
    if (epoch + 1) % 10 == 0:
        generator.save(f'cvg_generator_epoch_{epoch + 1}.h5')
        discriminator.save(f'cvg_discriminator_epoch_{epoch + 1}.h5')

    # Generate and save images for visualization
    for input_image, target in train_dataset.take(1):
        generated_image = generator(input_image, training=False)
        plt.figure(figsize=(15, 15))

        display_list = [input_image[0], generated_image[0], target[0]]
        title = ['Input Image', 'Generated Image', 'Target Image']

        for i in range(3):
            plt.subplot(1, 3, i + 1)
            plt.title(title[i])
            plt.imshow((display_list[i] * 0.5 + 0.5))
            plt.axis('off')
        plt.show()

In [None]:
import matplotlib.pyplot as plt

# Variables to store loss values
generator_losses = []
discriminator_losses = []

def train_step(input_image, target):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        # Forward pass
        gen_output = generator(input_image, training=True)
        disc_real_output = discriminator(target, training=True)
        disc_generated_output = discriminator(gen_output, training=True)

        # Calculate losses
        gen_loss = generator_loss(disc_generated_output, gen_output, target)
        disc_loss = discriminator_loss(disc_real_output, disc_generated_output)

    # Compute gradients
    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    # Update weights
    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

    # Store losses
    generator_losses.append(gen_loss.numpy())
    discriminator_losses.append(disc_loss.numpy())

# Training loop
for epoch in range(EPOCHS):
    print(f"Epoch {epoch + 1}/{EPOCHS}")
    for input_image, target in train_dataset:
        train_step(input_image, target)

    # Save model every 10 epochs
    if (epoch + 1) % 10 == 0:
        generator.save(f'generator_epoch_{epoch + 1}.h5')
        discriminator.save(f'discriminator_epoch_{epoch + 1}.h5')

    # Generate and save images for visualization
    if (epoch + 1) % 10 == 0:
        for input_image, target in train_dataset.take(1):
            generated_image = generator(input_image, training=False)
            plt.figure(figsize=(15, 15))

            display_list = [input_image[0], generated_image[0], target[0]]
            title = ['Input Image', 'Generated Image', 'Target Image']

            for i in range(3):
                plt.subplot(1, 3, i + 1)
                plt.title(title[i])
                plt.imshow((display_list[i] * 0.5 + 0.5))
                plt.axis('off')
            plt.show()

# Plot losses
plt.figure(figsize=(12, 6))
plt.plot(generator_losses, label='Generator Loss')
plt.plot(discriminator_losses, label='Discriminator Loss')
plt.xlabel('Batch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training Losses')
plt.show()