In [28]:
import requests

# 서버의 ComfyUI API URL 설정
url = "http://127.0.0.1:8189/prompt"  # 현재 사용 중인 엔드포인트

# GET 요청을 보내어 엔드포인트 테스트
try:
    response = requests.get(url)
    print(f"Status Code: {response.status_code}")
    print(f"Response: {response.text}")
except requests.exceptions.RequestException as e:
    print(f"An error occurred: {e}")

Status Code: 200
Response: {"exec_info": {"queue_remaining": 0}}


In [32]:
import requests
import json

# 서버의 ComfyUI API URL 설정
server_address = "http://127.0.0.1:8189"
url = f"{server_address}/prompt"

# 워크플로우 JSON 로드
with open("workflow_api.json") as f:
    workflow = json.load(f)

# CLIPTextEncode 클래스를 찾아 프롬프트 수정
for key, node in workflow.items():
    if isinstance(node, dict) and node.get("class_type") == "CLIPTextEncode":
        node["inputs"]["text"] = "A futuristic cityscape with neon lights"  # 새로운 프롬프트 설정

# API 요청 데이터 구성
data = {
    "prompt": workflow
}

# API 요청 보내기
response = requests.post(url, json=data)
if response.status_code == 200:
    print("Prompt queued successfully!")
else:
    print(f"Error: {response.status_code}", response.text)

Prompt queued successfully!


In [11]:
import os
from dotenv import load_dotenv
from lumaai import LumaAI

# .env 파일 로드
load_dotenv()

# 환경 변수 가져오기
api_key = os.getenv("LUMAAI_API_KEY")

# LumaAI 클라이언트 설정
client = LumaAI(auth_token=api_key)

In [14]:
import requests
import time
from lumaai import LumaAI

client = LumaAI()

generation = client.generations.create(
  prompt="A teddy bear in sunglasses playing electric guitar and dancing",
)
completed = False
while not completed:
  generation = client.generations.get(id=generation.id)
  if generation.state == "completed":
    completed = True
  elif generation.state == "failed":
    raise RuntimeError(f"Generation failed: {generation.failure_reason}")
  print("Dreaming")
  time.sleep(3)

video_url = generation.assets.video

# download the video
response = requests.get(video_url, stream=True)
with open(f'{generation.id}.mp4', 'wb') as file:
    file.write(response.content)
print(f"File downloaded as {generation.id}.mp4")

Dreaming
Dreaming
Dreaming
Dreaming
Dreaming
Dreaming
Dreaming
Dreaming
Dreaming
Dreaming
Dreaming
Dreaming
Dreaming
File downloaded as d13be421-4a7c-49d4-8351-e342cc287cd8.mp4


In [None]:
import requests
import json
import base64
import time
import os
from dotenv import load_dotenv
from lumaai import LumaAI
from cloudinary import CloudinaryImage, config as cloudinary_config
from cloudinary.uploader import upload as cloudinary_upload

# .env 파일 로드
load_dotenv()

# 환경 변수에서 API 키 가져오기
api_key = os.getenv("LUMAAI_API_KEY")

# LumaAI 클라이언트 설정
client = LumaAI(auth_token=api_key)

# 서버의 ComfyUI API URL 설정
server_address = "http://127.0.0.1:8189"
comfy_url = f"{server_address}/prompt"

# Cloudinary 설정
cloudinary_config(
    cloud_name='dantiosiz',
    api_key='344612546963595',
    api_secret='z6uJ4nMCrznoy9YYXPvRYytQDgg'
)

# ComfyUI 워크플로우 JSON 로드
with open("workflow_api.json") as f:
    workflow = json.load(f)

# CLIPTextEncode 클래스를 찾아 프롬프트 수정
for key, node in workflow.items():
    if isinstance(node, dict) and node.get("class_type") == "CLIPTextEncode":
        node["inputs"]["text"] = "A teddy bear in sunglasses playing electric guitar and dancing"

# ComfyUI API 요청 보내기
response = requests.post(comfy_url, json={"prompt": workflow})
if response.status_code == 200:
    print("Prompt queued successfully! Checking every 10 seconds for up to 1 minute to allow image generation...")

    # 최대 1분 동안 10초 간격으로 상태 확인
    image_data = None
    for _ in range(6):  # 최대 6번 (10초 * 6 = 60초)
        time.sleep(10)
        
        # ComfyUI에서 이미지 데이터 가져오기
        image_data_response = requests.get(f"{server_address}/result/{response.json().get('task_id')}")  # 'result' 경로와 'task_id'는 ComfyUI API 사양에 맞게 수정 필요
        image_data = image_data_response.json().get('image_data') if image_data_response.status_code == 200 else None

        if image_data:
            print("Image generation completed!")
            break  # 이미지가 생성되면 즉시 루프 종료
        else:
            print("Waiting for image generation...")

    # 이미지 생성이 완료되었는지 확인 후 작업 진행
    if image_data:
        image_path = "generated_image.png"
        
        # base64 데이터를 로컬에 이미지로 저장
        with open(image_path, "wb") as img_file:
            img_file.write(base64.b64decode(image_data))
        print(f"Image saved locally at: {image_path}")

        # Cloudinary에 이미지 업로드
        try:
            upload_result = cloudinary_upload(image_path, public_id='test_image', overwrite=True)
            cloudinary_url = CloudinaryImage('test_image').build_url(fetch_format='auto', quality='auto')
            print(f"Image URL on Cloudinary: {cloudinary_url}")

            # Luma API에 이미지 URL 전달하여 비디오 생성
            generation = client.generations.create(
                prompt="The person in the scene should have minimal movement, with gentle, subtle motions like breathing or slight head turns",
                keyframes={
                    "frame0": {
                        "type": "image",
                        "url": cloudinary_url
                    }
                }
            )
            completed = False
            while not completed:
                generation = client.generations.get(id=generation.id)
                if generation.state == "completed":
                    completed = True
                    video_url = generation.assets.video

                    # 비디오 다운로드
                    video_response = requests.get(video_url, stream=True)
                    video_path = f"{generation.id}.mp4"
                    with open(video_path, 'wb') as video_file:
                        for chunk in video_response.iter_content(chunk_size=1024):
                            if chunk:
                                video_file.write(chunk)
                    print(f"Video downloaded locally as {video_path}")
                elif generation.state == "failed":
                    raise RuntimeError(f"Generation failed: {generation.failure_reason}")
                print("Generating animation...")
        except Exception as e:
            print("Failed to upload image to Cloudinary or generate video:", e)
    else:
        print("Error: Image was not generated within the 1-minute limit.")
else:
    print(f"Error: {response.status_code}", response.text)

Prompt queued successfully! Checking every 10 seconds for up to 1 minute to allow image generation...
Waiting for image generation...
Waiting for image generation...
Waiting for image generation...
Waiting for image generation...
Waiting for image generation...


In [33]:
import requests
import json
import os
from dotenv import load_dotenv
from lumaai import LumaAI
from cloudinary import CloudinaryImage, config as cloudinary_config
from cloudinary.uploader import upload as cloudinary_upload

# .env 파일 로드
load_dotenv()

# 환경 변수에서 API 키 가져오기
api_key = os.getenv("LUMAAI_API_KEY")

# LumaAI 클라이언트 설정
client = LumaAI(auth_token=api_key)

# Cloudinary 설정
cloudinary_config(
    cloud_name='dantiosiz',
    api_key='344612546963595',
    api_secret='z6uJ4nMCrznoy9YYXPvRYytQDgg'
)

# 로컬 이미지 파일 경로 설정
local_image_path = "ComfyUI_00042_.png"  # 테스트할 로컬 이미지 파일 경로

# Cloudinary에 이미지 업로드
try:
    # 로컬 이미지 업로드
    upload_result = cloudinary_upload(local_image_path, public_id='test_image', overwrite=True)
    cloudinary_url = CloudinaryImage('test_image').build_url(fetch_format='auto', quality='auto')
    print(f"Image URL on Cloudinary: {cloudinary_url}")

    # Luma API에 이미지 URL 전달하여 비디오 생성
    generation = client.generations.create(
        prompt="The person in the scene should have minimal movement, with gentle, subtle motions like breathing or slight head turns",
        keyframes={
            "frame0": {
                "type": "image",
                "url": cloudinary_url
            }
        }
    )
    completed = False
    while not completed:
        generation = client.generations.get(id=generation.id)
        if generation.state == "completed":
            completed = True
            video_url = generation.assets.video

            # 비디오 다운로드
            video_response = requests.get(video_url, stream=True)
            video_path = f"{generation.id}.mp4"
            with open(video_path, 'wb') as video_file:
                for chunk in video_response.iter_content(chunk_size=1024):
                    if chunk:
                        video_file.write(chunk)
            print(f"Video downloaded locally as {video_path}")
        elif generation.state == "failed":
            raise RuntimeError(f"Generation failed: {generation.failure_reason}")
        print("Generating animation...")
except Exception as e:
    print("Failed to upload image to Cloudinary or generate video:", e)

Image URL on Cloudinary: http://res.cloudinary.com/dantiosiz/image/upload/f_auto,q_auto/test_image
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating animation...
Generating an

In [None]:
import requests
import json
import base64
import time
import os
from dotenv import load_dotenv
from lumaai import LumaAI
from cloudinary import CloudinaryImage, config as cloudinary_config
from cloudinary.uploader import upload as cloudinary_upload
import paramiko
from scp import SCPClient

# .env 파일 로드
load_dotenv()

# 환경 변수에서 API 키 가져오기
api_key = os.getenv("LUMAAI_API_KEY")

# LumaAI 클라이언트 설정
client = LumaAI(auth_token=api_key)

# Cloudinary 설정
cloudinary_config(
    cloud_name='dantiosiz',
    api_key='344612546963595',
    api_secret='z6uJ4nMCrznoy9YYXPvRYytQDgg'
)

# SSH 정보 설정
ssh_server_ip = "192.168.1.100"  # Vast AI 서버 IP
ssh_username = "root"
ssh_key_path = "/Users/travis/.ssh/id_direct_rsa"
remote_directory = "/workspace/ComfyUI/output"  # ComfyUI 출력 디렉토리
local_directory = "./images"  # 로컬 저장 디렉토리
os.makedirs(local_directory, exist_ok=True)

def get_latest_file_path(remote_directory):
    """서버 디렉토리에서 가장 최근 파일 경로 반환"""
    try:
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(ssh_server_ip, username=ssh_username, key_filename=ssh_key_path)
        
        # 가장 최근 파일 경로 확인
        stdin, stdout, stderr = ssh.exec_command(f"ls -t {remote_directory} | head -n 1")
        latest_file = stdout.read().decode().strip()
        if latest_file:
            return f"{remote_directory}/{latest_file}"
        else:
            raise FileNotFoundError("No files found in the specified directory.")
    except Exception as e:
        print(f"Error finding latest file: {e}")
    finally:
        ssh.close()


def download_image_via_ssh(remote_path, local_path):
    """SSH를 통해 Vast AI 서버에서 파일 다운로드"""
    try:
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(ssh_server_ip, username=ssh_username, key_filename=ssh_key_path)

        with SCPClient(ssh.get_transport()) as scp:
            scp.get(remote_path, local_path)
        print(f"Image downloaded via SSH to: {local_path}")
    except Exception as e:
        print(f"Error downloading image via SSH: {e}")
    finally:
        ssh.close()


# ComfyUI 워크플로우 JSON 로드
with open("workflow_api.json") as f:
    workflow = json.load(f)

# CLIPTextEncode 클래스를 찾아 프롬프트 수정
for key, node in workflow.items():
    if isinstance(node, dict) and node.get("class_type") == "CLIPTextEncode":
        node["inputs"]["text"] = "A teddy bear in sunglasses playing electric guitar and dancing"

# 서버의 ComfyUI API URL 설정
server_address = "http://127.0.0.1:8189"
comfy_url = f"{server_address}/prompt"

# ComfyUI API 요청 보내기
response = requests.post(comfy_url, json={"prompt": workflow})
if response.status_code == 200:
    print("Prompt queued successfully! Checking every 10 seconds for up to 1 minute to allow image generation...")

    # 최대 1분 동안 10초 간격으로 상태 확인
    task_id = response.json().get('task_id')  # ComfyUI에서 반환한 task_id 사용
    for _ in range(6):  # 최대 6번 (10초 * 6 = 60초)
        time.sleep(10)

        # ComfyUI에서 상태 확인
        status_response = requests.get(f"{server_address}/status/{task_id}")  # API 경로는 ComfyUI 사양에 맞게 수정
        if status_response.status_code == 200 and status_response.json().get('status') == "completed":
            print("Image generation completed!")
            break
        else:
            print("Waiting for image generation...")
    else:
        print("Error: Image was not generated within the 1-minute limit.")
        exit(1)

    # 서버에서 최신 이미지 파일 경로 가져오기
    remote_image_path = get_latest_file_path(remote_directory)
    file_name = os.path.basename(remote_image_path)
    local_image_path = os.path.join(local_directory, file_name)

    # SSH로 서버에서 로컬로 이미지 다운로드
    download_image_via_ssh(remote_image_path, local_image_path)

    # Cloudinary에 이미지 업로드
    try:
        upload_result = cloudinary_upload(local_image_path, public_id='test_image', overwrite=True)
        cloudinary_url = CloudinaryImage('test_image').build_url(fetch_format='auto', quality='auto')
        print(f"Image URL on Cloudinary: {cloudinary_url}")

        # Luma API에 이미지 URL 전달하여 비디오 생성
        generation = client.generations.create(
            prompt="The person in the scene should have minimal movement, with gentle, subtle motions like breathing or slight head turns",
            keyframes={
                "frame0": {
                    "type": "image",
                    "url": cloudinary_url
                }
            }
        )
        completed = False
        while not completed:
            generation = client.generations.get(id=generation.id)
            if generation.state == "completed":
                completed = True
                video_url = generation.assets.video

                # 비디오 다운로드
                video_response = requests.get(video_url, stream=True)
                video_path = f"{generation.id}.mp4"
                with open(video_path, 'wb') as video_file:
                    for chunk in video_response.iter_content(chunk_size=1024):
                        if chunk:
                            video_file.write(chunk)
                print(f"Video downloaded locally as {video_path}")
            elif generation.state == "failed":
                raise RuntimeError(f"Generation failed: {generation.failure_reason}")
            print("Generating animation...")
    except Exception as e:
        print("Failed to upload image to Cloudinary or generate video:", e)
else:
    print(f"Error: {response.status_code}", response.text)