In [1]:
# Загружаем необходимые библиотеки 
# Библиотека для работы с функциями операционной системы
import os

# Даталоадер
from torch.utils.data import DataLoader
from torch.utils.data import Subset

# Загрузка изображений 
from torchvision.datasets import ImageFolder
# Функции трансформации изображений, сохранения и компоновки 
import torchvision.transforms as tt
from torchvision.utils import save_image
from torchvision.utils import make_grid

# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import lr_scheduler
from torch.optim.lr_scheduler import StepLR
from torch.utils.tensorboard import SummaryWriter

# Общего применения 
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
sns.set(style='darkgrid', font_scale=1.2)
import itertools
from tqdm import tqdm, tqdm_notebook
from IPython.display import clear_output

# from torch.autograd import Variable
# import cv2


In [2]:
# Смотрим видеокарты установленные в системе 
for i in range(torch.cuda.device_count()):
   print(torch.cuda.get_device_properties(i).name)

NVIDIA GeForce RTX 3060
NVIDIA GeForce GTX 1050 Ti


In [3]:
#  Определяемся с устройством 
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
device

device(type='cuda')

In [4]:
# Параметры для обработки изображения
image_size = 256
# Константы нормализации изображения 
stats = (0.5, 0.5, 0.5), (0.5, 0.5, 0.5)

In [5]:
# Денормализация для вывода изображения, в генераторе на выходе гиперболтческий тангенс 
# Денормализация отобразит его выход [-1,1] в [0,1] для вывода
def denorm(img_tensors):
    return torch.clamp(img_tensors * stats[1][0] + stats[0][0],0,1)

In [6]:
# Функция вывода изображения
def show_images(images, nmax=4, title=''):
    fig, ax = plt.subplots(figsize=(6, 6))
    ax.set_xticks([])
    ax.set_yticks([])
    ax.set_title(title)
    ax.imshow(make_grid(denorm(images.detach()[:nmax]), nrow=2).permute(1, 2, 0))

In [7]:
# загрузка состояния модели и птимизатора
def load_state(path, model, optimizer):
    checkpoint = torch.load(path)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    model.train()

In [8]:
# Блок Реснет 
class ResNetBlock(nn.Module):
    def __init__(self, channels):
        super(ResNetBlock, self).__init__()
        self.resnet  = nn.Sequential(
               
            nn.ReflectionPad2d(1),
            nn.Conv2d(in_channels=channels, out_channels=channels, kernel_size=3, bias = True),
            nn.InstanceNorm2d(channels),
            nn.Dropout(0.5),
            nn.ReLU(inplace=True), 
            
            nn.ReflectionPad2d(1),
            nn.Conv2d(in_channels=channels, out_channels=channels, kernel_size=3, bias = True),
            nn.InstanceNorm2d(channels),

        )
        
    def forward(self, x):
        return x + self.resnet(x)

In [9]:
# Генератор
class GeneratorResNet(nn.Module):
    def __init__(self):
        super(GeneratorResNet, self).__init__()
        # базовое количество карт
        self.chanel = 64
        
        # Сеть энкодера  
        
        self.input = nn.Sequential(
            nn.ReflectionPad2d(3),
            nn.Conv2d(in_channels=3, out_channels=self.chanel, kernel_size=7, stride=1, padding=0, bias = True),
            nn.ReLU(inplace=True)  
        ) #256 -> 256
        
        self.down1 = nn.Sequential(
            nn.Conv2d(in_channels=self.chanel, out_channels=self.chanel*2, kernel_size=4, stride=2, padding=1, bias = True),
            nn.InstanceNorm2d(self.chanel*2),
            nn.ReLU(inplace=True) 
        ) #256 -> 128 
        
        self.down2 = nn.Sequential(
            nn.Conv2d(in_channels=self.chanel*2, out_channels=self.chanel*4, kernel_size=4, stride=2, padding=1, bias = True),
            nn.InstanceNorm2d(self.chanel*4),
            nn.ReLU(inplace=True)    
        ) #128 -> 64
        
      
        self.resnet1 = ResNetBlock(self.chanel*4)
        self.resnet2 = ResNetBlock(self.chanel*4)
        self.resnet3 = ResNetBlock(self.chanel*4)
        self.resnet4 = ResNetBlock(self.chanel*4)
        self.resnet5 = ResNetBlock(self.chanel*4)
        self.resnet6 = ResNetBlock(self.chanel*4)
        self.resnet7 = ResNetBlock(self.chanel*4)
        self.resnet8 = ResNetBlock(self.chanel*4)
        self.resnet9 = ResNetBlock(self.chanel*4)
        
        
         # Сеть декодера 
        
        self.up1 = nn.Sequential(
            nn.ConvTranspose2d(in_channels=self.chanel*4, out_channels=self.chanel*2, kernel_size=4, stride=2, padding=1, bias = True),
            nn.InstanceNorm2d(self.chanel*2),
            nn.ReLU(inplace=True)  
       
        ) # 64 -> 128
        
        self.up2 = nn.Sequential(
            nn.ConvTranspose2d(in_channels=self.chanel*2, out_channels=self.chanel, kernel_size=4, stride=2, padding=1, bias = True),
            nn.InstanceNorm2d(self.chanel),
            nn.ReLU(inplace=True)  
        ) # 128 -> 256
        
        
        
        self.out = nn.Sequential(
            nn.ReflectionPad2d(3),
            nn.Conv2d(in_channels=self.chanel, out_channels=3, kernel_size=7, stride=1, padding=0, bias = True),  
            nn.Tanh()     
        )
        
    
    def forward(self, x):
        
        x = self.input(x)   # 256x256
        x = self.down1(x)   # 128x128
        x = self.down2(x)   # 64x64
      
        x = self.resnet1(x)
        x = self.resnet2(x)
        x = self.resnet3(x)
        x = self.resnet4(x)
        x = self.resnet5(x)
        x = self.resnet6(x)
        x = self.resnet7(x)
        x = self.resnet8(x)
        x = self.resnet9(x) # 64x64
        
        
        x = self.up1(x)  # 128x128
        x = self.up2(x)  # 256x256

        return self.out(x)

In [10]:
# Создаем генераторы  и тестируем 
generator_AB = GeneratorResNet()
generator_BA = GeneratorResNet()
generator_AB = generator_AB.to(device)
generator_BA = generator_BA.to(device)

In [11]:
opt_g = torch.optim.Adam(itertools.chain(generator_AB.parameters(), generator_BA.parameters()), lr=2e-4, betas=(0.5, 0.999))

In [12]:
# Загрузка модели
load_state('/home/master/dls/CycleGAN_foto2maps_pix_idn_g_BA_epoch_499.tsm', generator_BA, opt_g)
load_state('/home/master/dls/CycleGAN_foto2maps_pix_idn_g_AB_epoch_499.tsm', generator_AB, opt_g)

In [13]:
import torchvision.io as io

In [14]:
import telebot

In [15]:
bot = telebot.TeleBot('')

In [16]:
@bot.message_handler(content_types=['photo'])
def photo(message):
    print('message.photo =', message.photo)
    fileID = message.photo[-1].file_id
    print('fileID =', fileID)
    file_info = bot.get_file(fileID)
    print('file.file_path =', file_info.file_path)
    downloaded_file = bot.download_file(file_info.file_path)
    with open("image.jpg", 'wb') as new_file:
        new_file.write(downloaded_file)
        test_image = io.read_image('/home/master/dls/image.jpg')
    x1 = test_image.shape[1]
    x2 = test_image.shape[2]
    test_image = test_image/255
    test_image =  tt.Normalize(*stats)(test_image)
    test_image = tt.Resize((image_size, image_size))(test_image)
    with torch.no_grad():
        test_result  = generator_BA(test_image.to(device)).to('cpu')
    test_result = tt.Resize((x1, x2),  interpolation=tt.InterpolationMode.BICUBIC)(test_result)
    plt.imsave('/home/master/dls/image_out.jpg', denorm(test_result.detach()).permute(1, 2, 0).numpy())
    print('Transform: OK')
    file = open('/home/master/dls/image_out.jpg', 'rb')
    bot.send_photo(message.chat.id, file)
    print('Send: OK')

In [None]:
bot.polling(none_stop=True, interval=0)

message.photo = [<telebot.types.PhotoSize object at 0x7eff493d1400>, <telebot.types.PhotoSize object at 0x7eff493d1580>, <telebot.types.PhotoSize object at 0x7eff493d16a0>, <telebot.types.PhotoSize object at 0x7eff493d18b0>]
fileID = AgACAgIAAxkBAAO5ZKUPE3cJ303b6OckLK9CXvdBCMIAAijGMRsGfylJFUiYXHb3Mz4BAAMCAAN5AAMvBA
file.file_path = photos/file_72.jpg




Transform: OK
Send: OK
