In [1]:
import numpy as np
import pandas as pd
import cv2
import os
# from patchify import patchify
from PIL import Image
from matplotlib import pyplot as plt
import tifffile
import pickle as pkl
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Conv2D
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
# from keras.models import Model
# from keras.models import load_model

# Reading Stacked Images

In [88]:
DIRECTORY = (r"C:\Users\tause\Advance_ANN\Stacked")

CATEGORIES = []
for classes in os.listdir(DIRECTORY):
    CATEGORIES += [classes]

# print(CATEGORIES)

data = []
i=0
for category in CATEGORIES:
    img_path = os.path.join(DIRECTORY, category)
    arr = cv2.imread(img_path,0)
    data.append(arr)

stacked = np.array(data)

In [89]:
stacked = np.transpose(stacked, (1, 2, 0))
stacked.shape

(512, 512, 32)

# Making Patches

In [90]:
def extract_blocks(image, block_size, stride):
    blocks = []
    image_height, image_width = image.shape[:2]
    num_blocks_x = image_width // stride
    num_blocks_y = image_height // stride
    for y in range(0,image_height-block_size[1]+1,stride):
        for x in range(0,image_width-block_size[0]+1,stride):
            block=image[y:y+block_size[1],x:x+block_size[0]]
            blocks.append(block)
    return blocks

In [91]:
def reconstruct_image(patches, image_size, block_size, stride):
    # Get patch size
    patch_size = block_size[0]

    # Calculate the number of patches in each direction
    n_patches_x = (image_size[1] - patch_size) // stride + 1
    n_patches_y = (image_size[0] - patch_size) // stride + 1

    # Initialize reconstructed image
    reconstructed_image = np.zeros(image_size)

    # Place patches in the reconstructed image
    patch_idx = 0
    for y in range(n_patches_y):
        for x in range(n_patches_x):
            patch = patches[patch_idx]
            reconstructed_image[y*stride:y*stride+patch_size, x*stride:x*stride+patch_size] = patch
            patch_idx += 1

    return reconstructed_image

In [92]:
patches = np.array(extract_blocks(stacked,(64,64),24))
len(patches)

361

# Labels

In [93]:
y = np.array(patches)
y.shape

(361, 64, 64, 32)

In [94]:
y = y.astype(float)

# Making Dataset

In [95]:
x1 = tf.keras.layers.AveragePooling2D(pool_size=(8, 8),strides=(8, 8), padding='valid',dtype='float64')
x1 = x1(y)
x1.shape

TensorShape([361, 8, 8, 32])

In [96]:
x2_1 = y[:,:,:,0:11].mean(axis = 3)
x2_2 = y[:,:,:,11:22].mean(axis = 3)
x2_3 = y[:,:,:,22:].mean(axis = 3)

In [97]:
x2 = np.concatenate([np.expand_dims(x2_1,3),np.expand_dims(x2_2,3),np.expand_dims(x2_3,3)],axis = 3)
x2.shape

(361, 64, 64, 3)

# PreProcessing for Model

In [98]:
print("Labels-shape",y.shape)
print("Input_Feature-1 shape",x1.shape)
print("Input_Feature-2 shape",x2.shape)

Labels-shape (361, 64, 64, 32)
Input_Feature-1 shape (361, 8, 8, 32)
Input_Feature-2 shape (361, 64, 64, 3)


# Reshaping

In [99]:
X_interploated = tf.image.resize(x1, (64, 64), method='bilinear')
X_interploated.shape

TensorShape([361, 64, 64, 32])

In [100]:
X = tf.concat([X_interploated, x2], axis=-1)
X.shape

TensorShape([361, 64, 64, 35])

In [101]:
X = X/255
y = y/255

# --------------------------------------------------------------------------------------------------

# Generator

In [74]:
discriminator = keras.Sequential(
    [
        layers.Input((64, 64, 35)),

        layers.Conv2D(filters = 32,kernel_size = (3,3),strides=1,padding="valid",activation="LeakyReLU"),
        #shape(batch_size, 62, 62, 32).
        
        layers.Conv2D(filters = 64,kernel_size = (3,3),strides=1,padding="valid",activation="LeakyReLU"),
        #(batch_size, 60, 60, 64).

        layers.Conv2D(filters = 64,kernel_size = (3,3),strides=1,padding="valid",activation="LeakyReLU"),
        #(batch_size, 58, 58, 64).

        layers.Conv2D(filters = 128,kernel_size = (3,3),strides=1,padding="valid",activation="LeakyReLU"),
        #(batch_size, 56, 56, 128).
        
        layers.Reshape((112 ,112 ,32)),
        
        layers.Conv2D(filters = 32,kernel_size = (49,49),strides=1,padding="valid",activation="LeakyReLU"),
        #(batch_size, 64, 64, 32).
    ],
    name="discriminator",
)

# Create the generator.
generator = keras.Sequential(
    [
        layers.Input((64, 64, 35)),

        layers.Conv2D(filters = 32,kernel_size = (3,3),strides=1,padding="valid",activation="LeakyReLU"),
        #shape(batch_size, 62, 62, 32).
        
        layers.Conv2D(filters = 64,kernel_size = (3,3),strides=1,padding="valid",activation="LeakyReLU"),
        #(batch_size, 60, 60, 64).

        layers.Conv2D(filters = 64,kernel_size = (3,3),strides=1,padding="valid",activation="LeakyReLU"),
        #(batch_size, 58, 58, 64).

        layers.Conv2D(filters = 128,kernel_size = (3,3),strides=1,padding="valid",activation="LeakyReLU"),
        #(batch_size, 56, 56, 128).
        
        layers.Reshape((112 ,112 ,32)),
        
        layers.Conv2D(filters = 32,kernel_size = (49,49),strides=1,padding="valid",activation="LeakyReLU"),
        #(batch_size, 64, 64, 32).
    ],
    name="generator",
)

In [75]:
# Define the discriminator loss function
def discriminator_loss(real_output, synthetic_output):
    real_loss = tf.kerasbinary_crossentropy(tf.ones_like(real_output), real_output)
    synthetic_loss = tf.kerasbinary_crossentropy(tf.zeros_like(synthetic_output), synthetic_output)
    total_loss = real_loss + synthetic_loss
    return total_loss

In [76]:
# Define the generator loss function
def generator_loss(synthetic_output):
    return tf.kerasbinary_crossentropy(tf.ones_like(synthetic_output), synthetic_output)

In [77]:
# Define the discriminator and generator optimizers
discriminator_optimizer = keras.optimizers.Adam(learning_rate=0.001)
generator_optimizer = keras.optimizers.Adam(learning_rate=0.001)

In [78]:
# Define the training loop
@tf.function
def train_step(images):
    noise = tf.random.normal([64,64,64,35])

    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        synthetic_images = generator(noise, training=True)

        real_output = discriminator(images, training=True)
        synthetic_output = discriminator(synthetic_images, training=True)

        gen_loss = generator_loss(synthetic_output)
        disc_loss = discriminator_loss(real_output, synthetic_output)

        gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
        gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

        generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
        discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))


In [81]:
EPOCHS = 50
# BATCH_SIZE = 64
# noise_dim = [64,64,32]
# Train the GAN

for epoch in range(EPOCHS):
    for images in X:
        print(images.shape)
        break
    break
#         np.expand_dims(images,axis=0)
#         train_step((np.expand_dims(images,axis=0)))

(64, 64, 35)


In [83]:
import tensorflow as tf
import tensorflow.keras as keras

class Generator(keras.Model):
    def __init__(self, in_channels, out_channels):
        super().__init__()

        self.conv1 = keras.layers.Conv2D(64, 3, padding='same')
        self.conv2 = keras.layers.Conv2D(128, 3, padding='same')
        self.conv3 = keras.layers.Conv2D(256, 3, padding='same')
        self.conv4 = keras.layers.Conv2D(512, 3, padding='same')
        self.conv5 = keras.layers.Conv2D(out_channels, 3, padding='same')
        self.bn1 = keras.layers.BatchNormalization()
        self.bn2 = keras.layers.BatchNormalization()
        self.bn3 = keras.layers.BatchNormalization()
        self.bn4 = keras.layers.BatchNormalization()

    def call(self, x):
        x = tf.nn.relu(self.bn1(self.conv1(x)))
        x = tf.nn.relu(self.bn2(self.conv2(x)))
        x = tf.nn.relu(self.bn3(self.conv3(x)))
        x = tf.nn.relu(self.bn4(self.conv4(x)))
        x = tf.nn.tanh(self.conv5(x))
        return x

class Discriminator(keras.Model):
    def __init__(self, in_channels):
        super().__init__()

        self.conv1 = keras.layers.Conv2D(64, 3, padding='same')
        self.conv2 = keras.layers.Conv2D(128, 3, padding='same')
        self.conv3 = keras.layers.Conv2D(256, 3, padding='same')
        self.conv4 = keras.layers.Conv2D(512, 3, padding='same')
        self.conv5 = keras.layers.Conv2D(1, 3, padding='same')
        self.bn1 = keras.layers.BatchNormalization()
        self.bn2 = keras.layers.BatchNormalization()
        self.bn3 = keras.layers.BatchNormalization()
        self.bn4 = keras.layers.BatchNormalization()
    def call(self, x):
        x = tf.nn.leaky_relu(self.bn1(self.conv1(x)), alpha=0.2)
        x = tf.nn.leaky_relu(self.bn2(self.conv2(x)), alpha=0.2)
        x = tf.nn.leaky_relu(self.bn3(self.conv3(x)), alpha=0.2)
        x = tf.nn.leaky_relu(self.bn4(self.conv4(x)), alpha=0.2)
        x = tf.nn.sigmoid(self.conv5(x))
        return x

In [86]:
# define the input shapes for the generator and discriminator
generator_input_shape = (None, 64, 64, 35)
discriminator_input_shape = (None, 64, 64, 32)

# create instances of the generator and discriminator
generator = Generator(35, 32)
discriminator = Discriminator(32)

# define the optimizers for the generator and discriminator
generator_optimizer = keras.optimizers.Adam(2e-4, beta_1=0.5)
discriminator_optimizer = keras.optimizers.Adam(2e-4, beta_1=0.5)

# define the loss functions for the generator and discriminator
loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)

# define the training loop
@tf.function
def train_step(inputs, labels):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        generated_images = generator(inputs, training=True)

        real_output = discriminator(labels, training=True)
        fake_output = discriminator(generated_images, training=True)

        gen_loss = loss_fn(tf.ones_like(fake_output), fake_output)
        disc_loss = loss_fn(tf.ones_like(real_output), real_output) + loss_fn(tf.zeros_like(fake_output), fake_output)

    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

# define a function to generate and save images during training
def generate_and_save_images(model, epoch, test_input):
    predictions = model(test_input, training=False)

    def generate_and_save_images(model, epoch, test_input):
        predictions = model(test_input, training=False)

        fig = plt.figure(figsize=(4, 4))

        for i in range(predictions.shape[0]):
            plt.subplot(4, 4, i+1)
            plt.imshow(predictions[i, :, :, 0] * 127.5 + 127.5, cmap='gray')
            plt.axis('off')

        plt.savefig('image_at_epoch_{:04d}.png'.format(epoch))
        plt.show()



In [104]:
# define the number of epochs and the batch size
num_epochs = 100
batch_size = 16
test_input = tf.random.normal([16, 64, 64, 35])
# create a dataset and a data iterator
dataset = tf.data.Dataset.from_tensor_slices((X, y)).batch(batch_size)
data_iterator = iter(dataset)

# train the model
for epoch in range(num_epochs):
    # get a batch of data
    inputs, labels = next(data_iterator)

    # train the model on the batch
    train_step(inputs, labels)

    # generate and save images every 10 epochs
    if epoch % 10 == 0:
        generate_and_save_images(generator, epoch, test_input)



StopIteration: 