## Import

In [None]:
!wget https://raw.githubusercontent.com/huggingface/diffusers/main/scripts/convert_original_controlnet_to_diffusers.py -O convert_controlnet_ckpt.py

In [None]:
!git clone https://github.com/lllyasviel/ControlNet

In [None]:
!python convert_controlnet_ckpt.py \
  --checkpoint_path /content/drive/MyDrive/SD2.1_training_RGB_hsv+clip/test-epoch=02.ckpt \
  --original_config_file /content/ControlNet/models/cldm_v21.yaml \
  --dump_path /content/diffusers_converted_model \
  --extract_ema

In [None]:
import torch

In [None]:
from diffusers import StableDiffusionControlNetPipeline, ControlNetModel, DPMSolverMultistepScheduler

controlnet = ControlNetModel.from_pretrained(
    "/content/diffusers_converted_model",
    torch_dtype=torch.float16,
    **{"use_safetensors": False}  # ✅ 핵심!
)

In [None]:
from tqdm import tqdm
import random
import os
import zipfile
import json
import torch
import pandas as pd
import cv2          # ★ 후처리를 위해 추가
import numpy as np
from PIL import Image
from diffusers import UNet2DConditionModel, StableDiffusionControlNetPipeline, ControlNetModel
import open_clip
from compel import Compel
import warnings
warnings.filterwarnings('ignore')

In [None]:
from diffusers import DPMSolverMultistepScheduler

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

## Hyperparameter Setting

In [None]:
CFG = {
    'SUB_DIR' : './submission',
    'SEED' : 42
}

## Fixed RandomSeed

In [None]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed_everything(CFG['SEED']) # Seed 고정

## Load Pre-trained Model (Stable-Diffusion-V1-5)

In [None]:
fp16     = torch.float16
base_sd  = "stabilityai/stable-diffusion-2-1-base"

# 학습이 끝난 가중치 경로 ▼ (본인 폴더명으로 교체)
lora_dir      = "/content/drive/MyDrive/0730uhyun/lora_weights_epoch0"        # save_attn_procs() 결과

test_csv      = "/content/test.csv"                   # 제출용 CSV
out_dir       = "preds"; os.makedirs(out_dir, exist_ok=True)

In [None]:
# -------------------------------------------------
# 1) 모델 로드
# -------------------------------------------------
# 1-a. UNet + LoRA
unet = UNet2DConditionModel.from_pretrained(
    base_sd, subfolder="unet", torch_dtype=fp16
)
unet.load_attn_procs(lora_dir)        # LoRA 주입
unet.to(device)

In [None]:
cnet_canny = ControlNetModel.from_pretrained("thibaud/controlnet-sd21-canny-diffusers",  torch_dtype=fp16)
cnet_depth = ControlNetModel.from_pretrained("thibaud/controlnet-sd21-depth-diffusers",  torch_dtype=fp16)

In [None]:
from diffusers import MultiControlNetModel
multi_cnet = MultiControlNetModel([controlnet, cnet_canny, cnet_depth])   # 차원 완벽 호환

In [None]:
# -------------------------------------------------
# 2) 파이프라인 결합
# -------------------------------------------------
pipe = StableDiffusionControlNetPipeline.from_pretrained(
    base_sd,
    unet       = unet,
    controlnet = multi_cnet,
    safety_checker=None,
    torch_dtype=fp16,
).to(device)

In [None]:
pipe.scheduler = DPMSolverMultistepScheduler.from_config(
    pipe.scheduler.config,
    algorithm_type    = "dpmsolver++",   # 핵심
    use_karras_sigmas = True
)

In [None]:
compel = Compel(
    tokenizer=pipe.tokenizer,
    text_encoder=pipe.text_encoder,
    truncate_long_prompts=False,
)

MAX_TOKENS = pipe.tokenizer.model_max_length

In [None]:
print(MAX_TOKENS)

## Pre-Processing Input Image (Controlnet)

In [None]:
def gray_image(pil, size=512):
    g = pil.resize((size,size), Image.BILINEAR).convert("L")
    return Image.merge("RGB", (g,g,g))

In [None]:
from controlnet_aux import CannyDetector
canny = CannyDetector()

def canny_edge(pil, size=512):
    return Image.fromarray(
        canny(np.array(pil.resize((size,size), Image.BILINEAR)))
    ).convert("RGB")

In [None]:
import torchvision.transforms as T, torch.hub

midas = torch.hub.load("intel-isl/MiDaS", "MiDaS_small").to(device).eval()
tfm   = T.Compose([T.Resize(256), T.ToTensor(),
                   T.Normalize(mean=[0.5], std=[0.5])])

def depth_image(pil, size=512):
    with torch.no_grad():
        pred = midas(tfm(pil).unsqueeze(0).to(device))[0,0].float().cpu().numpy()
    d = (255*(pred - pred.min())/(pred.max()-pred.min())).astype("uint8")
    return Image.fromarray(d).resize((size,size)).convert("RGB")

## Inference

In [None]:
test_df = pd.read_csv('./test.csv')

In [None]:
# =========================================================
import re
COLOR_WORDS = {
    "beige",
    "black",
    "blue",
    "blue-and-white",
    "bronze",
    "brown",
    "gray",
    "green",
    "greenish",
    "greenish-blue",
    "grey",
    "maroon",
    "navy",
    "olive",
    "orange",
    "pink",
    "pink-purple",
    "purple",
    "red",
    "red-brown",
    "red-orange",
    "rust",
    "salmon",
    "silver",
    "sky",
    "stone",
    "tan",
    "turquoise",
    "violet",
    "white",
    "yellow",
}
_color_pat = re.compile(r"\b(" + "|".join(COLOR_WORDS) + r")\b", re.I)
def emphasize_colors(cap:str,max_emph:int=10):
    cnt=[0]
    return _color_pat.sub(lambda m: f"(({m.group(0)}))" if (cnt.__setitem__(0,cnt[0]+1) or True) and cnt[0]<=max_emph else m.group(0), cap)
# =========================================================

In [None]:
out_imgs, out_img_names = [], []

for img_id, img_path, caption in zip(test_df["ID"],
                                     test_df["input_img_path"],
                                     test_df["caption"]):

    # 1) 입력 이미지 전처리
    input_img      = Image.open(img_path).convert("RGB")
    control_image  = [gray_image(input_img),
                      canny_edge(input_img),
                      depth_image(input_img)]

    # 2) 프롬프트 준비
    PREFIX  = "vibrant natural colors, realistic lighting, balanced white-balance. "
    negative_prompt   = "wrong color, dull or oversaturated, monochrome, artifacts, color shift"
    cap_emph   = emphasize_colors(caption)          # ((red)) car …
    full_prompt = PREFIX + cap_emph                 # 완성 프롬프트
    # 3) 프롬프트 길이에 따라 분기
    token_count = len(pipe.tokenizer(full_prompt).input_ids)

    with torch.autocast("cuda", dtype=torch.float16):
        if token_count <= MAX_TOKENS:
            # ---- 77 토큰 이하: Compel 생략 ----
            output_img = pipe(
                prompt=full_prompt,
                negative_prompt=negative_prompt,    # 빈 문자열이면 내부에서 자동 토큰화
                image=control_image,
                controlnet_conditioning_scale=[0.7,0.5,0.3],# gray,canny,depth
                guidance_scale=7.5,
                num_inference_steps=72,
                cfg_rescale         = 0.5,
            ).images[0]

        else:
            # ---- 77 토큰 초과: Compel 사용 ----
            conditioning          = compel.build_conditioning_tensor(full_prompt)
            negative_conditioning = compel.build_conditioning_tensor(negative_prompt)

            conditioning, negative_conditioning = compel.pad_conditioning_tensors_to_same_length(
                [conditioning, negative_conditioning]
            )

            output_img = pipe(
                prompt_embeds=conditioning,
                negative_prompt_embeds=negative_conditioning,
                image=control_image,
                controlnet_conditioning_scale=[0.7,0.5,0.3],# gray,canny,depth
                guidance_scale=7.5,
                num_inference_steps=72,
                cfg_rescale         = 0.5,
            ).images[0]


    # ---------- LAB 후처리 (밝기 채널 교체) ----------
    color_lab = cv2.cvtColor(np.array(output_img),   cv2.COLOR_RGB2LAB)
    gray_lab  = cv2.cvtColor(np.array(control_image[0]), cv2.COLOR_RGB2LAB)
    color_lab[:, :, 0] = gray_lab[:, :, 0]            # L 채널만 교체
    output_img = Image.fromarray(
        cv2.cvtColor(color_lab, cv2.COLOR_LAB2RGB)
    )
    # -------------------------------------------------


    out_imgs.append(output_img)
    out_img_names.append(img_id)

print("✅ Test 데이터셋에 대한 모든 이미지 생성 완료.")

## Submission

In [None]:
# 추론 결과물 디렉토리 생성
os.makedirs(CFG['SUB_DIR'], exist_ok=True)

In [None]:
# **중요** 추론 이미지 평가용 Embedding 추출 모델
clip_model, _, clip_preprocess = open_clip.create_model_and_transforms("ViT-L-14", pretrained="openai") # 모델명을 반드시 일치시켜야합니다.

In [None]:
clip_model.to(device)
# 평가 제출을 위해 추론된 이미지들을 ViT-L-14 모델로 임베딩 벡터(Feature)를 추출합니다.
feat_imgs = []
for output_img, img_id in tqdm(zip(out_imgs, out_img_names)):
    path_out_img = CFG['SUB_DIR'] + '/' + img_id + '.png'
    output_img.save(path_out_img)
    # 평가용 임베딩 생성 및 저장
    output_img = clip_preprocess(output_img).unsqueeze(0).cuda()
    with torch.no_grad():
        feat_img = clip_model.encode_image(output_img)
        feat_img /= feat_img.norm(dim=-1, keepdim=True) # L2 정규화 필수

    feat_img = feat_img.detach().cpu().numpy().reshape(-1)
    feat_imgs.append(feat_img)

In [None]:
feat_imgs = np.array(feat_imgs)
vec_columns = [f'vec_{i}' for i in range(feat_imgs.shape[1])]
feat_submission = pd.DataFrame(feat_imgs, columns=vec_columns)
feat_submission.insert(0, 'ID', out_img_names)

In [None]:
feat_submission.to_csv(CFG['SUB_DIR']+'/embed_submission.csv', index=False)

## 리더보드 제출을 위한 ZIP 파일 생성

In [None]:
# 최종 제출물 (ZIP) 생성 경로
# 제출물 (ZIP) 내에는 디렉토리(폴더)가 없이 구성해야합니다.
zip_path = './트리플컨트롤넷디퓨저변환.zip'

# zip 파일 생성
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
    for file_name in os.listdir(CFG['SUB_DIR']):
        file_path = os.path.join(CFG['SUB_DIR'], file_name)

        # 일반 파일이며 숨김 파일이 아닌 경우만 포함
        if os.path.isfile(file_path) and not file_name.startswith('.'):
            zipf.write(file_path, arcname=file_name)

print(f"✅ 압축 완료: {zip_path}")