In [None]:
# Импорт необходимых библиотек
import os
import json
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np

# Установка путей к директориям и файлам JSON
# Замените 'path_to_dataset' на фактический путь к вашему набору данных
DATA_DIR = 'FashionPedia'
TRAIN_IMAGES_DIR = os.path.join(DATA_DIR, 'train')
TEST_IMAGES_DIR = os.path.join(DATA_DIR, 'test')

ATTRIBUTES_TRAIN_JSON = os.path.join(DATA_DIR, 'attributes_train2020.json')
ATTRIBUTES_VAL_JSON = os.path.join(DATA_DIR, 'attributes_val2020.json')
INSTANCES_TRAIN_JSON = os.path.join(DATA_DIR, 'instances_attributes_train2020.json')
INSTANCES_VAL_JSON = os.path.join(DATA_DIR, 'instances_attributes_val2020.json')
INFO_TEST_JSON = os.path.join(DATA_DIR, 'info_test2020.json')

# Функция для загрузки JSON файла
def load_json(file_path):
    with open(file_path, 'r') as f:
        return json.load(f)

# Загрузка данных из JSON файлов
print("Загрузка JSON файлов...")
attributes_train = load_json(ATTRIBUTES_TRAIN_JSON)
attributes_val = load_json(ATTRIBUTES_VAL_JSON)
instances_train = load_json(INSTANCES_TRAIN_JSON)
instances_val = load_json(INSTANCES_VAL_JSON)
print("Загрузка завершена.\n")

# Преобразование данных изображений в DataFrame
def images_to_df(images_data):
    images = images_data['images']
    df = pd.DataFrame(images)
    return df

train_images_df = images_to_df(attributes_train)
val_images_df = images_to_df(attributes_val)

# Загрузка тестовых изображений, если файл существует
if os.path.exists(INFO_TEST_JSON):
    test_images_df = images_to_df(load_json(INFO_TEST_JSON))
else:
    test_images_df = pd.DataFrame()

print(f"Количество обучающих изображений: {len(train_images_df)}")
print(f"Количество валидационных изображений: {len(val_images_df)}")
print(f"Количество тестовых изображений: {len(test_images_df)}\n")

# Преобразование аннотаций в DataFrame
def annotations_to_df(annotations_data):
    annotations = annotations_data['annotations']
    ann_df = pd.DataFrame(annotations)
    return ann_df

train_annotations_df = annotations_to_df(instances_train)
val_annotations_df = annotations_to_df(instances_val)

print(f"Количество аннотаций в обучающем наборе: {len(train_annotations_df)}")
print(f"Количество аннотаций в валидационном наборе: {len(val_annotations_df)}\n")

# Объединение изображений с аннотациями
def merge_images_annotations(images_df, annotations_df):
    merged_df = images_df.merge(annotations_df, left_on='id', right_on='image_id', how='left')
    return merged_df

train_merged_df = merge_images_annotations(train_images_df, train_annotations_df)
val_merged_df = merge_images_annotations(val_images_df, val_annotations_df)

print("Объединение изображений с аннотациями завершено.\n")

# Загрузка атрибутов
attributes_info = pd.concat([
    pd.DataFrame(attributes_train['attributes']),
    pd.DataFrame(attributes_val['attributes'])
]).drop_duplicates(subset=['id']).reset_index(drop=True)

print(f"Общее количество уникальных атрибутов: {len(attributes_info)}\n")

# Пример вывода данных
print("Пример данных из обучающего набора:")
display(train_merged_df.head())

# Функция для отображения изображения с аннотациями
def display_image_with_annotations(df, index=0, dataset_type='train'):
    try:
        row = df.iloc[index]
    except IndexError:
        print(f"Индекс {index} выходит за пределы DataFrame.")
        return
    
    if dataset_type == 'train':
        images_dir = TRAIN_IMAGES_DIR
    elif dataset_type == 'test':
        images_dir = TEST_IMAGES_DIR
    else:
        images_dir = TRAIN_IMAGES_DIR  # По умолчанию
    
    image_path = os.path.join(images_dir, row['file_name'])
    
    if not os.path.exists(image_path):
        print(f"Изображение {image_path} не найдено.")
        return
    
    try:
        image = Image.open(image_path)
    except Exception as e:
        print(f"Ошибка при открытии изображения {image_path}: {e}")
        return
    
    plt.figure(figsize=(8, 8))
    plt.imshow(image)
    plt.axis('off')
    
    # Используем 'id_x' как Image ID и 'id_y' как Annotation ID
    plt.title(f"Image ID: {row['id_x']} | Annotation ID: {row['id_y']}")
    
    # Проверяем наличие сегментаций
    if 'segmentation' in row and isinstance(row['segmentation'], list):
        for seg in row['segmentation']:
            if isinstance(seg, list) and len(seg) >= 6:
                try:
                    # Преобразуем плоский список в список кортежей (x, y)
                    xy = list(zip(seg[::2], seg[1::2]))
                    polygon = plt.Polygon(xy, edgecolor='r', fill=False, linewidth=2)
                    plt.gca().add_patch(polygon)
                except Exception as e:
                    print(f"Ошибка при обработке сегментации: {e}")
            else:
                print(f"Некорректный формат сегментации: {seg}")
    
    plt.show()

print("Отображение примера изображения с аннотациями:")
if not train_merged_df.empty:
    display_image_with_annotations(train_merged_df, index=0, dataset_type='train')
else:
    print("Обучающий DataFrame пуст.")


In [None]:
# Импорт стандартных библиотек
import os
import json
import time
import numpy as np
import pandas as pd
from collections import defaultdict, deque

# Импорт PyTorch и torchvision
import torch
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
from torchvision import transforms as T
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor

# Импорт дополнительных библиотек
from PIL import Image, ImageDraw
import matplotlib.pyplot as plt

# Для обработки RLE
# from pycocotools import mask as maskUtils

In [None]:
# Определение именованной функции collate_fn для DataLoader
def collate_fn(batch):
    return tuple(zip(*batch))


In [None]:
# Кастомные трансформации для обработки как изображений, так и целей

class ToTensorTransform:
    def __call__(self, image, target):
        image = T.ToTensor()(image)
        return image, target

class RandomHorizontalFlipTransform:
    def __init__(self, flip_prob):
        self.flip_prob = flip_prob

    def __call__(self, image, target):
        if np.random.rand() < self.flip_prob:
            image = T.functional.hflip(image)
            # Отражаем координаты боксов и масок
            width = image.shape[2]
            boxes = target["boxes"]
            if boxes.numel() > 0:
                boxes[:, [0, 2]] = width - boxes[:, [2, 0]]
                target["boxes"] = boxes

            # Отражение масок
            masks = target["masks"]
            if masks.numel() > 0:
                masks = masks.flip(-1)
                target["masks"] = masks

        return image, target

class ComposeTransforms:
    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, image, target):
        for t in self.transforms:
            image, target = t(image, target)
        return image, target

In [None]:
import os
from PIL import Image, ImageDraw
import torch
from torch.utils.data import Dataset
import numpy as np
import pandas as pd

class CustomDataset(Dataset):
    def __init__(self, dataframe, images_dir, attributes_info, transforms=None):
        """
        Args:
            dataframe (pd.DataFrame): DataFrame с аннотациями и информацией об изображениях.
            images_dir (str): Путь к директории с изображениями.
            attributes_info (pd.DataFrame): DataFrame с информацией об атрибутах.
            transforms (callable, optional): Трансформации, применяемые к изображениям и целевым данным.
        """
        self.images_dir = images_dir
        self.attributes_info = attributes_info
        self.transforms = transforms

        # Фильтруем строки, у которых существует соответствующий файл изображения
        initial_len = len(dataframe)
        dataframe['full_path'] = dataframe['file_name'].apply(lambda x: os.path.join(images_dir, x))
        dataframe = dataframe[dataframe['full_path'].apply(os.path.exists)].reset_index(drop=True)
        filtered_len = len(dataframe)
        if filtered_len < initial_len:
            print(f"Пропущено {initial_len - filtered_len} изображений, так как файлы не найдены.")
        
        self.dataframe = dataframe

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        # Получаем строку DataFrame
        row = self.dataframe.iloc[idx]
        
        # Загружаем изображение с уже проверенным путем
        img_path = row['full_path']
        try:
            img = Image.open(img_path).convert("RGB")
        except Exception as e:
            print(f"Ошибка при открытии изображения {img_path}: {e}. Пропуск.")
            raise RuntimeError(f"Не удалось загрузить изображение: {img_path}")

        # Инициализируем целевые данные
        target = {}
        image_id = row['id_x']
        target["image_id"] = torch.tensor([image_id])
        
        # Получаем аннотации для изображения
        annotations = self.dataframe[self.dataframe['id_x'] == image_id]
        
        boxes = []
        labels = []
        masks = []
        areas = []
        iscrowd = []
        
        for _, ann in annotations.iterrows():
            # Боксы: преобразуем из [x_min, y_min, width, height] в [x_min, y_min, x_max, y_max]
            x_min, y_min, width, height = ann['bbox']
            x_max = x_min + width
            y_max = y_min + height

            # Проверка и коррекция координат BX
            if x_max <= x_min or y_max <= y_min:
                print(f"Некорректный BBox: [x_min={x_min}, y_min={y_min}, x_max={x_max}, y_max={y_max}]. Пропуск аннотации.")
                continue  # Пропускаем некорректные боксы

            boxes.append([x_min, y_min, x_max, y_max])
            
            # Метки (category_id)
            labels.append(ann['category_id'])
            
            # Сегментации (преобразуем в маски)
            segmentation = ann['segmentation']
            mask = self.polygons_to_mask(segmentation, row['height'], row['width'])
            masks.append(mask)
            
            # Площадь
            areas.append(ann['area'])
            
            # iscrowd
            iscrowd.append(ann['iscrowd'])
        
        if len(boxes) == 0:
            # Если аннотаций нет, создаём пустые тензоры
            boxes = torch.zeros((0, 4), dtype=torch.float32)
            labels = torch.zeros((0,), dtype=torch.int64)
            masks = torch.zeros((0, row['height'], row['width']), dtype=torch.uint8)
            areas = torch.zeros((0,), dtype=torch.float32)
            iscrowd = torch.zeros((0,), dtype=torch.int64)
        else:
            boxes = torch.as_tensor(boxes, dtype=torch.float32)
            labels = torch.as_tensor(labels, dtype=torch.int64)
            masks = torch.stack(masks)
            areas = torch.as_tensor(areas, dtype=torch.float32)
            iscrowd = torch.as_tensor(iscrowd, dtype=torch.int64)
        
        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["masks"] = masks
        target["area"] = areas
        target["iscrowd"] = iscrowd
        
        if self.transforms is not None:
            img, target = self.transforms(img, target)
        
        return img, target

    def polygons_to_mask(self, polygons, height, width):
        mask = Image.new('L', (width, height), 0)
        for idx, polygon in enumerate(polygons):
            # Проверяем, что polygon является списком
            if not isinstance(polygon, list):
                # print(f"Проблема: Сегментация не является списком: {polygon}")
                continue  # Пропускаем некорректную сегментацию

            # Проверяем, что все элементы polygon являются числами
            if not all(isinstance(coord, (int, float)) for coord in polygon):
                print(f"Проблема: Некорректные координаты в сегментации: {polygon}")
                continue  # Пропускаем сегментацию с некорректными координатами

            # Убедимся, что количество координат чётное
            if len(polygon) % 2 != 0:
                print(f"Проблема: Нечётное количество координат в сегментации: {polygon}")
                continue  # Пропускаем сегментацию с нечётным числом координат

            # Преобразуем плоский список координат в список кортежей
            xy = list(zip(polygon[::2], polygon[1::2]))

            # Проверяем, что длина xy достаточна для формирования полигона
            if len(xy) < 3:
                print(f"Проблема: Недостаточно точек для полигона: {xy}")
                continue  # Пропускаем полигоны с недостаточным количеством точек

            try:
                ImageDraw.Draw(mask).polygon(xy=xy, outline=1, fill=1)
            except Exception as e:
                print(f"Ошибка при рисовании полигона {xy}: {e}")
    
        return torch.as_tensor(np.array(mask), dtype=torch.uint8)


In [None]:
def get_transform(train):
    transforms = []
    transforms.append(ToTensorTransform())
    if train:
        transforms.append(RandomHorizontalFlipTransform(flip_prob=0.5))
    return ComposeTransforms(transforms)

In [None]:
# Создание обучающего и валидационного наборов
dataset_train = CustomDataset(train_merged_df, TRAIN_IMAGES_DIR, attributes_info, transforms=get_transform(train=True))
dataset_val = CustomDataset(val_merged_df, TEST_IMAGES_DIR, attributes_info, transforms=get_transform(train=False))

from torch.utils.data import Subset

def get_subset(dataset, fraction=0.1):
    subset_size = int(len(dataset) * fraction)
    subset_indices = list(range(len(dataset)))
    np.random.shuffle(subset_indices)
    return Subset(dataset, subset_indices[:subset_size])

subset_train = get_subset(dataset_train, fraction=0.2)  # Используем 20% от оригинала
subset_val = get_subset(dataset_val, fraction=0.2)      # Используем 20% от оригинала

# Создание подмножеств для тестового прогона (например, 1% от тренировочного и валидационного наборов)
subset_train_test = get_subset(dataset_train, fraction=0.01)
subset_val_test = get_subset(dataset_val, fraction=0.01)


data_loader_train = DataLoader(
    subset_train_test,
    batch_size=8,
    shuffle=True,
    num_workers=4,
    collate_fn=collate_fn,
    drop_last=True
)

data_loader_val = DataLoader(
    subset_val_test,
    batch_size=8,
    shuffle=False,
    num_workers=4,
    collate_fn=collate_fn,
    drop_last=True
)

print("DataLoader-ы созданы успешно.")


In [None]:
import torchvision

# Функция для инициализации модели Mask R-CNN
def get_instance_segmentation_model(num_classes):
    # Загружаем предобученную модель Mask R-CNN
    model = torchvision.models.detection.maskrcnn_resnet50_fpn(pretrained=True)
    
    # Получаем количество входных признаков для классификатора
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    
    # Замещаем классификатор на новый с num_classes выходами
    model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes)
    
    # Получаем количество входных признаков для масочного предиктора
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    hidden_layer = 256
    
    # Замещаем масочный предиктор новым
    model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask, hidden_layer, num_classes)
    
    return model

# Определяем количество классов (например, количество категориний + фон)
unique_categories = train_annotations_df['category_id'].unique()
print(unique_categories)
print(f"Максимальный category_id: {unique_categories.max()}")

num_classes = unique_categories.max() + 1

# Инициализируем модель
model = get_instance_segmentation_model(num_classes)

# Перемещаем модель на GPU, если доступен
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

print(f"Модель инициализирована и перемещена на устройство: {device}")


In [None]:
# Настройка оптимизатора
params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.SGD(params, lr=0.01, momentum=0.9, weight_decay=0.0005)

# Использование ReduceLROnPlateau
lr_scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=2)

print("Оптимизатор и планировщик обучения настроены.")


In [None]:
import time
import numpy as np
from collections import defaultdict, deque
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm

def train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=100):
    model.train()
    metric_logger = defaultdict(deque)
    progress_bar = tqdm(enumerate(data_loader), total=len(data_loader), desc=f"Epoch {epoch}")

    for i, (images, targets) in progress_bar:
        if images is None or targets is None:
            print(f"Пропуск батча {i} из-за некорректных данных.")
            continue

        try:
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            loss_dict = model(images, targets)

            losses = sum(loss for loss in loss_dict.values())

            optimizer.zero_grad()
            losses.backward()
            optimizer.step()

            for k, v in loss_dict.items():
                metric_logger[k].append(v.item())

            if i % print_freq == 0:
                metrics_str = ' | '.join([f"{k}: {np.mean(v):.4f}" for k, v in metric_logger.items()])
                progress_bar.set_postfix_str(f"{metrics_str}")
        except Exception as e:
            print(f"Ошибка на батче {i}: {e}")
            raise e  # Прерываем обучение для отладки


    return metric_logger

def evaluate(model, data_loader, device):
    model.eval()
    total_loss = 0.0
    num_batches = 0

    with torch.no_grad():
        for i, (images, targets) in enumerate(data_loader):
            if images is None or targets is None:
                print(f"Пропуск батча {i} из-за некорректных данных.")
                continue

            try:
                images = list(img.to(device) for img in images)
                targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

                loss_dict = model(images, targets)
                losses = sum(loss for loss in loss_dict.values())

                total_loss += losses.item()
                num_batches += 1
            except Exception as e:
                print(f"Ошибка на батче {i}: {e}")
                raise e  # Прерываем оценку для отладки

    average_loss = total_loss / num_batches if num_batches > 0 else 0.0
    return average_loss

In [None]:
num_epochs = 1

for epoch in range(num_epochs):
    start_time = time.time()

    print(f"Запуск эпохи {epoch}")
    train_metrics = train_one_epoch(model, optimizer, data_loader_train, device, epoch, print_freq=100)

    # Вычисляем валидационные метрики
    val_loss = evaluate(model, data_loader_val, device)
    print(f"Валидационная потеря: {val_loss:.4f}")

    # Передаём валидационную потерю в планировщик
    lr_scheduler.step(val_loss)

    end_time = time.time()
    epoch_time = end_time - start_time
    print(f"Epoch #{epoch} завершено за {epoch_time:.2f} секунд.")

    # Выводим потери на обучении
    train_losses = {k: f"{np.mean(v):.4f}" for k, v in train_metrics.items()}
    print(f"Потери на обучении: {train_losses}")

    print("-" * 50)

In [None]:
# Сохранение state_dict модели
torch.save(model.state_dict(), "mask_rcnn_state_dict.pth")


In [None]:
import torch
import coremltools as ct


In [None]:
# Загрузка модели
model = get_instance_segmentation_model(num_classes)
model.load_state_dict(torch.load("mask_rcnn_state_dict.pth", map_location='cpu'))
model.eval()

# Перемещение модели на CPU, если она была обучена на GPU
device = torch.device('cpu')
model.to(device)


In [None]:
# Создание фиктивного ввода, соответствующего размеру входных данных модели
dummy_input = torch.randn(1, 3, 800, 800)  # Обновите размер в соответствии с вашими данными


In [None]:
# Трассировка модели
scripted_model = torch.jit.trace(model, dummy_input)

# Сохранение TorchScript модели (опционально)
scripted_model.save("mask_rcnn_scripted.pt")


In [None]:
# Конвертация TorchScript модели в Core ML
mlmodel = ct.convert(
    scripted_model,
    inputs=[ct.ImageType(name="input_image", shape=dummy_input.shape)]
)

# Сохранение Core ML модели
mlmodel.save("mask_rcnn.mlmodel")

print("Модель успешно сконвертирована в формат Core ML и сохранена как mask_rcnn.mlmodel")


In [None]:
import torch
import torchvision

print(f"Версия PyTorch: {torch.__version__}")
print(f"Версия torchvision: {torchvision.__version__}")
