In [1]:
import tensorflow as tf
from tensorflow.keras import layers
import os
import time
import numpy as np
import matplotlib.pyplot as plt
from IPython import display

In [2]:
import tensorflow as tf
import os
from glob import glob


# Define the image size to resize to (256x256 for pix2pix)
IMG_HEIGHT = 512
IMG_WIDTH = 512

# Helper function to load an image
def load_image(image_file, is_target=False):
    # Read the image file
    image = tf.io.read_file(image_file)
    
    # Decode the image as PNG (adjust if using other formats like JPEG)
    image = tf.image.decode_png(image, channels=3) #if is_target else tf.image.decode_png(image, channels=1)
    
    # Cast the image to float32 for processing
    image = tf.cast(image, tf.float32)
    
    return image

# Resize both input (skeleton) and target images to the required size (256x256)
def resize(input_image, target_image, height, width):
    input_image = tf.image.resize(input_image, [height, width])
    target_image = tf.image.resize(target_image, [height, width])
    return input_image, target_image

# Normalize both images to the range [-1, 1]
def normalize(input_image, target_image):
    input_image = (input_image / 127.5) - 1
    target_image = (target_image / 127.5) - 1
    return input_image, target_image

# Preprocessing function to load and preprocess both skeleton and target images
def load_train_image(skeleton_path, target_path):
    input_image = load_image(skeleton_path, is_target=False)   # Grayscale (skeleton)
    target_image = load_image(target_path, is_target=True)     # RGB (target)

    # Resize both images to (256, 256)
    input_image, target_image = resize(input_image, target_image, IMG_HEIGHT, IMG_WIDTH)
    
    # Normalize the images to the range [-1, 1]
    input_image, target_image = normalize(input_image, target_image)
    
    return input_image, target_image

# Function to load dataset as TensorFlow Dataset object
def load_dataset(dataset_path, batch_size):
    skeleton_images = []
    target_images = []

    # Iterate over each train_X_img and train_X_label subfolder
    for i in range(1, 11):  # Assuming the folder names are train_1_img to train_10_label
        target_dir = os.path.join(dataset_path, f'train_{i}_img/')
        skeleton_dir = os.path.join(dataset_path, f'train_{i}_label/')
        
        # Collect all image file paths from both skeleton and target directories
        skeleton_images.extend(sorted(glob(os.path.join(skeleton_dir, '*.png'))))
        target_images.extend(sorted(glob(os.path.join(target_dir, '*.png'))))
    
    # Ensure that skeletons and targets have the same number of files
    assert len(skeleton_images) == len(target_images), "Mismatch in the number of skeleton and target images"

    # Create a TensorFlow Dataset from the file paths
    dataset = tf.data.Dataset.from_tensor_slices((skeleton_images, target_images))
    
    # Map the load_train_image function to each image pair
    dataset = dataset.map(lambda skeleton_path, target_path: tf.py_function(
        load_train_image, [skeleton_path, target_path], [tf.float32, tf.float32]))

    # Shuffle, batch, and prefetch the dataset for performance
    dataset = dataset.shuffle(buffer_size=400)  # Change this based on dataset size
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
    
    print("dataset created")
    
    return dataset


In [3]:
import tensorflow as tf
from tensorflow.keras import layers, Model

# Residual Block
def residual_block(x, filters):
    shortcut = x
    x = layers.Conv2D(filters, kernel_size=3, strides=1, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.Conv2D(filters, kernel_size=3, strides=1, padding='same')(x)
    x = layers.BatchNormalization()(x)
    return layers.Add()([shortcut, x])

# Global Generator Network
def global_generator(input_shape=(512, 512, 3), filters=64, n_residual_blocks=9):
    inputs = layers.Input(shape=input_shape)
    x = layers.Conv2D(filters, kernel_size=7, strides=1, padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)

    # Downsampling
    for _ in range(2):
        filters *= 2
        x = layers.Conv2D(filters, kernel_size=3, strides=2, padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.ReLU()(x)

    # Residual Blocks
    for _ in range(n_residual_blocks):
        x = residual_block(x, filters)

    # Upsampling
    for _ in range(2):
        filters //= 2
        x = layers.Conv2DTranspose(filters, kernel_size=3, strides=2, padding='same', output_padding=1)(x)
        x = layers.BatchNormalization()(x)
        x = layers.ReLU()(x)

    outputs = layers.Conv2D(3, kernel_size=7, strides=1, padding='same', activation='tanh')(x)
    return Model(inputs, outputs, name="GlobalGenerator")

# Create the generator model
generator_model = global_generator()

# Summary of the generator
generator_model.summary()


In [4]:
def downsample(filters, size, apply_batchnorm=True):
    initializer = tf.random_normal_initializer(0., 0.02)
    result = tf.keras.Sequential()
    result.add(
        layers.Conv2D(filters, size, strides=2, padding='same', 
                      kernel_initializer=initializer, use_bias=False))
 
    if apply_batchnorm:
        result.add(layers.BatchNormalization())
 
    result.add(layers.LeakyReLU())
    return result

def Discriminator():
    initializer = tf.random_normal_initializer(0., 0.02)
    
    input_img = layers.Input(shape=[512, 512, 3], name='input_image')
    target_img = layers.Input(shape=[512, 512, 3], name='target_image')

    x = layers.concatenate([input_img, target_img])  # (bs, 256, 256, 4)
    
    down1 = downsample(64, 4, False)(x)  # (bs, 128, 128, 64)
    down2 = downsample(128, 4)(down1)  # (bs, 64, 64, 128)
    down3 = downsample(256, 4)(down2)  # (bs, 32, 32, 256)
    
    zero_pad1 = layers.ZeroPadding2D()(down3)  # (bs, 34, 34, 256)
    conv = layers.Conv2D(512, 4, strides=1, kernel_initializer=initializer, use_bias=False)(zero_pad1)  # (bs, 31, 31, 512)
    
    batchnorm1 = layers.BatchNormalization()(conv)
    leaky_relu = layers.LeakyReLU()(batchnorm1)
    
    zero_pad2 = layers.ZeroPadding2D()(leaky_relu)  # (bs, 33, 33, 512)
    
    last = layers.Conv2D(1, 4, strides=1, kernel_initializer=initializer)(zero_pad2)  # (bs, 30, 30, 1)
    
    return tf.keras.Model(inputs=[input_img, target_img], outputs=last)

dis = Discriminator()
dis.summary()

In [5]:
'''loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True)

def generator_loss(disc_generated_output, gen_output, target):
    gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output)
    l1_loss = tf.reduce_mean(tf.abs(target - gen_output))
    total_gen_loss = gan_loss + (100 * l1_loss)
    return total_gen_loss, gan_loss, l1_loss

def discriminator_loss(disc_real_output, disc_generated_output):
    real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output)
    generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output)
    total_disc_loss = real_loss + generated_loss
    return total_disc_loss'''


'loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True)\n\ndef generator_loss(disc_generated_output, gen_output, target):\n    gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output)\n    l1_loss = tf.reduce_mean(tf.abs(target - gen_output))\n    total_gen_loss = gan_loss + (100 * l1_loss)\n    return total_gen_loss, gan_loss, l1_loss\n\ndef discriminator_loss(disc_real_output, disc_generated_output):\n    real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output)\n    generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output)\n    total_disc_loss = real_loss + generated_loss\n    return total_disc_loss'

In [6]:
@tf.function
def generator_loss(disc_generated_output, fake_images, real_images, feature_maps_real, feature_maps_fake, lambda_l1=10):
    # Adversarial loss: Binary cross-entropy for fooling the discriminator
    adversarial_loss = tf.reduce_mean(
        tf.nn.sigmoid_cross_entropy_with_logits(labels=tf.ones_like(disc_generated_output), logits=disc_generated_output)
    )
    
    # Feature matching loss: L1 loss between feature maps from the real and fake images
    feature_matching_loss = 0
    #for real_feature, fake_feature in zip(feature_maps_real, feature_maps_fake):
    feature_matching_loss = tf.reduce_mean(tf.abs(feature_maps_real - feature_maps_fake))
    
    # Optional L1 reconstruction loss: L1 loss between real and fake images
    l1_loss = tf.reduce_mean(tf.abs(real_images - fake_images))
    
    # Combined loss
    total_loss = adversarial_loss + feature_matching_loss + lambda_l1 * l1_loss
    return total_loss
@tf.function
def discriminator_loss(disc_real_output, disc_fake_output):
    # Real image loss: Binary cross-entropy for classifying real images as real
    real_loss = tf.reduce_mean(
        tf.nn.sigmoid_cross_entropy_with_logits(labels=tf.ones_like(disc_real_output), logits=disc_real_output)
    )
    
    # Fake image loss: Binary cross-entropy for classifying fake images as fake
    fake_loss = tf.reduce_mean(
        tf.nn.sigmoid_cross_entropy_with_logits(labels=tf.zeros_like(disc_fake_output), logits=disc_fake_output)
    )
    
    # Combined loss
    total_loss = real_loss + fake_loss
    return total_loss

In [7]:

# Check if GPU is available and print the devices
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Enable dynamic memory growth
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print(f"{len(gpus)} GPU(s) available: {[gpu.name for gpu in gpus]}")
    except RuntimeError as e:
        print(e)
else:
    print("No GPU available. Running on CPU.")

Physical devices cannot be modified after being initialized


In [8]:
generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
vgg_model = tf.keras.applications.VGG16(include_top=False, weights='imagenet')
vgg_model.trainable = False

@tf.function
def train_step(input_image, target, generator, discriminator):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        gen_output = generator(input_image, training=True)
        disc_real_output = discriminator([input_image, target], training=True)
        disc_generated_output = discriminator([input_image, gen_output], training=True)

        y_true_features = vgg_model(target)
        y_pred_features = vgg_model(gen_output)
        gen_total_loss = generator_loss(disc_generated_output, gen_output, target, y_true_features, y_pred_features)
        disc_loss = discriminator_loss(disc_real_output, disc_generated_output)
        
    generator_gradients = gen_tape.gradient(gen_total_loss, generator.trainable_variables)
    discriminator_gradients = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
    
    generator_optimizer.apply_gradients(zip(generator_gradients, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(discriminator_gradients, discriminator.trainable_variables))
    
    return gen_total_loss, disc_loss


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m58889256/58889256[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


In [9]:
def fit(generator, discriminator, dataset, epochs):
    cardinality = tf.data.experimental.cardinality(dataset)
    num_batches = cardinality.numpy()
    for epoch in range(epochs):
        start = time.time()
        batch = 1
        for input_image, target in dataset:
            print("Epoch no.: " + str(epoch) + " Batch no.: " + str(batch) + "/" + str(num_batches))
            gen_loss, disc_loss = train_step(input_image, target, generator, discriminator)
            batch+=1
        print(f"Epoch {epoch+1}/{epochs} | Generator Loss: {gen_loss} | Discriminator Loss: {disc_loss}")
        print(f'Time taken for epoch {epoch+1} is {time.time()-start} sec\n')


In [10]:
BATCH_SIZE = 8  # Adjust based on your GPU memory

# Load dataset
DATASET_PATH = "/kaggle/input/frame-pose-dataset/video-pose dataset/data-meta"
train_dataset = load_dataset(DATASET_PATH, batch_size=BATCH_SIZE)

dataset created


In [None]:
# Create generator and discriminator
generator = global_generator()
discriminator = Discriminator()

# Assume dataset is already loaded and preprocessed
fit(generator, discriminator, train_dataset, 10)

Epoch no.: 0 Batch no.: 1/3487
Epoch no.: 0 Batch no.: 2/3487
Epoch no.: 0 Batch no.: 3/3487


KeyboardInterrupt: 

In [93]:
generator.save('/kaggle/working/generator-pretrained-pix2pix.h5')
discriminator.save('/kaggle/working/discriminator-pretrained-pix2pix.h5')

In [95]:
generator = tf.keras.models.load_model('/kaggle/working/generator-pretrained-pix2pix.h5')
discriminator = tf.keras.models.load_model('/kaggle/working/discriminator-pretrained-pix2pix.h5')