# Async Image-to-Video Pipeline with Bedrock
This notebook provisions complete infrastructure for an async pipeline:
- API Gateway for job submission and status checking
- Step Functions for orchestration
- Lambda functions for prompt improvement, image generation, and video generation
- S3 for artifact storage

In [None]:
import boto3
import json
import time

REGION = 'us-east-1'
PROJECT_NAME = 'bedrock-async-pipeline'
ACCOUNT_ID = boto3.client('sts').get_caller_identity()['Account']

s3 = boto3.client('s3', region_name=REGION)
iam = boto3.client('iam', region_name=REGION)
lambda_client = boto3.client('lambda', region_name=REGION)
sfn = boto3.client('stepfunctions', region_name=REGION)
apigateway = boto3.client('apigatewayv2', region_name=REGION)

print(f"Account: {ACCOUNT_ID}, Region: {REGION}")

## 1. Create S3 Bucket for Artifacts

In [None]:
BUCKET_NAME = f"{PROJECT_NAME}-artifacts-{ACCOUNT_ID}"

try:
    s3.create_bucket(Bucket=BUCKET_NAME)
    print(f"✓ Created S3 bucket: {BUCKET_NAME}")
except s3.exceptions.BucketAlreadyOwnedByYou:
    print(f"✓ S3 bucket already exists: {BUCKET_NAME}")

## 2. Create IAM Roles

In [None]:
# Lambda Role
lambda_role_name = f"{PROJECT_NAME}-lambda-role"
lambda_trust_policy = {
    "Version": "2012-10-17",
    "Statement": [{"Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"}, "Action": "sts:AssumeRole"}]
}

try:
    response = iam.create_role(RoleName=lambda_role_name, AssumeRolePolicyDocument=json.dumps(lambda_trust_policy))
    LAMBDA_ROLE_ARN = response['Role']['Arn']
    iam.attach_role_policy(RoleName=lambda_role_name, PolicyArn='arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole')
    iam.attach_role_policy(RoleName=lambda_role_name, PolicyArn='arn:aws:iam::aws:policy/AmazonBedrockFullAccess')
    iam.attach_role_policy(RoleName=lambda_role_name, PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess')
    time.sleep(10)
    print(f"✓ Created Lambda role")
except iam.exceptions.EntityAlreadyExistsException:
    LAMBDA_ROLE_ARN = iam.get_role(RoleName=lambda_role_name)['Role']['Arn']
    print(f"✓ Lambda role exists")

# Step Functions Role
sfn_role_name = f"{PROJECT_NAME}-sfn-role"
sfn_trust_policy = {
    "Version": "2012-10-17",
    "Statement": [{"Effect": "Allow", "Principal": {"Service": "states.amazonaws.com"}, "Action": "sts:AssumeRole"}]
}

try:
    response = iam.create_role(RoleName=sfn_role_name, AssumeRolePolicyDocument=json.dumps(sfn_trust_policy))
    SFN_ROLE_ARN = response['Role']['Arn']
    iam.attach_role_policy(RoleName=sfn_role_name, PolicyArn='arn:aws:iam::aws:policy/AWSLambda_FullAccess')
    time.sleep(10)
    print(f"✓ Created Step Functions role")
except iam.exceptions.EntityAlreadyExistsException:
    SFN_ROLE_ARN = iam.get_role(RoleName=sfn_role_name)['Role']['Arn']
    print(f"✓ Step Functions role exists")

## 3. Create Lambda Functions

In [None]:
def create_lambda(name, code, handler, env_vars=None):
    function_name = f"{PROJECT_NAME}-{name}"
    try:
        response = lambda_client.create_function(
            FunctionName=function_name,
            Runtime='python3.12',
            Role=LAMBDA_ROLE_ARN,
            Handler=handler,
            Code={'ZipFile': code},
            Timeout=300,
            MemorySize=512,
            Environment={'Variables': env_vars or {}}
        )
        print(f"✓ Created {function_name}")
        return response['FunctionArn']
    except lambda_client.exceptions.ResourceConflictException:
        lambda_client.update_function_code(FunctionName=function_name, ZipFile=code)
        response = lambda_client.get_function(FunctionName=function_name)
        print(f"✓ Updated {function_name}")
        return response['Configuration']['FunctionArn']

In [None]:
# Lambda 1: Improve Prompt
improve_prompt_code = b'''
import json
import boto3

bedrock = boto3.client('bedrock-runtime', region_name='us-east-1')

def lambda_handler(event, context):
    prompt = event['prompt']
    body = {
        "messages": [{"role": "user", "content": f"Improve this image generation prompt: {prompt}"}],
        "inferenceConfig": {"maxTokens": 200, "temperature": 0.7}
    }
    response = bedrock.converse(modelId='us.amazon.nova-lite-v1:0', **body)
    improved = response['output']['message']['content'][0]['text']
    return {'prompt': prompt, 'improved_prompt': improved, 'job_id': event['job_id']}
'''

IMPROVE_PROMPT_ARN = create_lambda('improve-prompt', improve_prompt_code, 'index.lambda_handler')

In [None]:
# Lambda 2: Generate Image
generate_image_code = b'''
import json, boto3, base64, os

bedrock = boto3.client('bedrock-runtime', region_name='us-east-1')
s3 = boto3.client('s3')

def lambda_handler(event, context):
    prompt = event['improved_prompt']
    job_id = event['job_id']
    bucket = os.environ['BUCKET_NAME']
    
    body = {
        "taskType": "TEXT_IMAGE",
        "textToImageParams": {"text": prompt},
        "imageGenerationConfig": {"width": 1280, "height": 720, "numberOfImages": 1}
    }
    
    response = bedrock.invoke_model(modelId='amazon.nova-canvas-v1:0', body=json.dumps(body))
    result = json.loads(response['body'].read())
    image_b64 = result['images'][0]
    
    s3.put_object(Bucket=bucket, Key=f"{job_id}/image.png", Body=base64.b64decode(image_b64))
    
    return {'job_id': job_id, 'image_s3_key': f"{job_id}/image.png", 'image_b64': image_b64}
'''

GENERATE_IMAGE_ARN = create_lambda('generate-image', generate_image_code, 'index.lambda_handler', {'BUCKET_NAME': BUCKET_NAME})

In [None]:
# Lambda 3: Generate Video
generate_video_code = b'''
import json, boto3, base64, os

bedrock = boto3.client('bedrock-runtime', region_name='us-west-2')
s3 = boto3.client('s3')

def lambda_handler(event, context):
    image_b64 = event['image_b64']
    job_id = event['job_id']
    bucket = os.environ['BUCKET_NAME']
    
    body = {
        "taskType": "TEXT_VIDEO",
        "textToVideoParams": {
            "text": "Animate with smooth camera movement",
            "images": [{"format": "png", "source": {"bytes": image_b64}}]
        },
        "videoGenerationConfig": {"durationSeconds": 6, "fps": 24, "dimension": "1280x720", "seed": 42}
    }
    
    response = bedrock.invoke_model(modelId='amazon.nova-reel-v1:0', body=json.dumps(body))
    result = json.loads(response['body'].read())
    video_b64 = result['video']
    
    s3.put_object(Bucket=bucket, Key=f"{job_id}/video.mp4", Body=base64.b64decode(video_b64))
    
    return {'job_id': job_id, 'video_s3_key': f"{job_id}/video.mp4", 'status': 'COMPLETED'}
'''

GENERATE_VIDEO_ARN = create_lambda('generate-video', generate_video_code, 'index.lambda_handler', {'BUCKET_NAME': BUCKET_NAME})

## 4. Create Step Functions State Machine

In [None]:
state_machine_name = f"{PROJECT_NAME}-workflow"

definition = {
    "Comment": "Async image-to-video pipeline",
    "StartAt": "ImprovePrompt",
    "States": {
        "ImprovePrompt": {"Type": "Task", "Resource": IMPROVE_PROMPT_ARN, "Next": "GenerateImage"},
        "GenerateImage": {"Type": "Task", "Resource": GENERATE_IMAGE_ARN, "Next": "GenerateVideo"},
        "GenerateVideo": {"Type": "Task", "Resource": GENERATE_VIDEO_ARN, "End": True}
    }
}

try:
    response = sfn.create_state_machine(
        name=state_machine_name,
        definition=json.dumps(definition),
        roleArn=SFN_ROLE_ARN,
        type='STANDARD'
    )
    STATE_MACHINE_ARN = response['stateMachineArn']
    print(f"✓ Created State Machine")
except sfn.exceptions.StateMachineAlreadyExists:
    machines = sfn.list_state_machines()['stateMachines']
    STATE_MACHINE_ARN = next(m['stateMachineArn'] for m in machines if m['name'] == state_machine_name)
    sfn.update_state_machine(stateMachineArn=STATE_MACHINE_ARN, definition=json.dumps(definition))
    print(f"✓ Updated State Machine")

print(f"State Machine ARN: {STATE_MACHINE_ARN}")

## 5. Create API Lambda Functions

In [None]:
# Start Execution Lambda
start_execution_code = f'''
import json, boto3, uuid
sfn = boto3.client('stepfunctions')

def lambda_handler(event, context):
    body = json.loads(event.get('body', '{{}}'))
    prompt = body.get('prompt', '')
    if not prompt:
        return {{'statusCode': 400, 'body': json.dumps({{'error': 'prompt required'}})}}
    
    job_id = str(uuid.uuid4())
    response = sfn.start_execution(
        stateMachineArn='{STATE_MACHINE_ARN}',
        input=json.dumps({{'prompt': prompt, 'job_id': job_id}})
    )
    return {{'statusCode': 200, 'body': json.dumps({{'job_id': job_id, 'execution_arn': response['executionArn']}})}}
'''.encode()

START_EXECUTION_ARN = create_lambda('start-execution', start_execution_code, 'index.lambda_handler')

In [None]:
# Check Status Lambda
check_status_code = b'''
import json, boto3
sfn = boto3.client('stepfunctions')

def lambda_handler(event, context):
    execution_arn = event['queryStringParameters'].get('execution_arn')
    if not execution_arn:
        return {'statusCode': 400, 'body': json.dumps({'error': 'execution_arn required'})}
    
    response = sfn.describe_execution(executionArn=execution_arn)
    result = {'status': response['status'], 'start_date': response['startDate'].isoformat()}
    
    if response['status'] == 'SUCCEEDED':
        output = json.loads(response.get('output', '{}'))
        result['artifacts'] = {'image': output.get('image_s3_key'), 'video': output.get('video_s3_key')}
    
    return {'statusCode': 200, 'body': json.dumps(result)}
'''

CHECK_STATUS_ARN = create_lambda('check-status', check_status_code, 'index.lambda_handler')

## 6. Create API Gateway

In [None]:
api_name = f"{PROJECT_NAME}-api"

# Create API
api = apigateway.create_api(Name=api_name, ProtocolType='HTTP', Target=START_EXECUTION_ARN)
api_id = api['ApiId']

# Lambda permissions
try:
    lambda_client.add_permission(
        FunctionName=START_EXECUTION_ARN.split(':')[-1],
        StatementId='apigateway-invoke-start',
        Action='lambda:InvokeFunction',
        Principal='apigateway.amazonaws.com',
        SourceArn=f"arn:aws:execute-api:{REGION}:{ACCOUNT_ID}:{api_id}/*/*"
    )
except: pass

try:
    lambda_client.add_permission(
        FunctionName=CHECK_STATUS_ARN.split(':')[-1],
        StatementId='apigateway-invoke-status',
        Action='lambda:InvokeFunction',
        Principal='apigateway.amazonaws.com',
        SourceArn=f"arn:aws:execute-api:{REGION}:{ACCOUNT_ID}:{api_id}/*/*"
    )
except: pass

# Integrations
start_integration = apigateway.create_integration(
    ApiId=api_id, IntegrationType='AWS_PROXY', IntegrationUri=START_EXECUTION_ARN, PayloadFormatVersion='2.0'
)

status_integration = apigateway.create_integration(
    ApiId=api_id, IntegrationType='AWS_PROXY', IntegrationUri=CHECK_STATUS_ARN, PayloadFormatVersion='2.0'
)

# Routes
apigateway.create_route(ApiId=api_id, RouteKey='POST /generate', Target=f"integrations/{start_integration['IntegrationId']}")
apigateway.create_route(ApiId=api_id, RouteKey='GET /status', Target=f"integrations/{status_integration['IntegrationId']}")

# Stage
apigateway.create_stage(ApiId=api_id, StageName='prod', AutoDeploy=True)

API_ENDPOINT = f"https://{api_id}.execute-api.{REGION}.amazonaws.com/prod"
print(f"✓ API Gateway created: {API_ENDPOINT}")

## 7. Test the Pipeline

In [None]:
import requests

# Start a job
response = requests.post(f"{API_ENDPOINT}/generate", json={"prompt": "A serene mountain landscape at sunset"})
job_data = response.json()
print(f"Job started: {job_data}")

execution_arn = job_data['execution_arn']
job_id = job_data['job_id']

In [None]:
# Check status
response = requests.get(f"{API_ENDPOINT}/status", params={"execution_arn": execution_arn})
status = response.json()
print(json.dumps(status, indent=2))

## Summary

Infrastructure created:
- S3 bucket for artifacts
- 5 Lambda functions (improve prompt, generate image, generate video, start execution, check status)
- Step Functions state machine for orchestration
- API Gateway with 2 endpoints

Usage:
```bash
# Start job
curl -X POST {API_ENDPOINT}/generate -d '{"prompt":"your text"}'

# Check status
curl {API_ENDPOINT}/status?execution_arn=<arn>
```