In [1]:
from tensorflow.keras import Sequential,optimizers
import tensorflow as tf 
from tensorflow.keras.layers import Input, Dense,BatchNormalization,Conv2DTranspose,Conv2D,Dropout,Flatten, Reshape, Activation 
from tensorflow_addons.layers import GroupNormalization
import numpy as np
import time
from IPython.display import display, clear_output
import pickle

print(tf.config.list_physical_devices('GPU'))


TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 

 The versions of TensorFlow you are currently using is 2.10.0 and is not supported. 
Some things might work, some things might not.
If you were to encounter a bug, do not file an issue.
If you want to make sure you're using a tested and supported configuration, either change the TensorFlow version or the TensorFlow Addons's version. 
You can find the compatibility matrix in TensorFlow Addon's readme:
https://github.com/tensorflow/addons


[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [2]:
NOISE_DIM = 100
BATCH_SIZE = 32
INSTANCE_NUM = 10000
BATCHES = INSTANCE_NUM//BATCH_SIZE

In [3]:
def convTBlock(output_channels,kernal_size, strides, padding='same', Inst_Norm=True, activation = "relu"):
    model = Sequential()
    model.add(Conv2DTranspose(output_channels, kernal_size, strides=strides, padding=padding, use_bias=False,kernel_regularizer=tf.keras.regularizers.l2(0.0005)))
    if Inst_Norm:
        model.add(GroupNormalization(groups = 1))

    model.add(Activation(activation))
    return model

def generator_model(noise_dim):
    model = Sequential()
    
    model.add(Dense(2048*8, use_bias=False, input_shape=(noise_dim,)))
    model.add(Reshape((16,16 , 64)))
    
    model.add(convTBlock(64     , 4, 1))
    model.add(convTBlock(32     , 4, 2))
    model.add(convTBlock(16     , 4, 2))
    model.add(convTBlock(1      , 4, 2, Inst_Norm=False, activation = "leaky_relu"))
    
    return model

generator = generator_model(NOISE_DIM)
generator.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 16384)             1638400   
                                                                 
 reshape (Reshape)           (None, 16, 16, 64)        0         
                                                                 
 sequential_1 (Sequential)   (None, 16, 16, 64)        65664     
                                                                 
 sequential_2 (Sequential)   (None, 32, 32, 32)        32832     
                                                                 
 sequential_3 (Sequential)   (None, 64, 64, 16)        8224      
                                                                 
 sequential_4 (Sequential)   (None, 128, 128, 1)       256       
                                                                 
Total params: 1,745,376
Trainable params: 1,745,376
Non-

In [4]:
def convBlock(output_channels,kernal_size, strides, padding='same', batch_normalization=False, activation = "leaky_relu",dropout = 0.2):
    model = Sequential()
    model.add(Conv2D(output_channels, kernal_size, strides=strides, padding=padding, use_bias=False,kernel_regularizer=tf.keras.regularizers.l2(0.0005)))
    if batch_normalization:
        model.add(BatchNormalization())
    model.add(Activation(activation))
    if dropout:
        model.add(Dropout(dropout))
    return model

def discriminator_model():
    model = Sequential()
    model.add(Input(shape=[128, 128, 1]))

    model.add(convBlock(16,  4, 2, padding='same', dropout=0.3))
    model.add(convBlock(32,  4, 2, padding='same', dropout=0.3))
    model.add(convBlock(64, 4, 2, padding='same', dropout=0.3))
    model.add(convBlock(128,4, 2, padding='same', batch_normalization=False))

    model.add(Flatten())
    model.add(Dense(1,activation ="sigmoid"))

    return model
discriminator = discriminator_model()
discriminator.summary()

Model: "sequential_5"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 sequential_6 (Sequential)   (None, 64, 64, 16)        256       
                                                                 
 sequential_7 (Sequential)   (None, 32, 32, 32)        8192      
                                                                 
 sequential_8 (Sequential)   (None, 16, 16, 64)        32768     
                                                                 
 sequential_9 (Sequential)   (None, 8, 8, 128)         131072    
                                                                 
 flatten (Flatten)           (None, 8192)              0         
                                                                 
 dense_1 (Dense)             (None, 1)                 8193      
                                                                 
Total params: 180,481
Trainable params: 180,481
Non-tr

In [5]:
with open("data.pkl", "rb") as file:
    df = pickle.load(file)[:INSTANCE_NUM]
df = (np.asarray(df, dtype = np.float16)).reshape(-1, 128, 128,1).astype(np.float16)
df = df/255
dataset = tf.data.Dataset.from_tensor_slices(df)
dataset = dataset.shuffle(buffer_size=1024).batch(BATCH_SIZE)

In [6]:
del df

In [7]:
import math

loss_func = tf.keras.losses.BinaryCrossentropy(from_logits=True)

epsilon = 1e-8
def discRealLoss(real_output):
    return -tf.reduce_mean(tf.math.log(real_output + epsilon))
def discFakeLoss(fake_output):
    return -tf.reduce_mean(tf.math.log(1 - fake_output + epsilon))
def genRealLoss(fake_output):
    return -tf.reduce_mean(tf.math.log(fake_output + epsilon))

class CustomDecaySchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, initial_learning_rate, decay_steps, decay_rate, min_learning_rate):
        self.initial_learning_rate = initial_learning_rate
        self.decay_steps = decay_steps
        self.decay_rate = decay_rate
        self.min_learning_rate = min_learning_rate

    def __call__(self, step):
        # Calculate the decayed learning rate
        decayed_learning_rate = self.initial_learning_rate * (self.decay_rate ** (step // self.decay_steps))
        # Ensure the learning rate does not go below the minimum
        return tf.maximum(decayed_learning_rate, self.min_learning_rate)
    
lr_schedule = CustomDecaySchedule(
    1e-4,
    decay_steps=BATCHES//100,
    decay_rate=0.96,
    min_learning_rate=1e-6)


gen_opt= optimizers.Adam(
    learning_rate=lr_schedule,
    )
dis_opt = optimizers.Adam(
    learning_rate=1e-4,
    )

def show_info(batch_number, epoch,epochs, fake_loss, dis_loss, start):
    clear_output(wait=True)
    display(f"epoch: {epoch}/{epochs} | batch: {batch_number}/{BATCHES}")
    display(f"gen_loss: {fake_loss:.4f}")
    display(f"disc_loss: {dis_loss:.4f}")
    display(f"time: {time.time() - start:.2f}s") 

log_dir = "log/"+time.strftime("%Y%m%d-%H%M%S")+"/"
summary_writer = tf.summary.create_file_writer(log_dir)

In [8]:
@tf.function
def train_discriminator(images, generated_images):
    with tf.GradientTape() as disc_tape:
        real = discriminator(images, training=True)
        fake = discriminator(generated_images, training=True)
        
        real_loss = discRealLoss(real)
        fake_loss = discFakeLoss(fake)
        disc_loss = real_loss + fake_loss
    
    disc_gradients = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
    dis_opt.apply_gradients(zip(disc_gradients, discriminator.trainable_variables))
    return disc_loss,real,fake

@tf.function
def train_generator(noise):
    with tf.GradientTape() as gen_tape:
        generated_images = generator(noise, training=True)
        fake = discriminator(generated_images, training=True)
        gen_loss = genRealLoss(real,fake)
    
    gen_gradients = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gen_opt.apply_gradients(zip(gen_gradients, generator.trainable_variables))
    return gen_loss

In [9]:
constant_noise = tf.random.normal([BATCH_SIZE, NOISE_DIM],seed=1)
last_loss = math.inf
from filelock import FileLock

def train_epoch(images, batch_size, noise_dim,epoch,batch_number,epochs):
    global last_loss
    
    start = time.time()
    noise = tf.random.normal([BATCH_SIZE, noise_dim])


    generated = generator(noise, training=True)

    disc_loss,real,fake = train_discriminator(images, generated)
    gen_loss = train_generator(noise)
    

    show_info(batch_number, epoch,epochs, gen_loss, disc_loss, start)
    
    with summary_writer.as_default():
        tf.summary.scalar('Loss/gen', gen_loss,     step=(INSTANCE_NUM*(epoch-1)//BATCH_SIZE)+batch_number)
        tf.summary.scalar('Loss/dis' , disc_loss,    step=(INSTANCE_NUM*(epoch-1)//BATCH_SIZE)+batch_number)
        tf.summary.scalar('Loss/real' , int(sum(real.numpy()>.5)),    step=(INSTANCE_NUM*(epoch-1)//BATCH_SIZE)+batch_number)
        tf.summary.scalar('Loss/fake' , int(sum(fake.numpy()>.5)),    step=(INSTANCE_NUM*(epoch-1)//BATCH_SIZE)+batch_number)
        
    if gen_loss < last_loss:
        generator.save("generator.keras")
        discriminator.save("discriminator.keras")
        last_loss= gen_loss
        
    lock = FileLock("image.pkl.lock")
    with lock:
        with open("image.pkl", "wb") as file:
            pickle.dump([epoch,batch_number,np.asarray(generator(constant_noise)[0])], file)
    
def train(dataset, epochs= 20,batch_size=32):
    start = time.time()
    for epoch_number in range(epochs):
        batch_number = 1
        for batch in dataset:
            batch_number+=1
            train_epoch(images = batch,
                        batch_size = batch_size,
                        noise_dim=NOISE_DIM,
                        epoch=epoch_number+1,
                        batch_number=batch_number,
                        epochs = epochs)
            display(f"Total time: {time.time() - start:.2f}s")
        

In [10]:
try:
    train(dataset, 50, BATCH_SIZE)
except KeyboardInterrupt:
    print("Manually Interrupted")

TypeError: in user code:

    File "C:\Users\ahmad\AppData\Local\Temp\ipykernel_6748\2480240856.py", line 20, in train_generator  *
        gen_loss = genRealLoss(fake)

    TypeError: tf__genRealLoss() missing 1 required positional argument: 'fake_output'
