# **Qwen-image for Generating & Editing Images**
- The T4 takes about 5 minutes to edit a single upload of a 480×848 image, and about 7 minutes to edit two uploaded 480×848 images or outpaint a 480×848 image to 960x848 with the default settings. You can generate an image by either not uploading any image or enabling `generate_image`.
- You can find models here: (1)https://huggingface.co/Comfy-Org  (2)https://huggingface.co/QuantStack/Qwen-Image-Edit-GGUF/tree/main (3) https://huggingface.co/QuantStack/Qwen-Image-Edit-2509-GGUF/tree/main (4) https://huggingface.co/lightx2v/Qwen-Image-Lightning/tree/main
- Enabling the speedup LoRA will internally set the steps to 4 and the CFG to 1.
- **To use a lora, put its huggingface or civitai download link in the `lora_download_url` textbox, select the `download_lora` checkbox, and if using civitai, input your civitai token before running the code to `Prepare Environment`. Remember to describe the main subject of the image and include the trigger words for the LoRA in the prompt. You can watch this video to learn how to get and use LoRAs from huggingface and civitai, and how to create your civitai token: https://youtu.be/49NkIV_QpBM**
- Github project page: https://github.com/QwenLM/Qwen-Image
- Notebook source: https://github.com/Isi-dev/Google-Colab_Notebooks
- Premium notebooks I highly recommend: https://isinse.gumroad.com/
- Google Colab Youtube Playlist: https://www.youtube.com/playlist?list=PLdi1sS5pbSYeA470Sb1wARR4OieCBIqMv
- Even $1 helps support my work: https://buymeacoffee.com/isiomo



In [None]:

# qwen_model_download_url: https://huggingface.co/city96/Qwen-Image-gguf/resolve/main/qwen-image-Q4_K_M.gguf

# qwen_edit_model_download_url: https://huggingface.co/QuantStack/Qwen-Image-Edit-GGUF/resolve/main/Qwen_Image_Edit-Q4_K_M.gguf




















# @markdown # 💥1. Prepare Environment
# !pip install --quiet torch torchvision --index-url https://download.pytorch.org/whl/cu124
# !pip install --upgrade --quiet torch torchvision --index-url https://download.pytorch.org/whl/cu118
!pip install torch==2.8.0 torchvision==0.23.0
%cd /content

from IPython.display import clear_output
!git clone --branch ComfyUI_v0.3.51 https://github.com/Isi-dev/ComfyUI
clear_output()
%cd /content/ComfyUI/custom_nodes
!git clone https://github.com/Isi-dev/ComfyUI_Img2PaintingAssistant
!git clone --branch forQwen https://github.com/Isi-dev/ComfyUI_GGUF.git
clear_output()
# !git clone https://github.com/Isi-dev/ComfyUI_KJNodes.git
# clear_output()
%cd /content/ComfyUI/custom_nodes/ComfyUI_GGUF
!pip install -r requirements.txt
clear_output()
# %cd /content/ComfyUI/custom_nodes/ComfyUI_KJNodes
# !pip install -r requirements.txt
# clear_output()
%cd /content/ComfyUI

import subprocess
import sys

def install_pip_packages():
    packages = [
        'torchsde',
        'av',
        'diffusers',
        # 'transformers',
        'xformers==0.0.32.post1',
        'accelerate',
        'triton==3.4',
        'sageattention',
        # 'omegaconf',
        # 'tqdm',
        # 'librosa',
        'einops',
        'spandrel'
    ]

    for package in packages:
        try:
            # Run pip install silently (using -q)
            subprocess.run(
                [sys.executable, '-m', 'pip', 'install', '-q', package],
                check=True,
                capture_output=True
            )
            print(f"✓ {package} installed")
        except subprocess.CalledProcessError as e:
            print(f"✗ Error installing {package}: {e.stderr.decode().strip() or 'Unknown error'}")

def install_apt_packages():
    packages = ['aria2']

    try:
        # Run apt install silently (using -qq)
        subprocess.run(
            ['apt-get', '-y', 'install', '-qq'] + packages,
            check=True,
            capture_output=True
        )
        print("✓ apt packages installed")
    except subprocess.CalledProcessError as e:
        print(f"✗ Error installing apt packages: {e.stderr.decode().strip() or 'Unknown error'}")


print("Installing pip packages...")
install_pip_packages()
clear_output()  # Clear the pip installation output

print("Installing apt packages...")
install_apt_packages()
clear_output()  # Clear the apt installation output

print("Installation completed with status:")
print("- All pip packages installed successfully" if '✗' not in install_pip_packages.__code__.co_consts else "- Some pip packages had issues")
print("- apt packages installed successfully" if '✗' not in install_apt_packages.__code__.co_consts else "- apt packages had issues")

import os
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'

import torch
import numpy as np
from PIL import Image
import gc
import torch
from PIL import Image
import random
import imageio
from google.colab import files
from IPython.display import display, HTML, Image as IPImage
sys.path.insert(0, '/content/ComfyUI')

from nodes import (
    DualCLIPLoader,
    CLIPLoader,
    CLIPTextEncode,
    VAEEncode,
    VAEDecode,
    VAELoader,
    KSampler,
    KSamplerAdvanced,
    ConditioningZeroOut,
    InpaintModelConditioning,
    ImageScaleBy,
    ImageScale,
    LoraLoaderModelOnly,
    LoadImage,
    SaveImage
)

from custom_nodes.ComfyUI_GGUF.nodes import UnetLoaderGGUF

from comfy_extras.nodes_qwen import TextEncodeQwenImageEdit

from comfy_extras.nodes_edit_model import ReferenceLatent

from comfy_extras.nodes_hunyuan import EmptyHunyuanLatentVideo

from comfy_extras.nodes_model_advanced import ModelSamplingSD3

from comfy_extras.nodes_flux import (
    FluxGuidance,
    FluxKontextImageScale
)

from comfy_extras.nodes_images import (
    ImageCrop,
    ImageStitch

)

from custom_nodes.ComfyUI_Img2PaintingAssistant.image_to_painting_node import Painting

from comfy_extras.nodes_sd3 import EmptySD3LatentImage

from comfy_extras.nodes_post_processing import ImageScaleToTotalPixels

from comfy_extras.nodes_mask import SolidMask

def image_width_height(image):
    if image.ndim == 4:
        _, height, width, _ = image.shape
    elif image.ndim == 3:
        height, width, _ = image.shape
    else:
        raise ValueError(f"Unsupported image shape: {image.shape}")
    return width, height

def compute_value_from_size(width: int, height: int) -> float:
    return float(width * height / 4)

def fillMask(width, height, mask, box=(0, 0), color=0):
    bg = Image.new("L", (width, height), color)
    bg.paste(mask, box, mask)
    return bg

def emptyImage(width, height, batch_size=1, color=0):
    r = torch.full([batch_size, height, width, 1], ((color >> 16) & 0xFF) / 0xFF)
    g = torch.full([batch_size, height, width, 1], ((color >> 8) & 0xFF) / 0xFF)
    b = torch.full([batch_size, height, width, 1], ((color) & 0xFF) / 0xFF)
    return torch.cat((r, g, b), dim=-1)

def pil2tensor(image):
  return torch.from_numpy(np.array(image).astype(np.float32) / 255.0).unsqueeze(0)

def tensor2pil(image):
    return Image.fromarray(np.clip(255. * image.cpu().numpy().squeeze(), 0, 255).astype(np.uint8))

def lanczos(samples, width, height):
    images = [Image.fromarray(np.clip(255. * image.movedim(0, -1).cpu().numpy(), 0, 255).astype(np.uint8)) for image in samples]
    images = [image.resize((width, height), resample=Image.Resampling.LANCZOS) for image in images]
    images = [torch.from_numpy(np.array(image).astype(np.float32) / 255.0).movedim(-1, 0) for image in images]
    result = torch.stack(images)
    return result.to(samples.device, samples.dtype)

def bislerp(samples, width, height):
    def slerp(b1, b2, r):
        '''slerps batches b1, b2 according to ratio r, batches should be flat e.g. NxC'''

        c = b1.shape[-1]

        #norms
        b1_norms = torch.norm(b1, dim=-1, keepdim=True)
        b2_norms = torch.norm(b2, dim=-1, keepdim=True)

        #normalize
        b1_normalized = b1 / b1_norms
        b2_normalized = b2 / b2_norms

        #zero when norms are zero
        b1_normalized[b1_norms.expand(-1,c) == 0.0] = 0.0
        b2_normalized[b2_norms.expand(-1,c) == 0.0] = 0.0

        #slerp
        dot = (b1_normalized*b2_normalized).sum(1)
        omega = torch.acos(dot)
        so = torch.sin(omega)

        #technically not mathematically correct, but more pleasing?
        res = (torch.sin((1.0-r.squeeze(1))*omega)/so).unsqueeze(1)*b1_normalized + (torch.sin(r.squeeze(1)*omega)/so).unsqueeze(1) * b2_normalized
        res *= (b1_norms * (1.0-r) + b2_norms * r).expand(-1,c)

        #edge cases for same or polar opposites
        res[dot > 1 - 1e-5] = b1[dot > 1 - 1e-5]
        res[dot < 1e-5 - 1] = (b1 * (1.0-r) + b2 * r)[dot < 1e-5 - 1]
        return res

def common_upscale(samples, width, height, upscale_method, crop):
        orig_shape = tuple(samples.shape)
        if len(orig_shape) > 4:
            samples = samples.reshape(samples.shape[0], samples.shape[1], -1, samples.shape[-2], samples.shape[-1])
            samples = samples.movedim(2, 1)
            samples = samples.reshape(-1, orig_shape[1], orig_shape[-2], orig_shape[-1])
        if crop == "center":
            old_width = samples.shape[-1]
            old_height = samples.shape[-2]
            old_aspect = old_width / old_height
            new_aspect = width / height
            x = 0
            y = 0
            if old_aspect > new_aspect:
                x = round((old_width - old_width * (new_aspect / old_aspect)) / 2)
            elif old_aspect < new_aspect:
                y = round((old_height - old_height * (old_aspect / new_aspect)) / 2)
            s = samples.narrow(-2, y, old_height - y * 2).narrow(-1, x, old_width - x * 2)
        else:
            s = samples

        if upscale_method == "bislerp":
            out = bislerp(s, width, height)
        elif upscale_method == "lanczos":
            out = lanczos(s, width, height)
        else:
            out = torch.nn.functional.interpolate(s, size=(height, width), mode=upscale_method)

        if len(orig_shape) == 4:
            return out

        out = out.reshape((orig_shape[0], -1, orig_shape[1]) + (height, width))
        return out.movedim(2, 1).reshape(orig_shape[:-2] + (height, width))

def make(image_1, direction="left-right", pixels=0, image_2=None, mask_1=None, mask_2=None):
    if image_2 is None:
      image_2 = emptyImage(image_1.shape[2], image_1.shape[1])
      mask_2 = torch.full((1, image_1.shape[1], image_1.shape[2]), 1, dtype=torch.float32, device="cpu")

    elif image_2 is not None and mask_2 is None:
      raise ValueError("mask_2 is required when image_2 is provided")
    if pixels > 0:
      _, img2_h, img2_w, _ = image_2.shape
      h = pixels if direction == 'left-right' else int(img2_h * (pixels / img2_w))
      w = pixels if direction == 'top-bottom' else int(img2_w * (pixels / img2_h))

      image_2 = image_2.movedim(-1, 1)
      image_2 = common_upscale(image_2, w, h, 'bicubic', 'disabled')
      image_2 = image_2.movedim(1, -1)

      orig_image_2 = tensor2pil(image_2)
      orig_mask_2 = tensor2pil(mask_2).convert('L')
      orig_mask_2 = orig_mask_2.resize(orig_image_2.size)
      mask_2 = pil2tensor(orig_mask_2)

    _, img1_h, img1_w, _ = image_1.shape
    _, img2_h, img2_w, _ = image_2.shape

    image, mask, context_mask = None, None, None

    # resize
    if img1_h != img2_h and img1_w != img2_w:
      width, height = img2_w, img2_h
      if direction == 'left-right' and img1_h != img2_h:
        scale_factor = img2_h / img1_h
        width = round(img1_w * scale_factor)
      elif direction == 'top-bottom' and img1_w != img2_w:
        scale_factor = img2_w / img1_w
        height = round(img1_h * scale_factor)

      image_1 = image_1.movedim(-1, 1)
      image_1 = common_upscale(image_1, width, height, 'bicubic', 'disabled')
      image_1 = image_1.movedim(1, -1)

    if mask_1 is None:
      mask_1 = torch.full((1, image_1.shape[1], image_1.shape[2]), 0, dtype=torch.float32, device="cpu")

    orig_image_1 = tensor2pil(image_1)
    orig_mask_1 = tensor2pil(mask_1).convert('L')

    if orig_mask_1.size != orig_image_1.size:
      orig_mask_1 = orig_mask_1.resize(orig_image_1.size)

    img1_w, img1_h = orig_image_1.size
    image_1 = pil2tensor(orig_image_1)
    image = torch.cat((image_1, image_2), dim=2) if direction == 'left-right' else torch.cat((image_1, image_2),
                                                                                             dim=1)

    context_mask = fillMask(image.shape[2], image.shape[1], orig_mask_1)
    context_mask = pil2tensor(context_mask)

    orig_mask_2 = tensor2pil(mask_2).convert('L')
    x = img1_w if direction == 'left-right' else 0
    y = img1_h if direction == 'top-bottom' else 0
    mask = fillMask(image.shape[2], image.shape[1], orig_mask_2, (x, y))
    mask = pil2tensor(mask)

    return (image, mask, context_mask, img2_w, img2_h, x, y)




from pathlib import Path




def download_with_aria2c(link, folder="/content/ComfyUI/models/loras"):
    import os

    filename = link.split("/")[-1]
    command = f"aria2c --console-log-level=error -c -x 16 -s 16 -k 1M {link} -d {folder} -o {filename}"

    print("Executing download command:")
    print(command)

    os.makedirs(folder, exist_ok=True)
    get_ipython().system(command)

    return filename



def download_civitai_model(civitai_link, civitai_token, folder="/content/ComfyUI/models/loras"):
    import os
    import time

    os.makedirs(folder, exist_ok=True)

    try:
        model_id = civitai_link.split("/models/")[1].split("?")[0]
    except IndexError:
        raise ValueError("Invalid Civitai URL format. Please use a link like: https://civitai.com/api/download/models/1523247?...")

    civitai_url = f"https://civitai.com/api/download/models/{model_id}?type=Model&format=SafeTensor"
    if civitai_token:
        civitai_url += f"&token={civitai_token}"

    timestamp = time.strftime("%Y%m%d_%H%M%S")
    filename = f"model_{timestamp}.safetensors"

    full_path = os.path.join(folder, filename)

    download_command = f"wget --max-redirect=10 --show-progress \"{civitai_url}\" -O \"{full_path}\""
    print("Downloading from Civitai...")

    os.system(download_command)

    local_path = os.path.join(folder, filename)
    if os.path.exists(local_path) and os.path.getsize(local_path) > 0:
        print(f"LoRA downloaded successfully: {local_path}")
    else:
        print(f"❌ LoRA download failed or file is empty: {local_path}")

    return filename


def download_lora(link, folder="/content/ComfyUI/models/loras", civitai_token=None):
    """
    Download a model file, automatically detecting if it's a Civitai link or huggingface download.

    Args:
        link: The download URL (either huggingface or Civitai)
        folder: Destination folder for the download
        civitai_token: Optional token for Civitai downloads (required if link is from Civitai)

    Returns:
        The filename of the downloaded model
    """
    if "civitai.com" in link.lower():
        if not civitai_token:
            raise ValueError("Civitai token is required for Civitai downloads")
        return download_civitai_model(link, civitai_token, folder)
    else:
        return download_with_aria2c(link, folder)

qwen_model_download_url = "https://huggingface.co/QuantStack/Qwen-Image-Edit-GGUF/resolve/main/Qwen_Image_Edit-Q4_K_M.gguf"# @param {"type":"string"}
qwen_text_encoder_download_url = "https://huggingface.co/Comfy-Org/Qwen-Image_ComfyUI/resolve/main/split_files/text_encoders/qwen_2.5_vl_7b_fp8_scaled.safetensors"# @param {"type":"string"}
qwen_vae_download_url = "https://huggingface.co/Comfy-Org/Qwen-Image_ComfyUI/resolve/main/split_files/vae/qwen_image_vae.safetensors"# @param {"type":"string"}
qwen_speedup_lora_download_url = "https://huggingface.co/lightx2v/Qwen-Image-Lightning/resolve/main/Qwen-Image-Lightning-4steps-V1.0-bf16.safetensors"# @param {"type":"string"}

download_loRA = False # @param {type:"boolean"}
qwen_lora_download_url = ""# @param {"type":"string"}
token_if_civitai_url = ""# @param {"type":"string"}



lora = None
if download_loRA:
    lora = download_lora(qwen_lora_download_url, civitai_token=token_if_civitai_url)
# Validate loRA file extension
valid_extensions = {'.safetensors', '.ckpt', '.pt', '.pth', '.sft'}
if lora:
    if not any(lora.lower().endswith(ext) for ext in valid_extensions):
        print(f"❌ Invalid LoRA format: {lora}")
        lora = None
    else:
        clear_output()
        print("loRA downloaded succesfully!")

def model_download(url: str, dest_dir: str, filename: str = None, silent: bool = True) -> bool:
    """
    Colab-optimized download with aria2c

    Args:
        url: Download URL
        dest_dir: Target directory (will be created if needed)
        filename: Optional output filename (defaults to URL filename)
        silent: If True, suppresses all output (except errors)

    Returns:
        bool: True if successful, False if failed
    """
    try:
        # Create destination directory
        Path(dest_dir).mkdir(parents=True, exist_ok=True)

        # Set filename if not specified
        if filename is None:
            filename = url.split('/')[-1].split('?')[0]  # Remove URL parameters

        # Build command
        cmd = [
            'aria2c',
            '--console-log-level=error',
            '-c', '-x', '16', '-s', '16', '-k', '1M',
            '-d', dest_dir,
            '-o', filename,
            url
        ]

        # Add silent flags if requested
        if silent:
            cmd.extend(['--summary-interval=0', '--quiet'])
            print(f"Downloading {filename}...", end=' ', flush=True)

        # Run download
        result = subprocess.run(cmd, check=True, capture_output=True, text=True)

        if silent:
            print("Done!")
        else:
            print(f"Downloaded {filename} to {dest_dir}")
        return filename

    except subprocess.CalledProcessError as e:
        error = e.stderr.strip() or "Unknown error"
        print(f"\nError downloading {filename}: {error}")
        return False
    except Exception as e:
        print(f"\nError: {str(e)}")
        return False


flux_model = model_download(qwen_model_download_url, "/content/ComfyUI/models/unet")
flux_vae = model_download(qwen_vae_download_url, "/content/ComfyUI/models/vae")
flux_t5xxl = model_download(qwen_text_encoder_download_url, "/content/ComfyUI/models/clip")
flux_1_turbo = model_download(qwen_speedup_lora_download_url, "/content/ComfyUI/models/loras")

paint = Painting()

def clear_memory():
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        torch.cuda.ipc_collect()
    for obj in list(globals().values()):
        if torch.is_tensor(obj) or (hasattr(obj, "data") and torch.is_tensor(obj.data)):
            del obj
    gc.collect()


def save_as_image(image, filename_prefix, output_dir="/content/ComfyUI/output"):
    """Save single frame as PNG image."""
    os.makedirs(output_dir, exist_ok=True)
    output_path = f"{output_dir}/{filename_prefix}.png"

    frame = (image.cpu().numpy() * 255).astype(np.uint8)

    Image.fromarray(frame).save(output_path)

    return output_path


def save_as_mp4(images, filename_prefix, fps, output_dir="/content/ComfyUI/output"):
    os.makedirs(output_dir, exist_ok=True)
    output_path = f"{output_dir}/{filename_prefix}.mp4"

    frames = []
    for i, img in enumerate(images):
        try:

            if isinstance(img, torch.Tensor):
                img = img.cpu().numpy()

            # print(f"Frame {i} initial shape: {img.shape}, dtype: {img.dtype}, max: {img.max()}")  # Debug


            if img.max() <= 1.0:
                img = (img * 255).astype(np.uint8)
            else:
                img = img.astype(np.uint8)


            if len(img.shape) == 4:  # Batch dimension? (N, C, H, W)
                img = img[0]  # Take first image in batch

            if len(img.shape) == 3:
                if img.shape[0] in (1, 3, 4):  # CHW format
                    img = np.transpose(img, (1, 2, 0))
                elif img.shape[2] > 4:  # Too many channels
                    img = img[:, :, :3]
            elif len(img.shape) == 2:
                img = np.expand_dims(img, axis=-1)

            # print(f"Frame {i} processed shape: {img.shape}")  # Debug

            # Final validation
            if len(img.shape) != 3 or img.shape[2] not in (1, 3, 4):
                raise ValueError(f"Invalid frame shape after processing: {img.shape}")

            frames.append(img)
        except Exception as e:
            print(f"Error processing frame {i}: {str(e)}")
            raise

    try:
        with imageio.get_writer(output_path, fps=fps) as writer:
            for i, frame in enumerate(frames):
                # print(f"Writing frame {i} with shape: {frame.shape}")  # Debug
                writer.append_data(frame)
    except Exception as e:
        print(f"Error writing video: {str(e)}")
        raise

    return output_path

import cv2
import shutil
from IPython.display import Video
import datetime


def upload_file():
    """Handle file upload (image or video) and return paths."""
    os.makedirs('/content/ComfyUI/input', exist_ok=True)
    uploaded = files.upload()

    paths = []
    for filename in uploaded.keys():
        src_path = f'/content/ComfyUI/{filename}'
        dest_path = f'/content/ComfyUI/input/{filename}'
        shutil.move(src_path, dest_path)
        paths.append(dest_path)
        print(f"File saved to: {dest_path}")

    return paths[0] if paths else None

def display_video(video_path):
    from IPython.display import HTML
    from base64 import b64encode

    video_data = open(video_path,'rb').read()

    # Determine MIME type based on file extension
    if video_path.lower().endswith('.mp4'):
        mime_type = "video/mp4"
    elif video_path.lower().endswith('.webm'):
        mime_type = "video/webm"
    elif video_path.lower().endswith('.webp'):
        mime_type = "image/webp"
    else:
        mime_type = "video/mp4"  # default

    data_url = f"data:{mime_type};base64," + b64encode(video_data).decode()

    display(HTML(f"""
    <video width=512 controls autoplay loop>
        <source src="{data_url}" type="{mime_type}">
    </video>
    """))

outputImagePath = None

def edit_input(
    image_path: str = None,
    image_path2: str = None,
    positive_prompt: str = "",
    negative_prompt: str = "",
    guidance: float = 2.5,
    generate_image: bool = False,
    change_resolution: bool = False,
    width: int = 832,
    height: int = 480,
    change_output_resolution: bool = False,
    width_out: int = 1024,
    height_out: int = 1024,
    seed: int = 0,
    steps: int = 20,
    cfg: float = 1.0,
    sampler_name: str = "euler",
    scheduler: str = "simple",
    denoise: float = 1.0,
    use_turbo_lora: bool = False,
    use_lora: bool = False,
    LoRA_Strength: float = 1.0,
    overwrite: bool = False

):

    with torch.inference_mode():

        clip_loader = CLIPLoader()
        unet_loader =  UnetLoaderGGUF()
        model_sampling = ModelSamplingSD3()
        text_encode_qwen_image_edit = TextEncodeQwenImageEdit()
        # unet_loader =  UNETLoader()
        vae_loader =   VAELoader()
        vae_encode = VAEEncode()
        vae_decode = VAEDecode()
        ksampler = KSamplerAdvanced()
        load_lora = LoraLoaderModelOnly()
        load_turbo_lora = LoraLoaderModelOnly()
        load_image = LoadImage()
        load_image2 = LoadImage()
        save_image = SaveImage()
        positive_prompt_encode = CLIPTextEncode()
        negative_prompt_encode = CLIPTextEncode()
        # inpaint_model_conditioning = InpaintModelConditioning()
        empty_latent_image = EmptySD3LatentImage()
        # flux_guidance = FluxGuidance()
        flux_kontext_scale = FluxKontextImageScale()
        image_stitch = ImageStitch()
        # reference_latent = ReferenceLatent()
        # imageScaleToTotalPixels = ImageScaleToTotalPixels()
        # umage_resize = ImageResizeKJ()
        # solid_mask = SolidMask()
        image_scaler1 = ImageScale()
        image_scaler = ImageScale()
        image_scale_by = ImageScaleBy()
        image_crop = ImageCrop()

        print("Loading Text_Encoder...")
        clip = clip_loader.load_clip(flux_t5xxl, type="qwen_image", device="default")[0]

        # positive_prompt = f'A diptych with two side-by-side images of the same scene. On the right, the scene is exactly the same as on the left but {positive_prompt}'


        print("Loading VAE...")
        vae = vae_loader.load_vae(flux_vae)[0]

        if generate_image:
            positive_prompt = positive_prompt_encode.encode(clip, positive_prompt)[0]
            negative_prompt = negative_prompt_encode.encode(clip, negative_prompt)[0]
            latent = empty_latent_image.generate(width, height, 1)[0]

        else:

            if image_path is not None:
                image = load_image.load_image(image_path)[0]
                if change_resolution:
                        image = image_scaler1.upscale(
                            image,
                            "lanczos",
                            width,
                            height,
                            "disabled"
                        )[0]

                if image_path2 is not None:
                    image2 = load_image2.load_image(image_path2)[0]
                    width_int, height_int = image_width_height(image)
                    width_int2, height_int2 = image_width_height(image2)
                    if width_int != width_int2 or height_int != height_int2:
                        image2 = image_scaler.upscale(
                            image2,
                            "lanczos",
                            width_int,
                            height_int,
                            "disabled"
                        )[0]

                    image = image_stitch.stitch(
                            image,
                            "right",
                            True,
                            0,
                            "white",
                            image2,
                        )[0]

                    # image = flux_kontext_scale.scale(image)[0]
                else:
                    image2 = None

                positive_prompt = text_encode_qwen_image_edit.encode(clip, positive_prompt, vae=vae, image=image)[0]
                negative_prompt = text_encode_qwen_image_edit.encode(clip, positive_prompt, vae=vae, image=image)[0]

                latent = vae_encode.encode(vae, image)[0]

            else:
                print("You did not upload the main image, so an image will be generated instead.")
                positive_prompt = positive_prompt_encode.encode(clip, positive_prompt)[0]
                negative_prompt = negative_prompt_encode.encode(clip, negative_prompt)[0]
                latent = empty_latent_image.generate(width, height, 1)[0]

        del clip
        torch.cuda.empty_cache()
        gc.collect()

        base_name = "generated_Image"
        if not overwrite:
            timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
            base_name += f"_{timestamp}"

        try:
            print("Loading Unet Model...")
            model = unet_loader.load_unet(flux_model)[0]

            model = model_sampling.patch(model, guidance, 1)[0]

            usedcfg = cfg
            usedSteps = steps

            if use_turbo_lora:
                print("Loading speedup Lora...")
                model = load_turbo_lora.load_lora_model_only(model, flux_1_turbo, 1)[0]
                usedSteps = 4
                usedcfg=1


            if use_lora and lora is not None:
                print("Loading Lora...")
                model = load_lora.load_lora_model_only(model, lora, LoRA_Strength)[0]

            if change_output_resolution:
                latent = empty_latent_image.generate(width_out, height_out, 1)[0]

            # clear_output()



            print("Generating image...")
            image_out_latent = ksampler.sample(
                model=model,
                add_noise="enable",
                noise_seed=seed,
                steps=usedSteps,
                cfg=usedcfg,
                sampler_name=sampler_name,
                scheduler=scheduler,
                positive=positive_prompt,
                negative=negative_prompt,
                latent_image=latent,
                start_at_step=0,
                end_at_step=1000,
                return_with_leftover_noise="disable"

            )[0]




            del model
            torch.cuda.empty_cache()
            gc.collect()

            print("Decoding latents...")
            decoded = vae_decode.decode(vae, image_out_latent)[0]

            del vae
            torch.cuda.empty_cache()
            gc.collect()

            # only_result_image = image_crop.crop(decoded,width_int, height_int,width_int,0)[0]

            global outputImagePath
            base_name = "ComfyUI"
            if not overwrite:
                timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
                base_name += f"_{timestamp}"

            outputImagePath = save_as_image(decoded[0], base_name)
            display(IPImage(filename=outputImagePath))

        except Exception as e:
            print(f"Error during Image Editing/saving: {str(e)}")
            raise
        finally:
            clear_memory()

def adjust_color_properties(
    image_path: str = None,
    brightness: float = 1,
    hue: float = 0,
    saturation: float = 1.1,
    lightness: float = 1.4,
    overwrite: bool = False
):
    image = load_image.load_image(image_path)[0]
    _ , painted = paint.process(image, 26, 1, 7, brightness, hue, saturation, lightness, 1, correct_black_Img=False)

    base_name = "ComfyUI"
    if not overwrite:
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        base_name += f"_{timestamp}"

    imageToPaint = save_as_image(painted[0], base_name)
    display(IPImage(filename=imageToPaint))

clear_output()

print("✅ Environment Setup Complete!")



In [None]:
# @markdown # Upload Image 1 (optional)

file_uploaded = upload_file()
display_upload = False # @param {type:"boolean"}
if display_upload:
    if file_uploaded.lower().endswith(('.png', '.jpg', '.jpeg')):
        display(IPImage(filename=file_uploaded))
    else:
        print("Image format cannnot be displayed.")


In [None]:
# @markdown # Upload Image 2 (optional)

file_uploaded2 = upload_file()
display_upload = False # @param {type:"boolean"}
if display_upload:
    if file_uploaded2.lower().endswith(('.png', '.jpg', '.jpeg')):
        display(IPImage(filename=file_uploaded2))
    else:
        print("Image format cannnot be displayed.")


In [None]:
# @markdown # 💥2. Generate/Edit Image

positive_prompt="A man carrying a beautiful young lady in his arms" # @param {"type":"string"}
negative_prompt="jpeg compression" # @param {"type":"string"}
shift=3 # @param {"type":"slider","min":0.0,"max":100.0,"step":0.1}
# @markdown ---
# @markdown ### Image Settings
generate_image=False # @param {type:"boolean"}
do_not_use_uploaded_image2=False # @param {type:"boolean"}
change_resolution=False # @param {type:"boolean"}
new_width = 480 # @param {"type":"number"}
new_height = 848 # @param {"type":"number"}
change_output_resolution=False # @param {type:"boolean"}
width_out = 720 # @param {"type":"number"}
height_out = 1280 # @param {"type":"number"}
edit_output_image=False # @param {type:"boolean"}
overwrite_previous_output=False # @param {type:"boolean"}
# @markdown ---
# @markdown ### Sampler Settings
seed=0 # @param {"type":"integer"}
steps = 20 # @param {"type":"slider","min":0,"max":100,"step":1}
cfg = 2.5 # @param {"type":"slider","min":0,"max":20,"step":0.5}
sampler_name="euler" # @param ["uni_pc", "uni_pc_bh2", "ddim","euler", "euler_cfg_pp", "euler_ancestral", "euler_ancestral_cfg_pp", "heun", "heunpp2","dpm_2", "dpm_2_ancestral","lms", "dpm_fast", "dpm_adaptive", "dpmpp_2s_ancestral", "dpmpp_2s_ancestral_cfg_pp", "dpmpp_sde", "dpmpp_sde_gpu","dpmpp_2m", "dpmpp_2m_cfg_pp", "dpmpp_2m_sde", "dpmpp_2m_sde_gpu", "dpmpp_3m_sde", "dpmpp_3m_sde_gpu", "ddpm", "lcm","ipndm", "ipndm_v", "deis", "res_multistep", "res_multistep_cfg_pp", "res_multistep_ancestral", "res_multistep_ancestral_cfg_pp","gradient_estimation", "er_sde", "seeds_2", "seeds_3"]
scheduler="simple" # @param ["simple","normal","karras","exponential","sgm_uniform","ddim_uniform","beta","linear_quadratic","kl_optimal"]
denoise=1 # @param {"type":"slider","min":0.0,"max":1.0,"step":0.01}
# @markdown ---
# @markdown ### LoRA Settings
use_speedup_lora=True # @param {type:"boolean"}
use_lora=False # @param {type:"boolean"}
LoRA_Strength=1 # @param {"type":"slider","min":-1,"max":1,"step":0.01}

file_uploaded11  = globals().get("file_uploaded", None)
file_uploaded22 = globals().get("file_uploaded2", None)

if edit_output_image and outputImagePath is not None:
    file_uploaded11 = outputImagePath
    file_uploaded22 = None

import random
seed = seed if seed != 0 else random.randint(0, 2**32 - 1)
print(f"Using seed: {seed}")

if do_not_use_uploaded_image2 and file_uploaded22 is not None:
    file_uploaded22 = None

edit_input(
    image_path = file_uploaded11,
    image_path2 = file_uploaded22,
    positive_prompt = positive_prompt,
    negative_prompt = negative_prompt,
    guidance = shift,
    generate_image = generate_image,
    change_resolution=change_resolution,
    width=new_width,
    height=new_height,
    change_output_resolution=change_output_resolution,
    width_out=width_out,
    height_out=height_out,
    seed = seed,
    steps = steps,
    cfg = cfg,
    sampler_name = sampler_name,
    scheduler = scheduler,
    denoise = denoise,
    use_turbo_lora = use_speedup_lora,
    use_lora = use_lora,
    LoRA_Strength = LoRA_Strength,
    overwrite = overwrite_previous_output
)

clear_memory()