# Imagen 3 자동 생성 → 임시저장 파이프라인

알림(댓글) 이벤트를 받아, 이미지 요청일 경우 Imagen 3로 이미지를 생성하고, 임시 저장소(GCS 또는 로컬)에 업로드한 뒤(선택) 앱의 임시저장 API에 등록하는 데모 파이프라인입니다.

- 생성 모델: Google Vertex AI Imagen 3 (`imagen-3.0-fast-generate-001` 또는 `imagen-3.0-generate-001`)
- 분류/프롬프트: 간단한 규칙 기반 예시 포함(원하면 Gemini로 대체 가능)
- 저장: GCS(권장) 또는 로컬 저장
- 등록: 앱 임시저장 API가 있을 경우 HTTP POST 연동(옵션)

비용이 발생할 수 있으므로 테스트 전 프로젝트/한도를 확인하세요.


## 준비 사항

1) Python 패키지 (로컬/런타임에 설치)
- `google-cloud-aiplatform` (Vertex AI SDK)
- `google-cloud-storage` (GCS 사용 시)
- `pillow`, `requests`, `python-dotenv`(옵션)

2) 환경 변수 (.env 권장)
- `GOOGLE_CLOUD_PROJECT` : GCP 프로젝트 ID
- `VERTEX_LOCATION` : Vertex 리전 (예: `us-central1`)
- `USE_GCS` : `true` 또는 `false` (기본 `false`)
- `GCS_BUCKET` : GCS 버킷명 (USE_GCS=true일 때 필수)
- `GCS_SIGN_URL_EXPIRES` : 서명 URL 만료(초, 기본 86400=1일)
- `STORAGE_PREFIX` : 업로드 경로 prefix (기본 `temp-media`)
- `APP_BASE_URL` : (옵션) 임시저장 등록 API 베이스 URL (예: `https://your.app`)
- `APP_API_TOKEN` : (옵션) API 인증 토큰(Bearer)

3) 권한
- Vertex AI, GCS 접근을 위해 서비스 계정 자격증명(ADC) 구성 또는 로컬 `gcloud auth application-default login` 후 사용

4) 모델 이름
- 빠른 생성: `imagen-3.0-fast-generate-001`
- 고품질: `imagen-3.0-generate-001`


In [ ]:
# 선택: 런타임에서 필요 패키지 설치
# !pip install google-cloud-aiplatform google-cloud-storage pillow requests python-dotenv

import os
import io
import json
import uuid
from dataclasses import dataclass
from typing import Optional, List, Dict, Any

try:
    from dotenv import load_dotenv
    load_dotenv()
except Exception:
    pass

# Vertex AI SDK
try:
    import vertexai
    from vertexai.preview.vision_models import ImageGenerationModel
except Exception as e:
    raise RuntimeError("google-cloud-aiplatform 패키지가 필요합니다: pip install google-cloud-aiplatform") from e

# 이미지 표시용
try:
    from PIL import Image
except Exception:
    Image = None

# 선택: 앱 콜백용
try:
    import requests
except Exception:
    requests = None

PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT')
LOCATION = os.getenv('VERTEX_LOCATION', 'us-central1')
USE_GCS = os.getenv('USE_GCS', 'false').lower() == 'true'
GCS_BUCKET = os.getenv('GCS_BUCKET')
GCS_SIGN_URL_EXPIRES = int(os.getenv('GCS_SIGN_URL_EXPIRES', '86400'))  # seconds
STORAGE_PREFIX = os.getenv('STORAGE_PREFIX', 'temp-media')

APP_BASE_URL = os.getenv('APP_BASE_URL')
APP_API_TOKEN = os.getenv('APP_API_TOKEN')

assert PROJECT_ID, '환경변수 GOOGLE_CLOUD_PROJECT 가 필요합니다'

vertexai.init(project=PROJECT_ID, location=LOCATION)
print(f'Vertex 초기화 완료: project={PROJECT_ID}, location={LOCATION}')


## 1) 댓글 분석 및 프롬프트 설계 (간단 규칙 기반)

- 실제 서비스에서는 Gemini 2.5 Flash로 분류·가드레일·프롬프트 리라이팅을 권장합니다.
- 여기서는 키워드 기반 간단 판정과 프롬프트 정제를 넣었습니다.


In [ ]:
IMAGE_KEYWORDS = [
    '사진', '이미지', '그림', '사진좀', '이미지좀', '그려줘',
    'image', 'picture', 'photo', 'render', 'generate', 'make an image',
]

def is_image_request(text: str) -> bool:
    t = (text or '').lower()
    if any(k in t for k in [k.lower() for k in IMAGE_KEYWORDS]):
        return True
    # 한국어 어미/표현 추가 탐지
    return any(s in text for s in ['만들어줘', '그려줘', '렌더링'])

def design_prompt_from_comment(text: str) -> str:
    # 매우 단순한 프롬프트 정제: 금지어/민감정보 필터링은 실제 서비스에서 강화 필요
    base = text.strip()
    # 한국어 그대로도 Imagen가 처리 가능하나, 명확도를 위해 약간의 템플릿 추가
    return f"Create an image: {base}. High quality, detailed, natural lighting."

# 테스트
print(is_image_request('고양이 사진 만들어줘'))
print(design_prompt_from_comment('스케이트보드 타는 코기 사진 만들어줘'))


## 2) Imagen 3로 이미지 생성

- `imagen-3.0-fast-generate-001` (빠른 생성) 또는 `imagen-3.0-generate-001` (고품질)
- 반환 객체의 구조는 SDK 버전에 따라 조금 달라질 수 있어, 바이트 추출을 견고하게 처리했습니다.


In [ ]:
from datetime import timedelta

def _extract_image_bytes_list(images_obj) -> List[bytes]:
    # SDK 응답 호환 처리
    candidates = None
    if isinstance(images_obj, list):
        candidates = images_obj
    elif hasattr(images_obj, 'images'):
        candidates = images_obj.images
    else:
        candidates = [images_obj]

    out = []
    for img in candidates:
        data = None
        for attr in ('image_bytes', '_image_bytes', 'bytes'):
            if hasattr(img, attr) and getattr(img, attr):
                data = getattr(img, attr)
                break
        if data is None and hasattr(img, 'as_bytes'):
            try:
                data = img.as_bytes()
            except Exception:
                pass
        if data is None and hasattr(img, 'save'):
            # save -> BytesIO
            bio = io.BytesIO()
            try:
                img.save(bio, format='PNG')
                data = bio.getvalue()
            except Exception:
                pass
        if data is None:
            raise ValueError('이미지 바이트 추출 실패: SDK 호환성 이슈')
        out.append(data)
    return out

def generate_with_imagen(prompt: str, *,
                             model_name: str = 'imagen-3.0-fast-generate-001',
                             number_of_images: int = 1,
                             output_mime_type: str = 'image/png',
                             aspect_ratio: Optional[str] = None,
                             seed: Optional[int] = None) -> List[bytes]:
    model = ImageGenerationModel.from_pretrained(model_name)
    kwargs = {
        'prompt': prompt,
        'number_of_images': number_of_images,
        'output_mime_type': output_mime_type,
    }
    if aspect_ratio:
        kwargs['aspect_ratio'] = aspect_ratio
    if seed is not None:
        kwargs['seed'] = seed

    images = model.generate_images(**kwargs)
    return _extract_image_bytes_list(images)

# 간단 테스트(네트워크/권한 필요)
# bytes_list = generate_with_imagen('a cute corgi riding a skateboard, photoreal, 4k', number_of_images=1)
# print(len(bytes_list[0]))


## 3) 저장소 백엔드 (GCS / 로컬)

- 서비스 연동 전 테스트는 로컬 저장으로 충분합니다. 실제 운영은 GCS 권장.
- GCS는 서명 URL을 반환하여 클라이언트 노출이 가능합니다.


In [ ]:
class StorageResult:
    def __init__(self, key: str, url: str):
        self.key = key
        self.url = url
    def to_dict(self):
        return {'key': self.key, 'url': self.url}

class StorageBackend:
    def save_bytes(self, user_id: str, key: str, data: bytes, content_type: str = 'image/png') -> StorageResult:
        raise NotImplementedError

class LocalBackend(StorageBackend):
    def __init__(self, base_dir: str = './tmp/generated'):
        self.base_dir = base_dir
        os.makedirs(self.base_dir, exist_ok=True)
    def save_bytes(self, user_id: str, key: str, data: bytes, content_type: str = 'image/png') -> StorageResult:
        user_dir = os.path.join(self.base_dir, str(user_id))
        os.makedirs(user_dir, exist_ok=True)
        filename = f'{key}.png'
        path = os.path.join(user_dir, filename)
        with open(path, 'wb') as f:
            f.write(data)
        # 로컬 파일 URL (앱에서 서빙 경로 매핑 필요)
        return StorageResult(key=f'{STORAGE_PREFIX}/{user_id}/{filename}', url=f'file://{os.path.abspath(path)}')

class GCSBackend(StorageBackend):
    def __init__(self, bucket: str, sign_ttl_seconds: int = 86400):
        try:
            from google.cloud import storage
        except Exception as e:
            raise RuntimeError('google-cloud-storage 패키지가 필요합니다: pip install google-cloud-storage') from e
        self._storage = storage
        self.client = storage.Client(project=PROJECT_ID)
        self.bucket = self.client.bucket(bucket)
        self.ttl = sign_ttl_seconds
    def save_bytes(self, user_id: str, key: str, data: bytes, content_type: str = 'image/png') -> StorageResult:
        blob_path = f'{STORAGE_PREFIX}/{user_id}/{key}.png'
        blob = self.bucket.blob(blob_path)
        blob.upload_from_string(data, content_type=content_type)
        url = blob.generate_signed_url(expiration=self.ttl, method='GET')
        return StorageResult(key=blob_path, url=url)

def get_storage_backend() -> StorageBackend:
        if USE_GCS:
            assert GCS_BUCKET, 'USE_GCS=true면 GCS_BUCKET 설정이 필요합니다'
            return GCSBackend(bucket=GCS_BUCKET, sign_ttl_seconds=GCS_SIGN_URL_EXPIRES)
        return LocalBackend()

storage_backend = get_storage_backend()
print('Storage backend:', type(storage_backend).__name__)


## 4) 앱 임시저장 API 등록(옵션)

- 앱 백엔드에 `POST /api/temp-media`와 같은 엔드포인트가 있다고 가정하고 예시를 제공합니다.
- 없다면 이 단계는 건너뛰고, 저장 결과를 DB에 직접 기록하도록 서버 측에서 구현하세요.


In [ ]:
def register_temp_media_to_app(user_id: str, source_comment_id: str, stored: StorageResult) -> Optional[Dict[str, Any]]:
    if not APP_BASE_URL or not requests:
        print('[skip] APP_BASE_URL 또는 requests 미설정 -> 앱 등록 생략')
        return None
    url = APP_BASE_URL.rstrip('/') + '/api/temp-media'
    headers = {'Content-Type': 'application/json'}
    if APP_API_TOKEN:
        headers['Authorization'] = f'Bearer {APP_API_TOKEN}'
    payload = {
        'userId': str(user_id),
        'sourceCommentId': str(source_comment_id),
        'key': stored.key,
        'url': stored.url,
        'status': 'ready'
    }
    try:
        resp = requests.post(url, headers=headers, data=json.dumps(payload), timeout=15)
        resp.raise_for_status()
        return resp.json()
    except Exception as e:
        print('[warn] 앱 임시저장 등록 실패:', e)
        return None


## 5) 이벤트 핸들러

- 입력: `{ commentId, userId, text }`
- 처리: 이미지 요청 여부 판단 → 프롬프트 → 생성 → 저장 → (옵션) 앱 등록
- 출력: 저장 결과 리스트


In [ ]:
def handle_comment_event(event: Dict[str, Any], *,
                          model_name: str = 'imagen-3.0-fast-generate-001',
                          number_of_images: int = 1,
                          aspect_ratio: Optional[str] = None,
                          seed: Optional[int] = None) -> List[Dict[str, Any]]:
    comment_id = str(event.get('commentId') or event.get('comment_id') or '')
    user_id = str(event.get('userId') or event.get('user_id') or '')
    text = str(event.get('text') or '')
    assert comment_id and user_id, 'event에는 commentId, userId가 필요합니다'

    if not is_image_request(text):
        print('[skip] 이미지 요청 아님')
        return []
    prompt = design_prompt_from_comment(text)
    print('[plan] prompt =>', prompt)

    bytes_list = generate_with_imagen(prompt, model_name=model_name, number_of_images=number_of_images, aspect_ratio=aspect_ratio, seed=seed)
    results = []
    for idx, b in enumerate(bytes_list):
        key = f"{comment_id}-{idx}-{uuid.uuid4().hex[:8]}"
        stored = storage_backend.save_bytes(user_id=user_id, key=key, data=b, content_type='image/png')
        app_rec = register_temp_media_to_app(user_id, comment_id, stored)
        results.append({
            'key': stored.key,
            'url': stored.url,
            'appRecord': app_rec,
        })
    return results


## 6) 데모 실행

- 아래 셀에서 샘플 이벤트로 동작을 확인할 수 있습니다.
- 로컬 저장소의 경우 생성 이미지가 `./tmp/generated/<userId>/`에 저장됩니다.


In [ ]:
sample_event = {
    'commentId': 'cmt-12345',
    'userId': 'user-42',
    'text': '스케이트보드 타는 코기 사진 만들어줘',
}

try:
    results = handle_comment_event(sample_event, model_name='imagen-3.0-fast-generate-001', number_of_images=1)
    print('results:', results)
    if results and Image is not None and results[0]['url'].startswith('file://'):
        # 로컬 저장 시 미리보기
        local_path = results[0]['url'].replace('file://', '')
        with open(local_path, 'rb') as f:
            display(Image.open(io.BytesIO(f.read())))
except Exception as e:
    print('[error] 데모 실행 중 오류:', e)


## 운영 적용 가이드

- 서버: 댓글 생성 Webhook/이벤트 수신 → 큐에 작업 enqueue → 워커에서 본 로직 실행
- 가드레일: 안전성/저작권/NSFW/정책 필터 추가 권장
- 중복 방지: commentId+userId 기반 idempotency key로 재시도 대비
- 만료: 임시저장 객체에 TTL/만료 정책 적용 (예: GCS Lifecycle)
- 관측성: 프롬프트/seed/결과 URL/실패 사유를 로그/메트릭으로 수집

필요하면 서버 코드(Next.js/Express/Spring)용 라우트/워커 스캐폴드도 추가해 드릴게요.
