In [20]:
# 环境初始化
import os
from dotenv import load_dotenv
load_dotenv()

# ComfyUI 服务地址
COMFYUI_URL = os.getenv("COMFYUI_URL")

print(f"Using ComfyUI URL: {COMFYUI_URL}")

Using ComfyUI URL: http://home.chrissong.top:3818


In [None]:
# This is an example that uses the websockets api to know when a prompt execution is done
# Once the prompt execution is done it downloads the images using the /history endpoint

import websocket  # NOTE: websocket-client (https://github.com/websocket-client/websocket-client)
import uuid
import json
import urllib.request
import urllib.parse
import asyncio
from io import BytesIO
# 加载工作流 api 然后执行

from utils.load_json import load_json

# WORKFLOW_API_FILE = "comfyui_wf_api/base_api.json"
WORKFLOW_API_FILE = "comfyui_wf_api/image2image.json"

workflow = load_json(WORKFLOW_API_FILE)

server_address = "home.chrissong.top:3818"
client_id = str(uuid.uuid4())


def queue_prompt(prompt, prompt_id):
    p = {"prompt": prompt, "client_id": client_id, "prompt_id": prompt_id}
    data = json.dumps(p).encode("utf-8")
    req = urllib.request.Request("http://{}/prompt".format(server_address), data=data)
    urllib.request.urlopen(req).read()


def get_image(filename, subfolder, folder_type):
    data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
    url_values = urllib.parse.urlencode(data)
    with urllib.request.urlopen(
        "http://{}/view?{}".format(server_address, url_values)
    ) as response:
        return response.read()


def get_history(prompt_id):
    with urllib.request.urlopen(
        "http://{}/history/{}".format(server_address, prompt_id)
    ) as response:
        return json.loads(response.read())


def get_images(ws, prompt):
    prompt_id = str(uuid.uuid4())
    queue_prompt(prompt, prompt_id)
    output_images = {}
    while True:
        out = ws.recv()
        if isinstance(out, str):
            message = json.loads(out)
            if message["type"] == "executing":
                data = message["data"]
                if data["node"] is None and data["prompt_id"] == prompt_id:
                    break  # Execution is done
        else:
            # If you want to be able to decode the binary stream for latent previews, here is how you can do it:
            # bytesIO = BytesIO(out[8:])
            # preview_image = Image.open(bytesIO) # This is your preview in PIL image format, store it in a global
            continue  # previews are binary data

    history = get_history(prompt_id)[prompt_id]
    for node_id in history["outputs"]:
        node_output = history["outputs"][node_id]
        images_output = []
        if "images" in node_output:
            for image in node_output["images"]:
                image_data = get_image(
                    image["filename"], image["subfolder"], image["type"]
                )
                images_output.append(image_data)
        output_images[node_id] = images_output

    return output_images


prompt = workflow
ws_url = f"ws://{server_address}/ws"
print(f"尝试连接WebSocket: {ws_url}")
ws = websocket.create_connection(ws_url)
# ws.connect(
#     ws_url,
#     header=[f"Origin: http://{server_address}"],
# )
images = get_images(ws, prompt)
ws.close()  # for in case this example is used in an environment where it will be repeatedly called, like in a Gradio app. otherwise, you'll randomly receive connection timeouts
# Commented out code to display the output images:

for node_id in images:
    for image_data in images[node_id]:
        from PIL import Image
        import io

        image = Image.open(io.BytesIO(image_data))
        image.show()
print("Done")