# Stable diffusion 2.1 - Sagemaker endpoint, Custom inference script, and Custom ECR image
### Deploy Stable Diffusion Models with Full Control to a SageMaker Endpoint

1. [Introduction](#Introduction)  
2. [Development Environment and Permissions](#Development-Environment-and-Permissions)
3. [Custom inference script](#custom-inference-script-creation)
4. [Create model and deploy](#creation-of-the-hugging-face-model-and-deploy)
5. [Test endpoint](#testing-the-endpoint)
6. [Conclusion](#conclusion)

## Introduction

Welcome to our SageMaker endpoint example, where we will deploy a Stable Diffusion 2.1 base model. In this notebook, we will create a new inference script and refer to the [extending-image-notebook](../01_extending_aws_dlc_images/extending-image.ipynb) for guidance on creating a custom ECR image. That notebook provides detailed instructions on extending AWS DLC images and incorporating your custom image into the deployment process. Additionally, if you prefer to use an AWS DLC image, make sure to include a requirements.txt file with the necessary Python libraries for running the Stable Diffusion model.

# Development Environment and Permissions

## Installing Required Libraries



In [None]:
!pip install "sagemaker==2.116.0" "huggingface_hub==0.10.1" --upgrade --quiet

## Permissions
If you are going to use Sagemaker in a local environment. You need access to an IAM Role with the required permissions for Sagemaker.


In [None]:
import sagemaker
import boto3
import os
sess = sagemaker.Session()
# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client('iam')
    role = iam.get_role(RoleName='sagemaker_execution_role')['Role']['Arn']

sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")

# Check if the "code" directory exists, and create it if it doesn't
code_dir = "code"
if not os.path.exists(code_dir):
    os.makedirs(code_dir)


## Retrieving Image URI and Model URI


In [None]:
model_id = 'model-txt2img-stabilityai-stable-diffusion-v2-1-base'
model_version = "*"
from sagemaker import model_uris
from sagemaker.utils import name_from_base

endpoint_name = name_from_base("stable-diffusion-v2-1-base")

# Retrieve the model URI, which includes the pre-trained model, parameters, and inference scripts.
# This URI encompasses all necessary dependencies and scripts for model loading and inference handling.
model_uri = model_uris.retrieve(
    model_id=model_id, model_version=model_version, model_scope="inference"
)
print(model_uri)

# Alternatively, you can use your own stored model's S3 URI.
s3_model_uri = 's3://sagemaker-us-west-2-499172972132/stable-diffusion-v2-1/model.tar.gz'
print(s3_model_uri)

aws_DLC_image = '763104351884.dkr.ecr.us-west-2.amazonaws.com/huggingface-pytorch-inference:1.10.2-transformers4.17.0-gpu-py38-cu113-ubuntu20.04'
custom_ecr_image = 'xxxxxxxxxx.dkr.ecr.us-west-2.amazonaws.com/pytorch-extending-deepspeed-stable-diffusion-v2-1-base'


## Util functions

In [None]:
from PIL import Image
from io import BytesIO
from IPython.display import display
import base64
import matplotlib.pyplot as plt

# helper decoder
def decode_base64_image(image_string):
  base64_image = base64.b64decode(image_string)
  buffer = BytesIO(base64_image)
  return Image.open(buffer)

# display PIL images as grid
def display_images(images=None,columns=3, width=100, height=100):
    plt.figure(figsize=(width, height))
    for i, image in enumerate(images):
        plt.subplot(int(len(images) / columns + 1), columns, i + 1)
        plt.axis('off')
        plt.imshow(image)



# Custom Inference Script Creation

### Inference script extension:
We have expanded the base inference script to include support for additional functionalities such as text-to-image, image-to-image, and text-to-vector transformations.

In [3]:
%%writefile code/inference.py
import base64
import torch
from io import BytesIO
import json
from PIL import Image
from diffusers import StableDiffusionPipeline, DPMSolverMultistepScheduler, StableDiffusionImg2ImgPipeline


def model_fn(model_dir):

    device = "cuda"
    image2image_pipe = StableDiffusionImg2ImgPipeline.from_pretrained(
        model_dir,
        torch_dtype=torch.float16,
    ).to(device)

    # Load stable diffusion and move it to the GPU
    pipe = StableDiffusionPipeline.from_pretrained(model_dir, torch_dtype=torch.float16)
    pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config)
    pipe = pipe.to(device)


    return { "text2image": pipe, "image2image": image2image_pipe }


def predict_fn(data, pipe):
    device = "cuda"
    # get prompt & parameters
    prompt = data.pop("inputs", data)
    # set valid HP for stable diffusion
    num_inference_steps = max(min(data.pop("num_inference_steps", 25), 100), 0)
    guidance_scale = data.pop("guidance_scale", 7.5)
    strength = data.pop("strength", 0.8)
    num_images_per_prompt = data.pop("num_images_per_prompt", 1)
    negative_prompt = data.pop("negative_prompt", None)

    width = max(min(data.pop("width", 512), 1024), 64)
    height = max(min(data.pop("height", 512), 1024), 64)
    width = (width // 8) * 8
    height = (height // 8) * 8

    # get mode (text2image, text2vector, image2image)
    mode = data.pop("mode", data)
    init_image = data.pop("image", None)


    seed = data.pop("seed", None)
    latents = None
    seeds = []

    generator = torch.Generator(device=device)
    if mode == 'text2image':
        if seed:
            generator.manual_seed(seed)
            latents = torch.randn(
                (1, pipe[mode].unet.in_channels, height // 8, width // 8),
                generator = generator,
                device = device
            )
            #we set the amount of images to 1, otherwise we're generating x times the same image.
            num_images_per_prompt = 1
        else:
            for _ in range(num_images_per_prompt):
                # Get a new random seed, store it and use it as the generator state
                _seed = generator.seed()
                seeds.append(_seed)
                generator = generator.manual_seed(_seed)

                image_latents = torch.randn(
                    (1, pipe[mode].unet.in_channels, height // 8, width // 8),
                    generator = generator,
                    device = device
                )
                latents = image_latents if latents is None else torch.cat((latents, image_latents))

        # run generation with parameters
        with torch.autocast("cuda"):
            generated_images = pipe['text2image'](
                [prompt] * num_images_per_prompt,
                num_inference_steps=num_inference_steps,
                guidance_scale=guidance_scale,
                # num_images_per_prompt=num_images_per_prompt,
                negative_prompt=[negative_prompt] * num_images_per_prompt if negative_prompt else None,
                latents = latents
            )["images"]

        # create response
        encoded_images = []
        for image in generated_images:
            buffered = BytesIO()
            image.save(buffered, format="JPEG")
            encoded_images.append(base64.b64encode(buffered.getvalue()).decode())

        # create response
        return {"generated_images": encoded_images, "seeds": seeds or [seed]}

    if mode == 'image2image' and init_image:
        seed = seed or generator.seed()
        # generators = [generator.manual_seed(seed)]*num_images_per_prompt
        # run generation with parameters
        init_image = base64.b64decode(init_image)
        buffer = BytesIO(init_image)
        init_image = Image.open(buffer).convert("RGB")
        init_image = init_image.resize((width, height))


        generated_images = pipe['image2image'](
            num_images_per_prompt=num_images_per_prompt,
            prompt=prompt,
            image=init_image,
            num_inference_steps=num_inference_steps,
            guidance_scale=guidance_scale,
            strength=strength,
            negative_prompt=negative_prompt,
            # negative_prompt=[negative_prompt]*num_images_per_prompt if negative_prompt else None,
            # generator=generators,
        )["images"]

        # create response
        encoded_images = []
        for image in generated_images:
            buffered = BytesIO()
            image.save(buffered, format="JPEG")
            encoded_images.append(base64.b64encode(buffered.getvalue()).decode())

        # create response
        return {"generated_images": encoded_images, "seeds": seeds or [seed]}

    if mode == 'text2vector':
        # tokenize the prompt
        prompt_inputs = pipe['text2image'].tokenizer(
            prompt, return_tensors='pt',
            padding='max_length'
        ).to("cuda")
        # create prompt encoding
        prompt_embeds = pipe['text2image'].text_encoder(**prompt_inputs)
        # extract CLIP embedding
        prompt_embeds = prompt_embeds['pooler_output']

        prompt_embeds = prompt_embeds.cpu().detach().numpy()

        # Serialize the NumPy array to JSON
        prompt_embeds = json.dumps(prompt_embeds.tolist())

        return {"generated_vector": prompt_embeds}

    return {"error": "specify mode (text2image, text2vector, or image2image)"}


Writing code/inference.py


# Creation of the Hugging Face Model and deploy

If you don't have a custom ECR image, you can change the variable `custom_ecr_image` to `aws_DLC_image`. Additionally, make sure to run the `write_requirements.txt` script to enable SageMaker to install the necessary Python libraries.

In [5]:
# %%writefile code/requirements.txt
# diffusers==0.11.1 
# transformers==4.25.1 
# scipy==1.9.3 
# accelerate==0.15.0


Writing code/requirements.txt


In [None]:
from sagemaker.huggingface.model import HuggingFaceModel


# create Hugging Face Model Class
huggingface_model = HuggingFaceModel(
   model_data=model_uri,      # path to your model and script
   image_uri=custom_ecr_image, # path to your private ecr image
   entry_point = 'inference.py', #custom inference script
   source_dir = "./code/",
   role=role,                    # iam role with permissions to create an Endpoint
)

# deploy the endpoint endpoint
predictor = huggingface_model.deploy(
    initial_instance_count=1,
    instance_type="ml.g5.xlarge"
    )


# Testing the Endpoint

Please allow a few minutes for the endpoint to become live. Once it's ready, you can test the endpoint using either the predictor object or invoke the endpoint using boto3.

### text2image

In [None]:
num_images_per_prompt = 3
prompt = "lamb on a scooter, yellow color, high quality, highly detailed, elegant, sharp focus"

# Perform prediction
response = predictor.predict(data={
  "inputs": prompt,
  "mode": "text2image",
  "num_images_per_prompt": num_images_per_prompt,
  }
)

decoded_images = [decode_base64_image(image) for image in response["generated_images"]]
display_images(decoded_images)

### image2image

In [None]:
init_image = Image.open("./sketch-mountains-input.jpeg")
buffered = BytesIO()
init_image.save(buffered, format="JPEG")
init_image = base64.b64encode(buffered.getvalue()).decode()

# run prediction
response = predictor.predict(data={
  "inputs": "A fantasy landscape, trending on artstation",
  "mode": "image2image",
  "num_images_per_prompt": 1,
    "image": init_image,
  }
)

# decode images
decoded_images = [decode_base64_image(image) for image in [init_image, response["generated_images"][0]]]

# visualize generation
display_images(decoded_images)

### Text2Vector

If you wish to store the text vector, you can invoke this endpoint.

In [None]:
num_images_per_prompt = 3
prompt = "lamb on a scooter, yellow color, high quality, highly detailed, elegant, sharp focus"

# Perform prediction
response = predictor.predict(data={
  "inputs": prompt,
  "mode": "text2vector"
  }
)

response["generated_vector"]

# Conclusion

Remember to delete your endpoint to ensure that resources are not being unnecessarily consumed.

In [None]:
# predictor.delete_model()
predictor.delete_endpoint()