In [1]:
from google.colab import drive
drive.mount('/content/drive')




Mounted at /content/drive


In [10]:

import tensorflow as tf
import numpy as np
from PIL import Image
import glob
import os
from skimage.transform import resize
import matplotlib.pyplot as plt

# Instance Normalization class as used in your model
class InstanceNormalization(tf.keras.layers.Layer):
    def __init__(self, epsilon=1e-5):
        super(InstanceNormalization, self).__init__()
        self.epsilon = epsilon

    def build(self, input_shape):
        self.scale = self.add_weight(name='scale', shape=input_shape[-1:],
                                     initializer=tf.random_normal_initializer(1., 0.02),
                                     trainable=True)
        self.offset = self.add_weight(name='offset', shape=input_shape[-1:],
                                      initializer='zeros', trainable=True)

    def call(self, x, training=False):
        mean, variance = tf.nn.moments(x, axes=[1, 2], keepdims=True)
        inv = tf.math.rsqrt(variance + self.epsilon)
        normalized = (x - mean) * inv
        return self.scale * normalized + self.offset

def downsample(filters, size, apply_norm=True):
    initializer = tf.random_normal_initializer(0., 0.02)
    down_layers = tf.keras.Sequential()
    #adding Conv2D layer
    down_layers.add(tf.keras.layers.Conv2D(filters, size, strides = 2, padding = 'same',
                                        kernel_initializer = initializer, use_bias = False))
    #adding normalization
    if apply_norm:
        down_layers.add(InstanceNormalization())
    #adding activation function as Leaky Relu
    down_layers.add(tf.keras.layers.LeakyReLU())
    return down_layers


def upsample(filters, size, apply_dropout=False):
    initializer = tf.random_normal_initializer(0., 0.02)
    up_layers = tf.keras.Sequential()
    # Add Transposed Conv2d layer
    up_layers.add(tf.keras.layers.Conv2DTranspose(filters, size, strides=2, padding='same',
                                               kernel_initializer=initializer, use_bias=False))
    # Add Normalization Layer
    up_layers.add(InstanceNormalization())
    # Conditionally add Dropout layer
    if apply_dropout:
        up_layers.add(tf.keras.layers.Dropout(0.5))
    # Add Relu Activation Layer
    up_layers.add(tf.keras.layers.ReLU())
    return up_layers


def unet_generator():
    down_stack = [
        downsample(64, 4, False), #(bs, 128, 128, 64)
        downsample(128, 4), #(bs, 64, 64, 128)
        downsample(256, 4), #(bs, 32, 32, 256)
        downsample(512, 4), #(bs, 16, 16, 512)
        downsample(512, 4), #(bs, 8, 8, 512)
        downsample(512, 4), #(bs, 4, 4, 512)
        downsample(512, 4), #(bs, 2, 2, 512)
        downsample(512, 4) #(bs, 1, 1, 512)
    ]

    up_stack = [
        upsample(512, 4, apply_dropout=True),  # (bs, 2, 2, 1024)
        upsample(512, 4, apply_dropout=True),  # (bs, 4, 4, 1024)
        upsample(512, 4, apply_dropout=True),  # (bs, 8, 8, 1024)
        upsample(512, 4),  # (bs, 16, 16, 1024)
        upsample(256, 4),  # (bs, 32, 32, 512)
        upsample(128, 4),  # (bs, 64, 64, 256)
        upsample(64, 4),  # (bs, 128, 128, 128)
    ]
    initializer = tf.random_normal_initializer(0., 0.02)
    last = tf.keras.layers.Conv2DTranspose(1, 4,
                                         strides=2,
                                         padding='same',
                                         kernel_initializer=initializer,
                                         activation='tanh')  # (bs, 256, 256, 1)
    inputs = tf.keras.layers.Input(shape=[256, 256, 1])
    concat = tf.keras.layers.Concatenate()
    x = inputs

    # Downsampling through the model
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)

    skips = reversed(skips[:-1])

  # Upsampling and establishing the skip connections
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = concat([x, skip])

    x = last(x)

    return tf.keras.Model(inputs=inputs, outputs=x)

# Load the pre-trained model
model = unet_generator()
checkpoint_path = '/content/drive/My Drive/mia/trainedmodel2'
ckpt = tf.train.Checkpoint(generator_g=model)
status = ckpt.restore(tf.train.latest_checkpoint(checkpoint_path)).expect_partial()  # Using expect_partial to ignore optimizer states

# Function to process new images and save them
def process_and_save_images(input_dir, output_dir):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    all_t1_images = glob.glob(f"{input_dir}/*.png")
    for filename in all_t1_images:
        img = Image.open(filename).convert('L')
        img = img.resize((256, 256))
        img_array = np.array(img) / 127.5 - 1
        img_array = np.expand_dims(img_array, axis=0)  # Add batch dimension
        img_array = np.expand_dims(img_array, axis=-1)  # Add channel dimension
        prediction = model(img_array, training=False)
        prediction = (prediction[0, :, :, 0].numpy() + 1) * 127.5  # Rescale to 0-255
        prediction = np.clip(prediction, 0, 255).astype(np.uint8)
        result_path = os.path.join(output_dir, os.path.basename(filename))
        Image.fromarray(prediction).save(result_path)

# Example usage
input_directory = '/content/drive/My Drive/mia/mri_taskc/taskc_slices2/subject_61_t1w.nii'
output_directory = '/content/drive/My Drive/mia/mri_taskc/taskc_slices2_2/subject_61_t1w.nii'
process_and_save_images(input_directory, output_directory)
