In [1]:
# Подключаем модули
import urllib.request
import shutil

from math import ceil
from io import BytesIO
from base64 import b64decode
from math import sqrt
from os import path, listdir, makedirs
from json import load
from numpy import zeros, array, int32, uint8
# noinspection PyProtectedMember,PyUnresolvedReferences
from cv2 import fillPoly, rectangle, circle, imwrite, imread, IMWRITE_PNG_COMPRESSION, resize, cvtColor, COLOR_BGR2RGB, COLOR_BGR2GRAY, COLOR_GRAY2BGR, countNonZero
from tqdm import tqdm
from PIL import Image
from pathlib import Path

In [2]:
# Все папки проекта и внешний сайт, на котором хранятся все нужные данные для решения
root = "/notebooks"

baseURL = "https://0v.ru/plan/"

previewTrainMasks = path.join(root, "preview_train_masks")
splittedTrain = path.join(root, "splitted_train_512")

testPath = path.join(root, "test")
splittedTest  = path.join(root, "splitted_test_512")

jsonDataPath = path.join(root, "object_detection")
objectsZipFilename = "object_detection.zip"
objectsZipFilenameAndPath = path.join(root, objectsZipFilename)

testZipFilename = "test.zip"
testZipFilenameAndPath = path.join(root, testZipFilename)

In [3]:
# Настройки деления изображений на кусочки
HEIGHT = 512
WIDTH  = 512

CHANNELS = 3

In [4]:
# Настройки классов. Синонимы, минимальный процент площади, чтобы маска считалась значимой, список отобранных корректных файлов класса.
labels = \
{
    "wall":   {"classNumber": 1, "synonyms": ["wall_common_empty", "wall_bearing_empty"],   "minPercentForMask": 0.05, "goodListFile": "good_walls.json"},
    "window": {"classNumber": 2, "synonyms": ["window_common", "windon", "window_balcony"], "minPercentForMask": 0.01, "goodListFile": "good_windows_doors.json"},
    "door":   {"classNumber": 3, "synonyms": ["door_common", "door_balcony"],               "minPercentForMask": 0.01, "goodListFile": "good_windows_doors.json"},
}

countClasses = len(labels)

In [5]:
# Если папок для формирования разделения на кусочки и предпросмотра масок нет, то создаём их
if not path.exists(previewTrainMasks):
    makedirs(previewTrainMasks)

if not path.exists(splittedTrain):
    makedirs(splittedTrain)
    
if not path.exists(splittedTest):
    makedirs(splittedTest)

In [6]:
# Подготавливаем список классов. Формируем словари, загружаем с сайта список правильных изображений.
synonyms = {}
for label, data in labels.items():
    fullNameGoodListFile = path.join(root, data["goodListFile"])
    if not path.exists(fullNameGoodListFile):
        urllib.request.urlretrieve(baseURL + data["goodListFile"], fullNameGoodListFile)

    with open(fullNameGoodListFile) as file:
        labels[label]["goodList"] = load(file)

    for synonym in data["synonyms"]:
        synonyms[synonym] = label

    if not path.exists(path.join(splittedTrain, label)):
        makedirs(path.join(splittedTrain, label))

In [7]:
# Загружаем все JSON-файлы для тренировки
if not path.exists(objectsZipFilenameAndPath):
    urllib.request.urlretrieve(baseURL + objectsZipFilename, objectsZipFilenameAndPath)

if not path.exists(jsonDataPath):
    makedirs(jsonDataPath)

    shutil.unpack_archive(objectsZipFilenameAndPath, jsonDataPath)

In [8]:
# Функция разбиения изображения и её маски на кусочки. Проверятся также наличие на маске площади, большей, чем указанный процент
def splitMaskAndImage(image, mask, filename):
    imageHeight, imageWidth = image.shape[:2]

    # Идём по высоте и ширине и разбиваем картинку на части
    # Если высота или ширина не делится на 256, то дополняем размер
    for indexRow in range(int(ceil(imageHeight / HEIGHT))):
        for indexColumn in range(int(ceil(imageWidth / WIDTH))):
            startX = indexColumn * WIDTH
            startY = indexRow * HEIGHT

            croppedMask = zeros((HEIGHT, WIDTH, countClasses), dtype = uint8)

            destinationHeight, destinationWidth, sourceBottom, sourceRight = getBounds(imageHeight, imageWidth, startX, startY)

            croppedMask[0:destinationHeight, 0:destinationWidth, :] = mask[startY:sourceBottom, startX:sourceRight, :]

            croppedImage = None

            for indexLabel, (labelName, labelData) in enumerate(labels.items()):
                if filename + ".json" in labelData["goodList"]:
                    channelCropped = croppedMask[:, :, indexLabel]
                    if countNonZero(channelCropped) / channelCropped.size >= labelData["minPercentForMask"]:
                        if croppedImage is None:
                            croppedImage = zeros((HEIGHT, WIDTH, CHANNELS), dtype = uint8)
                            croppedImage[0:destinationHeight, 0:destinationWidth, :] = image[startY:sourceBottom, startX:sourceRight, :]

                        # Получаем имя кусочка
                        croppedName = path.join(splittedTrain, labelName, filename + "_" + str(startX) + "_" + str(startY))

                        # Сохраняем маску и картинку
                        imwrite(croppedName + ".png", channelCropped, [IMWRITE_PNG_COMPRESSION, 9])
                        imwrite(croppedName + "_preview.png", croppedImage, [IMWRITE_PNG_COMPRESSION, 9])

In [9]:
# Функция разбиения изображения на кусочки (для файлов test)
def splitImage(image, filename):
    image = cvtColor(image, COLOR_BGR2GRAY)
    image = cvtColor(image, COLOR_GRAY2BGR)

    imageHeight, imageWidth = image.shape[:2]

    # Идём по высоте и ширине и разбиваем картинку на части
    # Если высота или ширина не делится на 256, то дополняем размер
    for indexRow in range(int(ceil(imageHeight / HEIGHT))):
        for indexColumn in range(int(ceil(imageWidth / WIDTH))):
            startX = indexColumn * WIDTH
            startY = indexRow * HEIGHT

            cropped = zeros((HEIGHT, WIDTH, CHANNELS), dtype=uint8)

            destinationHeight, destinationWidth, sourceBottom, sourceRight = getBounds(imageHeight, imageWidth, startX, startY)
            part = image[startY:sourceBottom, startX:sourceRight, :]
            part[part == 0] = 1
            cropped[0:destinationHeight, 0:destinationWidth, :] = part

            # Получаем имя кусочка
            croppedName = path.join(splittedTest, filename + "_" + str(startX) + "_" + str(startY) + ".png")

            # Сохраняем
            imwrite(croppedName, cropped, [IMWRITE_PNG_COMPRESSION, 9])

In [10]:
# Определение границ кусочка, с учётом выхода за пределы границ большого изображения
def getBounds(imageHeight, imageWidth, startX, startY):
    if startX + WIDTH > imageWidth and startY + HEIGHT > imageHeight:
        destinationHeight = imageHeight - startY
        destinationWidth = imageWidth - startX
        sourceBottom = imageHeight
        sourceRight = imageWidth

    elif startX + WIDTH > imageWidth:
        destinationHeight = HEIGHT
        destinationWidth = imageWidth - startX
        sourceBottom = startY + HEIGHT
        sourceRight = imageWidth

    elif startY + HEIGHT > imageHeight:
        destinationHeight = imageHeight - startY
        destinationWidth = WIDTH
        sourceBottom = imageHeight
        sourceRight = startX + WIDTH

    # если полностью помещается
    else:
        destinationHeight = HEIGHT
        destinationWidth = WIDTH
        sourceBottom = startY + HEIGHT
        sourceRight = startX + WIDTH

    return destinationHeight, destinationWidth, sourceBottom, sourceRight

In [11]:
# Формирование и подготовка масок. Выкидываются ломаные линии, линии и точки. Также устраняется перекрытие классов
def prepareMask(filename, fullName):
    with open(fullName) as file:
        data = load(file)

    # noinspection PyTypeChecker
    image = array(Image.open(BytesIO(b64decode(data["imageData"]))))

    if len(image.shape) == 3:
        height, width, _ = image.shape
        image = cvtColor(image, COLOR_BGR2GRAY)
    else:
        height, width = image.shape

    image = cvtColor(image, COLOR_GRAY2BGR)

    maskWidth = data["imageWidth"]
    maskHeight = data["imageHeight"]

    if height != maskHeight or width != maskWidth:
        return None, None

    maskPreview = zeros((maskHeight, maskWidth, CHANNELS), dtype = uint8)


    def setMask(mask, classNumber, drawFunctionWhite, drawFunctionBlack):
        mask[:, :, classNumber - 1] = drawFunctionWhite(array(mask[:, :, classNumber - 1]))
        if classNumber != 1:
            for index in range(countClasses):
                if index != classNumber - 1:
                    mask[:, :, index] = drawFunctionBlack(array(mask[:, :, index]))


    for shape in data["shapes"]:
        label = shape["label"]

        if label in synonyms:
            label = synonyms[label]

        if label in labels:
            labelData = labels[label]
            classNumber = labelData["classNumber"]

            if len(shape["points"]) > 0:
                points = array(shape["points"], dtype=int32)
                typeShape = shape["shape_type"]
                if typeShape == "polygon":
                    setMask(maskPreview, classNumber, lambda mask: fillPoly(mask, [points], 255), lambda mask: fillPoly(mask, [points], 0))
                elif typeShape == "rectangle":
                    setMask(maskPreview, classNumber, lambda mask: rectangle(mask, points[0], points[1], 255, -1), lambda mask: rectangle(mask, points[0], points[1], 0, -1))
                elif typeShape == "circle":
                    (cx, cy), (px, py) = points[0], points[1]
                    center = ((cx + px) // 2, (cy + py) // 2)
                    diameter = round(sqrt((cx - px) ** 2 + (cy - py) ** 2))
                    setMask(maskPreview, classNumber, lambda mask: circle(mask, center, diameter, 255, -1), lambda mask: circle(mask, center, diameter, 0, -1))
                elif typeShape == "line" or typeShape == "linestrip" or typeShape == "point":
                    # Такие типы бессмысленны для сегментации, так как имеют нулевую площадь. Маски с ними заведомо неверные
                    return
                else:
                    print("Unknown shape type:", shape["shape_type"], filename)

    splitMaskAndImage(image, maskPreview, filename.replace(".json", ""))

    maskPreview = image * 0.5 + maskPreview * 0.9
    imwrite(path.join(previewTrainMasks, filename + ".png"), maskPreview, [IMWRITE_PNG_COMPRESSION, 9])

In [12]:
# Если в папках для предпросмотра или разбивки файлов train что-то есть, то очищаем эти файлы
_ = [file.unlink() for file in Path(previewTrainMasks).glob("*") if file.is_file()]
_ = [file.unlink() for file in Path(splittedTrain).glob("*") if file.is_file()]

In [13]:
# Получаем список всех файлов train
listTrainFiles = [filename for filename in listdir(jsonDataPath) if filename.endswith("json") and not filename.startswith(".")]

In [14]:
# Подготавливаем маски train
for filename in tqdm(listTrainFiles):
    prepareMask(filename, path.join(root, jsonDataPath, filename))

100%|██████████| 3500/3500 [1:47:40<00:00,  1.85s/it]  


In [15]:
# Скачиваем файлы test. Если для них нет папки, то создаём
if not path.exists(testZipFilename):
    urllib.request.urlretrieve(baseURL + testZipFilename, testZipFilenameAndPath)

if not path.exists(testPath):
    makedirs(testPath)

    shutil.unpack_archive(testZipFilenameAndPath, testPath)

In [16]:
# Если в папке кусочков test есть файлы, то удаляем их
_ = [file.unlink() for file in Path(splittedTest).glob("*") if file.is_file()]

In [17]:
# Формируем список файлов test
listTestFiles = [filename for filename in listdir(testPath) if filename.endswith(".png")]

In [18]:
Для всех файлов test выполняем разбиение
for filename in tqdm(listTestFiles):
    splitImage(imread(path.join(testPath, filename)), filename)

100%|██████████| 1500/1500 [07:08<00:00,  3.50it/s]
