Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions tests/segmentation_utils_tests.py/image_preprocessor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_image_onehot_encoder_squarematrix() -> None:
onehot_test[:, ::2, :,1] = 1
onehot_test[:, 1::2,:, 0] = 1

one_hot_image = ImagePreprocessor.onehot_encode(mask, output_size, n_classes)
one_hot_image = ImagePreprocessor.onehot_encode(mask, n_classes)

assert one_hot_image.shape == (
1,
Expand Down Expand Up @@ -80,8 +80,6 @@ def test_image_augmentation_pipeline_squarematrix() -> None:
image_new, mask_new = ImagePreprocessor.augmentation_pipeline(
image,
mask,
input_size,
output_size,
image_queue=image_queue,
mask_queue=mask_queue,
)
Expand Down
63 changes: 32 additions & 31 deletions utilities/segmentation_utils/ImagePreprocessor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
from dataclasses import dataclass
from typing import Callable, Optional, Protocol
from typing import Callable, Protocol

import numpy as np
import tensorflow as tf


class IPreprocessor(Protocol):
"""
Interface of the preprocessing queue class
Parameters
----------
:queue list: list of functions to be applied
"""

queue: list[Callable]

def update_seed(self, seed: int) -> None:
Expand All @@ -16,6 +23,19 @@ def get_queue_length(self) -> int:


class PreFunction:
"""
Class that wraps a function and its arguments to be used in a preprocessing queue
enables function to be defined with their parameters prior to being called.

To call the function, simply call the PreFunction object with a tf.Tensor as an argument

Parameters
----------
:function Callable: function to be wrapped
:args list: list of arguments to be passed to the function
:kwargs dict: dictionary of keyword arguments to be passed to the function
"""

def __init__(self, function: Callable, *args, **kwargs) -> None:
self.function = function
self.args = args
Expand All @@ -25,6 +45,13 @@ def __call__(self, image: tf.Tensor) -> tf.Tensor:
return self.function(image, *self.args, **self.kwargs)

def set_seed(self, seed: int) -> None:
"""
Changes the seed of the function

Parameters
----------
:seed int: seed to be changed to
"""
self.kwargs["seed"] = seed


Expand Down Expand Up @@ -127,7 +154,7 @@ def generate_default_queue(seed=0) -> tuple[PreprocessingQueue, PreprocessingQue
return image_queue, mask_queue


def onehot_encode(masks, output_size, num_classes) -> tf.Tensor:
def onehot_encode(masks, num_classes) -> tf.Tensor:
"""
Function that one-hot encodes masks

Expand All @@ -140,27 +167,19 @@ def onehot_encode(masks, output_size, num_classes) -> tf.Tensor:
:return tf.Tensor: Batch of one-hot encoded masks
"""
#!TODO: add support for 1D masks
encoded = np.zeros((masks.shape[0], output_size[0], output_size[1], num_classes))
encoded = np.zeros((masks.shape[0], masks.shape[1], masks.shape[2], num_classes))
for i in range(num_classes):
mask = (masks == i).astype(float)
encoded[:, :, :, i] = mask
if output_size[1] == 1:
encoded = encoded.reshape(
(masks.shape[0], output_size[0] * output_size[1], num_classes)
)
encoded = tf.convert_to_tensor(encoded)
return encoded


def augmentation_pipeline(
image,
mask,
input_size: tuple[int, int],
output_size: tuple[int, int],
image_queue: PreprocessingQueue,
mask_queue: PreprocessingQueue,
output_reshape: Optional[tuple[int, int]] = None,
channels: int = 3,
seed: int = 0,
) -> tuple[tf.Tensor, tf.Tensor]:
"""
Expand All @@ -172,37 +191,22 @@ def augmentation_pipeline(
----------
:tf.Tensor image: The image to be processed
:tf.Tensor mask: The mask to be processed
:tuple(int, int) input_size: Input size of the image
:tuple(int, int) output_size: Output size of the image


Keyword Arguments
-----------------
:tuple(int, int), optional output_reshape: In case the image is a column vector, \
this is the shape it should be reshaped to. Defaults to None.

:PreprocessingQueue, optional mask_queue image_queue: \
Augmentation processing queue for images, defaults to None

:PreprocessingQueue, optional mask_queue: Augmentation processing queue \
for masks, defaults to None

:int, optional channels: Number of bands in the image, defaults to 3 \
:int, optional seed: The seed to be used in the pipeline, defaults to 0

Raises
------
:raises ValueError: If only one of the queues is passed

Returns
-------
:return tuple(tf.Tensor, tf.Tensor): tuple of the processed image and mask
"""

# reshapes masks, such that transforamtions work properly
if output_reshape is not None and output_size[1] == 1:
mask = tf.reshape(mask, (output_reshape[0], output_reshape[1]))

mask = tf.expand_dims(mask, axis=-1)

image_queue.update_seed(seed)
Expand All @@ -212,12 +216,9 @@ def augmentation_pipeline(
image = fun_im(image)
mask = fun_mask(mask)

# flattens masks out to the correct output shape
if output_size[1] == 1:
mask = flatten(mask, output_size, channels=1)
else:
mask = tf.squeeze(mask, axis=-1)


mask = tf.squeeze(mask, axis=-1) # removes the last dimension
mask = tf.convert_to_tensor(mask)
# image = tf.convert_to_tensor(tf.clip_by_value(image, 0, 1))

Expand Down
76 changes: 36 additions & 40 deletions utilities/segmentation_utils/flowreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from utilities.segmentation_utils import ImagePreprocessor
from utilities.segmentation_utils.constants import ImageOrdering
from utilities.segmentation_utils.ImagePreprocessor import IPreprocessor


class FlowGenerator:
Expand Down Expand Up @@ -101,8 +102,8 @@ def get_dataset_size(self) -> int:

def set_preprocessing_pipeline(
self,
preprocessing_queue_image: ImagePreprocessor.IPreprocessor,
preprocessing_queue_mask: ImagePreprocessor.IPreprocessor,
preprocessing_queue_image: IPreprocessor,
preprocessing_queue_mask: IPreprocessor,
) -> None:
"""
Sets the preprocessing pipeline
Expand Down Expand Up @@ -200,9 +201,6 @@ def preprocess(self, generator_zip):
i_image, i_mask = ImagePreprocessor.augmentation_pipeline(
image=i_image,
mask=i_mask,
input_size=self.image_size,
output_size=self.output_size,
output_reshape=self.output_reshape,
seed=image_seed,
#!both preprocessing queues are assigned by this time
image_queue=self.preprocessing_queue_image, # type: ignore
Expand Down Expand Up @@ -273,8 +271,8 @@ def __init__(
preprocessing_enabled: bool = True,
seed: int = 909,
preprocessing_seed: Optional[int] = None,
preprocessing_queue_image: ImagePreprocessor.IPreprocessor = ImagePreprocessor.generate_image_queue(),
preprocessing_queue_mask: ImagePreprocessor.IPreprocessor = ImagePreprocessor.generate_mask_queue(),
preprocessing_queue_image: IPreprocessor = ImagePreprocessor.generate_image_queue(),
preprocessing_queue_mask: IPreprocessor = ImagePreprocessor.generate_mask_queue(),
read_weights: bool = False,
weights_path: Optional[str] = None,
shuffle_counter: int = 0,
Expand Down Expand Up @@ -307,9 +305,10 @@ def __init__(
self.shuffle_counter = shuffle_counter
self.image_ordering = image_ordering


self.image_filenames = np.array(sorted(os.listdir(self.image_path)))
self.mask_filenames = np.array(sorted(os.listdir(self.mask_path)))

# should be moved out as a strategy
if self.read_weights:
weights_df = pd.read_csv(self.weights_path, header=None)
weights_np = weights_df.to_numpy()
Expand All @@ -327,6 +326,7 @@ def __init__(
self.linked_data = [self.image_filenames, self.mask_filenames]
if self.read_weights:
self.linked_data.append(self.weights)

self.__shuffle_filenames()
self.dataset_size = self.__len__()

Expand All @@ -344,8 +344,10 @@ def __init__(
# such no need to define it otherwise
dimension = math.sqrt(self.output_size[0])
self.output_reshape = (int(dimension), int(dimension))
self.column_vector = True
else:
self.output_reshape = self.output_size
self.column_vector = False

print("Reading images from: ", self.image_path)

Expand Down Expand Up @@ -405,42 +407,31 @@ def __read_batch(self, start: int, end: int) -> None:
self.n_channels,
)
)
if self.output_size[1] == 1:
column = True
batch_masks = np.zeros(
(n, self.mini_batch, self.output_size[0], self.num_classes)
)
else:
column = False
batch_masks = np.zeros(
(
n,
self.mini_batch,
self.output_size[0],
self.output_size[1],
self.num_classes,
)

batch_masks = np.zeros(
(
n,
self.mini_batch,
self.output_reshape[0],
self.output_reshape[1],
self.num_classes,
)
)

# preprocess and assign images and masks to the batch
for i in range(n):
if column:
raw_masks = np.zeros(
(self.mini_batch, self.output_size[0] * self.output_size[1], 1)
)
else:
raw_masks = np.zeros(
(self.mini_batch, self.output_size[0], self.output_size[1])
)
raw_masks = np.zeros(
(self.mini_batch, self.output_reshape[0], self.output_reshape[1])
)

for j in range(self.mini_batch):
image_index = i * self.mini_batch + j

image = Image.open(
os.path.join(self.image_path, batch_image_filenames[image_index])
).resize(self.image_size, Image.ANTIALIAS)

image = np.array(image)
image = image / 255

mask = Image.open(
os.path.join(self.mask_path, batch_mask_filenames[image_index])
Expand All @@ -462,24 +453,19 @@ def __read_batch(self, start: int, end: int) -> None:
) = ImagePreprocessor.augmentation_pipeline(
image,
mask=mask,
input_size=self.image_size,
output_size=self.output_size,
output_reshape=self.output_reshape,
seed=image_seed,
#!both preprocessing queues are assigned by this time
image_queue=self.preprocessing_queue_image, # type: ignore
mask_queue=self.preprocessing_queue_mask, # type: ignore
)
if column:
mask = np.reshape(mask, self.output_size)

batch_images[i, j, :, :, :] = image
# NOTE: this provides the flexibility required to process both
# column and matrix vectors
raw_masks[j, ...] = mask
raw_masks[j, :, :] = mask

batch_masks[i, ...] = ImagePreprocessor.onehot_encode(
raw_masks, self.output_size, self.num_classes
batch_masks[i, :, :, :] = ImagePreprocessor.onehot_encode(
raw_masks, self.num_classes
)

# chaches the batch
Expand Down Expand Up @@ -509,6 +495,16 @@ def __getitem__(self, index) -> tuple[np.ndarray, np.ndarray]:

batch_images = self.image_batch_store[store_index, ...] # type: ignore
batch_masks = self.mask_batch_store[store_index, ...] # type: ignore
if self.column_vector:
batch_masks = np.reshape(
batch_masks,
(
self.mini_batch,
batch_masks.shape[1] * batch_masks[2],
self.num_classes,
),
)

if self.image_ordering == ImageOrdering.CHANNEL_FIRST:
batch_images = np.moveaxis(batch_images, -1, 1)
batch_masks = np.moveaxis(batch_masks, -1, 1)
Expand Down