[Базовая статья](https://learnopencv.com/multi-label-image-classification-with-pytorch-image-tagging/)

[Базовый код](https://github.com/spmallick/learnopencv/blob/master/PyTorch-Multi-Label-Image-Classification-Image-Tagging/Pipeline.ipynb])

In [1]:
import os
import pandas as pd
import numpy as np
from PIL import Image
from torch.utils.data import Dataset, random_split, DataLoader
from torchvision import models
import torch
import torchvision.transforms as T
from sklearn.metrics import precision_score, recall_score, f1_score
from torch import nn
from torch.utils.data.dataloader import DataLoader
from matplotlib import pyplot as plt


In [2]:
data_train_dir = 'D:/EDUCATION/DATA/OTUS_ML_P/train_images'
data_test_dir = 'D:/EDUCATION/DATA/OTUS_ML_P/test_images'

In [3]:
classes = ['Fish', 'Flower', 'Gravel', 'Sugar']

In [4]:
def encode_target(labels, classes=classes):
    target = torch.zeros(len(classes))
    for label in labels:
        index = classes.index(label)
        target[index] = 1
    return target

def decode_target(target, classes=classes, threshold=0.5):
    result = []
    for index, value in enumerate(target):
        if value > threshold:
            result.append(classes[index])
    return ' '.join(result)

In [5]:
def get_file_names(path):
    result = []
    for _, _, file_names in os.walk(path):  
        for file_name in file_names:
                result.append(file_name)
    return result

def get_labels(file_path):
    labels = {}
    
    data = pd.read_csv(file_path)
    data['File_Name'] = data['Image_Label'].apply(lambda x: x.split('_')[0])
    data['Label'] = data['Image_Label'].apply(lambda x: x.split('_')[1])

    data = data[data['EncodedPixels'].notna()]

    for _, row in data.iterrows():
        if row['File_Name'] not in labels:
            labels[row['File_Name']] = []
        labels[row['File_Name']].append(row['Label'])

    return labels

In [6]:
class ImagesDataset(Dataset):
    def __init__(self, train_images_path='D:/EDUCATION/DATA/OTUS_ML_P/TRAIN_IMAGES', train_labels_file_path='D:/EDUCATION/DATA/OTUS_ML_P/TRAIN.CSV'):
        self.train_images_path = train_images_path
        self.train_labels_file_path = train_labels_file_path

        self.images = get_file_names(train_images_path)
        self.labels = get_labels(train_labels_file_path)
      

    def __len__(self):
        return len(self.images)       

    def __getitem__(self, index):
        transform = T.Compose([T.ToTensor()])

        image_file_name = self.images[index]
        image = transform(Image.open(os.path.join(self.train_images_path, image_file_name)))
        
        return image, encode_target(self.labels[image_file_name])       

In [7]:
data = ImagesDataset()
data[0]

(tensor([[[0.0000, 0.0000, 0.0000,  ..., 0.9098, 0.8902, 0.8588],
          [0.0000, 0.0000, 0.0000,  ..., 0.9176, 0.8784, 0.8314],
          [0.0000, 0.0000, 0.0000,  ..., 0.8314, 0.7569, 0.6902],
          ...,
          [0.0588, 0.0588, 0.0588,  ..., 0.3922, 0.4863, 0.5843],
          [0.0549, 0.0549, 0.0588,  ..., 0.4196, 0.5216, 0.5961],
          [0.0549, 0.0549, 0.0549,  ..., 0.4471, 0.5569, 0.5922]],
 
         [[0.0000, 0.0000, 0.0000,  ..., 0.9098, 0.8902, 0.8588],
          [0.0000, 0.0000, 0.0000,  ..., 0.9176, 0.8784, 0.8314],
          [0.0000, 0.0000, 0.0000,  ..., 0.8314, 0.7569, 0.6902],
          ...,
          [0.1059, 0.1059, 0.1059,  ..., 0.3961, 0.4902, 0.5882],
          [0.1020, 0.1020, 0.1059,  ..., 0.4235, 0.5255, 0.6000],
          [0.1020, 0.1020, 0.1020,  ..., 0.4510, 0.5608, 0.5961]],
 
         [[0.0000, 0.0000, 0.0000,  ..., 0.9020, 0.8824, 0.8510],
          [0.0000, 0.0000, 0.0000,  ..., 0.9098, 0.8706, 0.8235],
          [0.0000, 0.0000, 0.0000,  ...,

In [8]:
torch.cuda.is_available()

True

In [9]:
device = torch.device('cuda')
device

device(type='cuda')

In [10]:
train_size = len(data) - int(0.15 * len(data))
val_size = len(data) - train_size
train_size, val_size

(4715, 831)

In [11]:
train_ds, val_ds = random_split(data, [train_size, val_size])
len(train_ds), len(val_ds)

(4715, 831)

In [12]:
batch_size = 2
train_loader = DataLoader(train_ds, batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size * 2)

In [13]:
class Resnext50(nn.Module):
    def __init__(self, n_classes):
        super().__init__()
        resnet = models.resnext50_32x4d(pretrained=True)
        resnet.fc = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(in_features=resnet.fc.in_features, out_features=len(classes))
        )
        self.base_model = resnet
        self.sigm = nn.Sigmoid()

    def forward(self, x):
        return self.sigm(self.base_model(x))

In [14]:
def calculate_metrics(pred, target, threshold=0.5):
    pred = np.array(pred > threshold, dtype=float)
    return {'micro/precision': precision_score(y_true=target, y_pred=pred, average='micro'),
            'micro/recall': recall_score(y_true=target, y_pred=pred, average='micro'),
            'micro/f1': f1_score(y_true=target, y_pred=pred, average='micro'),
            'macro/precision': precision_score(y_true=target, y_pred=pred, average='macro'),
            'macro/recall': recall_score(y_true=target, y_pred=pred, average='macro'),
            'macro/f1': f1_score(y_true=target, y_pred=pred, average='macro'),
            'samples/precision': precision_score(y_true=target, y_pred=pred, average='samples'),
            'samples/recall': recall_score(y_true=target, y_pred=pred, average='samples'),
            'samples/f1': f1_score(y_true=target, y_pred=pred, average='samples'),
            }

In [15]:
lr = 1e-4 
test_freq = 200 # Test model frequency (iterations)
max_epoch_number = 4

In [16]:
model = Resnext50(len(classes))
model.train()
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.BCELoss()



In [17]:
epoch = 0
iteration = 0
while True:
    batch_losses = []
    for imgs, targets in train_loader:
        imgs, targets = imgs.to(device), targets.to(device)

        optimizer.zero_grad()

        model_result = model(imgs)
        loss = criterion(model_result, targets.type(torch.float))

        batch_loss_value = loss.item()
        loss.backward()
        optimizer.step()

        batch_losses.append(batch_loss_value)
        with torch.no_grad():
            result = calculate_metrics(model_result.cpu().numpy(), targets.cpu().numpy())

        if iteration % test_freq == 0:
            model.eval()
            with torch.no_grad():
                model_result = []
                targets = []
                for imgs, batch_targets in val_loader:
                    imgs = imgs.to(device)
                    model_batch_result = model(imgs)
                    model_result.extend(model_batch_result.cpu().numpy())
                    targets.extend(batch_targets.cpu().numpy())

            result = calculate_metrics(np.array(model_result), np.array(targets))
            print("epoch:{:2d} iter:{:3d} test: "
                  "micro f1: {:.3f} "
                  "macro f1: {:.3f} "
                  "samples f1: {:.3f}".format(epoch, iteration,
                                              result['micro/f1'],
                                              result['macro/f1'],
                                              result['samples/f1']))

            model.train()
        iteration += 1

    loss_value = np.mean(batch_losses)
    print("epoch:{:2d} iter:{:3d} train: loss:{:.3f}".format(epoch, iteration, loss_value))
    epoch += 1
    if max_epoch_number < epoch:
        break

  return F.conv2d(input, weight, bias, self.stride,
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, "true nor predicted", "F-score is", len(true_sum))
  return F.conv2d(input, weight, bias, self.stride,
