# Обучение модели детектора дефектов транспортерных ленты конвейеров (прямоугольные рамки)

*Импорт всех необходимых библиотек (ноутбук запускаю локально)*

In [None]:
pip install pandas

Note: you may need to restart the kernel to use updated packages.


In [None]:
pip install torch

Note: you may need to restart the kernel to use updated packages.


In [None]:
pip install torchvision

Note: you may need to restart the kernel to use updated packages.


In [None]:
pip install matplotlib

Note: you may need to restart the kernel to use updated packages.


In [None]:
pip install tqdm




In [None]:
import torch
import torchvision
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.feature_extraction import create_feature_extractor
import torchvision.transforms.functional as F
import pandas as pd
import os
from PIL import Image
from collections import Counter
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
from torchvision.models.detection import fasterrcnn_resnet50_fpn
import warnings
import random
from torchvision.transforms import ToTensor, Compose
from torchvision.models.detection import fasterrcnn_resnet50_fpn, FasterRCNN_ResNet50_FPN_Weights
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import ToTensor, RandomHorizontalFlip, Compose
from matplotlib.patches import Rectangle

*Загружаю исходные изображения участков лент и их разметку в формате (.xml)*

In [None]:
# Путь к изображениям
#img_path = '/content/images'
# Загрузка DataFrame с аннотациями
#df = pd.read_csv('/content/new_annotations.csv')
# Путь к изображениям
img_path = 'AppV/Untitled Folder/images'
# Загрузка DataFrame с аннотациями
df = pd.read_csv('AppV/Untitled Folder/new_annotations.csv')
#df = pd.read_csv('/content/df.csv')



*Функция создания модели обучения на базе предобученной модели Resnet50 (создаем генератор якорей для предсказания bounding boxes объектов)*

In [None]:
# Функция для получения модели обнаружения объектов
def get_object_detection_model(num_classes):
    backbone = torchvision.models.resnet50(weights=torchvision.models.ResNet50_Weights.IMAGENET1K_V1)
    backbone = create_feature_extractor(backbone, return_nodes={"layer4": "0"})
    backbone.out_channels = 2048

    rpn_anchor_generator = AnchorGenerator(
        sizes=((32, 64, 128, 256, 512),), aspect_ratios=((0.5, 1.0, 2.0),)
    )

    roi_pooler = torchvision.ops.MultiScaleRoIAlign(
        featmap_names=["0"], output_size=7, sampling_ratio=2
    )

    model = FasterRCNN(
        backbone,
        num_classes=num_classes,
        rpn_anchor_generator=rpn_anchor_generator,
        box_roi_pool=roi_pooler,
    )

    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model


*Класс для создания кастомного датасета на основе датафрейма разметки*

In [None]:
class CustomDataset(Dataset):
    def __init__(self, root, df, transforms=None):
        self.root = root
        self.transforms = transforms
        self.dataframe = df
        self.imgs = list(self.dataframe['filename'].unique())

    def __getitem__(self, idx):
        img_name = self.imgs[idx]
        img_path = os.path.join(self.root, img_name)
        img = Image.open(img_path).convert("RGB")

        current_annotations = self.dataframe[self.dataframe['filename'] == img_name]
        num_objs = len(current_annotations)

        boxes = []
        labels = []
        for _, row in current_annotations.iterrows():
            boxes.append([row["xmin"], row["ymin"], row["xmax"], row["ymax"]])
            labels.append(row["class"])

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)

        image_id = torch.tensor([idx])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)

        target = {"boxes": boxes, "labels": labels, "image_id": image_id, "area": area, "iscrowd": iscrowd}

        if self.transforms:
            img = self.transforms(img)

        return img, target

    def __len__(self):
        return len(self.imgs)


*Блок классов и функций для трансформаций датасета*

In [None]:
# Классы преобразований
class ComposeTransforms(object):
    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, image, target):
        for transform in self.transforms:
            image, target = transform(image, target)
        return image, target

class ToTensor(object):
    def __call__(self, image, target):
        image = F.to_tensor(image)
        return image, target

class CustomRandomHorizontalFlip(object):
    def __init__(self, p=0.5):
        self.p = p

    def __call__(self, image, target):
        if torch.rand(1).item() < self.p:
            image = F.hflip(image)
            if "boxes" in target:
                bbox = target["boxes"]
                image_width = image.shape[2]  # Assuming image shape is [C, H, W]
                bbox[:, [0, 2]] = image_width - bbox[:, [2, 0]]
                target["boxes"] = bbox
        return image, target

# Функция для трансформаций
def get_transform(train):
    transforms = []
    transforms.append(ToTensor())
    return Compose(transforms)

* Определение функции collate_fn, которая позволяет корректно объединить батчи при обработке данных*

In [None]:
# Функция для объединения батчей
def collate_fn(batch):
    return tuple(zip(*batch))

*Функция для балансировки классов датасета*

In [None]:
# Функция для вычисления весов классов
def calculate_class_weights(df):
    class_counts = df['class'].value_counts().to_dict()
    total_counts = sum(class_counts.values())
    class_weights = {cls: total_counts / count for cls, count in class_counts.items()}
    weights = np.zeros(len(class_counts))
    for cls, weight in class_weights.items():
        weights[cls] = weight
    return torch.tensor(weights, dtype=torch.float32)


*Основная функция обучения модели детекции объектов на изображениях*

In [None]:
# Обучение модели
def train_model(num_classes, num_epochs, batch_size, device):
    dataset = CustomDataset(img_path, df, transforms=get_transform(train=True))
    data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=0, collate_fn=collate_fn)

    class_weights = calculate_class_weights(df)
    criterion = torch.nn.CrossEntropyLoss(weight=class_weights)

    weights = FasterRCNN_ResNet50_FPN_Weights.COCO_V1
    model = fasterrcnn_resnet50_fpn(weights=weights)

    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    model = model.to(device)

    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0
        progress_bar = tqdm(data_loader, desc=f'Epoch {epoch + 1}/{num_epochs}', leave=False)

        for images, targets in progress_bar:
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())
            epoch_loss += losses.item()

            optimizer.zero_grad()
            losses.backward()
            optimizer.step()

            progress_bar.set_postfix(loss=epoch_loss / (progress_bar.n + 1))

        lr_scheduler.step()
        print(f"Epoch {epoch + 1} completed with loss {epoch_loss / len(data_loader)}")

    print("Training completed.")
    return model

*Выбор устройства для выполнения вычислений*

In [None]:
# Основной поток выполнения
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

*Основной блок обучения модели детекции*

In [None]:
# Правильное количество классов (включая фоновый класс)
num_classes = 3

# Обучение модели (например, 10 эпох)
num_epochs = 10
batch_size = 5
model = train_model(num_classes, num_epochs, batch_size, device)





Epoch 1 completed with loss 0.27409427435625167




Epoch 2 completed with loss 0.11017622022579114




Epoch 3 completed with loss 0.08927402184123084




Epoch 4 completed with loss 0.06798725260332936




Epoch 5 completed with loss 0.06102835134203945




Epoch 6 completed with loss 0.057962850002305846




Epoch 7 completed with loss 0.055938851176982836




Epoch 8 completed with loss 0.055427752945217346




Epoch 9 completed with loss 0.055741946501213877


                                                                                                                                                                                                                                                                                                                            

Epoch 10 completed with loss 0.055269097887156976
Training completed.




# **Тестирование модели**

*Функция визуализации объектов на тестовой выборке изображений (результат работы модели)*

In [None]:
def visualize_predictions(image, boxes, labels, scores, threshold=0.5):
    def generate_color_map(all_labels):
        color_map = {}
        random.seed(42)  # For reproducibility
        for label in all_labels:
            color_map[int(label)] = (random.random(), random.random(), random.random())
        return color_map

    # Печать всех уникальных меток для отладки
    unique_labels = np.unique(labels)
    print(f"Unique labels in batch: {unique_labels}")

    color_map = generate_color_map([0, 1, 2])
    print(f"Generated color_map keys: {list(color_map.keys())}")

    drawn_boxes = {label: [] for label in np.unique(labels)}

    image = image.permute(1, 2, 0).cpu().numpy()
    image = np.clip(image, 0, 1)
    fig, ax = plt.subplots(1, figsize=(12, 9))
    ax.imshow(image)

    for box, label, score in zip(boxes, labels, scores):
        if score >= threshold:
            xmin, ymin, xmax, ymax = box
            box_coords = (xmin, ymin, xmax, ymax)

            intersects = False
            for drawn_box in drawn_boxes[label]:
                if (xmin < drawn_box[2] and xmax > drawn_box[0] and ymin < drawn_box[3] and ymax > drawn_box[1]):
                    intersects = True
                    break

            if not intersects:
                # Оповещение о метке для отладки
                print(f"Drawing box for label: {label} with confidence score: {score}")

                rect = Rectangle((xmin, ymin), xmax - xmin, ymax - ymin, fill=False, color=color_map[int(label)], linewidth=2)
                ax.add_patch(rect)
                label_text = f"{label}: {score:.2f}"
                ax.text(xmin, ymin, label_text, bbox={"facecolor": "yellow", "alpha": 0.5})

                drawn_boxes[label].append(box_coords)

    plt.show()



Функция оценки работы модели на валидационных данных

In [None]:
# Проверка модели на валидационных данных
def evaluate_model(model, data_loader_test):
    model.eval()
    with torch.no_grad():
        for images, targets in data_loader_test:
            images = list(img.to(device) for img in images)
            outputs = model(images)

            for img, output in zip(images, outputs):
                boxes = output['boxes'].cpu().numpy()
                labels = output['labels'].cpu().numpy()
                scores = output['scores'].cpu().numpy()
                visualize_predictions(img, boxes, labels, scores)

*Создание валидационного датасета*

In [None]:
# Проверка модели на валидационных данных
validate_dataset = CustomDataset(img_path, df, transforms=get_transform(train=False))
data_loader_test = torch.utils.data.DataLoader(
    validate_dataset, batch_size=batch_size, shuffle=False, num_workers=0, collate_fn=collate_fn
)
evaluate_model(model, data_loader_test)

# **Заключение**

Модель уверенно распознает объекты 1 и 2 класса на изображениях, что может быть использовано для создания автоматизированного комплекса по детекции дефектов поточно-транспортного оборудования промышленных предприятий: ленточных конвейеров различной протяженности.

Вопросы, требующие доработки: не распознается класс 0 на объектах - порывы лент.