<a href="https://colab.research.google.com/github/wakamenori/txt2imghd-colab/blob/master/txt2imghd.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#@title Check GPU

!ln -sf /opt/bin/nvidia-smi /usr/bin/nvidia-smi
!pip install -qq gputil psutil humanize
import psutil
import humanize
import os
from IPython.display import clear_output
import GPUtil as GPU

GPUs = GPU.getGPUs()
# XXX: only one GPU on Colab and isn’t guaranteed
gpu = GPUs[0]
def printm():
    process = psutil.Process(os.getpid())
    print("Gen RAM Free: " + humanize.naturalsize( psutil.virtual_memory().available ), " | Proc size: " + humanize.naturalsize( process.memory_info().rss))
    print("GPU RAM Free: {0:.0f}MB | Used: {1:.0f}MB | Util {2:3.0f}% | Total {3:.0f}MB".format(gpu.memoryFree, gpu.memoryUsed, gpu.memoryUtil*100, gpu.memoryTotal))
clear_output()
!nvidia-smi -L
print()
printm()

GPU 0: Tesla P100-PCIE-16GB (UUID: GPU-d17781ce-e905-a2a0-64fe-8fd1b62eb0e2)

Gen RAM Free: 25.7 GB  | Proc size: 95.6 MB
GPU RAM Free: 16280MB | Used: 0MB | Util   0% | Total 16280MB


# README

## Introduction
This notebook is basically a Colab version of [txt2imghd](https://github.com/jquesnelle/txt2imghd) and also NSFW disabled.  
I take some codes from the original repo and made it work on Colab. It's easy for people who has no coding background to use it.

`txt2imghd` generates high-resolution images using txt2img and img2img.
> txt2imghd is a port of the GOBIG mode from progrockdiffusion applied to Stable Diffusion, with Real-ESRGAN as the upscaler.  
> It creates detailed, higher-resolution images by first generating an image from a prompt, upscaling it, and then running img2img on smaller pieces of the upscaled image, and blending the result back into the original image.  

(quoted from [original repo](https://github.com/jquesnelle/txt2imghd))

## How it works
1. Generate an image from a prompt using txt2img. Or load a image file.
2. Scale up the image using Real-ESRGAN.
3. Run img2img on smaller pieces of the up-scaled image.
4. Blend the result back into the upscaled image.

### Detailed explanation
#### Step.1 Generate an image from a prompt using txt2img
> Let's say you generate an image in pixel size of 512x512 using txt2img.
![Original image](https://github.com/wakamenori/txt2imghd-colab/blob/master/gallery/Original.png?raw=true)

#### Step.2 Scale up the image using Real-ESRGAN
> Scale up the image to 1024x1024.
![up-scaled](https://github.com/wakamenori/txt2imghd-colab/blob/master/gallery/up-scaled_2x.png?raw=true)

#### Step.3 Run img2img on smaller pieces of the up-scaled image
> Chop up-scaled image into slices in pixel size of original image.  
In this example, up-scaled image will be chopped into 9 images in pixel size of 512x512.  
Then run img2img on every small images with the same prompt for original image.  
This process generates more detailed and cleaner images compared to just up scaling the image
![img2img-1](https://github.com/wakamenori/txt2imghd-colab/blob/master/gallery/slice_img2img.png?raw=true)
![img2img-2](https://github.com/wakamenori/txt2imghd-colab/blob/master/gallery/slice_img2img(2).png?raw=true)

#### Step4. Blend the result back into the upscaled image.
> Finally, blend the slices of images back into the upscaled image.  
![txt2imghd](https://github.com/wakamenori/txt2imghd-colab/blob/master/gallery/up-scaled_2x_img2img.png?raw=true)
![txt2imghd_diff](https://github.com/wakamenori/txt2imghd-colab/blob/master/gallery/diff.png?raw=true)
####  Step.3 and Step.4 again
> If you select `SCALEUP_RATIO` 4x or 8x and checked `SCALEUP_STEP_BY_STEP`, Step.3 and Step.4 runs again.  
up scale 1024x1024 image to 2048x2048, and chop up-scalled image into 36 pieces then run img2img on every 36 images, then blend them back

## `CUDA out of memory` try these
Not to run out memory,
- Smaller size of the image
- Lower `SCALEUP_RATIO`
- Uncheck `FP32`
- Uncheck `GFPGAN`

## Credits
Colab
- [NSFW Disabled: NOP & WAS's Stable Diffusion Colab v0.35 (1.4 Weights)](https://colab.research.google.com/drive/1jUwJ0owjigpG-9m6AI_wEStwimisUE17#scrollTo=Ucr5_i21xSjv)

Reddit
- [txt2imghd: Generate high-res images with Stable Diffusion](https://www.reddit.com/r/StableDiffusion/comments/wxm0cf/txt2imghd_generate_highres_images_with_stable/)

Github
- [txt2imghd](https://github.com/jquesnelle/txt2imghd)
- [Real-ESRGAN](https://github.com/xinntao/Real-ESRGAN)
- [GFPGAN](https://github.com/TencentARC/GFPGAN)



# Install dependencies

Executing this section crashes session. **It's intended**.
You don't need to re-execute again.

In [2]:
#@title Install dependencies
!pip install -qq transformers scipy ftfy transformers gradio datasets "ipywidgets>=7,<8" diffusers==0.2.4
!pip install -qq --ignore-installed Pillow==9.0.0

[K     |████████████████████████████████| 4.7 MB 8.6 MB/s 
[K     |████████████████████████████████| 53 kB 1.6 MB/s 
[K     |████████████████████████████████| 6.1 MB 25.6 MB/s 
[K     |████████████████████████████████| 365 kB 38.8 MB/s 
[K     |████████████████████████████████| 112 kB 41.2 MB/s 
[K     |████████████████████████████████| 120 kB 52.8 MB/s 
[K     |████████████████████████████████| 1.6 MB 37.1 MB/s 
[K     |████████████████████████████████| 6.6 MB 52.8 MB/s 
[K     |████████████████████████████████| 270 kB 43.8 MB/s 
[K     |████████████████████████████████| 212 kB 63.8 MB/s 
[K     |████████████████████████████████| 2.3 MB 51.0 MB/s 
[K     |████████████████████████████████| 84 kB 4.2 MB/s 
[K     |████████████████████████████████| 54 kB 3.6 MB/s 
[K     |████████████████████████████████| 57 kB 5.5 MB/s 
[K     |████████████████████████████████| 112 kB 51.0 MB/s 
[K     |████████████████████████████████| 54 kB 2.8 MB/s 
[K     |██████████████████████████

In [None]:
#@title Restart session
#@markdown Executing this cell crashes session. It's intended.<br>You don't need to re-execute cells above.
import os
os._exit(00)

# Setup pipelines and util functions
Read access token for huggingface from a file in Google drive<br>
make sure you saved token in text file and uploaded it to Google drive

In [2]:
from google.colab.patches import cv2_imshow
import cv2
from skimage.metrics import structural_similarity
from IPython.display import clear_output
from google.colab import drive
import inspect
from typing import List, Optional, Union

import numpy as np
import torch

import PIL
from diffusers import (
    AutoencoderKL,
    DDIMScheduler,
    DiffusionPipeline,
    LMSDiscreteScheduler,
    PNDMScheduler,
    UNet2DConditionModel,
)
import gradio as gr
import torch
from torch import autocast
from diffusers import LMSDiscreteScheduler
import requests
import PIL
from PIL import Image
from io import BytesIO
from IPython.display import clear_output

from tqdm.auto import tqdm
from transformers import CLIPFeatureExtractor, CLIPTextModel, CLIPTokenizer
import inspect
import warnings
from typing import List, Optional, Union

import torch

from diffusers.models import AutoencoderKL, UNet2DConditionModel
from diffusers.pipeline_utils import DiffusionPipeline
from diffusers.schedulers import DDIMScheduler, LMSDiscreteScheduler, PNDMScheduler
from diffusers.pipelines.stable_diffusion import StableDiffusionSafetyChecker


def preprocess(image):
    w, h = image.size
    w, h = map(lambda x: x - x % 32, (w, h))  # resize to integer multiple of 32
    image = image.resize((w, h), resample=PIL.Image.LANCZOS)
    image = np.array(image).astype(np.float32) / 255.0
    image = image[None].transpose(0, 3, 1, 2)
    image = torch.from_numpy(image)
    return 2.0 * image - 1.0


class StableDiffusionImg2ImgPipeline(DiffusionPipeline):
    def __init__(
        self,
        vae: AutoencoderKL,
        text_encoder: CLIPTextModel,
        tokenizer: CLIPTokenizer,
        unet: UNet2DConditionModel,
        scheduler: Union[DDIMScheduler, PNDMScheduler],
        safety_checker: StableDiffusionSafetyChecker,
        feature_extractor: CLIPFeatureExtractor,
    ):
        super().__init__()
        scheduler = scheduler.set_format("pt")
        self.register_modules(
            vae=vae,
            text_encoder=text_encoder,
            tokenizer=tokenizer,
            unet=unet,
            scheduler=scheduler,
            safety_checker=safety_checker,
            feature_extractor=feature_extractor,
        )

    @torch.no_grad()
    def __call__(
        self,
        prompt: Union[str, List[str]],
        init_image: torch.FloatTensor,
        strength: float = 0.8,
        num_inference_steps: Optional[int] = 50,
        guidance_scale: Optional[float] = 7.5,
        eta: Optional[float] = 0.0,
        generator: Optional[torch.Generator] = None,
        output_type: Optional[str] = "pil",
    ):

        if isinstance(prompt, str):
            batch_size = 1
        elif isinstance(prompt, list):
            batch_size = len(prompt)
        else:
            raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")

        if strength < 0 or strength > 1:
            raise ValueError(f"The value of strength should in [0.0, 1.0] but is {strength}")

        # set timesteps
        accepts_offset = "offset" in set(inspect.signature(self.scheduler.set_timesteps).parameters.keys())
        extra_set_kwargs = {}
        offset = 0
        if accepts_offset:
            offset = 1
            extra_set_kwargs["offset"] = 1

        self.scheduler.set_timesteps(num_inference_steps, **extra_set_kwargs)

        # encode the init image into latents and scale the latents
        init_latents = self.vae.encode(init_image.to(self.device)).sample()
        init_latents = 0.18215 * init_latents

        # prepare init_latents noise to latents
        init_latents = torch.cat([init_latents] * batch_size)

        # get the original timestep using init_timestep
        init_timestep = int(num_inference_steps * strength) + offset
        init_timestep = min(init_timestep, num_inference_steps)
        timesteps = self.scheduler.timesteps[-init_timestep]
        timesteps = torch.tensor([timesteps] * batch_size, dtype=torch.long, device=self.device)

        # add noise to latents using the timesteps
        noise = torch.randn(init_latents.shape, generator=generator, device=self.device)
        init_latents = self.scheduler.add_noise(init_latents, noise, timesteps)

        # get prompt text embeddings
        text_input = self.tokenizer(
            prompt,
            padding="max_length",
            max_length=self.tokenizer.model_max_length,
            truncation=True,
            return_tensors="pt",
        )
        text_embeddings = self.text_encoder(text_input.input_ids.to(self.device))[0]

        # here `guidance_scale` is defined analog to the guidance weight `w` of equation (2)
        # of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1`
        # corresponds to doing no classifier free guidance.
        do_classifier_free_guidance = guidance_scale > 1.0
        # get unconditional embeddings for classifier free guidance
        if do_classifier_free_guidance:
            max_length = text_input.input_ids.shape[-1]
            uncond_input = self.tokenizer(
                [""] * batch_size, padding="max_length", max_length=max_length, return_tensors="pt"
            )
            uncond_embeddings = self.text_encoder(uncond_input.input_ids.to(self.device))[0]

            # For classifier free guidance, we need to do two forward passes.
            # Here we concatenate the unconditional and text embeddings into a single batch
            # to avoid doing two forward passes
            text_embeddings = torch.cat([uncond_embeddings, text_embeddings])

        # prepare extra kwargs for the scheduler step, since not all schedulers have the same signature
        # eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.
        # eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502
        # and should be between [0, 1]
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        extra_step_kwargs = {}
        if accepts_eta:
            extra_step_kwargs["eta"] = eta

        latents = init_latents
        t_start = max(num_inference_steps - init_timestep + offset, 0)
        for i, t in enumerate(self.scheduler.timesteps[t_start:]):
            # expand the latents if we are doing classifier free guidance
            latent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latents

            # predict the noise residual
            noise_pred = self.unet(latent_model_input, t, encoder_hidden_states=text_embeddings)["sample"]

            # perform guidance
            if do_classifier_free_guidance:
                noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
                noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

            # compute the previous noisy sample x_t -> x_t-1
            latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs)["prev_sample"]

        # scale and decode the image latents with vae
        latents = 1 / 0.18215 * latents
        image = self.vae.decode(latents)

        image = (image / 2 + 0.5).clamp(0, 1)
        image = image.cpu().permute(0, 2, 3, 1).numpy()

        # run safety checker
        # safety_cheker_input = self.feature_extractor(self.numpy_to_pil(image), return_tensors="pt").to(self.device)
        # image, has_nsfw_concept = self.safety_checker(images=image, clip_input=safety_cheker_input.pixel_values)

        if output_type == "pil":
            image = self.numpy_to_pil(image)

        return {"sample": image, "nsfw_content_detected": False}


class StableDiffusionPipeline(DiffusionPipeline):
    def __init__(
        self,
        vae: AutoencoderKL,
        text_encoder: CLIPTextModel,
        tokenizer: CLIPTokenizer,
        unet: UNet2DConditionModel,
        scheduler: Union[DDIMScheduler, PNDMScheduler, LMSDiscreteScheduler],
        safety_checker: StableDiffusionSafetyChecker,
        feature_extractor: CLIPFeatureExtractor,
    ):
        super().__init__()
        scheduler = scheduler.set_format("pt")
        self.register_modules(
            vae=vae,
            text_encoder=text_encoder,
            tokenizer=tokenizer,
            unet=unet,
            scheduler=scheduler,
            safety_checker=safety_checker,
            feature_extractor=feature_extractor,
        )

    @torch.no_grad()
    def __call__(
        self,
        prompt: Union[str, List[str]],
        height: Optional[int] = 512,
        width: Optional[int] = 512,
        num_inference_steps: Optional[int] = 50,
        guidance_scale: Optional[float] = 7.5,
        eta: Optional[float] = 0.0,
        generator: Optional[torch.Generator] = None,
        output_type: Optional[str] = "pil",
        **kwargs,
    ):
        if "torch_device" in kwargs:
            device = kwargs.pop("torch_device")
            warnings.warn(
                "`torch_device` is deprecated as an input argument to `__call__` and will be removed in v0.3.0."
                " Consider using `pipe.to(torch_device)` instead."
            )

            # Set device as before (to be removed in 0.3.0)
            if device is None:
                device = "cuda" if torch.cuda.is_available() else "cpu"
            self.to(device)

        if isinstance(prompt, str):
            batch_size = 1
        elif isinstance(prompt, list):
            batch_size = len(prompt)
        else:
            raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")

        if height % 8 != 0 or width % 8 != 0:
            raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")

        # get prompt text embeddings
        text_input = self.tokenizer(
            prompt,
            padding="max_length",
            max_length=self.tokenizer.model_max_length,
            truncation=True,
            return_tensors="pt",
        )
        text_embeddings = self.text_encoder(text_input.input_ids.to(self.device))[0]

        # here `guidance_scale` is defined analog to the guidance weight `w` of equation (2)
        # of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1`
        # corresponds to doing no classifier free guidance.
        do_classifier_free_guidance = guidance_scale > 1.0
        # get unconditional embeddings for classifier free guidance
        if do_classifier_free_guidance:
            max_length = text_input.input_ids.shape[-1]
            uncond_input = self.tokenizer(
                [""] * batch_size, padding="max_length", max_length=max_length, return_tensors="pt"
            )
            uncond_embeddings = self.text_encoder(uncond_input.input_ids.to(self.device))[0]

            # For classifier free guidance, we need to do two forward passes.
            # Here we concatenate the unconditional and text embeddings into a single batch
            # to avoid doing two forward passes
            text_embeddings = torch.cat([uncond_embeddings, text_embeddings])

        # get the intial random noise
        latents = torch.randn(
            (batch_size, self.unet.in_channels, height // 8, width // 8),
            generator=generator,
            device=self.device,
        )

        # set timesteps
        accepts_offset = "offset" in set(inspect.signature(self.scheduler.set_timesteps).parameters.keys())
        extra_set_kwargs = {}
        if accepts_offset:
            extra_set_kwargs["offset"] = 1

        self.scheduler.set_timesteps(num_inference_steps, **extra_set_kwargs)

        # if we use LMSDiscreteScheduler, let's make sure latents are mulitplied by sigmas
        if isinstance(self.scheduler, LMSDiscreteScheduler):
            latents = latents * self.scheduler.sigmas[0]

        # prepare extra kwargs for the scheduler step, since not all schedulers have the same signature
        # eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.
        # eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502
        # and should be between [0, 1]
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        extra_step_kwargs = {}
        if accepts_eta:
            extra_step_kwargs["eta"] = eta

        for i, t in tqdm(enumerate(self.scheduler.timesteps)):
            # expand the latents if we are doing classifier free guidance
            latent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latents
            if isinstance(self.scheduler, LMSDiscreteScheduler):
                sigma = self.scheduler.sigmas[i]
                latent_model_input = latent_model_input / ((sigma**2 + 1) ** 0.5)

            # predict the noise residual
            noise_pred = self.unet(latent_model_input, t, encoder_hidden_states=text_embeddings)["sample"]

            # perform guidance
            if do_classifier_free_guidance:
                noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
                noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

            # compute the previous noisy sample x_t -> x_t-1
            if isinstance(self.scheduler, LMSDiscreteScheduler):
                latents = self.scheduler.step(noise_pred, i, latents, **extra_step_kwargs)["prev_sample"]
            else:
                latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs)["prev_sample"]

        # scale and decode the image latents with vae
        latents = 1 / 0.18215 * latents
        image = self.vae.decode(latents)

        image = (image / 2 + 0.5).clamp(0, 1)
        image = image.cpu().permute(0, 2, 3, 1).numpy()

        # run safety checker
        # safety_cheker_input = self.feature_extractor(self.numpy_to_pil(image), return_tensors="pt").to(self.device)
        # image, has_nsfw_concept = self.safety_checker(images=image, clip_input=safety_cheker_input.pixel_values)

        if output_type == "pil":
            image = self.numpy_to_pil(image)

        return {"sample": image, "nsfw_content_detected": False}


PATH_TO_TOKEN_FILE = "/content/drive/MyDrive/token.txt"  # @param {type:'string'}

drive.mount('/content/drive')


with open(PATH_TO_TOKEN_FILE, 'r') as f:
    access_tokens = f.read()


# lms = LMSDiscreteScheduler(
#     beta_start=0.00085,
#     beta_end=0.012,
#     beta_schedule="scaled_linear",
#     num_train_timesteps=1000
# )
ddim = DDIMScheduler(beta_start=0.00085, beta_end=0.012, beta_schedule="scaled_linear",
                     clip_sample=False, set_alpha_to_one=False)

pipe = StableDiffusionPipeline.from_pretrained(
    "CompVis/stable-diffusion-v1-4",
    scheduler=ddim,
    revision="fp16",
    use_auth_token=access_tokens
).to("cuda")

pipeimg = StableDiffusionImg2ImgPipeline.from_pretrained(
    "CompVis/stable-diffusion-v1-4",
    scheduler=ddim,
    revision="fp16",
    torch_dtype=torch.float16,
    use_auth_token=access_tokens
).to("cuda")


def grid_coords(target, original, overlap):
    # generate a list of coordinate tuples for our sections, in order of how they'll be rendered
    # target should be the size for the gobig result, original is the size of each chunk being rendered
    center = []
    target_x, target_y = target
    center_x = int(target_x / 2)
    center_y = int(target_y / 2)
    original_x, original_y = original
    x = center_x - int(original_x / 2)
    y = center_y - int(original_y / 2)
    center.append((x, y))  # center chunk
    uy = y  # up
    uy_list = []
    dy = y  # down
    dy_list = []
    lx = x  # left
    lx_list = []
    rx = x  # right
    rx_list = []
    while uy > 0:  # center row vertical up
        uy = uy - original_y + overlap
        uy_list.append((lx, uy))
    while (dy + original_y) <= target_y:  # center row vertical down
        dy = dy + original_y - overlap
        dy_list.append((rx, dy))
    while lx > 0:
        lx = lx - original_x + overlap
        lx_list.append((lx, y))
        uy = y
        while uy > 0:
            uy = uy - original_y + overlap
            uy_list.append((lx, uy))
        dy = y
        while (dy + original_y) <= target_y:
            dy = dy + original_y - overlap
            dy_list.append((lx, dy))
    while (rx + original_x) <= target_x:
        rx = rx + original_x - overlap
        rx_list.append((rx, y))
        uy = y
        while uy > 0:
            uy = uy - original_y + overlap
            uy_list.append((rx, uy))
        dy = y
        while (dy + original_y) <= target_y:
            dy = dy + original_y - overlap
            dy_list.append((rx, dy))
    # calculate a new size that will fill the canvas, which will be optionally used in grid_slice and go_big
    last_coordx, last_coordy = dy_list[-1:][0]
    render_edgey = last_coordy + original_y  # outer bottom edge of the render canvas
    render_edgex = last_coordx + original_x  # outer side edge of the render canvas
    scalarx = render_edgex / target_x
    scalary = render_edgey / target_y
    if scalarx <= scalary:
        new_edgex = int(target_x * scalarx)
        new_edgey = int(target_y * scalarx)
    else:
        new_edgex = int(target_x * scalary)
        new_edgey = int(target_y * scalary)
    # now put all the chunks into one master list of coordinates (essentially reverse of how we calculated them so that the central slices will be on top)
    result = []
    for coords in dy_list[::-1]:
        result.append(coords)
    for coords in uy_list[::-1]:
        result.append(coords)
    for coords in rx_list[::-1]:
        result.append(coords)
    for coords in lx_list[::-1]:
        result.append(coords)
    result.append(center[0])
    return result, (new_edgex, new_edgey)


def grid_slice(source, overlap, og_size, maximize=False):
    width, height = og_size  # size of the slices to be rendered
    coordinates, new_size = grid_coords(source.size, og_size, overlap)
    if maximize == True:
        source = source.resize(new_size, get_resampling_mode())  # minor concern that we're resizing twice
        # re-do the coordinates with the new canvas size
        coordinates, new_size = grid_coords(source.size, og_size, overlap)
    # loc_width and loc_height are the center point of the goal size, and we'll start there and work our way out
    slices = []
    for coordinate in coordinates:
        x, y = coordinate
        slices.append(((source.crop((x, y, x+width, y+height))), x, y))
    global slices_todo
    slices_todo = len(slices) - 1
    return slices, new_size


def convert_pil_img(image):
    w, h = image.size
    w, h = map(lambda x: x - x % 32, (w, h))  # resize to integer multiple of 32
    image = image.resize((w, h), resample=PIL.Image.Resampling.LANCZOS)
    image = np.array(image).astype(np.float32) / 255.0
    image = image[None].transpose(0, 3, 1, 2)
    image = torch.from_numpy(image)
    return 2.*image - 1.


def addalpha(im, mask):
    imr, img, imb, ima = im.split()
    mmr, mmg, mmb, mma = mask.split()
    # we want the RGB from the original, but the transparency from the mask
    im = Image.merge('RGBA', [imr, img, imb, mma])
    return (im)


def grid_merge(source, slices):
    source.convert("RGBA")
    for slice, posx, posy in slices:  # go in reverse to get proper stacking
        source.alpha_composite(slice, (posx, posy))
    return source


def display_images(images, figsize=(30, 15), columns=4, seed: Optional[int] = None, titles: Optional[List[str]] = None, label: Optional[str] = None):
    plt.figure(
        figsize=figsize,
        facecolor="white"
    )
    plt.axis("off")

    if seed is not None:
        plt.suptitle("seed is " + str(seed), y=0.65, x=0.1)

    for i, image in enumerate(images):
        plt.subplot(len(images) / columns + 1, columns, i + 1)
        plt.imshow(image)
        plt.xticks(color='w')
        plt.yticks(color='w')
        if titles is not None:
            plt.title(titles[i], fontsize=30)
        if label is not None and i == 0:
            plt.ylabel(label, fontsize=30)
    plt.tight_layout()


def pil2cv(image):
    ''' PIL -> OpenCV '''
    new_image = np.array(image, dtype=np.uint8)
    if new_image.ndim == 2:
        pass
    elif new_image.shape[2] == 3:
        new_image = cv2.cvtColor(new_image, cv2.COLOR_RGB2BGR)
    elif new_image.shape[2] == 4:
        new_image = cv2.cvtColor(new_image, cv2.COLOR_RGBA2BGRA)
    return new_image


def image_diff(before_image, after_image):

    # Load images
    before = pil2cv(before_image)
    after = pil2cv(after_image)

    # Convert images to grayscale
    before_gray = cv2.cvtColor(before, cv2.COLOR_BGR2GRAY)
    after_gray = cv2.cvtColor(after, cv2.COLOR_BGR2GRAY)

    # Compute SSIM between the two images
    (score, diff) = structural_similarity(before_gray, after_gray, full=True)
    # print("Image Similarity: {:.4f}%".format(score * 100))

    # The diff image contains the actual image differences between the two images
    # and is represented as a floating point data type in the range [0,1]
    # so we must convert the array to 8-bit unsigned integers in the range
    # [0,255] before we can use it with OpenCV
    diff = (diff * 255).astype("uint8")
    diff_box = cv2.merge([diff, diff, diff])

    # Threshold the difference image, followed by finding contours to
    # obtain the regions of the two input images that differ
    thresh = cv2.threshold(diff, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)[1]
    contours = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    contours = contours[0] if len(contours) == 2 else contours[1]

    mask = np.zeros(before.shape, dtype='uint8')
    filled_after = after.copy()

    for c in contours:
        area = cv2.contourArea(c)
        if area > 40:
            x, y, w, h = cv2.boundingRect(c)
            cv2.rectangle(before, (x, y), (x + w, y + h), (36, 255, 12), 2)
            cv2.rectangle(after, (x, y), (x + w, y + h), (36, 255, 12), 2)
            cv2.rectangle(diff_box, (x, y), (x + w, y + h), (36, 255, 12), 2)
            cv2.drawContours(mask, [c], 0, (255, 255, 255), -1)
            cv2.drawContours(filled_after, [c], 0, (0, 255, 0), -1)
    return filled_after, score


def clean_env():
    gc.collect()
    torch.cuda.empty_cache()


def setup_dirs(root_dir: str):
    os.makedirs(root_dir, exist_ok=True)
    results_dir = os.path.join(root_dir, "results")
    os.makedirs(results_dir, exist_ok=True)


def generate_image(prompt: str, width: int = 512, height: int = 512, seed: int = 0, steps: int = 50, guidance_scale: float = 7.5, eta: float = 0.0):
    with torch.autocast("cuda"):
        if seed == 0:
            seed = random.randint(0, sys.maxsize)
        print("Seed : ", seed)
        generator = torch.Generator("cuda").manual_seed(seed)
        image = pipe(
            prompt,
            height=height,
            width=width,
            generator=generator,
            num_inference_steps=steps,
            eta=eta,
            guidance_scale=guidance_scale
        )["sample"][0]
    return image


def install_gfpgan():
    if not os.path.exists('/content/GFPGAN'):
        %cd /content
        !git clone https://github.com/TencentARC/GFPGAN.git
        %cd GFPGAN
        # Set up the environment
        # Install basicsr - https://github.com/xinntao/BasicSR
        # We use BasicSR for both training and inference
        !pip install -qq basicsr
        # Install facexlib - https://github.com/xinntao/facexlib
        # We use face detection and face restoration helper in the facexlib package
        !pip install -qq facexlib
        # Install other depencencies
        !pip install -qq -r requirements.txt
        !python setup.py develop
        !pip install -qq realesrgan  # used for enhancing the background (non-face) regions
        # Download the pre-trained model
        # !wget https://github.com/TencentARC/GFPGAN/releases/download/v0.2.0/GFPGANCleanv1-NoCE-C2.pth -P experiments/pretrained_models
        # Now we use the V1.3 model for the demo
        !wget https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.3.pth -P experiments/pretrained_models
        %cd /content/
        clear_output()


def install_esrgan():
    if not os.path.exists('/content/Real-ESRGAN'):
        %cd /content
        !git clone https://github.com/xinntao/Real-ESRGAN.git
        %cd /content/Real-ESRGAN
        # Set up the environment
        !pip -qq install basicsr facexlib gfpgan
        !pip -qq install -r requirements.txt
        !python setup.py develop
        # Download the pre-trained model
        !wget https://github.com/xinntao/Real-ESRGAN/releases/download/v0.1.0/RealESRGAN_x4plus.pth -P experiments/pretrained_models
        !wget https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.2.4/RealESRGAN_x4plus_anime_6B.pth -P experiments/pretrained_models
        %cd /content
    clear_output()


def install_swinir():
    if not os.path.exists("/content/SwinIR"):
        %cd /content
        !git clone https://github.com/JingyunLiang/SwinIR.git
        !pip install timm
        !wget https://github.com/JingyunLiang/SwinIR/releases/download/v0.0/003_realSR_BSRGAN_DFO_s64w8_SwinIR-M_x4_GAN.pth -P / content/SwinIR/models
        !wget https://github.com/JingyunLiang/SwinIR/releases/download/v0.0/003_realSR_BSRGAN_DFOWMFC_s64w8_SwinIR-L_x4_GAN.pth -P / content/SwinIR/models
    clear_output()


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


#txt2imghd

In [None]:
from tqdm import trange
import os
import gc
import datetime as dt
import sys
import random
import shutil

import matplotlib.pyplot as plt
from typing import List, Optional

import torch
from PIL import Image, ImageDraw

from termcolor import colored

#@markdown # <u>**【Basics options】**</u>
#@markdown You can use a image you already generated as base image.<br>
#@markdown leave `BASEIMAGE_PATH` empty if you want to generate a new image.
BASEIMAGE_PATH = ""  # @param {type:"string"}
PROMPT = "anime kyoto animation key by greg rutkowski night, single white hair girl from behind, in abandoned chapel with overgrown flowers and plants" # @param {type:'string'}
HEIGHT = 512  # @param {type:"slider", min:256, max:1920, step:64}
WIDTH = 512  # @param {type:"slider", min:256, max:1920, step:64}
STEPS = 50  # @param {type:"slider", min:5, max:500, step:5}
IMG_NUM = 1  # @param {type:"slider", min:1, max:20, step:1}
#@markdown #### `SEED` 0 for random
SEED = 0  # @param {type:'integer'}
if SEED is None:
    SEED = 0
TXT2IMG_GUIDANCE_SCALE = 7.5  # @param {type:"slider", min:0, max:25, step:0.1}
TXT2IMG_ETA = 0  # @param {type:"slider", min:0, max:1, step:0.01}
#@markdown #### Absolute path to dir to save images **no space character*<br>
#@markdown sub directories will be created<br>
#@markdown `ROOT_DIR/original/` : images generated by txt2img<br>
#@markdown `ROOT_DIR/restored/restored_imgs/` : images generated by GFPGAN<br>
#@markdown `ROOT_DIR/upscaled/` : images generated by Real-ESRGAN and txt2imghd<br>
#@markdown `ROOT_DIR/results/` : Final result will be copied here

ROOT_DIR = "/content/drive/MyDrive/SD_PICTURES/txt2imghd"  # @param {type:"string"}

#@markdown ---
#@markdown # <u>**【Scaleup options】**</u>
# UPSCALE = True #@param{type:'boolean'}
SCALEUP_RATIO = "4x"  # @param ["1x", "2x", "4x", "8x"]
#@markdown `SCALEUP_STEP_BY_STEP` If checked, img2img will run every 2x scale up.<br>
#@markdown for example, If you set `SCALEUP_RATIO` to `4x`, image will be scaled up to 2x and img2img will run, then scale it up again and img2img will run again.
SCALEUP_STEP_BY_STEP = True  # @param{type:'boolean'}
ESRGAN_MODEL = "RealESRGAN_x4plus_anime_6B (Optimized for Anime)" # @param ["RealESRGAN_x4plus", "RealESRGAN_x4plus_anime_6B (Optimized for Anime)"]
#@markdown `FP_32` Use fp32 precision during inference. Default: fp16 (half precision).
FP_32 = False  # @param{type:'boolean'}
#@markdown `USE_IMG2IMG` If unchecked, image will be just scaled up without running img2img.
USE_IMG2IMG = True  # @param{type:'boolean'}
IMG2IMG_STRENGTH = 0.3  # @param {type:"slider", min:0, max:1, step:0.01}
IMG2IMG_STEPS = 65  # @param {type:"slider", min:5, max:500, step:5}
IMG2IMG_GUIDANCE_SCALE = 7.5  # @param {type:"slider", min:0, max:25, step:0.1}
IMG2IMG_ETA = 0  # @param {type:"slider", min:0, max:1, step:0.01}
#@markdown ---
#@markdown ## *Restore **realistic** face with [GFPGAN](https://github.com/TencentARC/GFPGAN)*
GFPGAN_BEFORE_SCALEUP = False  # @param{type:'boolean'}
GFPGAN_AFTER_SCALEUP = False  # @param{type:'boolean'}


def gfpgan_interface(input_path: str, output_dir: str) -> str:
    %cd /content/GFPGAN
    !python /content/GFPGAN/inference_gfpgan.py -i $input_path \
        -o $output_dir \
        -s 1 -v 1.3 --bg_upsampler realesrgan
    %cd /content
    filename = os.path.basename(input_path)
    restored_imgs_dir = os.path.join(output_dir, "restored_imgs")
    output_path = os.path.join(restored_imgs_dir, filename)
    new_path = os.path.join(restored_imgs_dir, filename.split(".")[0] + "_GFPGAN.png")
    os.rename(output_path, new_path)
    return new_path


def realesrgan_interface(input_path: str, output_dir: str, scale: int, model_name: str, fp32: bool) -> str:
    clean_env()
    os.makedirs(output_dir, exist_ok=True)
    fp32_opt = ""
    if fp32:
        fp32_opt = "--fp32"
    %cd /content/Real-ESRGAN/
    !python /content/Real-ESRGAN/inference_realesrgan.py \
        -i $input_path -o $output_dir -n $model_name --outscale $scale $fp32_opt
    %cd /content
    base_filename = os.path.basename(input_path).split(".")[0]
    output_path = os.path.join(output_dir, f"{base_filename}_out.png")
    new_output_path = os.path.join(output_dir, f"{base_filename}_{scale}x.png")
    os.rename(output_path, new_output_path)
    return new_output_path


def swinir_interface(input_path: str, output_dir: str, scale: int, model_name: str) -> str:
    %cd /content/SwinIR
    model_path = "/content/SwinIR/models/003_realSR_BSRGAN_DFO_s64w8_SwinIR-M_x4_GAN.pth"
    large = ""
    if model_name == "SwinIR-Large":
        model_path = "/content/SwinIR/models/003_realSR_BSRGAN_DFOWMFC_s64w8_SwinIR-L_x4_GAN.pth"
        large = "--large_model"
    image_dir = "/content/SwinIR/testsets/txt2imghd"
    filename = os.path.basename(input_path)
    if os.path.isdir(image_dir):
        shutil.rmtree(image_dir)
    os.makedirs(image_dir)
    os.makedirs("results", exist_ok=True)
    shutil.copy(input_path, os.path.join(image_dir, filename))
    print("model name ", model_name)
    print("path", model_path)
    !python main_test_swinir.py --task real_sr --model_path $model_path --folder_lq $image_dir --scale $scale $large
    %cd / content


def restore_face(input_path: str, output_dir: str) -> str:
    os.makedirs(output_dir, exist_ok=True)
    output_path = gfpgan_interface(input_path, output_dir)
    img_before_restoration = Image.open(input_path)
    img_after_restoration = Image.open(output_path)
    diff, score = image_diff(img_before_restoration, img_after_restoration)
    display_images(
        [img_before_restoration, diff, img_after_restoration],
        titles=["Before", f"Image Similarity: {score*100:.2f}%", "After"],
        label="GFPGAN",
    )
    print(colored("Face restored", "green"))
    print(colored(output_path, "green"))
    display(img_after_restoration)
    return output_path


def infer(prompt, strength: float, num_samples: int, steps: int, guidance_scale: float, eta: float, init_image, resize_height: Optional[int] = None, resize_width: Optional[int] = None):
    if resize_height is not None and resize_width is not None:
        init_image = init_image.resize(resize_height, resize_width)
    init_image = preprocess(init_image)
    with autocast("cuda"):
        images = pipeimg(
            prompt,
            init_image=init_image,
            strength=strength,
            guidance_scale=guidance_scale,
            num_inference_steps=steps,
            eta=eta
        )["sample"]
    return images[0]


def upscale(input: str, output_dir: str, model: str, ratio: int, scaleup_step_by_step: bool, fp32: bool, prompt: Optional[str], strength: float, steps: int, guidance_scale: float, eta: float):
    os.makedirs(output_dir, exist_ok=True)
    shutil.copy(input, os.path.join(output_dir, os.path.basename(input)))
    original_image = Image.open(input)
    og_size = original_image.size
    current_filepath = input
    scaleup_times = 1
    if scaleup_step_by_step and ratio != 2:
        if ratio == 4:
            scaleup_times = 2
        if ratio == 8:
            scaleup_times = 3
        ratio = 2

    for i in range(scaleup_times):
        if "RealESRGAN_x4plus" in model:
            current_filepath = realesrgan_interface(
                current_filepath,
                output_dir,
                ratio,
                model,
                fp32,
            )
        elif "SwinIR" in model:
            current_filepath = swinir_interface(
                current_filepath,
                output_dir,
                ratio,
                model,
            )
        source_image = Image.open(current_filepath)
        print(colored(f"Upscaled Image with {model}", "green"))
        print(colored(current_filepath, "green"))
        display(source_image)
        if prompt is None:
            continue
        gobig_overlap = 128
        slices, _ = grid_slice(source_image, gobig_overlap, og_size, False)
        betterslices = []
        for _, chunk_w_coords in tqdm(enumerate(slices), "Slices"):
            chunk, coord_x, coord_y = chunk_w_coords
            inferred_img = infer(
                prompt=prompt,
                init_image=chunk,
                strength=strength,
                steps=steps,
                guidance_scale=guidance_scale,
                eta=eta,
                num_samples=1
            )
            betterslices.append((inferred_img.convert('RGBA'), coord_x, coord_y))

        alpha = Image.new('L', og_size, color=0xFF)
        alpha_gradient = ImageDraw.Draw(alpha)
        a = 0
        i = 0
        shape = (og_size, (0, 0))
        while i < gobig_overlap:
            alpha_gradient.rectangle(shape, fill=a)
            a += 4
            i += 1
            shape = ((og_size[0] - i, og_size[1] - i), (i, i))
        mask = Image.new('RGBA', og_size, color=0)
        mask.putalpha(alpha)
        finished_slices = []
        for betterslice, x, y in betterslices:
            finished_slice = addalpha(betterslice, mask)
            finished_slices.append((finished_slice, x, y))
        final_output = grid_merge(source_image.convert("RGBA"), finished_slices).convert("RGB")
        current_filename = os.path.basename(current_filepath).split(".")[0]
        current_filepath = os.path.join(output_dir, f"{current_filename}_img2img.png")
        final_output.save(current_filepath)
        diff, score = image_diff(source_image, final_output)
        display_images(
            [source_image, diff, final_output],
            titles=["Before", f"Image Similarity: {score*100:.2f}%", "After"],
            label="Real-ESRGAN with img2img",
        )
        print(colored(f"Img2Img (steps: {steps}, strength: {strength})", "green"))
        print(colored(current_filepath, "green"))
        display(final_output)
        torch.cuda.empty_cache()
        gc.collect()
    return current_filepath


def text_2_img():
    clean_env()
    if GFPGAN_BEFORE_SCALEUP or GFPGAN_AFTER_SCALEUP:
        install_gfpgan()

    if SCALEUP_RATIO != "1x":
        install_esrgan()

    setup_dirs(ROOT_DIR)
    original_dir = os.path.join(ROOT_DIR, "original")
    os.makedirs(original_dir, exist_ok=True)

    for i in range(IMG_NUM):
        clean_env()
        now = dt.datetime.now()
        filename = now.strftime("%Y%m%d-%H%M%S") + ".png"
        filepath = os.path.join(original_dir, filename)
        if BASEIMAGE_PATH == "":
            img = generate_image(PROMPT, WIDTH, HEIGHT, SEED, STEPS, TXT2IMG_GUIDANCE_SCALE, TXT2IMG_ETA)
            print(colored("Image by txt2img", "green"))
        else:
            img = Image.open(BASEIMAGE_PATH)
            print(colored(f"Existing image({BASEIMAGE_PATH})", "green"))
        img.save(filepath)
        print(colored(filepath, "green"))
        display(img)
        if GFPGAN_BEFORE_SCALEUP:
            restored_dir = os.path.join(ROOT_DIR, "restored")
            filepath = restore_face(filepath, restored_dir)
            img = Image.open(filepath)
        if SCALEUP_RATIO != "1x":
            upscaled_dir = os.path.join(ROOT_DIR, "upscaled")
            prompt = PROMPT if USE_IMG2IMG else None
            filepath = upscale(
                input=filepath,
                output_dir=upscaled_dir,
                ratio=int(SCALEUP_RATIO[0]),
                scaleup_step_by_step=SCALEUP_STEP_BY_STEP,
                model=ESRGAN_MODEL.split(" ")[0],
                prompt=prompt,
                steps=IMG2IMG_STEPS,
                strength=IMG2IMG_STRENGTH,
                guidance_scale=IMG2IMG_GUIDANCE_SCALE,
                eta=IMG2IMG_ETA,
                fp32=FP_32
            )
            img = Image.open(filepath)
        if GFPGAN_AFTER_SCALEUP:
            restored_dir = os.path.join(ROOT_DIR, "restored")
            filepath = restore_face(filepath, restored_dir)
            img = Image.open(filepath)
        result_dir = os.path.join(ROOT_DIR, "results")
        os.makedirs(result_dir, exist_ok=True)
        result_path = os.path.join(result_dir, os.path.basename(filepath))
        img.save(result_path)

        print(colored(f"Result in {result_path}", "green"))

        clean_env()


text_2_img()
