# Сжатие изображений 

Преобразование картинок размера [3,512,512] в тензоры [6, 128, 128].

In [None]:
# для использования оптимизатора RAdam
!pip install torch_optimizer

In [None]:
import warnings
warnings.filterwarnings('ignore')
 
import torch
import random
import numpy as np
from tqdm import tqdm
from torch.utils.data import Dataset,DataLoader
from pathlib import Path
from scipy.io import loadmat
import pandas as pd
from torch.autograd import Variable
import pickle
import argparse
import os
from threading import Thread
 
 
from torchvision import datasets, transforms
from PIL import Image
 
%pylab inline
import matplotlib.pylab as plt
 
import copy
import datetime
#import traceback
import logging
import torch_optimizer as optim
 

seed = 37
np.random.seed(seed)
random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed)

### Для обучения использовала dataset с Kaggle: 

---

https://www.kaggle.com/andrewmvd/animal-faces

---



In [4]:
!mkdir dog

In [None]:
!unzip ./data/dog.zip -d /content/dog

In [None]:
parser = argparse.ArgumentParser()

parser.add_argument("--epoches", type=int, default=100, help="Number of epochs")
parser.add_argument("--batch_size", type=int, default=12, help="Batch size of train")
parser.add_argument("--lambda_L1", type=float, default=0.02, help="lambda for L1-regularization")
parser.add_argument("--lambda_L2", type=float, default=0.01, help="lambda for L2-regularization")
parser.add_argument("--learn_rate", type=float, default=0.001, help="Learning rate")
parser.add_argument("--shed_patience", type=int, default=5, help="Sheduler patience")
parser.add_argument("--optimizer", type=str, default="Adam", help="Type optimizer")
parser.add_argument("--fast", type=bool, default=True, help="cudnn.benchmark")

try:
    opt = parser.parse_args()
    opt.colab=False    
except:
    opt = parser.parse_args(args=[])
    opt.colab=True
if opt.colab:
    output = print
    path = "..."    
    path_result = "..."
    opt.workers = 4
else:
    opt.starttimestr = datetime.datetime.now().strftime("%Y-%m-%d_%H_%M_%S")
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s",
                        handlers=[logging.FileHandler(os.path.join("logs",opt.starttimestr+".log")),
                                  logging.StreamHandler()])
    output = logging.info
    path = "input"    
    path_result = "models"
    opt.workers = min(opt.batch_size, 16)
np.set_printoptions(precision=3, linewidth=np.inf, edgeitems=5)
torch.set_default_tensor_type(torch.FloatTensor)

In [7]:
# путь к данным
path = ["/content/dog/dog/afhq/train"]

# путь к папке, куда сохраняются модели, картинки
path_result = "./saved_models"

### Класс *DogDataset* формирует тренировочную, валидационную и тестовую выборки.

In [8]:
class DogDataset(Dataset):
  def __init__(self,path,flag,cache=False):
    # если flag=0 - обучение     ----60%
    # если flag=1 - валидация    ----20%
    # если flag=2 - тестирование ----20%
    mas = []
    for p in path:
      for root, dirs, files in os.walk(p):
        mas += list(map(lambda y:os.path.join(root,y),files))
    mas = sorted(mas)
    random.shuffle(mas)
    if flag==0:
      self.my_path = mas[:int(0.61518*len(mas))]
    elif flag==1:
      self.my_path = mas[int(0.61518*len(mas)):int(0.82024*len(mas))]
    else:
      self.my_path = mas[int(0.82024*len(mas)):]

    print("Количество файлов:", len(self.my_path))

    self.cache = cache
    if self.cache:
       self.cdata = [None]*self.__len__()
       thread = [None]*opt.workers
       for index in range(self.__len__()+len(thread)):
           i = index%len(thread)
           if thread[i] is not None:
              thread[i].join()
              thread[i]=None
           if index < self.__len__():
              thread[i] = Thread(target=self.__getitem__, args=(index,))
              thread[i].start()
  
  def __len__(self):
    return(len(self.my_path))
  
  def __getitem__(self, index):
    if self.cache and self.cdata[index] is not None:
      return self.cdata[index]

    
    file = self.my_path[index % len(self.my_path)]
    #все полученные картинки получат один размер 512*512
    trans = transforms.Compose([transforms.Resize((512, 512)), transforms.ToTensor()])
    
    image = trans(Image.open(file)) # тензор размером [3,512,512], где значения от 0 до 1
    image = image[:3,:,:]
    if image.shape[0]==1:
      image = torch.cat((image,image,image))
    
    if self.cache:
        self.cdata[index] = image
    return image

### Архитектура нейронной сети

Архитектура нейронной сети - encoder-decoder.

In [9]:
class Layer(torch.nn.Module):
  def __init__(self,
               in_channels,         # количество входных канлов
               out_channels,        # количество каналов после применения слоя
               kernel_size,         # размер ядра свёртки
               stride,              # размер шага
               padding,             # размер паддинга 
               flag,                # "encoder" or "decoder"
               batch_norm,          # булевая переменная: надо ли применять батч нормализацию
               drop,                # вероятность в Dropout
               activation):         # функция активации
    
    super(Layer, self).__init__()
    lay=[]
    if flag== "encoder":
      lay.append(torch.nn.Conv2d(in_channels, out_channels, 
                                 kernel_size=kernel_size, stride=stride, padding=padding))
    else:
      lay.append(torch.nn.ConvTranspose2d(in_channels, out_channels, 
                                          kernel_size=kernel_size, stride=stride, padding=padding))
    
    if batch_norm==True:
      lay.append(torch.nn.BatchNorm2d(out_channels))
    
    if drop == True:
      lay.append(torch.nn.Dropout(0.5)) 
   
    if activation == "leakyReLU":
      lay.append(torch.nn.LeakyReLU(0.2))
    elif activation == "reLU":
      lay.append(torch.nn.ReLU())
    elif activation == "sigmoid":
      lay.append(torch.nn.Sigmoid())
    
    self.layer = torch.nn.Sequential(*lay)


  def forward(self,data):
      return self.layer(data)

In [10]:
sigmoida = torch.nn.Sigmoid()

class Net_encoder_decoder(torch.nn.Module):
  def __init__(self,
               k_s =3,    #kernel_size
               strid=1,   #stride
               pad=1,     #padding
               canal = 32):   #channals
    super(Net_encoder_decoder, self).__init__()

    d = canal
    self.encoder_0 = Layer(in_channels = 3, out_channels = d, 
                           kernel_size=k_s-1, stride=strid-1, padding=pad,
                           flag = "encoder", batch_norm = True, drop = False, activation = "leakyReLU")
    
    self.encoder_1 = Layer(in_channels = d, out_channels = 2*d,
                           kernel_size=k_s, stride=strid, padding=pad,
                           flag = "encoder", batch_norm = True, drop = False, activation = "leakyReLU")
    
    self.encoder_1_p = Layer(in_channels = 2*d, out_channels = 4*d,
                           kernel_size=k_s-1, stride=strid-1, padding=pad,
                           flag = "encoder", batch_norm = True, drop = False, activation = "leakyReLU")
    
    self.encoder_2 = Layer(in_channels = 4*d, out_channels = 2*d,
                           kernel_size=k_s, stride=strid, padding=pad,
                           flag = "encoder", batch_norm = True, drop = False, activation = "leakyReLU")
    
    self.encoder_2_p = Layer(in_channels = 2*d, out_channels = 6,
                           kernel_size=k_s-1, stride=strid-1, padding=pad,
                           flag = "encoder", batch_norm = True, drop = False, activation = "leakyReLU")
    
    

    
    self.decoder_3 = Layer(in_channels = 6, out_channels = d,
                           kernel_size=k_s-1, stride=strid-1, padding=pad,
                           flag = "decoder", batch_norm = True, drop = False, activation = "reLU")
    
    self.decoder_4 = Layer(in_channels = d, out_channels = 2*d,
                           kernel_size=k_s, stride=strid, padding=pad,
                           flag = "decoder", batch_norm = True, drop = False, activation = "reLU")
    
    self.decoder_5 = Layer(in_channels = 2*d, out_channels = d,
                           kernel_size=k_s, stride=strid, padding=pad,
                           flag = "decoder", batch_norm = True, drop = False, activation = "reLU")
    
    self.decoder_6 = Layer(in_channels = d, out_channels = 3,
                           kernel_size=k_s-1, stride=strid-1, padding=pad,
                           flag = "decoder", batch_norm = True, drop = False, activation = "sigmoida")



  def forward(self, x):
    x = self.encoder_0(x)
    x = self.encoder_1(x)
    x = self.encoder_1_p(x)
    x = self.encoder_2(x)
    x = self.encoder_2_p(x)
    
    x = self.decoder_3(x)
    x = self.decoder_4(x)
    x = self.decoder_5(x)
    x = self.decoder_6(x)

    return(x)


  def encoded_1(self,x):
    x = self.encoder_0(x)
    x = self.encoder_1(x)
    x = self.encoder_1_p(x)
    x = self.encoder_2(x)
    x = self.encoder_2_p(x)
    return(x)



Инициализация весов модели

In [11]:
def init_weight(x):
  if type(x) == torch.nn.Conv2d or type(x) == torch.nn.ConvTranspose2d:
    torch.nn.init.xavier_uniform(x.weight, gain = 0.95)

### L1 - регуляризация 

In [12]:
def reg_L1(model):
  s=-1;
  for name, param in model.named_parameters():
    s+=1
    if s==0:
      L1 = torch.sum(torch.abs(param))
    else:
      L1 += torch.sum(torch.abs(param))
  
  return(L1)

### Функция потерь

In [13]:
class My_Loss(torch.nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(My_Loss, self).__init__()

    def forward(self, preds, input, type_loss):

      if type_loss == "MSE":
        func_loss = torch.nn.MSELoss()
        res = func_loss(preds, input) 
      elif type_loss == 'MAE':
        func_loss = torch.nn.L1Loss()
        res = func_loss(preds, input)
      elif type_loss == 'MAE+MSE':
        func_loss_1 = torch.nn.L1Loss()
        func_loss = torch.nn.MSELoss()
        res = func_loss(preds, input) + func_loss_1(preds, input) 


      return(res)

### Класс-тренер:

А) элементы:
1. модель
2. оптимизатор
3. планировщик
4. функция потерь
5. история значений функции потерь по эпохам на обучении
6. история значений функции потерь по эпохам на валидации
7. лучшая модель
8. тип архитектуры нейронной сети
9. коэффициент при L1-регуляризации
10. коэффициент при L2-регуляризации
11. количество каналов(НОК)
12. параметр для планировщика
13. начальное значение шага обучения
14. размер ядра свёртки
15. шаг при свёртке
16. размер паддинга
17. вид датасета
18. номер модели

Б) функции:

1. init, принимающий dict, описывающий конфигурацию модели, оптимизатора и планировщика, инициализация модели, инициализация оптимизатора и планировщика
2. обучение на 1 батче
3. валидация на 1 батче
4. тест на 1 батче
5. переход с следующей эпохе: сохраняем значение функции потерь, лучшую модель
6. отрисовка кривых обучения на обучении и валидации


In [22]:
class Trainer(object):
  def __init__(self, parametr):
    
    self.model = Net_encoder_decoder(k_s =parametr['k_size'],    #kernel_size
                                     strid=parametr['stride'],   #stride
                                     pad=parametr['padd'],       #padding
                                     canal = parametr['chanal']) #channals
    self.model.apply(init_weight)

    for param in self.model.parameters():
      param.requires_grad = True
    
    if parametr['optim'] == "Adam":
      self.optimizer = torch.optim.Adam((self.model).parameters(), lr=parametr['learn_rate'],
                                        weight_decay = parametr['lambda_L2'])
    elif parametr['optim'] == "RAdam":
      self.optimizer = optim.RAdam((self.model).parameters(), lr=parametr['learn_rate'],
                                   weight_decay = parametr['lambda_L2'])
    
    if parametr['sheduler'] ==True:
      self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(self.optimizer, patience=parametr['shed_patience'], 
                                                                  factor=0.5, verbose=True)
    

    self.lambda_L1 = parametr['lambda_L1']

    self.lambda_L2 = parametr['lambda_L2']

    self.learn_rate = parametr['learn_rate']

    self.shed_patience = parametr['shed_patience']

    self.optimizer_type = parametr['optim']

    self.kernel_size = parametr['k_size']

    self.stride = parametr['stride']

    self.padding = parametr['padd']

    self.channals = parametr['chanal']

    self.type_data = parametr['type_data']

    self.arсhitecture = parametr['arсhitecture']

    self.loss = parametr['loss']

    self.train_loss =[]
    
    self.valid_loss =[]

    self.train_loss_batch =[]
    
    self.valid_loss_batch =[]

    self.test_loss_batch =[]

    self.best_model = copy.deepcopy(self.model)

    self.best_val_loss = float('inf')

    self.index = parametr['index'] ## идентификатор модели


  def train(self,data,device):
    loss_fun = My_Loss() 
    
    self.model.train()
    train_running_loss = 0
    
    input = data

    preds = (self.model).forward(input)
    loss_value = loss_fun(preds,input, self.loss) + self.lambda_L1*reg_L1(self.model)

    self.optimizer.zero_grad()
    loss_value.backward()
    self.optimizer.step()

    train_running_loss = loss_value.item()
    
    self.train_loss_batch.append(train_running_loss)


  def valid(self,data, device):

    loss_fun = My_Loss() 

    self.model.eval()
    valid_running_loss = 0
    
    with torch.no_grad():
      input = data
      preds = (self.model).forward(input)
      loss_value = loss_fun(preds,input, self.loss) + self.lambda_L1*reg_L1(self.model)
      valid_running_loss = loss_value.item()
        
    self.valid_loss_batch.append(valid_running_loss)

  def jump(self,epoch):  ## переход с следующей эпохе: сохраняем значение функции потерь, лучшую модель
    train_epoch_loss = sum(self.train_loss_batch) / len(self.train_loss_batch)
    self.train_loss.append(train_epoch_loss)
    output('Epoch '+str(epoch)+' for model' +str(self.index) + ' train loss='+str(train_epoch_loss))

    valid_epoch_loss = sum(self.valid_loss_batch) / len(self.valid_loss_batch)
    self.valid_loss.append(valid_epoch_loss)

    self.valid_loss_batch =[]
    self.test_loss_batch =[]
    
    if valid_epoch_loss < self.best_val_loss:
      self.best_val_loss = valid_epoch_loss
      self.best_model = copy.deepcopy(self.model)

      output('Epoch '+str(epoch)+' Новая лучшая модель '+str(self.index) +'! loss='+str(valid_epoch_loss))

      torch.save(self.model, 
                 os.path.join(path_result, "e0_e3_d3_d6__{}.pth".format(self.index)))
      
      pickle.dump(self.scheduler.state_dict(),
                  open(os.path.join(path_result,"schedular_dict_e0_e3_d3_d6__{}.p".format(self.index)),
                       "wb"))
    else:
      output('Epoch '+str(epoch)+'for model'+str(self.index) +'valid loss='+str(valid_epoch_loss))

    self.scheduler.step(valid_epoch_loss)

  def plot_res(self,epoches):
    plt.plot(range(epoches), self.train_loss,'b', label = 'train')
    plt.plot(range(epoches), self.valid_loss,'r', label = 'valid')
    plt.legend()
    plt.grid()
    plt.savefig(os.path.join(path_result,"e0_e3_d3_d6__{}__loss.png".format(self.index)))
    plt.show()

  def tested(self,data, device,k):

    loss_fun = My_Loss() 

    self.best_model.eval()
    test_running_loss = 0

    with torch.no_grad():
      input = data

      preds = self.best_model.forward(input)
      loss_value = loss_fun(preds,input, self.loss) + self.lambda_L1*reg_L1(self.model)
      test_running_loss = loss_value.item()

      unloader = transforms.ToPILImage() 
      plt.figure(figsize = (12,5))

      plt.subplot(1,2,1)
      plt.imshow(unloader(input.reshape(3,512,512)))
      plt.title("In")

      plt.subplot(1,2,2)
      plt.imshow(unloader(preds.reshape(3,512,512)))
      plt.title("Out")
       
      if k==0 or k==40:
        plt.savefig(os.path.join(path_result,"e0_e3_d3_d6__{}__res__{}.png".format(self.index,k)),bbox_inches="tight")
      if k<100:
        plt.show()
    self.test_loss_batch.append(test_running_loss)

### Создание датасетов 

In [24]:
train_dataset = DogDataset(path,0)
valid_dataset = DogDataset(path,1)
test_dataset = DogDataset(path,2)

Количество файлов: 9000
Количество файлов: 3000
Количество файлов: 2630


### Задание параметров модели

In [25]:
df_Info = pd.read_csv(os.path.join(path_result,"INFO_e0_e3_d3_d6.csv"), header = 0, index_col = 0) 

In [26]:
epoches = 50
batch_size = 20

parametr_1 = {'index': df_Info.shape[0]+1, 'k_size': 4, 'stride':2, 'padd':1, 'chanal': 32, 'optim' : "RAdam", 
              'lambda_L1': 0.00001, 'lambda_L2' : 0.0001,  'learn_rate': 1.0e-3, 'sheduler':True, 'shed_patience': 5,
              'arсhitecture': '3---6', 'loss':"MSE","type_data": "dogs_cats_wild"}

model_1 = Trainer(parametr_1)

models = [model_1]

In [27]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [28]:
for m in models:
  m.model = m.model.to(device)

### Обучение модели

In [23]:
def train_epoch(models, train_dataset,valid_dataset, num_epochs, batch_size, device):

  train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, drop_last=True)
  
  valid_loader = torch.utils.data.DataLoader(
    valid_dataset, batch_size=batch_size, shuffle=False, num_workers=4, drop_last=True)
  
  output('Start train '+str(num_epochs)+' epochs')

  for epoch in (tqdm(range(num_epochs)) if opt.colab else range(num_epochs)):
    for input in train_loader:
      input = input.to(device)

      for m in models:
        m.train(input, device)
    for input in valid_loader:
      input = input.to(device)

      for m in models:
        m.valid(input, device)

    for m in models:
      m.jump(epoch)

In [None]:
train_epoch(models, train_dataset,valid_dataset, epoches, batch_size, device)

### Кривая обучения

In [None]:
model_1.plot_res(epoches)

### Тестирование модели

In [32]:
test_loader = torch.utils.data.DataLoader(
    test_dataset, batch_size=1, shuffle=False, num_workers=4, drop_last=True)

In [33]:
def pred(model, loader,device, s):

  k=0;
  for input in loader:
    input = input.to(device)
    model.tested(input, device,k)
    k+=1
    if k>100:
      break
  
  return(sum(model.test_loss_batch) / len(loader))

In [34]:
for i in range(len(models)):
  models[i].model = torch.load(os.path.join(path_result, "e0_e3_d3_d6__{}.pth".format(models[i].index)))

In [37]:
def save_param(model,epoches, batch_size):
    df_Info = pd.read_csv(os.path.join(path_result,"INFO_e0_e3_d3_d6.csv"), header = 0, index_col = 0)

    test_loss = pred(model, test_loader,device, [0,40])
    
    last_lr_1 = (pickle.load(open(os.path.join(path_result,"schedular_dict_e0_e3_d3_d6__{}.p".format(model.index)),
                                  "rb")))['_last_lr'][0]

    df_Info.loc[model.index] = [epoches, batch_size, model.lambda_L1, model.lambda_L2, model.learn_rate, last_lr_1,
                                test_loss, model.shed_patience, model.optimizer_type, model.kernel_size,
                                model.stride, model.padding, model.channals, model.arсhitecture, model.loss, 
                                model.type_data]

    df_Info.to_csv(os.path.join(path_result,"INFO_e0_e3_d3_d6.csv"))

In [None]:
save_param(model_1,epoches, batch_size)

### Сжатие 1 картинки

In [40]:
def pred_from_model(model, loader,device):

  k=0;
  for input in loader:
    input = input.to(device)
    pred_for_1_foto(model,input, device)
    k+=1
    if k>100:
      break
  
  return(sum(model.test_loss_batch) / len(loader))

In [41]:
def pred_for_1_foto(model,input, device):
    
    trans = transforms.Compose([transforms.Resize((512, 512)), transforms.ToTensor()])
    
    input = trans(Image.open(input)) # тензор размером [3,512,512], где значения от 0 до 
    
    print(input.shape)

    input = input.reshape(1,3,512,512)

    input = input.to(device)
    
    with torch.no_grad():

      for i in range(len(model)):
        if i ==0:
          preds = model[i].forward(input)
        else:
          preds += model[i].forward(input)

      preds /= len(model)
      
      unloader = transforms.ToPILImage() 
      plt.figure(figsize = (12,5))

      plt.subplot(1,2,1)
      plt.imshow(unloader(input.reshape(3,512,512)))
      plt.title("In")
      
      plt.subplot(1,2,2)
      plt.imshow(unloader(preds.reshape(3,512,512)))
      plt.title("Out")
      plt.show()

In [None]:
index_model_1_foto = [51]
model_for_1_foto = []
for i in index_model_1_foto:
  model_for_1_foto.append(torch.load(os.path.join(path_result, "e0_e3_d3_d6__{}.pth".format(i))))
pred_for_1_foto(model_for_1_foto,"./1.jpg", device)


### Сжатие видео

In [None]:
import cv2

In [None]:
!mkdir new_foto_4

In [None]:
score = []

def pred_for_foto(model,input, device,s):
    
    trans = transforms.Compose([transforms.Resize((512, 512)), transforms.ToTensor()])
    
    input = trans(Image.open(input)) # тензор размером [3,512,512], где значения от 0 до 
    
    input = input.reshape(1,3,512,512)

    input = input.to(device)
    
    with torch.no_grad():

      preds = model.forward(input)

      func_loss = torch.nn.MSELoss()
      score.append(func_loss(preds, input))

      unloader = transforms.ToPILImage() 
      plt.figure(figsize = (12,5))

      plt.subplot(1,2,1)
      plt.imshow(unloader(input.reshape(3,512,512)))
      plt.title("In")
      
      plt.subplot(1,2,2)
      plt.imshow(unloader(preds.reshape(3,512,512)))
      plt.title("Out")
      plt.savefig("/content/new_foto_4/new_foto__{}.png".format(s),bbox_inches="tight")


In [None]:
# Читать видео по указанному пути

cam = cv2.VideoCapture("./1.mp4")


try:

    # создание папки с именем data

    if not os.path.exists('/content/foto_4'):

        os.makedirs('foto_4')

# если не создано, то выдайте ошибку

except OSError:
    print ('Error: Creating directory of data')

  
# Рамка

currentframe = 1000

  

while(True):
    # чтение из кадра

    ret,frame = cam.read()

    if ret:

        # если видео еще осталось, продолжайте создавать изображения

        name = '/content/foto_4/frame' + str(currentframe) + '.jpg'

        print ('Creating...' + name)

        # запись извлеченных изображений

        cv2.imwrite(name, frame)

  

        # увеличение счетчика, чтобы оно было

        # показать, сколько кадров создано

        currentframe += 1

    else:

        break

  
# Освободить все пространство и окна, как только сделано
cam.release()
cv2.destroyAllWindows()

In [None]:
index_model_1_foto = 51
model_for_1_foto = torch.load(os.path.join(path_result, "e0_e3_d3_d6__{}.pth".format(index_model_1_foto)))

mas = sorted(Path("/content/foto_4").glob('*.jpg'))
s=1000
for im in tqdm(mas):
  pred_for_foto(model_for_1_foto,im, device,s)
  s+=1

In [None]:
print(len(score))
print("Точность сжатия-разжатия: ", (1-torch.mean(torch.tensor(score)))*100)

In [None]:
import imageio
fileList = []
filenames = sorted(Path("/content/new_foto_6").glob('*.png'))
for file in filenames:
    fileList.append(file)

writer = imageio.get_writer('/content/3.mp4', fps=20)

for im in fileList:
    writer.append_data(imageio.imread(im))
writer.close()