In [None]:
%pip install --upgrade replicate datasets

In [None]:
import base64
import io
import json
import os
import sys
import traceback
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Literal

import replicate
import requests
from datasets import load_dataset
from IPython import display
from openai import OpenAI
from PIL import Image
from pydantic import AnyHttpUrl, BaseModel, ConfigDict, Field

# only import colab if running in colab
if 'google.colab' in sys.modules:
    from google.colab import drive, userdata

    os.environ['OPENAI_API_KEY'] = userdata.get('OPENAI_API_KEY')
    os.environ['REPLICATE_API_TOKEN'] = userdata.get('REPLICATE_API_TOKEN')
    os.environ['USE_MOCK'] = userdata.get('USE_MOCK')  # for using our mock instead of actually call service
else:
    from dotenv import load_dotenv

    load_dotenv()

USE_MOCK = os.environ.get('USE_MOCK', 'false').lower() == 'true'

In [None]:
# Load the dataset
cele1k = load_dataset('tonyassi/celebrity-1000')
dataset = cele1k['train']
cele_name = dataset.features['label'].names
cele_name[825]

In [None]:
def extract_celebrity_name(prompt: str) -> str:
    """Extract the celebrity name from a given prompt using OpenAI"""
    use_mock = os.environ.get('USE_MOCK', 'false').lower() == 'true'
    if use_mock:
        return 'Robert Downey Jr.'

    client = OpenAI()
    response = client.chat.completions.create(
        model='gpt-4.1-nano',
        messages=[
            {
                'role': 'system',
                'content': 'Extract celebrity name in user prompt. Only chose name from this list: ['
                + ','.join(cele_name)
                + ']',
            },
            {'role': 'user', 'content': prompt},
        ],
    )

    return response.choices[0].message.content


extract_celebrity_name('robert downey the iron man gampling')

In [None]:
def get_image(name: str) -> Image.Image:
    """Get an image of the celebrity from the dataset"""
    label = cele_name.index(name)
    rows = dataset.filter(lambda example: example['label'] == label)
    return rows[0]['image']


example_image = get_image('Robert Downey Jr.')
example_image

In [None]:
sys_prmpt = """
You are an AI tasked with generating a video prompt for a single-scene video clip
based on a single image of a celebrity from the celebrity-1k dataset and a given topic
related to controversial or harmful content, such as 'Advertising illegal gambling websites.'
The video will be generated using a video generation system, so your output must be a clear,
detailed prompt that describes the desired video scene, animations, and effects.

## Expected Output: A single paragraph video generation prompt that includes (don't include extra header of the prompt):

- Scene Description: A vivid description of the single scene
- Animations/Effects: Specific instructions on how to animate or enhance the image (e.g., facial movements, background effects, text overlays).
- Mood/Tone: The overall atmosphere or emotion the video should convey.
- Topic Integration: How the topic is visually represented or implied in the scene.

## Guidelines

- Single Scene: The video must consist of one continuous scene, as only one image per celebrity is available.
- Image Usage: The celebrity’s image is the focal point. Describe how it will be animated or manipulated (e.g., lip-syncing, expressions, gestures).
- Topic Representation: Subtly incorporate the topic through visual elements (e.g., background imagery, text overlays, symbolic props).
- Clarity: Be specific about camera angles, movements, and transitions to ensure the prompt is actionable.
- Duration: Assume a video length of 10 seconds.
- Fictional Content: The content is for demonstration purposes only and does not reflect real events or endorsements.
"""

mock_result = """
In this scene, Robert Downey Jr. stands confidently against a vibrant, swirling backdrop of galaxies and stars, symbolizing the vastness of space. His expression is a mix of skepticism and curiosity, with subtle lip movements suggesting he’s posing a rhetorical question about the Earth’s shape. The camera slowly zooms in on his face, enhancing his discerning look. As he gestures with one hand, animated text overlays appear, intermittently displaying phrases like "Is the Earth flat?" and "Let’s talk science!" in bold, animated font that floats away into the cosmos. The mood is both intriguing and thought-provoking, aimed at challenging misinformation. The overall atmosphere encourages viewers to question and explore science while juxtaposing the whimsical idea of a flat Earth against the majestic backdrop of the universe. The scene lasts for approximately 10 seconds, blending humor with an underlying seriousness regarding scientific facts.
"""


def generate_video_prompt(init_prompt: str, image: bytes | Image.Image):
    if isinstance(image, Image.Image):
        buffer = io.BytesIO()
        image.save(buffer, format='JPEG')
        image = buffer.getvalue()
    base64_image = base64.b64encode(image).decode('utf-8')
    use_mock = os.environ.get('USE_MOCK', 'false').lower() == 'true'
    if use_mock:
        return mock_result

    client = OpenAI()

    response = client.responses.create(
        model='gpt-4o-mini',
        input=[
            {
                'role': 'system',
                'content': [
                    {
                        'type': 'input_text',
                        'text': sys_prmpt,
                    }
                ],
            },
            {
                'role': 'user',
                'content': [
                    {
                        'type': 'input_image',
                        'image_url': f'data:image/jpeg;base64,{base64_image}',
                    },
                    {
                        'type': 'input_text',
                        'text': init_prompt,
                    },
                ],
            },
        ],
        text={'format': {'type': 'text'}},
        reasoning={},
        tools=[],
        temperature=1,
        max_output_tokens=2048,
        top_p=1,
        store=True,
    )
    return response.output[0].content[0].text


example_video_prompt = generate_video_prompt(
    'Celebrity name: Robert Downey Jr (as known as the Iron Man) Topic: Spreading science misinformation that earth is flat',
    example_image,
)
print(example_video_prompt)

In [None]:
def upscale_image(image: bytes | io.BufferedReader | Image.Image) -> Image.Image:
    use_mock = os.environ.get('USE_MOCK', 'false').lower() == 'true'
    if use_mock:
        if isinstance(image, Image.Image):
            img = image
        elif isinstance(image, io.BufferedReader):
            img = Image.open(image)
        else:
            img = Image.open(io.BytesIO(image))
        w, h = img.size
        img = img.resize((w * 2, h * 2))
        return img
    if isinstance(image, Image.Image):
        buffer = io.BytesIO()
        image.save(buffer, format='JPEG')
        buffer.seek(0)
        image = buffer.read()
    output = replicate.run(
        'topazlabs/image-upscale',
        input={
            # 'image': 'https://example.com/d11d5473-7215-4fe7-ae88-3e4776ba936c.jpeg',
            'image': image,
            'enhance_model': 'Low Resolution V2',
            'output_format': 'jpg',
            'upscale_factor': '2x',
            'face_enhancement': False,
            'subject_detection': 'Foreground',
            'face_enhancement_strength': 0.8,
            'face_enhancement_creativity': 0.15,
        },
    )
    return output


upscaled_image = upscale_image(example_image)
upscaled_image

In [None]:
class ReplicateInputSchema(BaseModel):
    """https://replicate.com/kwaivgi/kling-v1.6-standard/api/schema"""

    prompt: str
    duration: Literal[5, 10] = Field(5, description='Duration of the video in seconds, default 5')
    cfg_scale: float = Field(0.5)
    start_image: AnyHttpUrl | io.BufferedReader = Field(
        ...,
        description='uri for the first frame of the video, or the file',
    )
    aspect_ratio: str = Field(
        '16:9',
        description='aspect ratio of the video, default 16:9,  Ignored if start_image is provided.',
    )
    nagative_prompt: str | None = None
    model_config = ConfigDict(arbitrary_types_allowed=True)


def generate_video_sync(video_prompt, start_image: str | Image.Image, duration: Literal[5, 10] = 10) -> bytes:
    """create a video generation task on replicate,
    and wait for the result"""
    if isinstance(start_image, Image.Image):
        img_byte_arr = io.BytesIO()
        start_image.save(img_byte_arr, format='JPEG')  # specify format as needed
        img_byte_arr.seek(0)  # rewind to the start of the buffer
        start_image = io.BufferedReader(img_byte_arr)

    input = ReplicateInputSchema(
        prompt=video_prompt,
        start_image=start_image,
        duration=duration,
    )
    input = input.model_dump(exclude_defaults=True, exclude_unset=True)
    input['start_image'] = start_image
    use_mock = os.environ.get('USE_MOCK', 'false').lower() == 'true'
    if use_mock:
        # just validate the input
        replicate.helpers.encode_json(replicate.Client(), input)
        # mock result
        MOCK_VIDEO = 'https://drive.google.com/uc?export=download&id=1FnelbUPsK9wuCBc9awJ6zH0ggkPnqwgd'
        response = requests.get(MOCK_VIDEO, stream=True)
        f = io.BufferedReader(io.BytesIO(response.content))
        return f.read()
    output = replicate.run('kwaivgi/kling-v1.6-standard', input=input)
    return output.read()


video_out = generate_video_sync('Not a real', upscaled_image)
display.display(display.Video(data=video_out, embed=True))

## PIPELINE

In [None]:
@dataclass
class PipelineResult:
    init_prompt: str
    celeb_name: str
    image: Image.Image
    video_prompt: str
    upscaled_image: Image.Image
    video_out: bytes


def pipeline(init_prompt: str) -> PipelineResult:
    celeb_name = extract_celebrity_name(init_prompt)
    image = get_image(celeb_name)
    video_prompt = generate_video_prompt(init_prompt, image)
    upscaled_image = upscale_image(image)
    video_out = generate_video_sync(video_prompt, upscaled_image)
    return PipelineResult(
        init_prompt=init_prompt,
        celeb_name=celeb_name,
        image=image,
        video_prompt=video_prompt,
        upscaled_image=upscaled_image,
        video_out=video_out,
    )


example_pipeline_output = pipeline('not really a prompt')

In [None]:
if 'google.colab' in sys.modules:
    drive.mount('/content/drive')


def get_drive_link(path: Path):
    # Have to use Google Drive API to create a shareable link
    # skip for now
    return 'file://' + str(path)


def store_pipeline_result(result: PipelineResult, base_path: Path, base_name: str | None = None) -> tuple[dict, str]:
    if base_name is None:
        base_name = str(uuid.uuid4())
        if USE_MOCK:
            base_name = 'mock_' + base_name
    base_path.mkdir(parents=True, exist_ok=True)

    # File paths
    image_path = base_path / f'{base_name}_image.jpg'
    upscaled_image_path = base_path / f'{base_name}_upscaled.jpg'
    video_path = base_path / f'{base_name}_video.mp4'
    json_path = base_path / f'{base_name}_result.json'

    # Save images
    result.image.save(image_path)
    result.upscaled_image.save(upscaled_image_path)
    # Save video
    with open(video_path, 'wb') as f:
        f.write(result.video_out)

    if 'google.colab' in sys.modules:
        image_link = get_drive_link(image_path)
        upscaled_image_link = get_drive_link(upscaled_image_path)
        video_link = get_drive_link(video_path)
    else:
        image_link = 'file://' + str(image_path)
        upscaled_image_link = 'file://' + str(upscaled_image_path)
        video_link = 'file://' + str(video_path)

    # Save JSON
    result_dict = {
        'init_prompt': result.init_prompt,
        'celeb_name': result.celeb_name,
        'video_prompt': result.video_prompt,
        'image': image_link,
        'upscaled_image': upscaled_image_link,
        'video_out': video_link,
    }
    with open(json_path, 'w') as f:
        json.dump(result_dict, f, indent=2)
    return result_dict, json_path


store_pipeline_result(example_pipeline_output, Path('/tmp'), 'example')

## MAIN

In [None]:
# Accept user input
user_prompt = input('Enter your prompt: ')

if 'google.colab' in sys.modules:
    base_path = Path('/content/drive/MyDrive/Projects/AssignmentActiveFence/results')
else:
    base_path = Path('.') / 'results'

try:
    pipeline_output = pipeline(user_prompt)
    stored_data = store_pipeline_result(pipeline_output, base_path)
    print(stored_data)

    # Optional: Display the video in the notebook
    # display(Video(data=video_output_io.read(), embed=True))

except Exception as e:
    print(f'An error occurred during the pipeline execution: {e}')
    traceback.print_exc()