In [1]:
!pip install python-telegram-bot
!pip install -U ipykernel

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models
from scipy import misc
import copy
import numpy as np 

import os
from telegram.ext import Updater, MessageHandler, Filters, CommandHandler, ConversationHandler, CallbackQueryHandler
from telegram.ext.dispatcher import run_async
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
import logging


In [3]:
class ContentLoss(nn.Module):

    def __init__(self, target, ):
        super(ContentLoss, self).__init__()
        self.target = target.detach()  
        self.loss = F.mse_loss(self.target, self.target)  

    def forward(self, input):
        self.loss = F.mse_loss(input, self.target)
        return input

In [4]:
class StyleLoss(nn.Module):
    def __init__(self, target_feature):
        super(StyleLoss, self).__init__()
        self.target = self.gram_matrix(target_feature).detach()
        self.loss = F.mse_loss(self.target, self.target)  

    def forward(self, input):
        G = self.gram_matrix(input)
        self.loss = F.mse_loss(G, self.target)
        return input

    def gram_matrix(self,input):
        batch_size, h, w, f_map_num = input.size()  


        features = input.view(batch_size * h, w * f_map_num)  

        G = torch.mm(features, features.t()) 


        return G.div(batch_size * h * w * f_map_num)

In [5]:
class Normalization(nn.Module):
    def __init__(self, mean, std):
        super(Normalization, self).__init__()
        self.mean = torch.tensor(mean).view(-1, 1, 1)
        self.std = torch.tensor(std).view(-1, 1, 1)

    def forward(self, img):
        return (img - self.mean) / self.std

In [6]:
class StyleTransferModel:
    def __init__(self):
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.cnn = models.vgg19(pretrained=True).features.to(device).eval()
        pass


    def transfer_style(self, content_img_stream, style_img_stream, num_steps=500,
                        style_weight=100000000, content_weight=1):
        content_img = self.process_image(content_img_stream)
        style_img = self.process_image(style_img_stream)
        input_img = content_img.clone().detach()
        """Run the style transfer."""
        print('Building the style transfer model..')
        model, style_losses, content_losses = self.get_style_model_and_losses(style_img,
                                                                         content_img,)
        optimizer = self.get_input_optimizer(input_img)

        print('Optimizing..')
        run = [0]
        while run[0] <= num_steps:

            def closure():

                input_img.data.clamp_(0, 1)

                optimizer.zero_grad()

                model(input_img)

                style_score = 0
                content_score = 0

                for sl in style_losses:
                    style_score += sl.loss
                for cl in content_losses:
                    content_score += cl.loss

                style_score *= style_weight
                content_score *= content_weight

                loss = style_score + content_score
                loss.backward()

                run[0] += 1
                if run[0] % 50 == 0:
                    print("run {}:".format(run))
                    print('Style Loss : {:4f} Content Loss: {:4f}'.format(
                        style_score.item(), content_score.item()))
                    print()

                return style_score + content_score

            optimizer.step(closure)

        input_img.data.clamp_(0, 1)
        image = input_img.cpu().clone().detach()
        unloader = transforms.ToPILImage()
        return unloader(image[0])


    def get_input_optimizer(self, input_img):
        optimizer = optim.LBFGS([input_img.requires_grad_()]) 
        return optimizer


    def process_image(self, img_stream):
        imsize = 720
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(device)
        loader = transforms.Compose([
            transforms.Resize(imsize),  
            transforms.CenterCrop(imsize),
            transforms.ToTensor()])  

        image = Image.open(img_stream)
        image = loader(image).unsqueeze(0)
        return image.to(device, torch.float)

    def get_style_model_and_losses(self, style_img, content_img):
        content_layers = ['conv_4']
        style_layers = ['conv_1', 'conv_2', 'conv_3', 'conv_4', 'conv_5']
        cnn = copy.deepcopy(self.cnn)


        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        normalization_mean = torch.tensor([0.485, 0.456, 0.406]).to(device)
        normalization_std = torch.tensor([0.229, 0.224, 0.225]).to(device)
        normalization = Normalization(normalization_mean, normalization_std).to(device)

        content_losses = []
        style_losses = []


        model = nn.Sequential(normalization)

        i = 0  
        for layer in cnn.children():
            if isinstance(layer, nn.Conv2d):
                i += 1
                name = 'conv_{}'.format(i)
            elif isinstance(layer, nn.ReLU):
                name = 'relu_{}'.format(i)
                layer = nn.ReLU(inplace=False)
            elif isinstance(layer, nn.MaxPool2d):
                name = 'pool_{}'.format(i)
            elif isinstance(layer, nn.BatchNorm2d):
                name = 'bn_{}'.format(i)
            else:
                raise RuntimeError('Unrecognized layer: {}'.format(layer.__class__.__name__))

            model.add_module(name, layer)

            if name in content_layers:
                target = model(content_img).detach()
                content_loss = ContentLoss(target)
                model.add_module("content_loss_{}".format(i), content_loss)
                content_losses.append(content_loss)

            if name in style_layers:
                target_feature = model(style_img).detach()
                style_loss = StyleLoss(target_feature)
                model.add_module("style_loss_{}".format(i), style_loss)
                style_losses.append(style_loss)


        for i in range(len(model) - 1, -1, -1):
            if isinstance(model[i], ContentLoss) or isinstance(model[i], StyleLoss):
                break

        model = model[:(i + 1)]

        return model, style_losses, content_losses

In [7]:
!git clone https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix

fatal: destination path 'pytorch-CycleGAN-and-pix2pix' already exists and is not an empty directory.


In [8]:
os.chdir('/content/pytorch-CycleGAN-and-pix2pix/')

In [9]:
!pip install -r requirements.txt

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [10]:
!bash ./scripts/download_cyclegan_model.sh winter2summer_yosemite

Note: available models are apple2orange, orange2apple, summer2winter_yosemite, winter2summer_yosemite, horse2zebra, zebra2horse, monet2photo, style_monet, style_cezanne, style_ukiyoe, style_vangogh, sat2map, map2sat, cityscapes_photo2label, cityscapes_label2photo, facades_photo2label, facades_label2photo, iphone2dslr_flower
Specified [winter2summer_yosemite]
for details.

--2022-07-24 15:49:51--  http://efrosgans.eecs.berkeley.edu/cyclegan/pretrained_models/winter2summer_yosemite.pth
Resolving efrosgans.eecs.berkeley.edu (efrosgans.eecs.berkeley.edu)... 128.32.244.190
Connecting to efrosgans.eecs.berkeley.edu (efrosgans.eecs.berkeley.edu)|128.32.244.190|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 45575747 (43M)
Saving to: ‘./checkpoints/winter2summer_yosemite_pretrained/latest_net_G.pth’


2022-07-24 15:49:52 (67.3 MB/s) - ‘./checkpoints/winter2summer_yosemite_pretrained/latest_net_G.pth’ saved [45575747/45575747]



In [11]:
from io import BytesIO
import subprocess
import shutil

In [12]:
model = StyleTransferModel()
first_image_file = {}

  f"The parameter '{pretrained_param}' is deprecated since 0.13 and will be removed in 0.15, "


In [13]:
def send_prediction_on_photo(update, context):
    chat_id = update.message.chat_id
    print("Got image from {}".format(chat_id))
    

    image_file = update.message.photo[-1].get_file()

    if chat_id in first_image_file:
        update.message.reply_text('Принял вторую фотографию. Ожидайте несколько минут')
        content_image_stream = BytesIO()
        first_image_file[chat_id].download(out=content_image_stream)
        del first_image_file[chat_id]

        style_image_stream = BytesIO()
        image_file.download(out=style_image_stream)
        del image_file

        output = model.transfer_style(content_image_stream, style_image_stream)


        output_stream = BytesIO()
        output.save(output_stream, format='PNG')
        output_stream.seek(0)
        update.message.reply_photo(photo=output_stream)
        print("Sent Photo to user")
        update.message.reply_text('Пока! Приятно было пообщаться, если захочешь снова поменять стиль, напиши "/start".')
        return ConversationHandler.END
    else:
        update.message.reply_text('Принял первое фото')
        first_image_file[chat_id] = image_file
        return PHOTO_1
    
    

In [14]:
def send_prediction_on_photo_CycleGAN(update, context):
    os.chdir('/content/pytorch-CycleGAN-and-pix2pix/')
    chat_id = update.message.chat_id
    print("Got image from {}".format(chat_id))
    update.message.reply_text('Принял зимнюю фотографию, подожди минутку')

    os.mkdir(str(chat_id))

    image_file = update.message.photo[-1].get_file()
    first_image_file[chat_id] = image_file
    
    image_path = str(chat_id)
    first_image_file[chat_id].download(image_path+'/image.png')
    del first_image_file[chat_id]

    subprocess.call(['python','/content/pytorch-CycleGAN-and-pix2pix/test.py',
    '--dataroot',image_path,'--name','winter2summer_yosemite_pretrained','--model','test',
    '--no_dropout','--results','/content/'+image_path,'--load_size','720','--crop_size','720'])

    shutil.rmtree('/content/pytorch-CycleGAN-and-pix2pix/' + image_path)
    
    output = Image.open('/content/'+image_path+ '/winter2summer_yosemite_pretrained/test_latest/images/image_fake.png')

    output_stream = BytesIO()
    output.save(output_stream, format='PNG')
    output_stream.seek(0)
    update.message.reply_photo(photo=output_stream)

    shutil.rmtree('/content/' +image_path)
    print("Sent Photo to user")
    update.message.reply_text('Пока! Приятно было пообщаться, если захочешь снова поменять стиль, напиши "/start".')
    return ConversationHandler.END

In [15]:
def start(update, context):
    """Send a message when the command /start is issued."""
    update.message.reply_text('Напиши мне "/style", и отправь две картинки .А я перенесу стиль второй картинки на первую')
    update.message.reply_text('Напиши мне "/summer", и отправь одну зимнюю картинку. Я сделаю её теплее и превращу зиму в лето!)')
    update.message.reply_text('Остановить бота - "/stop"')
    return NEURON_CHOISE

In [16]:
def cancel(update, context):
    update.message.reply_text('Пока! Приятно было пообщаться, если захочешь снова поменять стиль, напиши "/start".')

    return ConversationHandler.END

In [17]:
NEURON_CHOISE,PHOTO_1,PHOTO_2 = range(3)

In [18]:
if __name__ == '__main__':

    token = "5545169028:AAHBwjySmQu3l2QBkL_jaSmDJoC2jH0gS60"
    # Включим самый базовый логгинг, чтобы видеть сообщения об ошибках
    logging.basicConfig(
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        level=logging.INFO)
    
    updater = Updater(token,use_context=True)
    dp = updater.dispatcher

    conv_handler = ConversationHandler(
        entry_points=[CommandHandler('start', start)],

        states={
            NEURON_CHOISE: [CommandHandler('style', lambda x,y: PHOTO_1),CommandHandler('summer', lambda x,y: PHOTO_2)],
            PHOTO_1: [MessageHandler(Filters.photo, send_prediction_on_photo),],
            PHOTO_2: [MessageHandler(Filters.photo, send_prediction_on_photo_CycleGAN),],
            
        },

        fallbacks=[CommandHandler('stop', cancel)]
    )

    start_button = InlineKeyboardButton('start')
    InlineKeyboardMarkup(start_button)
    
    dp.add_handler(conv_handler)
    run_async(conv_handler)
    updater.start_polling()

    updater.idle()

2022-07-24 15:49:56,335 - apscheduler.scheduler - INFO - Scheduler started


Got image from 141412337
----------------- Options ---------------
             aspect_ratio: 1.0                           
               batch_size: 1                             
          checkpoints_dir: ./checkpoints                 
                crop_size: 720                           	[default: 256]
                 dataroot: 141412337                     	[default: None]
             dataset_mode: single                        
                direction: AtoB                          
          display_winsize: 256                           
                    epoch: latest                        
                     eval: False                         
                  gpu_ids: 0                             
                init_gain: 0.02                          
                init_type: normal                        
                 input_nc: 3                             
                  isTrain: False                         	[default: None]
                

2022-07-24 15:50:59,714 - telegram.ext.updater - INFO - Received signal 2 (SIGINT), stopping...
2022-07-24 15:50:59,716 - apscheduler.scheduler - INFO - Scheduler has been shut down
