In [1]:
import os
import numpy as np
import cv2
from tensorflow.keras.utils import Sequence
import tensorflow as tf

class DataLoader(Sequence):
    def __init__(self, image_paths, mask_paths, batch_size=32, image_size=(256, 256), shuffle=True):
        self.image_paths = image_paths
        self.mask_paths = mask_paths
        self.batch_size = batch_size
        self.image_size = image_size
        self.shuffle = shuffle
        self.indices = np.arange(len(self.image_paths))
        self.on_epoch_end()

    def __len__(self):
        """Denotes the number of batches per epoch."""
        return int(np.floor(len(self.image_paths) / self.batch_size))

    def __getitem__(self, index):
        """Generate one batch of data."""
        batch_indices = self.indices[index * self.batch_size : (index + 1) * self.batch_size]
        
        batch_images = [self.load_image(self.image_paths[i]) for i in batch_indices]
        batch_masks = [self.load_mask(self.mask_paths[i]) for i in batch_indices]
        
        batch_images = np.array(batch_images) / 255.0  # Normalize image pixel values to [0, 1]
        batch_masks = np.array(batch_masks)  # Masks should be binary
        
        return batch_images, batch_masks

    def load_image(self, path):
        """Load and preprocess an image."""
        image = cv2.imread(path)
        image = cv2.resize(image, self.image_size)  # Resize to (256, 256)
        return image

    def load_mask(self, path):
        """Load and preprocess a mask."""
        mask = cv2.imread(path, cv2.IMREAD_GRAYSCALE)  # Read in grayscale (single channel)
        mask = cv2.resize(mask, self.image_size)  # Resize to (256, 256)
        mask = mask.astype(np.float32)  # Convert mask to float32
        mask = mask / 255.0  # Normalize to [0, 1]
        return mask

    def on_epoch_end(self):
        """Shuffle the data at the end of each epoch."""
        if self.shuffle:
            np.random.shuffle(self.indices)


In [2]:
from tensorflow.keras import layers, models

def build_unet(input_size=(256, 256, 3)):
    inputs = layers.Input(input_size)

    # Encoder (Contracting Path)
    conv1 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    conv1 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(conv1)
    pool1 = layers.MaxPooling2D((2, 2))(conv1)

    conv2 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(pool1)
    conv2 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(conv2)
    pool2 = layers.MaxPooling2D((2, 2))(conv2)

    conv3 = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(pool2)
    conv3 = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(conv3)
    pool3 = layers.MaxPooling2D((2, 2))(conv3)

    conv4 = layers.Conv2D(512, (3, 3), activation='relu', padding='same')(pool3)
    conv4 = layers.Conv2D(512, (3, 3), activation='relu', padding='same')(conv4)
    pool4 = layers.MaxPooling2D((2, 2))(conv4)

    # Bottleneck
    conv5 = layers.Conv2D(1024, (3, 3), activation='relu', padding='same')(pool4)
    conv5 = layers.Conv2D(1024, (3, 3), activation='relu', padding='same')(conv5)

    # Decoder (Expansive Path)
    up6 = layers.Conv2DTranspose(512, (2, 2), strides=(2, 2), padding='same')(conv5)
    concat6 = layers.concatenate([up6, conv4], axis=-1)
    conv6 = layers.Conv2D(512, (3, 3), activation='relu', padding='same')(concat6)
    conv6 = layers.Conv2D(512, (3, 3), activation='relu', padding='same')(conv6)

    up7 = layers.Conv2DTranspose(256, (2, 2), strides=(2, 2), padding='same')(conv6)
    concat7 = layers.concatenate([up7, conv3], axis=-1)
    conv7 = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(concat7)
    conv7 = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(conv7)

    up8 = layers.Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(conv7)
    concat8 = layers.concatenate([up8, conv2], axis=-1)
    conv8 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(concat8)
    conv8 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(conv8)

    up9 = layers.Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv8)
    concat9 = layers.concatenate([up9, conv1], axis=-1)
    conv9 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(concat9)
    conv9 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(conv9)

    # Output layer (single channel binary mask)
    output = layers.Conv2D(1, (1, 1), activation='sigmoid')(conv9)

    # Create the model
    model = models.Model(inputs, output)

    return model


In [3]:

image_folder = '/Users/naomimalange/Desktop/M1/AI Lab/Ongoing projects/DCM/resized_images'
mask_folder = '/Users/naomimalange/Desktop/M1/AI Lab/Ongoing projects/DCM/resized_masks'

# Collect all image and mask paths
image_paths = [os.path.join(image_folder, fname) for fname in os.listdir(image_folder) if fname.endswith('.png')]
mask_paths = [os.path.join(mask_folder, fname) for fname in os.listdir(mask_folder) if fname.endswith('.png')]


image_paths.sort()
mask_paths.sort()

batch_size = 16
image_size = (256, 256)  

data_loader = DataLoader(image_paths, mask_paths, batch_size=batch_size, image_size=image_size)

unet_model = build_unet(input_size=(256, 256, 3))


unet_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Train the model
unet_model.fit(data_loader, epochs=10)


Epoch 1/10


  self._warn_if_super_not_called()


[1m187/187[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6287s[0m 34s/step - accuracy: 0.9092 - loss: 0.2706
Epoch 2/10
[1m 10/187[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m2:01:47[0m 41s/step - accuracy: 0.9667 - loss: 0.0780