In [59]:
import os
import numpy as np
from tqdm import tqdm
from PIL import Image
import shutil
import torch
from diffusers import (
    StableDiffusionImg2ImgPipeline,
    ControlNetModel,
    StableDiffusionControlNetPipeline,
    UniPCMultistepScheduler,
)
from diffusers.utils import load_image

In [60]:
img_type = "river"
conditioning_img_path = f"training/{img_type}/train/conditioning_images/"
img_path = f"training/{img_type}/train/images/"
conditioning_img_list = os.listdir(conditioning_img_path)
conditioning_img_list.sort()
conditioning_img_list = [file for file in conditioning_img_list if "_flip" not in file]
img_list = os.listdir(img_path)
img_list.sort()
img_list = [file for file in img_list if "_flip" not in file]

test_path = (
    "testing/label_img/river/"
    if img_type == "river"
    else "testing/rotate_img/road/"
)
conditioning_test_img_list = os.listdir(test_path)
conditioning_test_img_list.sort()

src_path = f"training/{img_type}/train/images/"
dst_path = "output/"

src_img_list = os.listdir(src_path)
src_img_list.sort()

retrieval_imgs = [
    np.array(
        Image.open(conditioning_img_path + file)
        .resize((107, 60))
        .resize((428, 240), Image.BILINEAR)
    )[:, :, 0]
    for file in conditioning_img_list
]
retrieval_imgs = [((img > 0).astype(np.uint8) * 255) for img in retrieval_imgs]

In [61]:
ROAD_SCORE = 0.6
RIVER_SCORE = 0.57
SCORE = ROAD_SCORE if img_type == "road" else RIVER_SCORE
image_construct_type = []

In [62]:
for file in tqdm(conditioning_test_img_list):
    img = Image.open(test_path + file)
    img = np.array(img)[:, :, 0]
    b = img > 128
    img_total = b.sum()
    score = 0
    pos = 0
    for i in range(len(retrieval_imgs)):
        a = retrieval_imgs[i] > 128
        score_current = (np.logical_and(a, b)).sum() / img_total
        if score_current > score:
            score = score_current
            pos = i
    if score > SCORE:
        src = src_path + src_img_list[pos]
        img_target = load_image(src)
        if "flip" in src_img_list[pos]:
            img_target = img_target.rotate(180)
        construct_type = "img2img"
    else:
        src = test_path + file
        img_target = load_image(src)
        construct_type = "controlnet"
    image_construct_type.append((construct_type, file, img_target))

  0%|          | 0/360 [00:00<?, ?it/s]

100%|██████████| 360/360 [00:40<00:00,  8.90it/s]


In [63]:
c = 0
for i in image_construct_type:
    if i[0] == "img2img":
        c += 1
c

339

In [64]:
device = "cuda" if torch.cuda.is_available() else "cpu"
controlnet_path = (
    "output_sd15_2/checkpoint-19200/controlnet/"
    if img_type == "river"
    else "output_sd15_road_2/checkpoint-21600/controlnet/"
)
controlnet = ControlNetModel.from_pretrained(
    controlnet_path, torch_dtype=torch.float16
).to(device)

pipe_controlnet = StableDiffusionControlNetPipeline.from_single_file(
    "beautifulrealityv3_full.safetensors",
    controlnet=controlnet,
    torch_dtype=torch.float16,
    use_safetensors=True,
)

Some weights of the model checkpoint were not used when initializing CLIPTextModel: 
 ['text_model.embeddings.position_ids']
You have disabled the safety checker for <class 'diffusers.pipelines.controlnet.pipeline_controlnet.StableDiffusionControlNetPipeline'> by passing `safety_checker=None`. Ensure that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered results in services or applications open to the public. Both the diffusers team and Hugging Face strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling it only for use-cases that involve analyzing network behavior or auditing its results. For more information, please have a look at https://github.com/huggingface/diffusers/pull/254 .


In [65]:
pipe_img2img = StableDiffusionImg2ImgPipeline.from_single_file(
    "beautifulrealityv3_full.safetensors",
    controlnet=controlnet,
    torch_dtype=torch.float16,
    use_safetensors=True,
)

Some weights of the model checkpoint were not used when initializing CLIPTextModel: 
 ['text_model.embeddings.position_ids']
You have disabled the safety checker for <class 'diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.StableDiffusionImg2ImgPipeline'> by passing `safety_checker=None`. Ensure that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered results in services or applications open to the public. Both the diffusers team and Hugging Face strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling it only for use-cases that involve analyzing network behavior or auditing its results. For more information, please have a look at https://github.com/huggingface/diffusers/pull/254 .


In [66]:
pipe_controlnet.scheduler = UniPCMultistepScheduler.from_config(pipe_controlnet.scheduler.config)
pipe_controlnet.enable_xformers_memory_efficient_attention()
pipe_controlnet.enable_model_cpu_offload()

pipe_controlnet.set_progress_bar_config(disable=True)
pipe_controlnet = pipe_controlnet.to(device)

pipe_img2img.scheduler = UniPCMultistepScheduler.from_config(pipe_img2img.scheduler.config)
pipe_img2img.enable_xformers_memory_efficient_attention()
pipe_img2img.enable_model_cpu_offload()

pipe_img2img.set_progress_bar_config(disable=True)
pipe_img2img = pipe_img2img.to(device)

It seems like you have activated model offloading by calling `enable_model_cpu_offload`, but are now manually moving the pipeline to GPU. It is strongly recommended against doing so as memory gains from offloading are likely to be lost. Offloading automatically takes care of moving the individual components vae, text_encoder, tokenizer, unet, controlnet, scheduler, safety_checker, feature_extractor, image_encoder to GPU when needed. To make sure offloading works as expected, you should consider moving the pipeline back to CPU: `pipeline.to('cpu')` or removing the move altogether if you use offloading.
It seems like you have activated model offloading by calling `enable_model_cpu_offload`, but are now manually moving the pipeline to GPU. It is strongly recommended against doing so as memory gains from offloading are likely to be lost. Offloading automatically takes care of moving the individual components vae, text_encoder, tokenizer, unet, scheduler, safety_checker, feature_extractor, 

In [67]:
def controlnet_generate(origin_img):
    generator = torch.manual_seed(0)
    prompt = (
        "river with muddy and light earth color water, aerial view, field, lush shore, gress, masterpiece, best quality, high resolution"
        if img_type == "river"
        else "road with lush grove, masterpiece, best quality, high resolution"
    )
    negative_prompt = (
        "bridge, stone, sand, tall trees, sandbank, worst quality, jpeg artifacts, mutation, duplicate"
        if img_type == "river"
        else "bridge, sand, tall trees, sandbank, worst quality, jpeg artifacts, normal quality, low quality, mutation, duplicate, car, flower"
    )
    images = pipe_controlnet(
        prompt=prompt,
        negative_prompt=negative_prompt,
        image=origin_img,
        generator=generator,
        num_inference_steps=30,
        strength=0.5,
        guidance_scale=8.0,
        controlnet_conditioning_scale=1.8,
        height=240,
        width=428,
    ).images
    return images[0]

In [68]:
def img2img_generate(origin_img):
    prompt = img_type
    prompt = (
        "river with muddy and light earth color water, aerial view, field, lush shore, gress"
        if img_type == "river"
        else "road with lush grove"
    )
    generator = torch.manual_seed(0)
    negative_prompt = "worst quality, jpeg artifacts, mutation, duplicate"
    images = pipe_img2img(
        prompt=prompt,
        negative_prompt=negative_prompt,
        image=origin_img,
        generator=generator,
        strength=0.1,
        guidance_scale=5.0,
        height=240,
        width=428,
    ).images
    return images[0]

In [69]:
image_construct_type

[('img2img',
  'PUB_RI_1000000.png',
  <PIL.Image.Image image mode=RGB size=428x240>),
 ('img2img',
  'PUB_RI_1000001.png',
  <PIL.Image.Image image mode=RGB size=428x240>),
 ('img2img',
  'PUB_RI_1000002.png',
  <PIL.Image.Image image mode=RGB size=428x240>),
 ('img2img',
  'PUB_RI_1000003.png',
  <PIL.Image.Image image mode=RGB size=428x240>),
 ('img2img',
  'PUB_RI_1000004.png',
  <PIL.Image.Image image mode=RGB size=428x240>),
 ('img2img',
  'PUB_RI_1000005.png',
  <PIL.Image.Image image mode=RGB size=428x240>),
 ('img2img',
  'PUB_RI_1000006.png',
  <PIL.Image.Image image mode=RGB size=428x240>),
 ('img2img',
  'PUB_RI_1000007.png',
  <PIL.Image.Image image mode=RGB size=428x240>),
 ('img2img',
  'PUB_RI_1000008.png',
  <PIL.Image.Image image mode=RGB size=428x240>),
 ('img2img',
  'PUB_RI_1000009.png',
  <PIL.Image.Image image mode=RGB size=428x240>),
 ('img2img',
  'PUB_RI_1000010.png',
  <PIL.Image.Image image mode=RGB size=428x240>),
 ('img2img',
  'PUB_RI_1000011.png',
  <PIL

: 

In [23]:
for img_info in tqdm(image_construct_type):
    if img_info[0] == "img2img":
        output_img = img2img_generate(img_info[2])
    else:
        output_img = img2img_generate(controlnet_generate(img_info[2]))
    file_name = img_info[1]
    if "png" in file_name:
        file_name = file_name.split(".")[0] + ".jpg"
    if "flip" in file_name:
        file_name = file_name.split("_flip")[0] + ".jpg"
        output_img = output_img.rotate(180)
    output_img.save(dst_path + file_name)

100%|██████████| 360/360 [11:13<00:00,  1.87s/it]
