In [4]:

import cv2 as cv
import pandas as pd
import matplotlib.pyplot as plt
import math
import numpy as np
import random
import os

import keras.backend as K
import keras.utils

from keras.callbacks import CSVLogger, EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from keras.layers import Activation, Add, Conv2D, Conv2DTranspose, concatenate, Cropping2D, MaxPooling2D, Reshape,UpSampling2D
from keras.models import Input, Model
from tensorflow.keras.optimizers import SGD, Adam, RMSprop
from keras.regularizers import l2
from keras.metrics import RootMeanSquaredError

import tensorflow as tf

IndentationError: unexpected indent (925392395.py, line 14)

In [None]:
Nkeypoints = 4
W = 224
H = 224

train_images = []
train_keypoints = []
test_images = []
test_keypoints = []

In [None]:
class DataGenerator(tf.keras.utils.Sequence):

    def __init__(self, imgs, kps, batch_size=4, shuffle=True):
        self.imgs = imgs
        self.kps = kps
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.on_epoch_end()

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.imgs))
        if self.shuffle:
            np.random.shuffle(self.indexes)

    def _gaussian(self, xL, yL, H, W, sigma=5):
        channel = [math.exp(-((c - xL) ** 2 + (r - yL) ** 2) / (2 * sigma ** 2)) for r in range(H) for c in range(W)]
        channel = np.array(channel, dtype=np.float32)
        channel = np.reshape(channel, newshape=(H, W))

        return channel

    def _to_heatmap(self, keypoints, sigma=5):
        img_hm = np.zeros(shape=(224, 224, 2), dtype=np.float32)

        for i in range(0, 2):
            x = keypoints[i * 2]
            y = keypoints[1 + 2 * i]
            channel_hm = self._gaussian(x, y, 224, 224, sigma)
            img_hm[:, :, i] = channel_hm
        img_hm = np.reshape(img_hm, newshape=(img_hm.shape[0] * img_hm.shape[1] * 2, 1))

        return img_hm

    def __data_generation(self, idxs):
        x = []
        y = []
        for idx in idxs:
            #print(idxs)
            img = self.imgs[idx]
            keypoints = self.kps[idx]
            img_hm = self._convertToHM(img, keypoints)
            x.append(img)
            y.append(img_hm)
        return np.array(x, dtype=np.float32), np.array(y, dtype=np.float32)

    def __len__(self):
        return len(self.imgs) // self.batch_size

    def __getitem__(self, index):
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
        return self.__data_generation(indexes)

Process keypoints and resize them to correspond to images being 224x224

In [None]:

newX = 224
newY = 224


def getNewValueAfterResize(coordinate, originalSize, newLength=newX):
    return newLength / int(originalSize) * int(coordinate)


def getNewValueAfterResizeAsString(coordinate, originalSize, newLength=newX):
    return str(getNewValueAfterResize(coordinate, originalSize, newLength))


def writeAnnotationFile(original_file, new_file):
    file1 = open(original_file, 'r')
    lines = file1.readlines()
    newlines = [
        'left_acetabular_x,left_acetabular_y,left_femural_x,left_femural_y,right_acetabular_x,right_acetabular_y,right_femural_x,right_femural_y']
    for i in range(0, len(lines), 4):
        la = lines[i].split(",")
        lf = lines[i + 1].split(",")
        rf = lines[i + 2].split(",")
        ra = lines[i + 3].split(",")
        original_image_size_x = int(la[4])
        original_image_size_y = int(la[5])
        new_elem = getNewValueAfterResizeAsString(la[1], original_image_size_x) + ',' + getNewValueAfterResizeAsString(
            la[2], original_image_size_y) + ',' + getNewValueAfterResizeAsString(lf[1],
                                                                                 original_image_size_x) + ',' + getNewValueAfterResizeAsString(
            lf[2], original_image_size_y) + ',' +
                   getNewValueAfterResizeAsString(rf[1], original_image_size_x) + ',' + getNewValueAfterResizeAsString(
            rf[2], original_image_size_y) + ',' + getNewValueAfterResizeAsString(ra[1],
                                                                                 original_image_size_x) + ',' + getNewValueAfterResizeAsString(
            ra[2], original_image_size_y)
        print(new_elem)
        newlines.append(new_elem)
    filetest = open(new_file, "w")
    newText = '\n'.join(newlines)
    filetest.write(newText)

In [None]:
def writeTest():
    writeAnnotationFile('AnnotationsAllImages512/annotations_test.csv', "newAnnotationsTest224Res.csv")


def writeTraining():
    writeAnnotationFile('AnnotationsAllImages512/annotations_training.csv', "newAnnotationsTraining224Res.csv")


def writeInternetTest():
    writeAnnotationFile('InternetImages/labels_internet_test.csv', "newAnnotationsTestInternet224Res.csv")


def writeInternetTraining():
    writeAnnotationFile('InternetImages/labels_internet_training.csv', "newAnnotationsTrainingInternet224Res.csv")

In [None]:
writeTest()
writeTraining()
writeInternetTest()
writeInternetTraining()
training_annotations = pd.read_csv("newAnnotationsTraining224Res.csv")
test_annotations = pd.read_csv("newAnnotationsTest224Res.csv")

Load images from folder

In [None]:
def convertImage(img):
    color = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    resize = cv2.resize(color, dsize=(224, 224), interpolation=cv2.INTER_CUBIC)
    return np.reshape(resize, (224, 224, 1))


In [None]:
def load_images(directory):
    images = []
    for f in os.listdir(directory):
        images.append(convertImage(cv2.imread(os.path.join(directory, f))))
    images = np.array(images) / 255.
    return images

In [None]:
def load_keypoints(keypoint_data):
    keypoint_data_csv = pd.read_csv(keypoint_data)
    keypoint_features = []
    for idx, features in keypoint_data_csv.iterrows():
        keypoint_features.append(features)
    keypoint_features = np.array(keypoint_features, dtype=float)
    return keypoint_features

In [None]:
train_images = np.concatenate((load_images('AnnotationsAllImages512/training'), load_images('InternetImages/train')))
images = np.concatenate((load_images('AnnotationsAllImages512/training'), load_images('InternetImages/train')))
train_keypoints = np.concatenate(
    (load_keypoints('newAnnotationsTraining224Res.csv'), load_keypoints("newAnnotationsTrainingInternet224Res.csv")))
keypoints = np.concatenate(
    (load_keypoints('newAnnotationsTraining224Res.csv'), load_keypoints("newAnnotationsTrainingInternet224Res.csv")))
test_images = np.concatenate((load_images('AnnotationsAllImages512/test'), load_images('InternetImages/test')))
test_keypoints = np.concatenate(
    (load_keypoints('newAnnotationsTest224Res.csv'), load_keypoints('newAnnotationsTestInternet224Res.csv')))

Rotation

In [None]:
def rotate_augmentation(images, keypoints, rotation_angles):
    rotated_images = []
    rotated_keypoints = []
    for angle in rotation_angles:
        for angle in [angle, -angle]:
            M = cv2.getRotationMatrix2D((112, 112), angle, 1.)
            angle_rad = -angle * pi / 180.
            for image in images:
                rotated_image = cv2.warpAffine(image, M, (224, 224), flags=cv2.INTER_CUBIC)
                rotated_images.append(rotated_image)
            for keypoint in keypoints:
                rotated_keypoint = keypoint - 112.
                for idx in range(0, len(rotated_keypoint), 2):
                    rotated_keypoint[idx] = rotated_keypoint[idx] * cos(angle_rad) - rotated_keypoint[idx + 1] * sin(
                        angle_rad)
                    rotated_keypoint[idx + 1] = rotated_keypoint[idx] * sin(angle_rad) + rotated_keypoint[
                        idx + 1] * cos(angle_rad)
                rotated_keypoint += 112.
                rotated_keypoints.append(rotated_keypoint)

    return np.reshape(rotated_images, (-1, 224, 224, 1)), rotated_keypoints


rotated_train_images, rotated_train_keypoints = rotate_augmentation(images, keypoints, aug_config.rotation_angles)
train_images = np.concatenate((train_images, rotated_train_images))
train_keypoints = np.concatenate((train_keypoints, rotated_train_keypoints))

Brightness alteration

In [None]:
def alter_brightness(images, keypoints):
    altered_brightness_images = []
    inc_brightness_images = np.clip(images * 1.2, 0.0, 1.0)
    dec_brightness_images = np.clip(images * 0.6, 0.0, 1.0)
    altered_brightness_images.extend(inc_brightness_images)
    altered_brightness_images.extend(dec_brightness_images)
    return altered_brightness_images, np.concatenate((keypoints, keypoints))


altered_brightness_images, altered_brightness_keypoints = alter_brightness(images, keypoints)
train_images = np.concatenate((train_images, altered_brightness_images))
train_keypoints = np.concatenate((train_keypoints, altered_brightness_keypoints))

Shift

In [None]:
def shift_images(images, keypoints, pixel_shifts):
    shifted_images = []
    shifted_keypoints = []
    for shift in pixel_shifts:
        for (shift_x, shift_y) in [(-shift, -shift), (-shift, shift), (shift, -shift), (shift, shift)]:
            M = np.float32([[1, 0, shift_x], [0, 1, shift_y]])
            for image, keypoint in zip(images, keypoints):
                shifted_image = cv2.warpAffine(image, M, (224, 224), flags=cv2.INTER_CUBIC)
                shifted_keypoint = np.array(
                    [(point + shift_x) if idx % 2 == 0 else (point + shift_y) for idx, point in enumerate(keypoint)])
                if np.all(0.0 < shifted_keypoint) and np.all(shifted_keypoint < 224.0):
                    shifted_images.append(shifted_image.reshape(224, 224, 1))
                    shifted_keypoints.append(shifted_keypoint)
    shifted_keypoints = np.clip(shifted_keypoints, 0.0, 224.0)
    return shifted_images, shifted_keypoints


shifted_train_images, shifted_train_keypoints = shift_images(images, keypoints, aug_config.pixel_shifts)
train_images = np.concatenate((train_images, shifted_train_images))
train_keypoints = np.concatenate((train_keypoints, shifted_train_keypoints))

Random noise

In [None]:
def add_noise(images):
    noisy_images = []
    for image in images:
        noisy_image = cv2.add(image, 0.018 * np.random.randn(224, 224,
                                                             1))  # Adding random normal noise to the input image & clip the resulting noisy image between [-1,1]
        noisy_images.append(noisy_image.reshape(224, 224, 1))
    return noisy_images


noisy_train_images = add_noise(images)
train_images = np.concatenate((train_images, noisy_train_images))
train_keypoints = np.concatenate((train_keypoints, keypoints))

Horizontal flips

In [None]:
def flip_images(images, keypoints):
    flipped_keypoints = []
    flipped_images = []
    for image, keypoint in zip(images, keypoints):
        flipped_keypoint = np.array([(224 - point) if idx % 2 == 0 else point for idx, point in enumerate(keypoint)])
        for idx in range(2):
            flipped_keypoint[idx], flipped_keypoint[idx + 6] = flipped_keypoint[idx + 6], flipped_keypoint[idx]
            flipped_keypoint[idx + 2], flipped_keypoint[idx + 4] = flipped_keypoint[idx + 4], flipped_keypoint[idx + 2]

        flipped_image = cv2.flip(image, 1)
        flipped_keypoints.append(flipped_keypoint)
        flipped_images.append(flipped_image.reshape(224, 224, 1))
    return flipped_images, flipped_keypoints


flipped_images, flipped_keypoints = flip_images(images, keypoints)
train_images = np.concatenate((train_images, flipped_images))
train_keypoints = np.concatenate((train_keypoints, flipped_keypoints))

Generators

In [None]:
train_gen = DataGenerator(train_images,
                          train_keypoints,
                          batch_size=4)

val_gen = DataGenerator(test_images, test_keypoints, batch_size=4)

UNET Model

In [2]:
def UNET(input_shape):
    def downsample_block(x, block_num, n_filters, pooling_on=True):
        x = Conv2D(n_filters, kernel_size=(3, 3), strides=1, padding='same', activation='relu')(x)
        x = Conv2D(n_filters, kernel_size=(3, 3), strides=1, padding='same', activation='relu')(x)
        skip = x

        if pooling_on is True:
            x = MaxPooling2D(pool_size=(2, 2), strides=2, padding='valid')(x)

        return x, skip

    def upsample_block(x, skip, block_num, n_filters):
        x = Conv2DTranspose(n_filters, kernel_size=(2, 2), strides=2, padding='valid', activation='relu')(x)
        x = concatenate([x, skip], axis=-1)
        x = Conv2D(n_filters, kernel_size=(3, 3), strides=1, padding='same', activation='relu')(x)
        x = Conv2D(n_filters, kernel_size=(3, 3), strides=1, padding='same', activation='relu')(x)

        return x

    input = Input(input_shape, name="Input")

    # downsampling
    x, skip1 = downsample_block(input, 1, 64)
    x, skip2 = downsample_block(x, 2, 128)
    x, skip3 = downsample_block(x, 3, 256)
    x, skip4 = downsample_block(x, 4, 512)
    x, _ = downsample_block(x, 5, 1024, pooling_on=False)

    # upsampling
    x = upsample_block(x, skip4, 6, 512)
    x = upsample_block(x, skip3, 7, 256)
    x = upsample_block(x, skip2, 8, 128)
    x = upsample_block(x, skip1, 9, 64)

    output = Conv2D(4, kernel_size=(1, 1), strides=1, padding='valid', activation='linear')(x)
    output = Reshape(target_shape=(224 * 224 * 4, 1))(output)

    model = Model(inputs=input, outputs=output)

    return model


unet = UNET(input_shape=(224, 224, 1))
print(unet.summary())

NameError: name 'Input' is not defined

In [None]:
save_model = ModelCheckpoint(filepath='UnetModel.h5', monitor='val_loss', verbose=1, save_weights_only=True,
                             save_best_only=True)
unet.compile(optimizer='adam', loss='mse', metrics=[RootMeanSquaredError()])
rlp = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=10, min_lr=1e-15, mode='min', verbose=1)
history= unet.fit(x=train_gen, validation_data=val_gen, epochs=40, callbacks=[save_model])

Find coordinates given mask

In [None]:
def find_coordinates_for_mask(mask):
    summ = np.sum(mask)
    positions=np.zeros(224,224)
    for i in range(224):
        for j in range(224):
            positions[i][j]=j
    x_score_map = mask * positions / summ
    y_score_map = mask * np.transpose(positions) / summ
    keypoint_x = np.sum(x_score_map, axis=None)
    keypoint_y = np.sum(y_score_map, axis=None)
    return keypoint_x, keypoint_y

def find_coordinates_for_prediction(masks):
    preds = []
    masks_for_every_keypoint = np.reshape(masks, newshape=(224, 224, 4))
    for k in range(masks_for_every_keypoint.shape[-1]):
        xpred, ypred = find_coordinates_for_mask(masks_for_every_keypoint[:, :, k])
        preds.append(xpred)
        preds.append(ypred)
    return preds

Print keypoints

In [None]:
import itertools
mulBy=2

def save_keypoints_plots(batch_imgs, batch_labels,predictions, folder):
    name="IndividualImg"
    for idx in range(len(batch_imgs)):
        crtLabel=batch_labels[idx]
        crtPred=predictions[idx]
        img = batch_imgs[idx]
        img = np.reshape(img, newshape=(224, 224))
        img = cv2.resize(img, (224*mulBy, 224*mulBy))
        plt.imshow(img, cmap='gray')
        colors = itertools.cycle(["r", "b", "g","m"])

        for i in range(4):
            kpx = crtPred[i*2]*mulBy
            kpy = crtPred[i*2+1]*mulBy
            plt.scatter(kpx, kpy, color=next(colors), marker='x', s=2)

        for i in range(4):
            kpx = crtLabel[i*2]*mulBy
            kpy = crtLabel[i*2+1]*mulBy
            plt.scatter(kpx, kpy, color='c', marker='x', s=2)

        plt.savefig('./'+folder +"/"+ name+str(idx) + ".png", dpi=800)
        plt.close()
        plt.show()

RMS Error on validation set

In [None]:
images,_ = val_gen[0]
predicted_keypoints = unet.predict_on_batch(images)

for idx in range(1,len(val_gen)):
    images,_ = val_gen[idx]
    new_predicted_keypoints=unet.predict_on_batch(images)
    keypoints=np.concatenate((predicted_keypoints, new_predicted_keypoints))

rms_error =RootMeanSquaredError(test_keypoints, predicted_keypoints)