UNet for perfect sinogram data.

In [None]:
import os
import torch
from torch import nn, optim
from torch.optim import lr_scheduler
from torch.utils.data import DataLoader, Dataset
from torchvision.transforms import ToTensor, Resize, Compose
from PIL import Image

# UNet model
class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)

class UNet(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(UNet, self).__init__()
        self.encoder1 = DoubleConv(in_channels, 64)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.encoder2 = DoubleConv(64, 128)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.encoder3 = DoubleConv(128, 256)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.encoder4 = DoubleConv(256, 512)
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.bottleneck = DoubleConv(512, 1024)

        self.upconv1 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)
        self.decoder1 = DoubleConv(1024, 512)
        self.upconv2 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.decoder2 = DoubleConv(512, 256)
        self.upconv3 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.decoder3 = DoubleConv(256, 128)
        self.upconv4 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.decoder4 = DoubleConv(128, 64)

        self.output = nn.Conv2d(64, out_channels, kernel_size=1)

    def forward(self, x):
        # Encoder
        enc1 = self.encoder1(x)
        enc2 = self.encoder2(self.pool1(enc1))
        enc3 = self.encoder3(self.pool2(enc2))
        enc4 = self.encoder4(self.pool3(enc3))

        # Bottleneck
        bottleneck = self.bottleneck(self.pool4(enc4))

        # Decoder
        dec1 = self.upconv1(bottleneck)
        dec1 = self.decoder1(torch.cat([enc4, dec1], dim=1))
        dec2 = self.upconv2(dec1)
        dec2 = self.decoder2(torch.cat([enc3, dec2], dim=1))
        dec3 = self.upconv3(dec2)
        dec3 = self.decoder3(torch.cat([enc2, dec3], dim=1))
        dec4 = self.upconv4(dec3)
        dec4 = self.decoder4(torch.cat([enc1, dec4], dim=1))

        # Output
        output = self.output(dec4)
        return output

# dataset class
class SinogramDataset(Dataset):
    def __init__(self, input_folder, output_folder, transform=None):
        self.input_folder = input_folder
        self.output_folder = output_folder
        self.transform = transform

        self.input_images = sorted(os.listdir(input_folder))
        self.output_images = sorted(os.listdir(output_folder))

        # Ensure that the input and output images are paired correctly based on order
        assert len(self.input_images) == len(self.output_images), "Mismatch in the number of input and output images"
        self.num_images = len(self.input_images)

    def __len__(self):
        return self.num_images

    def __getitem__(self, index):
        input_image_path = os.path.join(self.input_folder, self.input_images[index])
        output_image_path = os.path.join(self.output_folder, self.output_images[index])

        input_image = Image.open(input_image_path).convert('RGB')
        output_image = Image.open(output_image_path).convert('RGB')

        if self.transform:
            input_image = self.transform(input_image)
            output_image = self.transform(output_image)

        return input_image, output_image


# data transformations
data_transform = Compose([Resize((128, 128)), ToTensor()])

input_folder = '/home/hinata/code/fyp/images/ml_images/rec/lsd'
output_folder = '/home/hinata/code/fyp/images/ml_images/rec/hsd'
dataset = SinogramDataset(input_folder, output_folder, transform=data_transform)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

#UNet model, loss function, and optimizer
model = UNet(in_channels=3, out_channels=3)

# # Move the model to TPU if available
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# if str(device) == "cuda":
#     model = model.cuda()

# criterion = nn.MSELoss()
# optimizer = optim.Adam(model.parameters(), lr=0.00001)

# # Move the model to GPU if available
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model.to(device)

# # Move the criterion to GPU if available (optional)
# criterion = nn.MSELoss().to(device)

num_epochs = 250

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    print(f"Epoch {epoch+1}/{num_epochs}")

    for inputs, targets in dataloader:
        inputs, targets = inputs.to(device), targets.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, targets)

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss}")


torch.save(model.state_dict(), 'sngrm_to_sngrm_model.pth')


Testing my unet on new sinograms.

In [None]:
import os
import random
import torch
from skimage.metrics import structural_similarity as ssim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from PIL import Image
import numpy as np
from skimage.filters import gaussian
from torchvision.transforms import ToTensor, Resize
import matplotlib.pyplot as plt

def list_images(folder_path):
    return [os.path.join(folder_path, filename) for filename in os.listdir(folder_path) if filename.endswith('.png')]

input_sinogram_folder = '/content/drive/MyDrive/Colab Notebooks/fyp/ml_images/sinogram_images/4a'
ground_truth_sinogram_folder = '/content/drive/MyDrive/Colab Notebooks/fyp/ml_images/sinogram_images/16a'

input_sinogram_paths = list_images(input_sinogram_folder)

ground_truth_sinogram_paths = list_images(ground_truth_sinogram_folder)

num_images_to_select = 20  # Change this value as needed

# Randomly select a specific number of images
random_input_sinogram_paths = random.sample(input_sinogram_paths, num_images_to_select)
random_ground_truth_sinogram_paths = random.sample(ground_truth_sinogram_paths, num_images_to_select)

model = UNet(in_channels=3, out_channels=3)  # Assuming UNet is defined in the same module

state_dict = torch.load('sinogram_enhancement_model.pth', map_location=torch.device('cpu'))

model.load_state_dict(state_dict)

model.eval()

# store PSNR and SSIM values
psnr_values = []
ssim_values = []

# resize transform
resize_transform = Resize((128, 128))

for input_path, gt_path in zip(input_sinogram_paths, ground_truth_sinogram_paths):
    input_sinogram = Image.open(input_path).convert('L') 
    input_sinogram = resize_transform(input_sinogram)
    input_sinogram = ToTensor()(input_sinogram)  
    input_sinogram = torch.cat([input_sinogram] * 3, dim=0)  

    with torch.no_grad():
        output_sinogram = model(input_sinogram.unsqueeze(0))

    # denoise the output sinogram
    denoised_output = gaussian(output_sinogram.numpy()[0, 0], sigma=1)

    # convert tensors to numpy arrays
    denoised_output_np = denoised_output
    input_sinogram_np = input_sinogram[0].numpy()

    # save denoised output as image
    output_dir = '/content/drive/MyDrive/Colab Notebooks/fyp/img/output/3'
    output_filename = os.path.join(output_dir, f"predicted_sinogram_{os.path.basename(input_path)}")
    plt.imsave(output_filename, denoised_output_np, cmap='gray')

    # PSNR
    mse_loss = F.mse_loss(torch.from_numpy(np.expand_dims(denoised_output_np, axis=0)), input_sinogram.unsqueeze(0))
    psnr = 20 * torch.log10(1.0 / torch.sqrt(mse_loss))
    psnr_values.append(psnr.item())

    # SSIM
    ssim_value, _ = ssim(denoised_output_np, input_sinogram_np, full=True, data_range=1.0)
    ssim_values.append(ssim_value)

    # Load ground truth sinogram
    gt_sinogram = Image.open(gt_path).convert('L')  
    gt_sinogram = resize_transform(gt_sinogram)
    gt_sinogram = ToTensor()(gt_sinogram)  

    fig, axs = plt.subplots(1, 3, figsize=(8, 3))
    axs[0].imshow(input_sinogram_np, cmap='gray')
    axs[0].set_title('Original')
    axs[0].axis('off')
    axs[1].imshow(denoised_output_np, cmap='gray')
    axs[1].set_title('Predicted (Denoised Output)')
    axs[1].axis('off')
    axs[2].imshow(gt_sinogram.squeeze().numpy(), cmap='gray') 
    axs[2].set_title('Ground Truth')
    axs[2].axis('off')
    plt.show()


for i, input_path in enumerate(input_sinogram_paths):
    print(f"Input: {input_path}, PSNR: {psnr_values[i]:.4f}, SSIM: {ssim_values[i]:.4f}")


unet after hyperparameters were tuned and regularisation added as well as early stopping to prevent overfitting


In [None]:
import os
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
from sklearn.model_selection import train_test_split
import cv2
import matplotlib.pyplot as plt
from skimage.metrics import mean_squared_error, structural_similarity
from tensorflow.image import ssim
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras import layers, regularizers

class unet_sinogram(tf.keras.Model):
    def __init__(self, width=128, depth=5, dropout_rate=0.1, l2_lambda=0.001):
        super(unet_sinogram, self).__init__()
        self.width = width
        self.depth = depth
        self.dropout_rate = dropout_rate
        self.l2_lambda = l2_lambda
        self.history = {'loss': [], 'accuracy': [], 'val_loss': [], 'val_accuracy': []}

        # U-Net model architecture for sinograms
        self.inputs = tf.keras.Input(shape=(128, 128, 1))
        x = self.inputs

        # Encoder
        for _ in range(depth):
            x = layers.Conv2D(width, 3, activation='relu', padding='same', kernel_regularizer=regularizers.l2(l2_lambda))(x)
            x = layers.BatchNormalization()(x)
            x = layers.MaxPooling2D(pool_size=(2, 2))(x)

        # Bottleneck
        x = layers.Conv2D(width, 3, activation='relu', padding='same', kernel_regularizer=regularizers.l2(l2_lambda))(x)

        # Decoder
        for _ in range(depth):
            x = layers.Conv2DTranspose(width, 2, strides=(2, 2), padding='same')(x)
            x = layers.Conv2D(width, 3, activation='relu', padding='same', kernel_regularizer=regularizers.l2(l2_lambda))(x)
            x = layers.BatchNormalization()(x)
            x = layers.Dropout(self.dropout_rate)(x)

        self.outputs = layers.Conv2D(1, 1, activation='sigmoid', padding='same')(x)

        self.model = tf.keras.Model(inputs=self.inputs, outputs=self.outputs, name=f"sinogram_unet_{width}_{depth}")

    def call(self, inputs):
        return self.model(inputs)

    def compile_model(self, optimizer, loss_function, metrics):
        self.model.compile(optimizer=optimizer, loss=loss_function, metrics=metrics)

    def train_model(self, train_input_images, train_output_images, epochs, batch_size, validation_split):
        early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)  # Define early stopping callback
        history = self.model.fit(train_input_images, train_output_images, epochs=epochs, batch_size=batch_size, validation_split=validation_split, callbacks=[early_stopping], verbose=1)
        self.history['loss'].extend(history.history['loss'])
        self.history['accuracy'].extend(history.history['accuracy'])
        self.history['val_loss'].extend(history.history['val_loss'])
        self.history['val_accuracy'].extend(history.history['val_accuracy'])

        self.save_metrics()

    def evaluate_model(self, test_input_images, test_output_images):
        return self.model.evaluate(test_input_images, test_output_images, verbose=0)

    def save_metrics(self):
        with open("training_metrics.txt", "w") as file:
            file.write("Epoch\tLoss\tAccuracy\tVal_Loss\tVal_Accuracy\n")
            for i in range(len(self.history['loss'])):
                file.write(f"{i+1}\t{self.history['loss'][i]}\t{self.history['accuracy'][i]}\t{self.history['val_loss'][i]}\t{self.history['val_accuracy'][i]}\n")

    def calculate_metrics(self, test_input_images, test_output_images):
        mse_values = []
        ssim_values = []
        psnr_values = []

        for i in range(len(test_input_images)):
            input_image = test_input_images[i]
            output_image = test_output_images[i]
            predicted_image = self.model.predict(np.expand_dims(input_image, axis=0))[0]

            mse = mean_squared_error(output_image, predicted_image)
            ssim_score = structural_similarity(output_image, predicted_image, data_range=predicted_image.max() - predicted_image.min())
            psnr = tf.image.psnr(output_image, predicted_image, max_val=1.0)

            mse_values.append(mse)
            ssim_values.append(ssim_score)
            psnr_values.append(psnr)

        return mse_values, ssim_values, psnr_values

# custom loss functions
def ssim_loss(y_true, y_pred):
    return 1 - tf.reduce_mean(ssim(y_true, y_pred, max_val=1.0))

def psnr_loss(y_true, y_pred):
    return -tf.image.psnr(y_true, y_pred, max_val=1.0)

def load_and_preprocess_sinograms(input_folder, output_folder):
    input_images = []
    output_images = []
    
    input_filenames = sorted(os.listdir(input_folder))
    output_filenames = sorted(os.listdir(output_folder))
    
    for input_filename, output_filename in zip(input_filenames, output_filenames):
        input_img_path = os.path.join(input_folder, input_filename)
        output_img_path = os.path.join(output_folder, output_filename)
        
        input_img = cv2.imread(input_img_path, cv2.IMREAD_GRAYSCALE)
        output_img = cv2.imread(output_img_path, cv2.IMREAD_GRAYSCALE)
        
        if input_img is not None and output_img is not None:
            input_img = cv2.resize(input_img, (128, 128))  
            output_img = cv2.resize(output_img, (128, 128)) 
            
            input_img = input_img.astype('float32') / 255.0  
            output_img = output_img.astype('float32') / 255.0 
            
            input_images.append(input_img)
            output_images.append(output_img)
    
    return np.array(input_images), np.array(output_images)

input_folder = '/jupyter/work/fyp/data/sinograms/3rd_set/4'
output_folder = '/jupyter/work/fyp/data/sinograms/3rd_set/16'

input_images, output_images = load_and_preprocess_sinograms(input_folder, output_folder)

train_input_images, test_input_images, train_output_images, test_output_images = train_test_split(
    input_images, output_images, test_size=0.2, random_state=42
)

unet_model = unet_sinogram()
unet_model.compile_model(optimizer=Adam(learning_rate=0.00001), loss_function=psnr_loss, metrics=['accuracy'])

unet_model.train_model(train_input_images, train_output_images, epochs=200, batch_size=32, validation_split=0.2)

unet_model.model.save("unet_sinogram_model.h5")

# Evaluate the model and calculate metrics
# mse_values, ssim_values, psnr_values = unet_model.calculate_metrics(test_input_images, test_output_images)


PSNR graph

In [None]:
# evaluations

test_loss, _ = unet_model.evaluate_model(test_input_images, test_output_images)
print(f"Test Loss: {test_loss}")

# calculate PSNR
psnr = -test_loss  # No need to subscript test_loss
print(f"PSNR: {psnr}")

# plot loss and PSNR over epochs
plt.plot(unet_model.history['loss'], label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

Accuracy and Loss graphs

In [None]:
import matplotlib.pyplot as plt

train_losses = unet_model.history['loss']
val_losses = unet_model.history['val_loss']
train_accs = unet_model.history['accuracy']  
val_accs = unet_model.history['val_accuracy']  

# Loss Curves
def plot_loss_curves(train_losses, val_losses, task_name, save_dir):
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title(f'{task_name} Loss Curves')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.savefig(os.path.join(save_dir, f'{task_name}_loss_curves.png'))
    plt.show()

# # Accuracy Curves
def plot_accuracy_curves(train_accs, val_accs, task_name, save_dir):
    plt.plot(train_accs, label='Training Accuracy')
    plt.plot(val_accs, label='Validation Accuracy')
    plt.title(f'{task_name} Accuracy Curves')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.savefig(os.path.join(save_dir, f'{task_name}_accuracy_curves.png'))
    plt.show()

results_folder = '/jupyter/work/fyp/code/results'

plot_loss_curves(train_losses, val_losses, 'Image Reconstruction', results_folder)
plot_accuracy_curves(train_accs, val_accs, 'Image Reconstruction', results_folder)

Test regularised model.

In [None]:
import os
import random
import cv2
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.models import load_model
from skimage.metrics import peak_signal_noise_ratio
import tensorflow.keras.backend as K

# PSNR loss function
def psnr_loss(y_true, y_pred):
    return -K.mean(10.0 * K.log(K.square(1.0) / (K.square(y_pred - y_true) + K.epsilon())) / K.log(10.0))

# trained model
model = load_model("unet_sinogram_model.h5", custom_objects={'psnr_loss': psnr_loss})

# Define function to load and preprocess a single image
def load_and_preprocess_image(image_path):
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    if image is not None:
        image = cv2.resize(image, (128, 128))  
        image = image.astype('float32') / 255.0 
    return image

input_folder = '/jupyter/work/fyp/data/sinograms/3rd_set/4'
output_folder = '/jupyter/work/fyp/data/sinograms/3rd_set/16'

image_filenames = os.listdir(input_folder)

num_images_to_test = 20  # Adjust this number as needed
random_image_filenames = random.sample(image_filenames, num_images_to_test)

images = []

for random_image_filename in random_image_filenames:
    input_image_path = os.path.join(input_folder, random_image_filename)
    input_image = load_and_preprocess_image(input_image_path)

    # reshape input image for model prediction
    input_image = np.expand_dims(input_image, axis=0)

    # predict the output image using the trained model
    output_image = model.predict(input_image)

    ground_truth_image_path = os.path.join(output_folder, random_image_filename)
    
    try:
        ground_truth_image = load_and_preprocess_image(ground_truth_image_path)

        # PSNR (Peak Signal-to-Noise Ratio)
        psnr_value = peak_signal_noise_ratio(ground_truth_image, output_image[0, :, :, 0])

        # Plot original, predicted, and ground truth images
        plt.figure(figsize=(10, 5))

        # Original input image
        plt.subplot(1, 3, 1)
        plt.imshow(input_image[0], cmap='gray')
        plt.title('Original Input Image')
        plt.axis('off')

        # Predicted output image
        plt.subplot(1, 3, 2)
        plt.imshow(output_image[0, :, :, 0], cmap='gray')
        plt.title('Predicted Output Image')
        plt.axis('off')

        # Ground truth output image
        plt.subplot(1, 3, 3)
        plt.imshow(ground_truth_image, cmap='gray')
        plt.title('Ground Truth Output Image')
        plt.axis('off')

        plt.show()

        # PSNR value
        print(f"PSNR for {random_image_filename}: {psnr_value}")

        images.append(np.concatenate([input_image[0], output_image[0, :, :, 0], ground_truth_image], axis=1))
    except Exception as e:
        print(f"Error loading or processing {ground_truth_image_path}: {e}")
        continue

grid_image = np.concatenate(images, axis=0)

cv2.imwrite("grid_image.png", grid_image)

plt.imshow(grid_image, cmap='gray')
plt.axis('off')
plt.show()