In [None]:
# Import dependencies
import os
import numpy as np
import cv2
import tensorflow as tf
import matplotlib.pyplot as plt

from tensorflow.keras import Model
from tensorflow.keras.layers import Dropout, BatchNormalization, Conv2D, MaxPooling2D, Conv2DTranspose, \
    concatenate, Input, Add, Concatenate, GlobalAveragePooling2D, Activation, GaussianNoise, Input, Softmax


In [None]:
# Settings
classNames = ["normal", "sick"]
datapathTrain = '../data_plants/plants_binary/train/'
datapathTest =  '../data_plants/plants_binary/test/'

# Input settings
imageWidth = 240
imageHeight = 240
imageChannels = 3

# Training parameter
learningrate = 1e-4
nepoches = 5
batchSize = 10
intermediateResults = 600
patience = 3

# Lets build a data pipeline generator

The data pipeline generator is a helper class which builds tf.data.set objects tailor made for your model needs. Clearly, Keras offeres these functionality already built in, but its good to know how things work under the hood. Especially when you start working with non-standard models.

The generated tf.data.set object prepares the raw data for the neural network. It also carries out image augmentation etc..

In [None]:
class DatapipeGenerator:
    def __init__(self, datapath: str, classNames:list=[]):

        # Save datapath and classnames
        self.datapath = datapath
        self.classNames = classNames

        # Find all image files in datapath
        self.filenames = []
        for root, dirs, files in os.walk(datapath):
            for file in files:
                if file.endswith(".png") or file.endswith(".jpg") or file.endswith(".jpeg"):
                    name = str(os.path.join(root, file))
                    self.filenames.append(name)

        self.iw, self.ih, self.ic = None, None, None

    # ============================
    @property
    def cdict(self):
        return {k:v for v,k in enumerate(sorted(self.classNames))}


    # ============================
    def create(
        self, iw: int, ih: int, ic:int, batchSize: int,
        shuffle_buffer_size:int=10000,
        augmentations:list=['fliph', 'flipv', 'color', 'crop', 'noise'],
        nrepeat:int=1
    ):

        """Creates the datapipe"""

        self.iw, self.ih, self.ic = iw, ih, ic

        # Let's build the pipeline
        dataset = tf.data.Dataset.from_tensor_slices(self.filenames)

        dataset = dataset.shuffle(buffer_size=shuffle_buffer_size)
        dataset = dataset.repeat(nrepeat)

        # Load the image
        dataset = dataset.map(self._processLoadImage)

        # Add Gaussian nOise to image
        dataset = dataset.map(self._processAddNoise)

        # Augment the image
        if 'noise' in augmentations:
            dataset = dataset.map(self._processAddNoise)
        if 'fliph' in augmentations:
            dataset = dataset.map(self._processAugmentFlip)
        if 'flipv' in augmentations:
            dataset = dataset.map(self._processAugmentFlipVertically)
        if 'color' in augmentations:
            dataset = dataset.map(self._processAugmentColor)
        if 'crop' in  augmentations:
            dataset = dataset.map(self._processAugmentCrop)

        # Apply batching
        dataset = dataset.batch(batchSize)

        # Prefetching
        dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)

        return dataset

    
    # ============================
    def _lookUpClass(self, imgpath):
        # Split the string and take the second last element as class name
        imgClass = tf.strings.split(imgpath, '/')[-2]
        imgClass = imgClass.numpy().decode("utf-8")
        idx = self.cdict[imgClass]
        return idx

    # ============================
    def _processLoadImage(self, imgpath):

        img = tf.io.read_file(imgpath)
        img = tf.image.decode_jpeg(img, channels=self.ic)
        img = tf.image.convert_image_dtype(img, tf.float32)
        img = tf.image.resize(img, (self.ih, self.iw))

        idx = tf.py_function(self._lookUpClass, [imgpath], tf.int32)
        label = tf.one_hot(idx, len(self.classNames))
        return img, label, imgpath


    # ============================
    def _processAddNoise(self, img, label, imgpath, mean=0.0, stddev=0.1):

        def addnoise(img):
            weight = tf.random.uniform(shape=[], minval=0., maxval=1., dtype=tf.float32)
            gnoise = tf.random.normal(shape=tf.shape(img), mean=mean, stddev=stddev, dtype=tf.float32)
            return tf.add(img, gnoise*weight)

        def nonoise(img):
            return img

        choice = tf.random.uniform(shape=[], minval=0., maxval=1., dtype=tf.float32)
        img = tf.cond(
            choice < 0.5,
            lambda: addnoise(img),
            lambda: nonoise(img)
        )

        return img, label, imgpath

    # ============================
    def _processAugmentColor(self, img, label, imgpath,
                             rand_hue=0.01, rand_saturation=[0.8,1.2],
                             rand_brightness=0.01, rand_contrast=[0.8,1.1], **kwargs):

        if self.ic == 3:
            img = tf.image.random_hue(img, rand_hue)
            img = tf.image.random_saturation(img, rand_saturation[0], rand_saturation[1])
            
        img = tf.image.random_brightness(img, rand_brightness)
        img = tf.image.random_contrast(img, rand_contrast[0], rand_contrast[1])
        
        return img, label, imgpath


   # ============================
    def _processAugmentCrop(self, img, label, imgpath, rand_scales=[0.7, 1.0, 0.01], **kwargs):

        # Generate 20 crop settings, ranging from a 1% to 20% crop.
        def cropimage(img, width, height):
            scales = np.arange(rand_scales[0], rand_scales[1], rand_scales[2])
            cropboxes = np.zeros((len(scales), 4))
            for i, scale in enumerate(scales):
                cx1 = cy1 = 0.5 - (0.5 * scale)
                cx2 = cy2 = 0.5 + (0.5 * scale)
                cropboxes[i] = [cx1, cy1, cx2, cy2]

            cropboxes = tf.convert_to_tensor(cropboxes, dtype=tf.float32)

            # Create different crops for an image
            crops = tf.image.crop_and_resize(
                [img],
                boxes=cropboxes,
                box_indices=np.zeros(cropboxes.shape[0]),
                crop_size=(height, width)
            )

            # Return a random crop
            idx = tf.random.uniform(shape=[], minval=0, maxval=cropboxes.shape[0], dtype=tf.int32)
            return crops[idx,:,:,:]

        def nocrop(img):
            return img

        # =======================
        choice = tf.random.uniform(shape=[], minval=0., maxval=1., dtype=tf.float32)
        img = tf.cond(
            choice < 0.5,
            lambda: nocrop(img),
            lambda: cropimage(img, width=self.iw, height=self.ih)
        )

        return img, label, imgpath


    # ============================
    def _processAugmentFlip(self, img, label, imgpath):

        # Flip
        def flip(img):
            img = tf.image.flip_left_right(img)
            return img

        def noflip(img):
            return img

        choice = tf.random.uniform(shape=[], minval=0., maxval=1., dtype=tf.float32)
        img = tf.cond(choice < 0.5,
            lambda: noflip(img),
            lambda: flip(img)
        )

        return img, label, imgpath

    # ============================
    def _processAugmentFlipVertically(self, img, label, imgpath):

        # Flip
        def flip(img):
            img = tf.image.flip_up_down(img)
            return img

        def noflip(img):
            return img

        choice = tf.random.uniform(shape=[], minval=0., maxval=1., dtype=tf.float32)
        img = tf.cond(choice < 0.5,
            lambda: noflip(img),
            lambda: flip(img)
        )

        return img, label, imgpath


In [None]:
# Initialize both Generators

datapipeGenTrain = DatapipeGenerator(datapath=datapathTrain, classNames=classNames)
datapipeGenTest  = DatapipeGenerator(datapath=datapathTest, classNames=classNames)


dpTrain = datapipeGenTrain.create(
    iw=imageWidth,
    ih=imageHeight,
    ic=imageChannels,
    batchSize=batchSize,
    augmentations=['fliph', 'flipv', 'color', 'crop', 'noise']
)

dpTest = datapipeGenTest.create(
    iw=imageWidth,
    ih=imageHeight,
    ic=imageChannels,
    batchSize=batchSize,
    augmentations=[] # No augmentations on the test data set!
)

In [None]:
# Let's show how to use and test our pipeline

for it, (imgs, labels, paths) in enumerate(dpTrain):
    
    fig, axs = plt.subplots(1,4, figsize=(15,15))
    for b in range(4):
        axs[b].imshow(imgs[b,...].numpy())
        
    plt.show()
    
    break # Break or wait until the end of days...

# Let's build a wrapper for the binary classifier

Wrap the keras model and all methods required for training in one class

In [None]:
class ClassifierWrapper:
    def __init__(self, nc, iw, ih, ic=3, learnRate=0.001):
        self.nc = nc
        self.iw = iw
        self.ih = ih
        self.ic = ic

        self.model = None

        # Build the model
        self._buildModel()
        
        # Pick an optimizer
        self.optimizer = tf.keras.optimizers.Adam(learnRate)

    def _buildModel(self):
        """Build the modelWithoutPosthead"""
        
        # Feature extractor
        inputs = Input((self.ih, self.iw, self.ic))
        
        c1 = Conv2D(64, (3, 3), activation='relu', padding='same', name="Conv1", strides=(2, 2))(inputs)
        p1 = MaxPooling2D(pool_size=(3, 3), strides=(2, 2), name="pool1")(c1)

        f2s1 = Conv2D(16, (1, 1), activation='relu', padding='same', name="Fire2s1")(p1)
        f2e1 = Conv2D(64, (1, 1), activation='relu', padding='same', name="Fire2e1")(f2s1)
        f2e3 = Conv2D(64, (3, 3), activation='relu', padding='same', name="Fire2e3")(f2s1)
        f2 = Concatenate(name="Fire2cat")([f2e1, f2e3])

        f3s1 = Conv2D(16, (1, 1), activation='relu', padding='same', name="Fire3s1")(f2)
        f3e1 = Conv2D(64, (1, 1), activation='relu', padding='same', name="Fire3e1")(f3s1)
        f3e3 = Conv2D(64, (3, 3), activation='relu', padding='same', name="Fire3e3")(f3s1)
        f3 = Concatenate(name="Fire3cat")([f3e1, f3e3])

        
        # Classification head
        x = Dropout(0.5, name='drop9')(f3)
        heatmap = Conv2D(self.nc, (1, 1), padding='valid', name='heatmap', activation='linear')(x)
        logits = GlobalAveragePooling2D()(heatmap)

        # Model with final heatmap classification
        self.model = Model(inputs=[inputs], outputs=logits)


    def getHeatmapDetector(self):
        """Double headed module for anomaly detection"""
        heatmap = self.model.layers[-2]
        logits = self.model.layers[-1]
        return Model(inputs=[self.model.input], outputs=[heatmap.output, logits.output])


    def annoteTraining(self, imgs, ytrue):
        """For training only"""

        # Calculate Heatmap
        model = self.getHeatmapDetector()
        hm, ypred = model.predict(imgs)

        # Normalize heatmap
        probs = tf.expand_dims(tf.expand_dims(tf.one_hot(tf.argmax(ypred, axis=-1), self.nc),1),1)
        anomalies = tf.image.resize(hm, (self.ih, self.iw))
        anomalies = tf.multiply(anomalies, probs)
        anomalies = tf.reduce_sum(anomalies, axis=-1)
        amin = tf.expand_dims(tf.expand_dims(tf.reduce_min(anomalies, axis=[1,2]),1), -1)
        amax = tf.expand_dims(tf.expand_dims(tf.reduce_max(anomalies, axis=[1,2]),1), -1)
        anomalies = (anomalies - amin) / (amax - amin)
        anomalies = tf.constant((1.0,0.0,0.0))*tf.tile(tf.expand_dims(anomalies,-1), [1,1,1,3])


        if not isinstance(imgs, np.ndarray):
            imgs = imgs.numpy()

        x0, y0 = int(0.1*imgs.shape[2]), 20

        for n in range(imgs.shape[0]):
            
            labelTrue = f"{tf.argmax(ytrue[n, ...]).numpy()}"
            labelPred = f"{tf.argmax(ypred[n, ...]).numpy()}"

            imgs[n,...] = cv2.addWeighted(imgs[n,...], 1.0, anomalies[n,...].numpy(), 1.0, 0.0)

            imgs[n,...] = cv2.putText(imgs[n,...], f"True: {labelTrue}", (x0, y0), cv2.FONT_HERSHEY_SIMPLEX,
                              fontScale=0.5, color=(0, 255, 0), thickness=1)

            imgs[n,...] = cv2.putText(imgs[n,...], f"Pred: {labelPred}", (x0, y0+20), cv2.FONT_HERSHEY_SIMPLEX,
                              fontScale=0.5, color=(255, 0, 0), thickness=1)


        return imgs


    @tf.function
    def loss(self, ytrue, logits):
        return tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(ytrue, logits))
  
    @tf.function
    def trainStep(self, imgs, ytrue):
        
        # Calculate gradients of loss wrt to imgs
        with tf.GradientTape() as t:
            ypred = self.model(imgs)
            loss = self.loss(ytrue, ypred)

        # Change weightsPretrainedImageNet
        grads = t.gradient(loss, self.model.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))

        return ypred, loss

    @tf.function
    def testStep(self, imgs, ytrue):
        ypred = self.model(imgs)
        loss = self.loss(ytrue, ypred)
        return ypred, loss

In [None]:
# Instantiate our Model Wrap
classyWrap = ClassifierWrapper(
    nc = len(classNames),
    iw = imageWidth,
    ih = imageHeight,
    ic = imageChannels,
    learnRate = learningrate
)

# print model summary
print(classyWrap.model.summary())


# Let's train our model



In [None]:
# Trainingsloop starts here

lossTrain, lossTest = [[],[]], [[],[]]
bestTestLoss, earlyStoppingCtr = 1e+4, 0

# Load weights if exist
if os.path.isfile("./weightsBest.h5"):
    classyWrap.model.load_weights("./weightsBest.h5")

for e in range(nepoches):
    print(f"Starting epoche {e}")


    # =====================
    # Run Training data set
    print("Start Traing")
    
    # Loop through dataset
    for it, (imgs, labels, paths) in enumerate(dpTrain):
        
        # Run a train step
        pred, loss = classyWrap.trainStep(imgs=imgs, ytrue=labels)
        
        # Log results
        lossTrain[0].append(e * len(dpTrain) + it)
        lossTrain[1].append(loss.numpy())
    
        # Write out intermediate results
        if (e * len(dpTrain) + it) % intermediateResults == 0:
                
            print(f"Train - Epoche: {e}, Iteration: {it}/{len(dpTrain)}, TrainLoss: {loss.numpy():.7f}")
    
    
    # =====================
    # Run test data set:
    print("Start Testing")
    
    lossMean = 0
    
    # Loop through dataset
    for it, (imgs, labels, paths) in enumerate(dpTest):
            
        # Run a test step
        pred, loss = classyWrap.testStep(imgs=imgs, ytrue=labels)
        lossMean += loss.numpy()/len(dpTest)

    lossTest[0].append((e+1) * len(dpTrain))
    lossTest[1].append(lossMean)
    
    
    # Finally plot some images
    imgsAnno = classyWrap.annoteTraining(imgs, ytrue=labels)
    imgs = imgs.numpy()

    fig, axs = plt.subplots(2,4, figsize=(15,7))
    for b in range(min([4,imgs.shape[0]])):
        axs[0,b].imshow(imgs[b,...])
        axs[1,b].imshow(imgsAnno[b,...])

    plt.show()
    
    # For early stopping
    if lossMean < bestTestLoss:
        bestTestLoss = lossMean
        earlyStoppingCtr = 0
        classyWrap.model.save_weights("./weightsBest.h5")
        
    else:
        earlyStoppingCtr += 1
    
    print(f"Test  - Epoche: {e}, TestLoss: {lossMean:.7f}, BestLoss: {bestTestLoss:.7f}, EarlyStoppingCtr {earlyStoppingCtr}/{patience}")
    
    
    # =====================
    # Early Stopping
    if earlyStoppingCtr >= patience:
        print("Maximum patience reached. Stopping training")
        break
        
    # =====================
    # Plot losses
    fig, axs = plt.subplots(1, figsize=(15,7))
    axs.plot(lossTrain[0],lossTrain[1],'b-')
    axs.plot(lossTest[0],lossTest[1],'r-')
    plt.show()

# Postprocessing

In [None]:
modelWithHeatMap = classyWrap.getHeatmapDetector()
modelWithHeatMap.load_weights("./weightsBest.h5")

print("Model outputs: ")
print(modelWithHeatMap.outputs)


# Loop through dataset
for it, (imgs, labels, paths) in enumerate(dpTest):

    
    heatmap, ypred = modelWithHeatMap.predict(imgs)
    
    # Normalize heatmap
    probs = tf.expand_dims(tf.expand_dims(tf.one_hot(tf.argmax(ypred, axis=-1), len(classNames)),1),1)
    heatmap = tf.image.resize(heatmap, (imageHeight, imageWidth))
    heatmap = tf.multiply(heatmap, probs)
    heatmap = tf.reduce_sum(heatmap, axis=-1)
    amin = tf.expand_dims(tf.expand_dims(tf.reduce_min(heatmap, axis=[1,2]),1), -1)
    amax = tf.expand_dims(tf.expand_dims(tf.reduce_max(heatmap, axis=[1,2]),1), -1)
    heatmap = (heatmap - amin) / (amax - amin)
    heatmap = tf.constant((1.0,0.0,0.0))*tf.tile(tf.expand_dims(heatmap,-1), [1,1,1,3])


    # Annotate it
    if not isinstance(imgs, np.ndarray):
        imgs = imgs.numpy()

    for n in range(imgs.shape[0]):
        if tf.argmax(labels[n,:]).numpy() == 0:
            continue
        
        mask = (255*(heatmap[n,:,:,0].numpy()>0.5)).astype(np.uint8)
        contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

        contours_poly = [None]*len(contours)
        boundRect = [None]*len(contours)
        centers = [None]*len(contours)
        radius = [None]*len(contours)
        for i, c in enumerate(contours):
            contours_poly[i] = cv2.approxPolyDP(c, 3, True)
            boundRect[i] = cv2.boundingRect(contours_poly[i])
            centers[i], radius[i] = cv2.minEnclosingCircle(contours_poly[i])

        for i in range(len(contours)):
            color = (255, 0, 0)
            cv2.drawContours(imgs[n,...], contours_poly, i, color)
            cv2.rectangle(imgs[n,...], (int(boundRect[i][0]), int(boundRect[i][1])), \
              (int(boundRect[i][0]+boundRect[i][2]), int(boundRect[i][1]+boundRect[i][3])), color, 2)


    # Plot it
    fig, axs = plt.subplots(1,4, figsize=(15,7))
    for b in range(4):
        
        axs[b].imshow(imgs[b,...])
    plt.show()
