<a href="https://colab.research.google.com/github/LILICONDA/Diplom/blob/main/Prototypical_Netwoks.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Импортируем библиотеки

In [2]:
import numpy as np
import matplotlib.pyplot as plt
from scipy import ndimage
import multiprocessing as mp
import os
import cv2
from PIL import Image
from tqdm import tqdm_notebook
from tqdm import tnrange

import torch
import torch.nn as nn
import torchvision
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable

In [3]:
#Проверим поддержку графического процессора
print(torch.cuda.is_available())

True


Картинки для обучения и тестирования уже распределены по соответствующим папкам (TRAIN и TEST).

In [4]:
def image_to_array(folder_path):
  """
  Функция для перебора каждого файла изображения, преобразовывания его в массив 
  и сохранения массива в списке
  Args:
      folder_path: путь к папке с изображениями
  Returns:
      image_arrays: список массивов
  """
  image_files = [f for f in os.listdir(folder_path) if f.endswith(".jpg") or f.endswith(".png")]
  image_arrays = []
  for file_name in image_files:
    file_path = os.path.join(folder_path, file_name)
    image = Image.open(file_path)
    image_array = np.array(image)
    image_arrays.append(image_array)
  return image_arrays

In [44]:
train_x=np.array(image_to_array(r'/content/TRAIN'))
train_x.shape

(32, 288, 298, 4)

In [45]:
test_x=np.array(image_to_array(r'/content/TEST'))
test_x.shape

(8, 288, 298, 4)

In [7]:
def test_size(n):
  """ Функция для создания "правильных ответов" для тренировочного и тестового набора
  Args:
      n: количество ответов
  Returns:
      arr: список ответов
  """
  arr = [i % 2 for i in range(n)]
  return arr

In [46]:
train_y=np.array(test_size(32))
train_y.shape

(32,)

In [47]:
test_y=np.array(test_size(8))
test_y.shape

(8,)

### Создание выборки

In [10]:
def extract_sample(n_way, n_support, n_query, datax, datay):
  """
  Выбирает случайную выборку размером n_support+n_query для n_way классов
   Args:
       n_way (int): количество классов в задаче классификации
       n_support (int): количество помеченных примеров на класс в наборе поддержки
       n_query (int): количество помеченных примеров на класс в наборе запросов
       datax (np.array): набор изображений
       datay (np.array): набор меток
   Returns:
      (dict) of:
         (torch.Tensor): примеры изображений. Размер (n_way, n_support+n_query, (dim))
         (int): n_way
         (int): n_support
         (int): n_query
  """
  sample = []
  K = np.random.choice(np.unique(datay), n_way, replace=False)
  for cls in K:
    datax_cls = datax[datay == cls]
    perm = np.random.permutation(datax_cls)
    sample_cls = perm[:(n_support+n_query)]
    sample.append(sample_cls)
  sample = np.array(sample)
  sample = torch.from_numpy(sample).float()
  sample = sample.permute(0,1,4,2,3)
  return({
      'images': sample,
      'n_way': n_way,
      'n_support': n_support,
      'n_query': n_query
      })

### Построение модели

In [11]:
def euclidean_dist(x, y):
  """
  Вычисляет евклидово расстояние между x и y
   Args:
       x (torch.Tensor): shape (n, d). n обычно n_way*n_query
       y (torch.Tensor): shape (m, d). m обычно n_way
  Returns:
      torch.Tensor: shape(n, m). Для каждого запроса расстояния до каждого центроида
  """
  n = x.size(0)
  m = y.size(0)
  d = x.size(1)
  assert d == y.size(1)

  x = x.unsqueeze(1).expand(n, m, d)
  y = y.unsqueeze(0).expand(n, m, d)

  return torch.pow(x - y, 2).sum(2)

In [13]:
class ProtoNet(nn.Module):
  def __init__(self, encoder):
    """
    Args:
        encoder : CNN, кодирующий изображения в образце
        n_way (int): количество классов в задаче классификации
        n_support (int): количество помеченных примеров на класс в наборе поддержки
        n_query (int): количество помеченных примеров на класс в наборе запросов
    """
    super(ProtoNet, self).__init__()
    self.encoder = encoder.cuda()

  def set_forward_loss(self, sample):
    """
    Вычисляет потери, точность и выходные данные для задачи классификации
    Args:
        sample (torch.Tensor): shape (n_way, n_support+n_query, (dim)) 
    Returns:
        torch.Tensor: shape(2), loss, accuracy and y_hat
    """
    sample_images = sample['images'].cuda()
    n_way = sample['n_way']
    n_support = sample['n_support']
    n_query = sample['n_query']
    
    x_support = sample_images[:, :n_support]
    x_query = sample_images[:, n_support:]
   
    #целевые показатели 0 ... n_way-1
    target_inds = torch.arange(0, n_way).view(n_way, 1, 1).expand(n_way, n_query, 1).long()
    target_inds = Variable(target_inds, requires_grad=False)
    target_inds = target_inds.cuda()
   
    #кодировка изображений support and the query наборов
    x = torch.cat([x_support.contiguous().view(n_way * n_support, *x_support.size()[2:]),
                   x_query.contiguous().view(n_way * n_query, *x_query.size()[2:])], 0)
   
    z = self.encoder.forward(x)
    z_dim = z.size(-1) #обычно 64
    z_proto = z[:n_way*n_support].view(n_way, n_support, z_dim).mean(1)
    z_query = z[n_way*n_support:]

    #вычисляем расстояния
    dists = euclidean_dist(z_query, z_proto)
    
    #вычисляем вероятности
    log_p_y = F.log_softmax(-dists, dim=1).view(n_way, n_query, -1)
   
    loss_val = -log_p_y.gather(2, target_inds).squeeze().view(-1).mean()
    _, y_hat = log_p_y.max(2)
    acc_val = torch.eq(y_hat, target_inds.squeeze()).float().mean()
   
    return loss_val, {
        'loss': loss_val.item(),
        'acc': acc_val.item(),
        'y_hat': y_hat
        }

In [48]:
class Flatten(nn.Module):
  def __init__(self):
    super(Flatten, self).__init__()

  def forward(self, x):
    return x.view(x.size(0), -1)

def load_protonet_conv(**kwargs):
  """
  Загружает прототип модели сети
  Arg:
      x_dim (tuple): Размерность входной картинки
      hid_dim (int): Размер скрытых слоев в сверточных блоках
      z_dim (int): Размер встроенного изображения
  Returns:
      Model (Class ProtoNet)
  """
  x_dim = kwargs['x_dim']
  hid_dim = kwargs['hid_dim']
  z_dim = kwargs['z_dim']

  def conv_block(in_channels, out_channels):
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels, 3, padding=1),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(),
        nn.MaxPool2d(2)
        )
    
  encoder = nn.Sequential(
    conv_block(x_dim[0], 2*hid_dim),
    conv_block(2*hid_dim, 4*hid_dim),
    conv_block(4*hid_dim, 8*hid_dim),
    conv_block(8*hid_dim, 16*hid_dim),
    Flatten()
    )
    
  return ProtoNet(encoder)

### TRAIN

In [49]:
def train(model, optimizer, train_x, train_y, n_way, n_support, n_query, max_epoch, epoch_size):
  """
  Обучаем protonet
  Args:
      model
      optimizer
      train_x (np.array): изображения тренировочного сета
      train_y(np.array): разметка для тренировочного сета
      n_way (int): количество классов в задаче классификации
      n_support (int): количество помеченных примеров на класс в наборе поддержки
      n_query (int): количество помеченных примеров на класс в наборе запросов
      max_epoch (int): максимальное количество эпох для тренировки
      epoch_size (int): количество эпизодов в эпоху
  """
  #разделим скорость обучения на 2 в каждую эпоху, как предлагается в 
  #статье https://arxiv.org/pdf/1703.05175v2.pdf

  scheduler = optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.5, last_epoch=-1)
  epoch = 0 #эпохи, сделанные до сих пор
  stop = False #статус, чтобы знать, когда остановиться

  while epoch < max_epoch and not stop:
    running_loss = 0.0
    running_acc = 0.0

    for episode in tnrange(epoch_size, desc="Epoch {:d} train".format(epoch+1)):
      sample = extract_sample(n_way, n_support, n_query, train_x, train_y)
      optimizer.zero_grad()
      loss, output = model.set_forward_loss(sample)
      running_loss += output['loss']
      running_acc += output['acc']
      loss.backward()
      optimizer.step()
    epoch_loss = running_loss / epoch_size
    epoch_acc = running_acc / epoch_size
    print('Epoch {:d} -- Loss: {:.4f} Acc: {:.4f}'.format(epoch+1,epoch_loss, epoch_acc))
    epoch += 1
    scheduler.step()

In [50]:
%%time

model = load_protonet_conv(
    x_dim=(4, 288, 298),
    hid_dim=4,
    z_dim=4,
    )

optimizer = optim.Adam(model.parameters(), lr = 0.001)

n_way = 2
n_support = 8
n_query = 8

train_x = train_x
train_y = train_y

max_epoch = 5
epoch_size = 200

train(model, optimizer, train_x, train_y, n_way, n_support, n_query, max_epoch, epoch_size)

  for episode in tnrange(epoch_size, desc="Epoch {:d} train".format(epoch+1)):


Epoch 1 train:   0%|          | 0/200 [00:00<?, ?it/s]

Epoch 1 -- Loss: 25.2230 Acc: 0.6784


Epoch 2 train:   0%|          | 0/200 [00:00<?, ?it/s]

Epoch 2 -- Loss: 0.1066 Acc: 0.9637


Epoch 3 train:   0%|          | 0/200 [00:00<?, ?it/s]

Epoch 3 -- Loss: 0.0175 Acc: 0.9928


Epoch 4 train:   0%|          | 0/200 [00:00<?, ?it/s]

Epoch 4 -- Loss: 0.0079 Acc: 0.9972


Epoch 5 train:   0%|          | 0/200 [00:00<?, ?it/s]

Epoch 5 -- Loss: 0.0032 Acc: 0.9988
CPU times: user 36.2 s, sys: 21.2 s, total: 57.4 s
Wall time: 58.4 s


###TEST

In [51]:
def test(model, test_x, test_y, n_way, n_support, n_query, test_episode):
  """
  Тестируем protonet
  Args:
      model: обученная модель
      test_x (np.array): картинки для тестового набора
      test_y (np.array): разметка для тестового набора
      n_way (int): количество классов в задаче классификации
      n_support (int): количество помеченных примеров на класс в наборе поддержки
      n_query (int): количество помеченных примеров на класс в наборе запросов
      test_episode (int): количество эпизодов для тестирования
  """
  running_loss = 0.0
  running_acc = 0.0
  for episode in tnrange(test_episode):
    sample = extract_sample(n_way, n_support, n_query, test_x, test_y)
    loss, output = model.set_forward_loss(sample)
    running_loss += output['loss']
    running_acc += output['acc']
  avg_loss = running_loss / test_episode
  avg_acc = running_acc / test_episode
  print('Test results -- Loss: {:.4f} Acc: {:.4f}'.format(avg_loss, avg_acc))
     

In [52]:
n_way = 2
n_support = 2
n_query = 2

test_x = test_x
test_y = test_y

test_episode = 100

test(model, test_x, test_y, n_way, n_support, n_query, test_episode)

  for episode in tnrange(test_episode):


  0%|          | 0/100 [00:00<?, ?it/s]

Test results -- Loss: 0.0000 Acc: 1.0000
