# Разметка

Подготовка директорий

In [None]:
import os
import xml.etree.ElementTree as ET
import shutil
import cv2
import numpy as np
import json
import matplotlib.colors

def make_directory(path):
  if not os.path.exists(path):
    os.mkdir(path)

make_directory("images")
make_directory("annotations")
make_directory("annotated_images")
make_directory("masks")

Вспомогательные функции

In [None]:
def get_labels(path_to_xml):
    tree = ET.parse(path_to_xml)
    root = tree.getroot()
    label_tags = root.findall(".//label")
    labels = []
    for label_tag in label_tags:
      labels.append(label_tag.find("name").text)
    return labels

def get_colors(path_to_xml):
    tree = ET.parse(path_to_xml)
    root = tree.getroot()
    label_tags = root.findall(".//label")
    colors = []
    for label_tag in label_tags:
        colors.append(label_tag.find("color").text)
    return colors

def get_image_tag_attributes(image_tag):
    image_attributes = image_tag.attrib
    image_attributes["id"] = int(image_attributes["id"])
    image_attributes["height"] = int(image_attributes["height"])
    image_attributes["width"] = int(image_attributes["width"])
    return image_attributes

def get_polygon_tag_attributes(polygon_tag):
    polygon_attributes = polygon_tag.attrib
    polygon_attributes["occluded"] = int(polygon_attributes["occluded"])
    polygon_attributes["z_order"] = int(polygon_attributes["z_order"])
    polygon_attributes["points"] = string_points_to_array(polygon_attributes["points"])
    return polygon_attributes

def get_ellipse_tag_attributes(ellipse_tag):
    ellipse_attributes = ellipse_tag.attrib
    ellipse_attributes["cx"] = int(float(ellipse_attributes["cx"]))
    ellipse_attributes["cy"] = int(float(ellipse_attributes["cy"]))
    ellipse_attributes["rx"] = int(float(ellipse_attributes["rx"]))
    ellipse_attributes["ry"] = int(float(ellipse_attributes["ry"]))
    return ellipse_attributes

def get_mask_paths(path_to_masks_dir, labels):
    mask_paths = []
    for label in labels:
      mask_paths.append({"label": label,
                        "path" : os.path.join(path_to_masks_dir, f"mask_{label}")})
    return mask_paths

def string_points_to_array(string_points):
    points = []
    for point in string_points.split(";"):
      point = point.split(",")
      point[0] = int(float(point[0]))
      point[1] = int(float(point[1]))
      points.append(point)
    return points

def hex_to_rgb(hex_color):
    rgb_color = matplotlib.colors.hex2color(hex_color)
    new_rgb_color = []
    for color in rgb_color:
        color = int((float)(color) * 255)
        new_rgb_color.append(color)
    new_rgb_color[0], new_rgb_color[2] = new_rgb_color[2], new_rgb_color[0]
    return new_rgb_color

Разметка


In [None]:
def draw_annotation(path_to_xml, path_to_images_dir, path_to_annotated_images_dir):
    if not (os.path.exists(path_to_xml) and
            os.path.exists(path_to_images_dir) and
            os.path.exists(path_to_annotated_images_dir)):
        print("Error! There is no such file/directory")
        return

    images = os.listdir(path_to_images_dir)
    for image in images:
        shutil.copy(os.path.join(path_to_images_dir, image),
                    os.path.join(path_to_annotated_images_dir, image))

    tree = ET.parse(path_to_xml)
    root = tree.getroot()
    image_tags = root.findall("image")
    labels = get_labels(path_to_xml)
    colors = get_colors(path_to_xml)
    thickness = 1

    for image_tag in image_tags:
        image_name = image_tag.attrib.get("name")
        image_path = os.path.join(path_to_annotated_images_dir, image_name)
        image_as_array = cv2.imread(image_path)

        polygon_tags = image_tag.findall("polygon")
        for polygon_tag in polygon_tags:
            polygon_tag_attributes = get_polygon_tag_attributes(polygon_tag)
            points = polygon_tag_attributes["points"]
            label = polygon_tag_attributes["label"]
            hex_color = colors[labels.index(label)]
            rgb_color = hex_to_rgb(hex_color)
            cv2.drawContours(image_as_array, [np.array(points)], 0, rgb_color, thickness)

        ellipse_tags = image_tag.findall("ellipse")
        for ellipse_tag in ellipse_tags:
            ellipse_tag_attributes = get_ellipse_tag_attributes(ellipse_tag)
            cx = ellipse_tag_attributes["cx"]
            cy = ellipse_tag_attributes["cy"]
            rx = ellipse_tag_attributes["rx"]
            ry = ellipse_tag_attributes["ry"]
            label = ellipse_tag_attributes["label"]
            hex_color = colors[labels.index(label)]
            rgb_color = hex_to_rgb(hex_color)
            cv2.ellipse(image_as_array, (cx, cy), (rx, ry), 0, 0, 360, rgb_color, thickness)

        if image_as_array is not None:
            cv2.imwrite(image_path, image_as_array)


Маски

In [None]:
def draw_masks(path_to_xml, path_to_masks_dir):
    if not (os.path.exists(path_to_xml) and os.path.exists(path_to_masks_dir)):
        print("Error! There is no such file/directory")
        return

    tree = ET.parse(path_to_xml)
    root = tree.getroot()
    image_tags = root.findall("image")

    for image_tag in image_tags:
        image_tag_attributes = get_image_tag_attributes(image_tag)
        image_name = image_tag_attributes["name"]
        width = image_tag_attributes["width"]
        height = image_tag_attributes["height"]

        empty_mask = np.zeros((height, width, 3), dtype=np.uint8)
        masks = {}
        labels = get_labels(path_to_xml)
        for label in labels:
            masks[label] = empty_mask
            mask_dir = os.path.join(path_to_masks_dir, f"mask_{label}")
            make_directory(mask_dir)

        polygon_tags = image_tag.findall("polygon")
        for polygon_tag in polygon_tags:
            polygon_tag_attributes = get_polygon_tag_attributes(polygon_tag)
            points = polygon_tag_attributes["points"]
            label = polygon_tag_attributes["label"]

            current_mask = np.zeros((height, width, 3), dtype=np.uint8)
            cv2.fillPoly(current_mask, [np.array(points)], (255, 255, 255))
            masks[label] = cv2.bitwise_or(masks[label], current_mask)

        ellipse_tags = image_tag.findall("ellipse")
        for ellipse_tag in ellipse_tags:
            ellipse_tag_attributes = get_ellipse_tag_attributes(ellipse_tag)
            cx = ellipse_tag_attributes["cx"]
            cy = ellipse_tag_attributes["cy"]
            rx = ellipse_tag_attributes["rx"]
            ry = ellipse_tag_attributes["ry"]
            label = ellipse_tag_attributes["label"]

            current_mask = np.zeros((height, width, 3), dtype=np.uint8)
            cv2.ellipse(current_mask, (cx, cy), (rx, ry), 0, 0, 360, (255, 255, 255), -1)
            masks[label] = cv2.bitwise_or(masks[label], current_mask)

        for label, mask in masks.items():
            mask_dir = os.path.join(path_to_masks_dir, f"mask_{label.lower()}")
            mask_path = os.path.join(mask_dir, image_name)
            cv2.imwrite(mask_path, mask)


json-файл

In [None]:
def create_masks_json_file(path_to_xml, path_to_images_dir, path_to_masks_dir):
    if not (os.path.exists(path_to_xml) and
            os.path.exists(path_to_images_dir) and
            os.path.exists(path_to_masks_dir)):
        print("Error! There is no such file/directory")
        return

    items = []
    tree = ET.parse(path_to_xml)
    root = tree.getroot()
    image_tags = root.findall("image")
    defect_id = 0
    for image_tag in image_tags:
        defects = []
        polygon_tags = image_tag.findall("polygon")
        for polygon_tag in polygon_tags:
            polygon_tag_attributes = get_polygon_tag_attributes(polygon_tag)
            defects.append({"id" : defect_id,
                            "tag" : "polygon",
                            "label" : polygon_tag_attributes["label"],
                            "points" : polygon_tag_attributes["points"]})
            defect_id += 1

        ellipse_tags = image_tag.findall("ellipse")
        for ellipse_tag in ellipse_tags:
            ellipse_tag_attributes = get_ellipse_tag_attributes(ellipse_tag)
            defects.append({"id" : defect_id,
                            "tag" : "polygon",
                            "label" : ellipse_tag_attributes["label"],
                            "cx" : ellipse_tag_attributes["cx"],
                            "cy" : ellipse_tag_attributes["cy"],
                            "rx" : ellipse_tag_attributes["rx"],
                            "ry" : ellipse_tag_attributes["ry"]})
            defect_id += 1

        image_tag_attributes = get_image_tag_attributes(image_tag)
        items.append( {"id" : image_tag_attributes["id"],
                      "image" : image_tag_attributes["name"],
                      "width" : image_tag_attributes["width"],
                      "height" : image_tag_attributes["height"],
                      "defects" : defects})

    labels = get_labels(path_to_xml)
    mask_paths = get_mask_paths(path_to_masks_dir, labels)

    mask_info = {
        "datasetPath" : path_to_images_dir,
        "maskPaths" : mask_paths,
        "itemsCount" : len(items),
        "items" : items
        }

    path_to_json = os.path.join(path_to_images_dir, "mask_info.json")
    with open(path_to_json, "w") as file:
        json.dump(mask_info, file, indent = 2)


In [None]:
#draw_annotation("annotations/annotation1.xml", "images", "annotated_images")
#draw_annotation("annotations/annotation2.xml", "images", "annotated_images")
draw_masks("/kaggle/input/timbercv-dataset/annotation1.xml", "masks")
draw_masks("/kaggle/input/timbercv-dataset/annotation2.xml", "masks")
draw_masks("/kaggle/input/timbercv-dataset/annotation3.xml", "masks")
#create_masks_json_file("annotations/annotation1.xml", "images", "masks")
#create_masks_json_file("annotations/annotation1.xml", "images", "masks")

# Нейросеть

In [None]:
!pip install lightning

Модель

In [None]:
import lightning as L
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split
import albumentations as A
import gc
import matplotlib.pyplot as plt
import torchmetrics
import IPython
import torchvision.models as models

np.random.seed(0)
torch.manual_seed(0)
torch.cuda.manual_seed(0)
torch.backends.cudnn.deterministic = True

class DownLayer(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()

        self.down = nn.Sequential(
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(out_channels),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(out_channels),
        )

    def forward(self, x):
        x = self.down(x)
        return x

class UpLayer(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()

        self.up = nn.Sequential(
            nn.Upsample(scale_factor=2, mode = "bilinear")
        )
        self.conv = nn.Sequential(
            nn.Dropout(p=0.25),
            nn.Conv2d(in_channels + out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(out_channels),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(out_channels),
        )

    def forward(self, x, skip):
        x = self.up(x)
        x = torch.cat((x, skip), dim=1)
        x = self.conv(x)
        return x

class DoubleConvLayer(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(out_channels),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(out_channels),
        )
    def forward(self, x):
        x = self.double_conv(x)
        return x

class DiceBCELoss(nn.Module):
    def __init__(self, weight = 0.005):
        super(DiceBCELoss, self).__init__()
        self.weight = weight

    def focal_loss(self, predictions, targets):
        gamma = 2
        alpha = 0.005
        p_t = targets * predictions + (1 - targets) * (1 - predictions) + 1.0e-7
        contour_mask_index = torch.argmax(targets.sum(dim=(2, 3)), dim=1)
        t = targets.clone()
        t[:, contour_mask_index, :, :] *= 0
        weights = abs(t-alpha)
        focal_loss = -weights * ((1 - p_t)**gamma) *torch.log(p_t)   #((1 - p_t)**gamma) *
        return focal_loss.mean()
    
    def forward(self, predictions, targets):   
        contour_mask_index = torch.argmax(targets.sum(dim=(2, 3)), dim=1)
        t = targets.clone()
        t[:, contour_mask_index, :, :] *= 0
        weights = abs(t - self.weight)
        
        p = predictions.clone()
        p[:, contour_mask_index, :, :] *= 0
                                   
        dice_loss = 1 - ((2.*(weights * p * t).sum())/
        ((weights*p).sum() + (weights*t).sum() + 1e-7))
        
        
        BCE = F.binary_cross_entropy(predictions, 
                                     targets, 
                                     weight=weights, 
                                     reduction='mean')
        
        #BCE = self.focal_loss(predictions, targets)
        
        k = 0.1
        Dice_BCE = (1 - k)*BCE + k*dice_loss
        return Dice_BCE
    
class Unet(L.LightningModule):
    def __init__(self):
        super().__init__()
        self.example_input_array = torch.Tensor(1, 1, 640, 1936)
        self.accuracy = torchmetrics.Accuracy(task="BINARY")
        self.train_output = None
        self.val_output = None
        self.loss = DiceBCELoss()

        self.double_conv1 = DoubleConvLayer(1, 64)
        self.down1 = DownLayer(64, 128)
        self.down2 = DownLayer(128, 256)
        self.down3 = DownLayer(256, 512)
        self.down4 = DownLayer(512, 1024)
        self.up1 = UpLayer(1024, 512)
        self.up2 = UpLayer(512, 256)
        self.up3 = UpLayer(256, 128)
        self.up4 = UpLayer(128, 64)
        self.double_conv2 = DoubleConvLayer(64, 13)

    def forward(self, x):
        x1 = self.double_conv1(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x5 = self.up1(x5, x4)
        x5 = self.up2(x5, x3)
        x5 = self.up3(x5, x2)
        x5 = self.up4(x5, x1)
        x5 = self.double_conv2(x5)
        x5 = torch.sigmoid(x5)
        return x5

    
    def dice_loss(self, predictions, targets, epsilon=1e-7):
        loss = 1. - ((torch.sum(2 * predictions * targets) + epsilon) / 
                     (torch.sum(predictions) + torch.sum(targets) + epsilon))
        return loss.mean()
    
    def focal_loss(self, predictions, targets):
        gamma = 2
        alpha = 0.005
        p_t = targets * predictions + (1 - targets) * (1 - predictions) + 1.0e-7
        contour_mask_index = torch.argmax(targets.sum(dim=(2, 3)), dim=1)
        t = targets.clone()
        t[:, contour_mask_index, :, :] *= 0
        weights = abs(t-alpha)
        focal_loss = -weights * ((1 - p_t)**gamma) *torch.log(p_t)   #((1 - p_t)**gamma) *
        return focal_loss.mean()
    
    def custom_accuracy(self, x, y):
        y_ones_amount = y.sum().item()
        x_right_ones_amount = (x * y).sum().item()
        ones_accuracy = x_right_ones_amount / (y_ones_amount + 1.0e-7)

        contour_mask_index = torch.argmax(y.sum(dim=(2, 3)), dim=1)
        y_contour_ones_amount = y[:,contour_mask_index,:,:].sum().item()
        x_contour_right_ones_amount = (x[:,contour_mask_index,:,:] * y[:,contour_mask_index,:,:]).sum().item()
        ones_accuracy_without_contour = ((x_right_ones_amount - x_contour_right_ones_amount) /
                                        (y_ones_amount - y_contour_ones_amount + 1.0e-7))
        return ones_accuracy, ones_accuracy_without_contour

    def training_step(self, batch, batch_idx):
        x, y = batch
        x = self.forward(x)
        #loss = F.binary_cross_entropy(x, y) + self.dice_loss(x, y)
        loss = self.loss(x, y)
        
        result = torch.round(x)
        accuracy = self.accuracy(result, y)
        custom_accuracy = self.custom_accuracy(x, y)

        #plt.imshow(y[0, 10,:,:].cpu())
        #plt.show()

        if self.train_output == None:
            self.train_output = display("TRAIN.", display_id=True)
        if self.val_output == None:
            self.val_output = display("VAL.", display_id=True)
        self.train_output.update(f"TRAIN. acc: {accuracy:.4f}, custom_acc: {custom_accuracy[0]:.4f}, {custom_accuracy[1]:.4f}, loss: {loss:.4f}")


        self.log("loss", loss, prog_bar=True)
        self.log("accuracy", accuracy, prog_bar=True)
        return loss

    def test_step(self, batch, batch_idx):
        x, y = batch
        x = self.forward(x)
        #loss = F.binary_cross_entropy(x, y) + self.dice_loss(x, y)
        loss = self.loss(x, y)

        result = torch.round(x)
        accuracy = self.accuracy(result, y)
        custom_accuracy = self.custom_accuracy(x, y)

        print(f"TEST. acc:{accuracy:.4f}, custom_acc: {custom_accuracy[0]:.4f}, {custom_accuracy[1]:.4f}, loss: {loss:.4f}")

    def validation_step(self, batch, batch_idx):
        x, y = batch
        x = self.forward(x)
        #loss = F.binary_cross_entropy(x, y) + self.dice_loss(x, y)
        loss = self.loss(x, y)

        result = torch.round(x)
        accuracy = self.accuracy(result, y)
        custom_accuracy = self.custom_accuracy(x, y)

        if self.val_output != None:
            self.val_output.update(f"VAL. acc: {accuracy:.4f}, custom_acc: {custom_accuracy[0]:.4f}, {custom_accuracy[1]:.4f}, loss: {loss:.4f}")

        self.log("val_loss", loss, prog_bar=True)
        self.log("val_accuracy", accuracy, prog_bar=True)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), 
                                     lr=3e-4)
        #scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.1)
        return optimizer #[optimizer], [scheduler]


class TimberDataset(torch.utils.data.Dataset):
    def __init__(self, dataset_path, masks_path):
        super().__init__()
        self.transform = A.Compose([
            A.ShiftScaleRotate(shift_limit=0.2, scale_limit=0.2, rotate_limit=30, p=1),
        ])
        self.sorted_mask_dirs = sorted(os.listdir(masks_path))
    
    def __len__(self):
        image_names = [file for file in os.listdir(dataset_path) if not file.endswith(".json")]
        return len(image_names)

    def __getitem__(self, idx):
        if not os.path.exists(dataset_path):
            print(f"Error! There is no {dataset_path} directory")
            return None
        elif not os.path.exists(masks_path):
            print(f"Error! There is no {masks_path} directory")
            return None
        else:
            image_names = [file for file in os.listdir(dataset_path) if not file.endswith(".json")]
            image_masks = []
            for mask_dir in self.sorted_mask_dirs:
                mask_path = os.path.join(masks_path, mask_dir, image_names[idx])
                if os.path.exists(mask_path):
                    mask = cv2.imread(mask_path)
                    mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)
                    mask = np.round(mask/255.0)
                    mask = np.reshape(mask, (mask.shape[0], mask.shape[1], 1))
                    image_masks.append(mask)
            image_path = os.path.join(dataset_path, image_names[idx])
            image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
            #image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

            # аугментация
            image_masks = np.concatenate(image_masks, axis=2)
            transformed = self.transform(image=image, mask=image_masks)
            image = transformed["image"]
            image_masks = transformed["mask"]

            image = torch.tensor(image, dtype = torch.float32)
            image_masks = torch.tensor(image_masks, dtype = torch.float32)
            
            image = image.unsqueeze(0)
            #image = image.permute(2, 0, 1)
            image_masks = image_masks.permute(2, 0, 1)

            return (image, image_masks)


dataset_path = "/kaggle/input/timbercv-dataset/train"
masks_path = "/kaggle/working/masks"

dataset = TimberDataset(dataset_path, masks_path)
for directory in dataset.sorted_mask_dirs:
    print(directory)

train_dataset_size = int(len(dataset) * 0.85)
val_dataset_size = int(len(dataset) * 0.10)
test_dataset_size = len(dataset) - train_dataset_size - val_dataset_size

train_dataset, val_dataset, test_dataset = random_split(dataset,
 [train_dataset_size, val_dataset_size, test_dataset_size])

train_loader = DataLoader(train_dataset, batch_size = 1)
valid_loader = DataLoader(val_dataset, batch_size = 1)

# Обучение модели
#model = Unet()
model = Unet.load_from_checkpoint("/kaggle/input/timbercv/pytorch/new1/2/epoch16-step4726.ckpt")

model.train()
trainer = L.Trainer(max_epochs = 200)
trainer.fit(model = model,
            train_dataloaders = train_loader,
            val_dataloaders = valid_loader)

# Тестирование модели
result = trainer.test(model, dataloaders=DataLoader(test_dataset))

# TensorBoard
#%reload_ext tensorboard
#%tensorboard --logdir=lightning_logs/


In [None]:
gc.collect()
torch.cuda.empty_cache()

Проверка

In [None]:
#image_path = "/kaggle/input/timbercv-dataset/train/2023-12-11_062607_52_2.jpg"
#image_path = "/kaggle/input/timbercv-dataset/train/2023-12-20_065243_97_2.jpg"
#image_path = "/kaggle/input/timbercv-dataset/test.jpg"
#image_path = "/kaggle/input/timbercv-dataset/train/2023-10-27_133517_70_2.jpg"

image_path = "/kaggle/input/timbercv-dataset/test/2023-12-16_072036_19_2.jpg"

np_image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
#image = cv2.resize(image, (1936, 640))
#np_image = cv2.cvtColor(np_image, cv2.COLOR_BGR2RGB)
plt.imshow(np_image, cmap='gray')
plt.show()
image = torch.tensor(np_image, dtype = torch.float32)
plt.imshow(image.numpy(), cmap='gray')
plt.show()
plt.imshow(image.numpy()/255., cmap='gray')
plt.show()
#image = image.permute(2, 0, 1) # (640, 1936, 3) -> (3, 640, 1936)
image = torch.tensor(image).unsqueeze(0)
image = torch.tensor(image).unsqueeze(0)

#model = Unet.load_from_checkpoint("/content/lightning_logs/version_45/checkpoints/epoch=6-step=980.ckpt")
model.eval()
model.cpu()

with torch.no_grad():
    result = model(image)

In [None]:
result = torch.round(result)*255

#del zip
for mask, label in zip(result[0], dataset.sorted_mask_dirs):
    print(label)
    plt.imshow(mask, cmap='gray')
    plt.show()

Отрисовка масок на изображении

In [None]:
# Определение массива цветов для контуров масок
colors = [(255, 0, 0), (0, 255, 0), (0, 255, 0), (0, 255, 255), (255, 0, 255),
          (0, 255, 255), (128, 0, 0), (0, 128, 0), (0, 0, 128), (128, 128, 0),
          (128, 0, 128), (0, 128, 128), (255, 128, 0)]  
masks_np = result.squeeze(0).cpu().numpy()
image_with_contours = np_image.copy()

# Отображение изображения до рисования контуров
plt.imshow(image_with_contours, cmap='gray')
plt.show()

# Рисование контуров каждой маски с определенным цветом из массива colors
for idx, mask in enumerate(masks_np):
    # Получение контуров маски
    contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    
    # Использование соответствующего цвета из массива colors
    color = colors[idx % len(colors)]  # Использование цвета по модулю от индекса маски
    cv2.drawContours(image_with_contours, contours, -1, color, 2)

# Отображение изображения с контурами масок
plt.imshow(image_with_contours)
plt.show()

Сохранение модели

In [None]:
model.eval()
torch.save(model.state_dict(), 'model_weights.pth')

model_scripted = torch.jit.script(model) # Export to TorchScript
model_scripted.save('unet.pt') # Save

In [None]:
import shutil
import os
import zipfile
# директория, которую необходимо скачать
directory_to_download = '/kaggle/working/lightning_logs/version_0'

zip_file_path = 'files_to_download.zip'
with zipfile.ZipFile(zip_file_path, 'w') as zipf:
    for root, dirs, files in os.walk(directory_to_download):
        for file in files:
            zipf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), os.path.join(directory_to_download, '..')))

from IPython.display import FileLink
FileLink(zip_file_path)

PyTorch to TFLite

In [None]:
!pip install onnx_tf

In [None]:
'''
class Unet(L.LightningModule):
    def __init__(self):
        super().__init__()
        self.example_input_array = torch.Tensor(1, 3, 640, 1936)
        self.accuracy = torchmetrics.Accuracy(task="BINARY")
        self.train_output = None
        self.val_output = None
        self.loss = DiceBCELoss()

        self.double_conv1 = DoubleConvLayer(3, 64)
        self.down1 = DownLayer(64, 128)
        self.down2 = DownLayer(128, 256)
        self.up1 = UpLayer(256, 128)
        self.up2 = UpLayer(128, 64)
        self.double_conv2 = DoubleConvLayer(64, 13)

    def forward(self, x):
        x1 = self.double_conv1(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x3 = self.up1(x3, x2)
        x3 = self.up2(x3, x1)
        x3 = self.double_conv2(x3)
        x3 = torch.sigmoid(x3)
        return x3

    
    def dice_loss(self, predictions, targets, epsilon=1e-7):
        loss = 1. - ((torch.sum(2 * predictions * targets) + epsilon) / 
                     (torch.sum(predictions) + torch.sum(targets) + epsilon))
        return loss.mean()
    
    def focal_loss(self, predictions, targets):
        gamma = 2
        alpha = 0.005
        p_t = targets * predictions + (1 - targets) * (1 - predictions) + 1.0e-7
        contour_mask_index = torch.argmax(targets.sum(dim=(2, 3)), dim=1)
        t = targets.clone()
        t[:, contour_mask_index, :, :] *= 0
        weights = abs(t-alpha)
        focal_loss = -weights * ((1 - p_t)**gamma) *torch.log(p_t)   #((1 - p_t)**gamma) *
        return focal_loss.mean()
    
    def custom_accuracy(self, x, y):
        y_ones_amount = y.sum().item()
        x_right_ones_amount = (x * y).sum().item()
        ones_accuracy = x_right_ones_amount / (y_ones_amount + 1.0e-7)

        contour_mask_index = torch.argmax(y.sum(dim=(2, 3)), dim=1)
        y_contour_ones_amount = y[:,contour_mask_index,:,:].sum().item()
        x_contour_right_ones_amount = (x[:,contour_mask_index,:,:] * y[:,contour_mask_index,:,:]).sum().item()
        ones_accuracy_without_contour = ((x_right_ones_amount - x_contour_right_ones_amount) /
                                        (y_ones_amount - y_contour_ones_amount + 1.0e-7))
        return ones_accuracy, ones_accuracy_without_contour

    def training_step(self, batch, batch_idx):
        x, y = batch
        x = self.forward(x)
        #loss = F.binary_cross_entropy(x, y) + self.dice_loss(x, y)
        loss = self.loss(x, y)
        
        result = torch.round(x)
        accuracy = self.accuracy(result, y)
        custom_accuracy = self.custom_accuracy(x, y)

        #plt.imshow(y[0, 10,:,:].cpu())
        #plt.show()

        if self.train_output == None:
            self.train_output = display("TRAIN.", display_id=True)
        if self.val_output == None:
            self.val_output = display("VAL.", display_id=True)
        self.train_output.update(f"TRAIN. acc: {accuracy:.4f}, custom_acc: {custom_accuracy[0]:.4f}, {custom_accuracy[1]:.4f}, loss: {loss:.4f}")


        self.log("loss", loss, prog_bar=True)
        self.log("accuracy", accuracy, prog_bar=True)
        return loss

    def test_step(self, batch, batch_idx):
        x, y = batch
        x = self.forward(x)
        #loss = F.binary_cross_entropy(x, y) + self.dice_loss(x, y)
        loss = self.loss(x, y)

        result = torch.round(x)
        accuracy = self.accuracy(result, y)
        custom_accuracy = self.custom_accuracy(x, y)

        print(f"TEST. acc:{accuracy:.4f}, custom_acc: {custom_accuracy[0]:.4f}, {custom_accuracy[1]:.4f}, loss: {loss:.4f}")

    def validation_step(self, batch, batch_idx):
        x, y = batch
        x = self.forward(x)
        #loss = F.binary_cross_entropy(x, y) + self.dice_loss(x, y)
        loss = self.loss(x, y)

        result = torch.round(x)
        accuracy = self.accuracy(result, y)
        custom_accuracy = self.custom_accuracy(x, y)

        if self.val_output != None:
            self.val_output.update(f"VAL. acc: {accuracy:.4f}, custom_acc: {custom_accuracy[0]:.4f}, {custom_accuracy[1]:.4f}, loss: {loss:.4f}")

        self.log("val_loss", loss, prog_bar=True)
        self.log("val_accuracy", accuracy, prog_bar=True)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), 
                                     lr=3e-4)
        #scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.1)
        return optimizer #[optimizer], [scheduler]
'''

In [None]:
import torch
import onnx
import tensorflow as tf
import onnx_tf

# Load the PyTorch model
#pytorch_model = model
#pytorch_model = Unet.load_from_checkpoint("/kaggle/input/timbercv_mobile/pytorch/0/1/epoch0-step288.ckpt")
pytorch_model = Unet.load_from_checkpoint("/kaggle/input/timbercv/pytorch/8/1/epoch54-step15840.ckpt")
pytorch_model.eval()

# Export the PyTorch model to ONNX format
input_shape = (1, 3, 640, 1936)
sample_input = torch.randn(input_shape)
onnx_model_path = 'unet.onnx'
torch.onnx.export(
    model,                  # PyTorch Model
    sample_input,                    # Input tensor
    onnx_model_path,        # Output file (eg. 'output_model.onnx')
    opset_version=12,       # Operator support version
    input_names=['input'],   # Input tensor name (arbitary)
    output_names=['output'] # Output tensor name (arbitary)
)
# Load the ONNX model
onnx_model = onnx.load(onnx_model_path)

# Check that the IR is well formed
onnx.checker.check_model(onnx_model)

# Print a Human readable representation of the graph
onnx.helper.printable_graph(onnx_model.graph)

# Convert the ONNX model to TensorFlow format
tf_model_path = 'unet.pb'
tf_rep = onnx_tf.backend.prepare(onnx_model)
tf_rep.export_graph(tf_model_path)

tf_model = tf.keras.models.load_model(tf_model_path)

# Convert the TensorFlow model to TensorFlow Lite format
converter = tf.compat.v1.lite.TFLiteConverter.from_saved_model(tf_model_path)
#converter.optimizations = [tf.lite.Optimize.DEFAULT]
#converter.target_spec.supported_ops = [tf.float32]
tflite_model = converter.convert()

# Save the TensorFlow Lite model to a file
with open('unet.tflite', 'wb') as f:
    f.write(tflite_model)

In [None]:
file_to_zip = '/kaggle/working/unet.tflite'
zip_file_path = 'TFLite_model.zip'

with zipfile.ZipFile(zip_file_path, 'w') as zipf:
    zipf.write(file_to_zip, os.path.basename(file_to_zip))

FileLink(zip_file_path)

Тест модели TFLite

In [None]:
# Загрузка модели TensorFlow Lite
interpreter = tf.lite.Interpreter(model_path="/kaggle/working/unet.tflite")
interpreter.allocate_tensors()

# Получение информации о входных и выходных тензорах
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

# Подготовка входных данных 
image_path = "/kaggle/input/timbercv-dataset/train/2023-12-11_062607_52_2.jpg"
#image_path = "/kaggle/input/timbercv-dataset/train/2023-12-20_065243_97_2.jpg"
#image_path = "/kaggle/input/timbercv-dataset/test.jpg"

image = cv2.imread(image_path)
#image = cv2.resize(image, (1936, 640))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
plt.imshow(image)
plt.show()
image = torch.tensor(image, dtype = torch.float32)
image = image.permute(2, 0, 1) # (640, 1936, 3) -> (3, 640, 1936)
image = torch.tensor(image).unsqueeze(0)

# Загрузка входных данных в интерпретатор
interpreter.set_tensor(input_details[0]['index'], image)

# Выполнение модели
interpreter.invoke()

# Получение результатов
output_data = interpreter.get_tensor(output_details[0]['index'])
output_data = np.round(output_data)*255

for mask, label in zip(output_data[0], dataset.sorted_mask_dirs):
    print(label)
    plt.imshow(mask)
    plt.show()

Отрисовка масок

In [None]:
# Определение массива цветов для контуров масок
colors = [(255, 0, 0), (0, 255, 0), (0, 255, 0), (0, 255, 255), (255, 0, 255),
          (0, 255, 255), (128, 0, 0), (0, 128, 0), (0, 0, 128), (128, 128, 0),
          (128, 0, 128), (0, 128, 128), (255, 128, 0)]  
masks_np = result.squeeze(0).cpu().numpy()
image_with_contours = np_image.copy()

# Отображение изображения до рисования контуров
plt.imshow(image_with_contours)
plt.show()

# Рисование контуров каждой маски с определенным цветом из массива colors
for idx, mask in enumerate(masks_np):
    # Получение контуров маски
    contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    
    # Использование соответствующего цвета из массива colors
    color = colors[idx % len(colors)]  # Использование цвета по модулю от индекса маски
    cv2.drawContours(image_with_contours, contours, -1, color, 2)

# Отображение изображения с контурами масок
plt.imshow(image_with_contours)
plt.show()