#### Загружаем все необходимые библиотеки

In [None]:
import os
import json
import cv2
from PIL import Image
import numpy as np
from skimage.io import imread, imshow, imsave
from enum import Enum
from keras.utils.all_utils import to_categorical
from sklearn.model_selection import train_test_split
from keras.layers import Input, Conv2D, MaxPooling2D, concatenate, Conv2DTranspose, Dropout
from keras.models import Model
from keras import backend as K
import matplotlib.pyplot as plt
from keras.optimizers import adam_v2
from keras.applications.resnet import ResNet50
from skimage.color import gray2rgb
from keras.callbacks import ModelCheckpoint
import sys
import bbox_visualizer as bbv
import segmentation_models as sm
from keras.optimizers import adam_v2
import keras
import wandb

#### Вспомогательные функции

In [None]:
def load_images_and_patchify(directory_path, patch_size, add_reflected = False):
    """
    :param patch_size: image patchify square size
    :param directory_path: path to root directory containing training and test images
    :return: list of images from directory
    """

    # initialize empty list for images
    instances = []

    # iterate through files in directory
    for root, dirs, files in os.walk(directory_path):
        for file in files:
            filepath = os.path.join(root,file)
            extension = filepath.split(".")[-1]
            if extension == "jpg" or extension == "png" or extension == "bmp":
    
                # current image path
                img_path = filepath
    
                # Reads image as BGR
                image = cv2.imread(img_path)
    
                # convert image to RBG
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
                size_x = (image.shape[1] // patch_size) * patch_size  # get width to nearest size divisible by patch size
                size_y = (image.shape[0] // patch_size) * patch_size  # get height to nearest size divisible by patch size
    
                image = Image.fromarray(image)
    
                # Crop original image to size divisible by patch size from top left corner
                image = np.array(image.crop((0, 0, size_x, size_y)))
    
                # Extract patches from each image, step=patch_size means no overlap
                patch_img = patchify(image, (patch_size, patch_size, 3), step=patch_size)
    
                # iterate over vertical patch axis
                for j in range(patch_img.shape[0]):
                    # iterate over horizontal patch axis
                    for k in range(patch_img.shape[1]):
                        # patches are located like a grid. use (j, k) indices to extract single patched image
                        single_patch_img = patch_img[j, k]
    
                        # Drop extra extra dimension from patchify
                        instances.append(np.squeeze(single_patch_img))
                        if add_reflected == True:
                            instances.append(cv2.flip(np.squeeze(single_patch_img), 1))
    
    return instances

In [None]:
def load_images(directory_path, size, add_reflected = False):
    """
    :param size: 
    :param directory_path: path to root directory containing training and test images
    :return: list of images from directory
    """

    # initialize empty list for images
    instances = []

    # iterate through files in directory
    for root, dirs, files in os.walk(directory_path):
        for file in files:
            filepath = os.path.join(root,file)
            extension = filepath.split(".")[-1]
            if extension == "jpg" or extension == "png" or extension == "bmp":
    
                # current image path
                img_path = filepath
    
                # Reads image as BGR
                image = cv2.imread(img_path)
    
                # convert image to RBG
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
                image = cv2.resize(image, dsize = [size, size])
    
                instances.append(image)
        
                if add_reflected == True:
                    instances.append(cv2.flip(image, 1))
    
    return instances

In [None]:
def one_hot_encode_masks(masks, num_classes):
    """
    :param masks: Y_train patched mask dataset 
    :param num_classes: number of classes
    :return: 
    """
    # initialise list for integer encoded masks
    integer_encoded_labels = []

    # iterate over each mask
    for mask in masks:

        # get image shape
        _img_height, _img_width, _img_channels = mask.shape

        # create new mask of zeros
        encoded_image = np.zeros((_img_height, _img_width, 1)).astype(int)

        for j, cls in enumerate(MaskColorMap):
            encoded_image[np.all(mask == cls.value, axis=-1)] = j

        # append encoded image
        integer_encoded_labels.append(encoded_image)

    # return one-hot encoded labels
    return to_categorical(y=integer_encoded_labels, num_classes=num_classes)

In [None]:
def dice_index(y_true, y_pred, smooth=1):
    y_true_f = K.flatten(y_true[:,:,:,1])
    y_pred_f = K.flatten(y_pred[:,:,:,1])
    intersection = K.sum(y_true_f * y_pred_f)
    union = K.sum(y_true_f) + K.sum(y_pred_f)
    dice = (2. * intersection + smooth)/(union + smooth)
    return dice

In [None]:
def jaccard_index(y_true, y_pred, smooth=1):
    y_true_f = K.flatten(y_true[:,:,:,1])
    y_pred_f = K.flatten(y_pred[:,:,:,1])
    intersection = K.sum(y_true_f * y_pred_f)
    union = K.sum(y_true_f) + K.sum(y_pred_f)
    jaccard = (intersection + smooth) / (union - intersection + smooth)
    return jaccard

In [None]:
def jaccard_loss(y_true, y_pred):
    loss = 1 - jaccard_index(y_true, y_pred)
    return loss

In [None]:
def get_training_data(root_directory, size, patching = False, add_reflected = False):
    # initialise lists
    image_dataset, mask_dataset = [], []

    # define image patch size
    #patch_size = 160
    # walk through root directory
    for path, directories, files in os.walk(root_directory):
        for subdirectory in directories:

            # extract training input images and patchify
            if subdirectory == "images":
                if patching == True:
                    image_dataset.extend(
                        load_images_and_patchify(os.path.join(path, subdirectory), patch_size=patch_size, add_reflected = add_reflected))
                else:
                    image_dataset.extend(
                        load_images(os.path.join(path, subdirectory), size=size, add_reflected = add_reflected))

            # extract training label masks and patchify
            elif subdirectory == "masks":
                if patching == True:
                    mask_dataset.extend(
                        load_images_and_patchify(os.path.join(path, subdirectory), patch_size=patch_size, add_reflected = add_reflected))
                else:
                    mask_dataset.extend(
                        load_images(os.path.join(path, subdirectory), size=patch_size, add_reflected = add_reflected))
                    
    
    # return input images and masks
    return np.array(image_dataset), np.array(mask_dataset)

In [None]:
def display_images(instances, rows=2, titles=None):
    """
    :param instances:  list of images
    :param rows: number of rows in subplot
    :param titles: subplot titles
    :return:
    """
    n = len(instances)
    cols = n // rows if (n / rows) % rows == 0 else (n // rows) + 1

    # iterate through images and display subplots
    for j, image in enumerate(instances):
        plt.subplot(rows, cols, j + 1)
        plt.title('') if titles is None else plt.title(titles[j])
        plt.axis("off")
        plt.imshow(image)

    # show the figure
    plt.show()

In [None]:
def rgb_encode_mask(mask):
    # initialize rgb image with equal spatial resolution
    rgb_encode_image = np.zeros((mask.shape[0], mask.shape[1], 3))

    # iterate over MaskColorMap
    for j, cls in enumerate(MaskColorMap):
        # convert single integer channel to RGB channels
        rgb_encode_image[(mask == j)] = np.array(cls.value) / 255.
    return rgb_encode_image

#### Сохраняем маски дверей из json файлов в формате bbox для YOLO

In [None]:
# json to bboxes
json_dir = 'C:/ML/Floor/train_dataset_train/object_detection'
im_dir = 'C:/ML/Floor/train_dataset_train/train_window/images'
out_dir_labels = 'C:/ML/Floor/train_dataset_train/Detection/dataset/labels'
out_dir_images = 'C:/ML/Floor/train_dataset_train/Detection/dataset/images'

for path, directories, files in os.walk(json_dir):
    for file in files:
        with open("C:/ML/Floor/train_dataset_train/object_detection/" + file, "r") as read_file:
            data = json.load(read_file)
        image_name = data['imagePath']
        try:
            img = imread(im_dir + '/' + image_name)
        except:
            image_name = file.split('.')[0] + '.png'
            img = imread(im_dir + '/' + image_name)
        imsave(out_dir_images + '/' + image_name, img)
        imageHeight = data['imageHeight']
        imageWidth = data['imageWidth']
        s = img.shape
        f = open(out_dir_labels + '/' + image_name.split('.')[0] + '.txt', 'w')
        for i in range(len(data['shapes'])):
            obj = data['shapes'][i]
            if obj['label'] == 'door':
                if obj['shape_type'] == 'polygon':
                    points = np.int32([obj['points']])
                    pt1 = [min(points[0][:,0]), min(points[0][:,1])]
                    pt2 = [max(points[0][:,0]), max(points[0][:,1])]
                    pt1 = np.array(pt1)
                    pt2 = np.array(pt2)
                elif obj['shape_type'] == 'rectangle':
                    if obj['points'] != []:
                        pt1 = np.around(obj['points'][0]).astype(int)
                        pt2 = np.around(obj['points'][1]).astype(int)
                max_size = max(imageWidth, imageHeight)
                if imageHeight == imageWidth:
                    pass
                elif imageHeight < imageWidth:
                    d = round((imageWidth - imageHeight)/2)
#                    bbox = [pt1[0], pt1[1]+d, pt2[0], pt2[1]+d]
                    pt1[1] = pt1[1]+d
                    pt2[1] = pt2[1]+d
                else:
                    d = round((imageHeight - imageWidth)/2)
#                    bbox = [pt1[0]+d, pt1[1], pt2[0]+d, pt2[1]]
                    pt1[0] = pt1[0]+d
                    pt2[0] = pt2[0]+d
                
                x = (pt1[0] + (pt2[0] - pt1[0])/2) / max_size
                y = (pt1[1] + (pt2[1] - pt1[1])/2) / max_size
                w = (pt2[0] - pt1[0]) / max_size
                h = (pt2[1] - pt1[1]) / max_size
                out_line = '0 ' + str(x) + ' ' + str(y) + ' ' + str(w) + ' ' + str(h)
                f.write(out_line + '\n')
        f.close()
#        break
#    break

#### Сегментируем стены

In [None]:
n_classes = 2
patch_size = 512

class MaskColorMap(Enum):
    background = (0, 0, 0)
    wall = (255, 255, 255)
    
data_dir = r"C:/ML/Floor/train_dataset_train/train_wall"

X, Y = get_training_data(data_dir, patch_size, patching=False, add_reflected=False)

m, img_height, img_width, img_channels = X.shape

# display images from both training and test sets
display_count = 6
random_index = [np.random.randint(0, m) for _ in range(display_count)]
sample_images = [x for z in zip(list(X[random_index]), list(Y[random_index])) for x in z]
display_images(sample_images, rows=2)

In [None]:
Y = one_hot_encode_masks(Y, num_classes=n_classes)

In [None]:
X_train_wall, X_test_wall, Y_train_wall, Y_test_wall = train_test_split(X, Y, test_size=0.25, random_state=42)

In [None]:
del X

In [None]:
del Y

In [None]:
sm.set_framework('tf.keras')
sm.framework()

BEST_DIR = 'C:/ML/Floor/Results/Attempt_11/wall_segmentation.h5'
BACKBONE = 'resnext50'
MONITOR = 'val_jaccard_index'
BATCH_SIZE = 4
LR = 1e-4
EPOCHS = 20
N_CLASSES = 2
ACTIVATION = 'softmax'
SIZE = 512
LOSS = jaccard_loss
METRICS = jaccard_index

model_wall = sm.Unet(BACKBONE, input_shape=(SIZE, SIZE, 3), classes=N_CLASSES, activation=ACTIVATION, encoder_freeze=True, encoder_weights='imagenet')
model_wall.compile(adam_v2.Adam(learning_rate=LR), LOSS, METRICS)

callbacks = keras.callbacks.ModelCheckpoint(BEST_DIR, monitor=MONITOR, save_weights_only=True, save_best_only=True, mode='max')

history = model_wall.fit(X_train_wall, Y_train_wall, epochs=EPOCHS, batch_size=BATCH_SIZE, validation_data=(X_test_wall, Y_test_wall), verbose=1, callbacks=callbacks)

In [None]:
# Сохраняем предсказанные маски стен
for path, directories, files in os.walk('C:/ML/Floor/test_dataset_test'):
    for file in files:
        filename = file.split(".")[0]
        filepath = path + '/' + file
        img = imread(filepath)
        if len(img.shape) == 2:
            img = gray2rgb(img)
            h, w, c = img.shape
        if img.shape[2] == 4:
            img = img[:,:,:3]
            h, w, c = img.shape
        if img.shape[2] == 3:
            h, w, c = img.shape
        img = cv2.resize(img, dsize = [patch_size, patch_size], interpolation=cv2.INTER_NEAREST)
        
        img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
        
        img_input = np.expand_dims(img, 0)
        
        prediction_wall = np.squeeze(model_wall.predict(img_input))
        predicted_wall_img = np.argmax(prediction_wall, axis=-1)
        predicted_wall_img = cv2.resize(predicted_wall_img, dsize = [w, h], interpolation=cv2.INTER_NEAREST)
        
        imsave('C:/ML/Floor/Results/Attempt_11/walls/' + file, (predicted_wall_img*255).astype(np.uint8))
#        break
#    break

#### Сегментируем окна

In [None]:
sm.set_framework('tf.keras')
sm.framework()

BEST_DIR = 'C:/ML/Floor/Results/Attempt_11/window_segmentation.h5'
BACKBONE = 'resnext50'
BATCH_SIZE = 4
LR = 1e-4
EPOCHS = 20
N_CLASSES = 2
ACTIVATION = 'softmax'
SIZE = 512
MONITOR = 'val_jaccard_index'
LOSS = jaccard_loss
METRICS = jaccard_index

model_window = sm.Unet(BACKBONE, input_shape=(SIZE, SIZE, 3), classes=N_CLASSES, activation=ACTIVATION, encoder_freeze=True, encoder_weights='imagenet')
model_window.compile(adam_v2.Adam(learning_rate=LR), LOSS, METRICS)

callbacks = keras.callbacks.ModelCheckpoint(BEST_DIR, monitor=MONITOR, save_weights_only=True, save_best_only=True, mode='max')

history = model_window.fit(X_train_window, Y_train_window, epochs=EPOCHS, batch_size=BATCH_SIZE, validation_data=(X_test_window, Y_test_window), verbose=1, callbacks=callbacks)

In [None]:
# Сохраняем предсказанные маски окон
for path, directories, files in os.walk('C:/ML/Floor/test_dataset_test'):
    for file in files:
        filename = file.split(".")[0]
        filepath = path + '/' + file
        img = imread(filepath)
        if len(img.shape) == 2:
            img = gray2rgb(img)
            h, w, c = img.shape
        if img.shape[2] == 4:
            img = img[:,:,:3]
            h, w, c = img.shape
        if img.shape[2] == 3:
            h, w, c = img.shape
        img = cv2.resize(img, dsize = [patch_size, patch_size], interpolation=cv2.INTER_NEAREST)
        
        img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
        
        img_input = np.expand_dims(img, 0)
        
        prediction_window = np.squeeze(model_best.predict(img_input))
        predicted_window_img = np.argmax(prediction_window, axis=-1)
        predicted_window_img = cv2.resize(predicted_window_img, dsize = [w, h], interpolation=cv2.INTER_NEAREST)
        
        imsave('C:/ML/Floor/Results/Attempt_11/windows/' + file, (predicted_window_img*255).astype(np.uint8))
#        break
#    break

#### Выполним детекцию дверей

In [None]:
# Переходим в рабочую папку модели YOLO v5
%cd C:/ML/Object_detection/Yolo_5/yolov5

In [None]:
wandb.init()

In [None]:
# Обучаем предобученную модель YOLO v5
!python train.py --img 512 --batch 2 --epochs 300 --data floor.yaml --weights yolov5l.pt --cache

#### Объединяем результаты сегментации и детекции

In [None]:
for path, directories, files in os.walk('C:/ML/Floor/Results/Attempt_11/walls'):
    for file in files:
        filename = file.split(".")[0]
        filepath = path + '/' + file
        wall = imread(filepath)
        door = imread('C:/ML/Floor/Results/Attempt_11/doors/' + file)
        window = imread('C:/ML/Floor/Results/Attempt_11/windows/' + file)
        
        s = wall.shape
        empty = np.zeros((s[0], s[1])).astype('uint8')
        empty[wall==255] = 1
        empty[window==255] = 2
        empty[door==255] = 3
        
        imsave('C:/ML/Floor/Results/Attempt_11/images/' + file, empty.astype(np.uint8))
#        break
#    break