In [None]:
import os
import requests
import json
import boto3
from botocore.client import Config
import time
import base64
# ==============================================================================
# 1. 설정: 본인의 환경에 맞게 이 부분을 수정하세요.
# ==============================================================================

# RunPod Serverless API 정보
ENDPOINT_ID = ""    # 실제 serverless endpoint id로 변경해주세요
RUN_URL = f"https://api.runpod.ai/v2/{ENDPOINT_ID}/run"
RUNPOD_API_ENDPOINT = f"https://api.runpod.ai/v2/{ENDPOINT_ID}/run"
RUNPOD_API_KEY = '' # 실제 키로 교체하세요

# RunPod Network Volume S3 정보 (RunPod 대시보드에서 확인) 실제 세팅에 맞게 변경해주세요 
S3_ENDPOINT_URL = 'https://s3api-eu-ro-1.runpod.io/'  # 예: https://us-east-1.runpod.cloud
S3_ACCESS_KEY_ID = ''
S3_SECRET_ACCESS_KEY = ''
S3_BUCKET_NAME = '' 
S3_REGION = ''

# 업로드할 로컬 파일 경로
IMAGE_PATH = ""
AUDIO_PATH = ""

# S3에 업로드될 파일 이름 (경로 포함 가능)
S3_IMAGE_KEY = f"input/multitalk/{os.path.basename(IMAGE_PATH)}"
S3_AUDIO_KEY = f"input/multitalk/{os.path.basename(AUDIO_PATH)}"


# ==============================================================================
# 2. S3 클라이언트 설정 및 파일 업로드
# ==============================================================================

def upload_to_s3(file_path, bucket, object_name):
    """지정된 파일을 S3 호환 스토리지에 업로드합니다."""
    print(f"S3 클라이언트를 생성합니다... (Endpoint: {S3_ENDPOINT_URL})")
    s3_client = boto3.client(
        's3',
        endpoint_url=S3_ENDPOINT_URL,
        aws_access_key_id=S3_ACCESS_KEY_ID,
        aws_secret_access_key=S3_SECRET_ACCESS_KEY,
        region_name=S3_REGION,
        config=Config(signature_version='s3v4')
    )
    
    try:
        print(f"'{file_path}' 파일을 S3 버킷 '{bucket}'에 '{object_name}'으로 업로드 시작...")
        s3_client.upload_file(file_path, bucket, object_name)
        print(f"✅ 파일 업로드 성공: s3://{bucket}/{object_name}")
        return f"/runpod-volume/{object_name}"
    except Exception as e:
        print(f"❌ 파일 업로드 실패: {e}")
        return None

# 파일 존재 여부 확인
if not all(map(os.path.exists, [IMAGE_PATH, AUDIO_PATH])):
    raise FileNotFoundError("입력 파일 경로를 확인하세요. 파일이 존재하지 않습니다.")

# 각 파일을 S3에 업로드
image_s3_path = upload_to_s3(IMAGE_PATH, S3_BUCKET_NAME, S3_IMAGE_KEY)
audio_s3_path = upload_to_s3(AUDIO_PATH, S3_BUCKET_NAME, S3_AUDIO_KEY)

if not all([image_s3_path, audio_s3_path]):
    raise RuntimeError("S3 파일 업로드에 실패하여 작업을 중단합니다.")


# ==============================================================================
# 3. Serverless 작업 요청 (S3 경로 사용)
# ==============================================================================

# HTTP 요청 헤더
headers = {
    "Authorization": f"Bearer {RUNPOD_API_KEY}",
    "Content-Type": "application/json"
}

# API에 전송할 데이터 (Base64 대신 S3 경로 사용)
# 중요: 서버의 핸들러 코드가 'cond_image_s3_path'와 같은 키를 예상하도록 수정해야 합니다.
payload = {
    "input": {
        "prompt": "a man talking",
        "image_path": image_s3_path,
        "audio_paths": {
            "person1": audio_s3_path
        }
    }
}

# API에 POST 요청 보내기
print(f"\n🚀 RunPod Serverless 엔드포인트 [{RUNPOD_API_ENDPOINT}]에 작업을 요청합니다...")
try:
    response = requests.post(RUNPOD_API_ENDPOINT, headers=headers, json=payload)
    response.raise_for_status()  # 2xx가 아니면 오류 발생

    # 응답 확인
    print("✅ 요청 성공!")
    print(f"📄 상태 코드: {response.status_code}")
    
    response_data = response.json()
    print("\n[RunPod API 응답 내용]")
    print(json.dumps(response_data, indent=4))
    
    job_id = response_data.get('id')
    print(f"\n✨ 작업이 성공적으로 제출되었습니다. Job ID: {job_id}")
    print("결과는 /status 엔드포인트를 통해 확인할 수 있습니다.")

except requests.exceptions.HTTPError as errh:
    print(f"❌ HTTP 오류 발생: {errh}")
    print(f"응답 내용: {errh.response.text}")
except requests.exceptions.RequestException as err:
    print(f"❌ 요청 중 오류 발생: {err}")

In [None]:
job_output = None

STATUS_URL = f"https://api.runpod.ai/v2/{ENDPOINT_ID}/status"
while True:
    print(f"⏱️ 작업 상태를 확인합니다... (Job ID: {job_id})")
    status_response = requests.get(f"{STATUS_URL}/{job_id}", headers=headers)
    status_response.raise_for_status()
    
    status_data = status_response.json()
    status = status_data.get('status')
    
    if status == 'COMPLETED':
        print("✅ 작업이 완료되었습니다!")
        job_output = status_data.get('output')
        break
    elif status == 'FAILED':
        print("❌ 작업이 실패했습니다.")
        job_output = status_data.get('error', '알 수 없는 오류')
        break
    elif status in ['IN_QUEUE', 'IN_PROGRESS']:
        print(f"🏃 작업이 진행 중입니다... (상태: {status})")
        time.sleep(5)  # 5초 대기 후 다시 확인
    else:
        print(f"❓ 알 수 없는 상태입니다: {status}")
        job_output = status_data
        break

# --- Part 3: 결과물 다운로드 및 디코딩 ---
if job_output and status == 'COMPLETED':
    # 핸들러의 반환값에 따라 'video_b64' 키를 적절히 수정해야 할 수 있습니다.
    video_b64 = job_output.get('video_base64')
    
    if video_b64:
        print("🎨 결과물을 디코딩하고 파일로 저장합니다...")
        try:
            decoded_video = base64.b64decode(video_b64)
            output_filename = f"./result_{job_id}.mp4" #change your path
            
            with open(output_filename, 'wb') as f:
                f.write(decoded_video)
                
            print(f"✨ 최종 결과물이 '{output_filename}' 파일로 저장되었습니다!")
        except Exception as e:
            print(f"❌ 결과물 디코딩 또는 저장 중 오류 발생: {e}")
    else:
        print("⚠️ 결과물(video_b64)이 반환되지 않았습니다. 핸들러의 반환값을 확인하세요.")
elif status == 'FAILED':
        print(f"실패 원인: {job_output}")