In [1]:
!pip install pyTelegramBotAPI



In [2]:
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import matplotlib.pyplot as plt
import torchvision.transforms as transforms
import torchvision.models as models

import time
import copy

from pathlib import Path
from scipy import ndimage
import numpy as np
from io import BytesIO

In [3]:
class StyleTransfer(nn.Module):
  def __init__(self):
    super(StyleTransfer, self).__init__()
    self.imsize = 256  

    self.loader = transforms.Compose([ #проводим стандартную подготовку изображения
        transforms.Resize(self.imsize),
        transforms.CenterCrop(self.imsize),
        transforms.ToTensor()])

    self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    self.cnn = models.vgg19(pretrained=True).features.to(self.device).eval()# vgg19 показывает себя лучше других сетей
    self.unloader = transforms.ToPILImage()

    # зададим слои
    self.content_layers = ['conv_4']
    self.style_layers = ['conv_1', 'conv_2', 'conv_3', 'conv_4', 'conv_5']
    # параметры нормализации вынесли за пределы transforms дял наглядности
    self.normalization_mean = torch.tensor([0.485, 0.456, 0.406]).to(self.device)
    self.normalization_std = torch.tensor([0.229, 0.224, 0.225]).to(self.device)
    self.normalization = Normalization(self.normalization_mean, self.normalization_std).to(self.device)

  def start_train(self, content_img, style_img, num_steps=4000,
            style_weight=20000, content_weight=1):
      """В данной функции реализован цикл переноса стиля"""
      print('Начинаем построение модели')
      style_img = self.image_loader(style_img)
      content_img = self.image_loader(content_img)
      input_img = content_img.clone()
      model, style_losses, content_losses = self.get_style_model_and_losses(self.cnn,
          self.normalization_mean, self.normalization_std, style_img, content_img, self.content_layers, self.style_layers)
      optimizer = self.get_input_optimizer(input_img)

      print('Поехали..')
      run = [0]
      while run[0] <= num_steps:

          def closure():

              input_img.data.clamp_(0, 1)
              optimizer.zero_grad()
              model(input_img)

              style_score = 0
              content_score = 0

              for sl in style_losses:
                  style_score += sl.loss
              for cl in content_losses:
                  content_score += cl.loss
              
              style_score *= style_weight
              content_score *= content_weight

              loss = style_score + content_score
              loss.backward()

              run[0] += 1
              # функция для отладки
              if run[0] % 500 == 0:
                  print("run {}:".format(run))
                  print('Style Loss : {:4f} Content Loss: {:4f}'.format(
                      style_score.item(), content_score.item()))
                  print()

              return style_score + content_score

          optimizer.step(closure)

      input_img.data.clamp_(0, 1)

      return input_img

  def get_style_model_and_losses(self, cnn, normalization_mean, normalization_std,
                                  style_img, content_img,
                                  content_layers, style_layers):
      cnn = copy.deepcopy(cnn)
      # нормируем значения
      normalization = Normalization(normalization_mean, normalization_std).to(self.device)

      content_losses = []
      style_losses = []
      model = nn.Sequential(normalization)

      i = 0
      for layer in cnn.children():
          if isinstance(layer, nn.Conv2d):
              i += 1
              name = 'conv_{}'.format(i)
          elif isinstance(layer, nn.ReLU):
              name = 'relu_{}'.format(i)
              layer = nn.ReLU(inplace=False)
          elif isinstance(layer, nn.MaxPool2d):
              name = 'pool_{}'.format(i)
          elif isinstance(layer, nn.BatchNorm2d):
              name = 'bn_{}'.format(i)
          else:
              raise RuntimeError('Такого слоя в сети нет: {}'.format(layer.__class__.__name__))

          model.add_module(name, layer)

          if name in content_layers:
              # определим content loss:
              target = model(content_img).detach()
              content_loss = ContentLoss(target)
              model.add_module("content_loss_{}".format(i), content_loss)
              content_losses.append(content_loss)

          if name in style_layers:
              # определим style loss:
              target_feature = model(style_img).detach()
              style_loss = StyleLoss(target_feature)
              model.add_module("style_loss_{}".format(i), style_loss)
              style_losses.append(style_loss)

      # выбрасываем все уровни после последенего styel loss или content loss
      for i in range(len(model) - 1, -1, -1):
          if isinstance(model[i], ContentLoss) or isinstance(model[i], StyleLoss):
              break

      model = model[:(i + 1)]

      return model, style_losses, content_losses

  def get_input_optimizer(self, input_img):
      # задание оптимайзера
      # как обычно, Adam показывает себя лучше других
      optimizer = optim.Adam([input_img.requires_grad_()], lr=0.003)
      #optimizer = optim.LBFGS([input_img.requires_grad_()]) 
      return optimizer

  def image_loader(self, image_name):
    image = Image.open(image_name)
    image = self.loader(image).unsqueeze(0)
    return image.to(self.device, torch.float)

In [4]:
class ContentLoss(nn.Module):
        def __init__(self, target,):
            super(ContentLoss, self).__init__()

            self.target = target.detach()
            self.loss = F.mse_loss(self.target, self.target )

        def forward(self, input):
            self.loss = F.mse_loss(input, self.target)
            return input


class StyleLoss(nn.Module):
        def __init__(self, target_feature):
            super(StyleLoss, self).__init__()
            self.target = self.gram_matrix(target_feature).detach()
            self.loss = F.mse_loss(self.target, self.target)# to initialize with something

        def forward(self, input):
            G = self.gram_matrix(input)
            self.loss = F.mse_loss(G, self.target)
            return input

        def gram_matrix(self, input):
          #расчет матрицы Грама
          batch_size , h, w, f_map_num = input.size()

          features = input.view(batch_size * h, w * f_map_num)

          G = torch.mm(features, features.t())

          return G.div(batch_size * h * w * f_map_num)


class Normalization(nn.Module):
        def __init__(self, mean, std):
            super(Normalization, self).__init__()
            self.mean = torch.tensor(mean).view(-1, 1, 1)
            self.std = torch.tensor(std).view(-1, 1, 1)

        def forward(self, img):
            return (img - self.mean) / self.std

def unloader(image):
  
    unl = transforms.ToPILImage()
    image = image.cpu().clone()   
    image = image.squeeze(0)      
    image = unl(image)
    return image

In [None]:
import telebot
from telebot import types
import os

bot = telebot.TeleBot('вставьте свой токен')
model = StyleTransfer()

im_name_1 = {} #картинка для обработки
im_name_2 = {} #картинки со стилем
im_name_3 = {} #результат

In [None]:
@bot.message_handler(commands=['start'])
def start_message(message):
    bot.send_message(message.chat.id, 'Привет. Я бот, который переносит стиль с одного изображения на другое.\nВоспользуйтесь командой /help, чтобы узнать, как со мной работать.')

@bot.message_handler(commands=['help'])
def help_message(message):
    bot.send_message(message.chat.id, 'Отправьте мне сначала изображение - контент,\n а затем изображение - стиль..')

@bot.message_handler(func=lambda message: True, content_types=['photo'])
def photo(message):
    user_id = message.chat.id
    fileID = message.photo[-1].file_id
    #если пользователь ещё не отсылал картинок, записываем адрес первой картинки
    if user_id not in im_name_1:
        im_name_1[user_id] = message.photo[-1].file_id       
        bot.send_message(message.chat.id, 'Получили изображение с контентом\nЖдем изображение со стилем')
    else:
    	#если пользователь что-то отослал, записываем адрес второй картинки
        im_name_2[user_id] = message.photo[-1].file_id 
        #хендлер для выбора способа обработки
        user_answer(user_id)

def delete_user(user, n): #удаление отработанных файлов
    #переменную n создал, чтобы в дальнейшем сохранять контент
    #и загружать только изображение со стилем
    if n == 1: #удаляем начиная заново
        if user in im_name_1:
            im_name_1.pop(user) 
    if (n == 1) or (n == 2): #удаляем начиная заново или при смене стиля
        if user in im_name_2:    
            im_name_2.pop(user)


def delete_photo(user, n): #удаление отработанных файлов
    #переменную n создал, чтобы в дальнейшем сохранять контент
    #и загружать только изображение со стилем
    if n == 1: #удаляем начиная заново
        if os.path.exists(str(user)+"image1.jpg"):
            os.remove(str(user)+"image1.jpg") 
    if (n == 1) or (n == 2): #удаляем начиная заново или при смене стиля
        if os.path.exists(str(user)+"image2.jpg"):    
            os.remove(str(user)+"image2.jpg")

     
def user_answer(user_id):
    user = user_id
    with open(str(user)+"image1.jpg", 'wb') as new_file:
        if len(im_name_1[user]) <= 150:
            #при повторном запуске в ячейке может быть картинка, а не id файла
            new_file.write(bot.download_file(bot.get_file(im_name_1[user]).file_path))
        else:
            new_file.write(im_name_1[user])
    with open(str(user)+"image2.jpg", 'wb') as new_file:
        new_file.write(bot.download_file(bot.get_file(im_name_2[user]).file_path))
    bot.send_message(user, 'Начался процесс переноса стиля (может занять несколько минут)\n...')

    output = model.start_train(str(user)+"image1.jpg", str(user)+"image2.jpg")
    output = unloader(output)
    output_stream = BytesIO()
    output.save(output_stream, format='PNG')
    output_stream.seek(0)
    bot.send_photo(user, output_stream, caption='Готово!')

    delete_photo(user, 1) #удаляем все файлы
    delete_user(user, 1) #очищаем память



#if __name__ == '__main__':
#    bot.polling()
while True:
    try:
        bot.polling(none_stop=True, interval=0)
    except: 
        time.sleep(5)