In [None]:
from libtiff import TIFF
from keras.utils import Sequence
import cv2
import numpy as np
import os 
import natsort
import random
import imgaug.augmenters as iaa
import imgaug as ia
from tqdm import tqdm
from PIL import Image
import configparser


<h2> Data Generator for multi-label batch generation </h2>




In [None]:


class MultiLabelGenerator(Sequence):
    """MultiLabelGenerator for semantic segmentation architectures"""
    
    def __init__(self,list_IDs,labels, image_path, mask_path,
                 to_fit=True, batch_size=2, dim=(512, 512),
                 n_channels=3, n_classes=5, augment_flag = False ,shuffle=True,tiff_flag=False):
        
        """Initialization
        :param list_IDs: list of all 'label' ids to use in the generator
        :param labels: list of image labels (file names)
        :param image_path: path to images location
        :param mask_path: path to masks location
        :param to_fit: True to return X and y, False to return X only
        :param batch_size: batch size at each iteration
        :param dim: tuple indicating image dimension
        :param n_channels: number of image channels
        :param n_classes: number of output masks
        :param shuffle: True to shuffle label indexes after every epoch
        """
        self.list_IDs = list_IDs
        self.labels = labels
        self.image_path = image_path
        self.mask_path = mask_path
        self.to_fit = to_fit
        self.batch_size = batch_size
        self.dim = dim
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.tiff_flag = tiff_flag
        self.augment_flag = augment_flag
        if augment_flag:
            self.augment_dict ={"1":iaa.OneOf([iaa.Fliplr(1)]),
                       "2":iaa.OneOf([iaa.Affine(scale={"x": (0.6, 1.2)},order=[0])]),
                       "3":iaa.OneOf([iaa.Affine(rotate=(-20, 20),order=[0])]),
                       "4":iaa.OneOf([iaa.PerspectiveTransform(scale=(0.01, 0.2))]),
                       "5":iaa.OneOf([iaa.CropAndPad(percent=(-0.30, 0.30))]),
                       "6":iaa.OneOf([iaa.ElasticTransformation(alpha=(0.5, 2), sigma=0.25)]),
                       "7":iaa.OneOf([iaa.AdditiveGaussianNoise(loc=0, scale=(0.0, 0.05*255), per_channel=0.5)]), # add gaussian noise to images
                       "8":iaa.OneOf([iaa.AddToHueAndSaturation((-10, 10)]),
                       "9":iaa.OneOf([iaa.GaussianBlur((0, 1.0)]),
                       "10":iaa.OneOf([iaa.Sharpen(alpha=(0, 1.0), lightness=(0.75, 1.5))]),
                       "11":iaa.OneOf([iaa.LinearContrast((0.5, 1.0), per_channel=0.5)]),
                      }
        
        
        self.on_epoch_end()

    def __len__(self):
        """Denotes the number of batches per epoch
        :return: number of batches per epoch
        """
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        """Generate one batch of data
        :param index: index of the batch
        :return: X and y when fitting. X only when predicting
        """
        # Generate indexes of the batch
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]

        # Find list of IDs
        list_IDs_temp = [self.list_IDs[k] for k in indexes]

        # Generate data
        X = self._generate_X(list_IDs_temp)

        if self.to_fit:
            y = self._generate_y(list_IDs_temp)
            if self.augment_flag:
                X,y = self.augment_data(X,y)
            return X, y        
        
        else:
            return X
        
    def augment_data(self,X,y):
        """Augments a batch of data by randomly chosing one 
        of the augmentors defined in augment_dict.Returns
        the transformed images and segmentation maps"""
        
        random_choice = random.randint(1,len(self.augment_dict))
        X_aug, y_aug = self.augment_dict[str(random_choice)](images=X, segmentation_maps=y)
        
        return X_aug,y_aug

        

    def on_epoch_end(self):
        """Updates indexes after each epoch"""
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def _generate_X(self, list_IDs_temp):
        """Generates data containing batch_size images
        :param list_IDs_temp: list of label ids to load
        :return: batch of images
        """
        # Initialization
        X = np.empty((self.batch_size, *self.dim, self.n_channels),dtype=np.uint8)

        # Generate data
        for i, ID in enumerate(list_IDs_temp):
            # Store sample
            X[i,] = self._load_image(self.image_path + "/" + self.labels[ID] + ".jpg")

        return X

    def _generate_y(self, list_IDs_temp):
        """Generates data containing batch_size masks
        :param list_IDs_temp: list of label ids to load
        :return: batch if masks
        """
        y = np.empty((self.batch_size, *self.dim,5), dtype=np.int8)

        # Generate data
        if self.tiff_flag:
            for i, ID in enumerate(list_IDs_temp):
            # Store sample
                y[i,] = self._load_tif(self.mask_path + "/"+self.labels[ID]+ ".tif")

            return y
        else:
            for i, ID in enumerate(list_IDs_temp):
            # Store sample
                y[i,] = self._load_npy(self.mask_path + "/"+self.labels[ID]+ ".npy")

            return y

        
    def _load_image(self, image_path):
        """Loads image
        :param image_path: path to image to load
        :return: image 
        """
        img = cv2.imread(image_path)
        img = cv2.resize(img,self.dim,interpolation = cv2.INTER_NEAREST)
        return img
    
    def _load_tif(self, mask_path):
        
        """Load TIF masks after applying
        preprocessing to convert the masks
        :param mask_path: path to mask to load
        :return: mask image """
        
        mask_tif = TIFF.open(mask_path, mode='r')
        mask_annot = mask_tif.iter_images()
        temp_list = []
        
            
        for mask_images in mask_annot:
                
            mask_images[mask_images<=125]=0
            mask_images[mask_images>125]=1
            temp_list.append(cv2.resize(mask_images,self.dim,interpolation = cv2.INTER_NEAREST))

        mask = cv2.merge((temp_list[0],temp_list[1],temp_list[2],temp_list[3],temp_list[4]))        
        
        return mask
    
    def _load_npy(self, mask_path_npy):
        
        """Load TIF masks after applying
        preprocessing to convert the masks
        :param mask_path: path to mask to load
        :return: mask image """
        temp_list = []
        mask = np.load(mask_path_npy)
        for i in range(self.n_classes):
            temp_list.append(cv2.resize(mask[:,:,i],self.dim,interpolation = cv2.INTER_NEAREST))
            
        mask_image = cv2.merge((temp_list[0],temp_list[1],temp_list[2],temp_list[3],temp_list[4]))        

        return mask_image

<h2> Preparing list of files for segmentation </h2>

In [None]:
def prepare_file_names(image_dir,masks_dir):
        
        
    image_file_paths_train = []
    mask_file_paths_train = []
    
    train_files = natsort.natsorted(os.listdir(image_dir))
    image_file_paths_train += [os.path.splitext(x)[0] for x in train_files]

        
    mask_files = natsort.natsorted(os.listdir(masks_dir))
    
    mask_file_paths_train += [os.path.splitext(x)[0] for x in mask_files]
        
    return np.asarray(image_file_paths_train),np.asarray(mask_file_paths_train)

In [None]:
config = configparser.ConfigParser()
config.read('segmentation_training.ini')

TRAIN_IMAGES_PATH = config['image_folders']['train_images_path']
TRAIN_MASKS_PATH = config['image_folders']['train_masks_path']
VAL_IMAGES_PATH = config['image_folders']['val_images_path']
VAL_MASKS_PATH = config['image_folders']['val_masks_path']


In [None]:
X_train,y_train = prepare_file_names(TRAIN_IMAGES_PATH,TRAIN_MASKS_PATH)

In [None]:
X_val,y_val = prepare_file_names(VAL_IMAGES_PATH,VAL_MASKS_PATH)