In [None]:
!pip install "transformers==4.34.0" "datasets[s3]==2.13.0" "sagemaker>=2.190.0" "gradio==3.50.2" --upgrade --quiet

In [2]:
import sagemaker
import boto3
import os
import tarfile
from sagemaker.s3 import S3Uploader

from distutils.dir_util import copy_tree
from sagemaker.huggingface import HuggingFace
from sagemaker.huggingface.model import HuggingFaceModel

In [None]:
sess = sagemaker.Session()
# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client('iam')
    role = iam.get_role(RoleName='sagemaker_execution_role')['Role']['Arn']

sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")

In [None]:
# Compressed model URI; it can either be a fine-tuned model or a manually downloaded binary file of the model 
model_uri = 's3://sagemaker-eu-north-1-564976835481/huggingface-pytorch-training-2023-11-28-07-30-02-735/output/model.tar.gz'

In [None]:
url_parts = model_uri.split("/")  # => ['s3:', '', 'sagemakerbucketname', 'data', ...
# bucket_name = url_parts[2]
key = os.path.join(*url_parts[3:])
filename = url_parts[-1]

In [None]:
# s3 object and client to download and uplaod files
s3 = boto3.resource('s3')
client = s3.meta.client

In [None]:
# download model from s3
s3.Bucket(bucket).download_file(key, filename)

In [None]:
# extract compressed model.tar.gz
os.makedirs('model_extracted', exist_ok=True)
file = tarfile.open('model.tar.gz')
file.extractall('model_extracted')
file.close()

In [None]:
# copy code folder which contains inference.py and requirement.txt in extracted folder of compressed model.tar.gz
copy_tree("code/", os.path.join('model_extracted', 'code'))

In [None]:
# rename tag.gz model since we are going to compress new model and code with the same name
!mv model.tar.gz model.tar.gz_backup

In [None]:
# helper to create the model.tar.gz
def compress(tar_dir=None, output_file="model.tar.gz"):
    parent_dir=os.getcwd()
    os.chdir(tar_dir)
    with tarfile.open(os.path.join(parent_dir, output_file), "w:gz") as tar:
        for item in os.listdir('.'):
            print(item)
            tar.add(item, arcname=item)
    os.chdir(parent_dir)

compress(str('model_extracted'))

In [None]:
# upload model.tar.gz to s3
s3_model_uri=S3Uploader.upload(local_path="model.tar.gz", desired_s3_uri=f"s3://{sess.default_bucket()}/diffusion_dreambooth_fine_tuned")

In [None]:
# create Hugging Face Model Class
huggingface_model = HuggingFaceModel(
   model_data=s3_model_uri,      # path to your model and script
   role=role,                    # iam role with permissions to create an Endpoint
   transformers_version='4.28',  # transformers version used
   pytorch_version='2.0',        # pytorch version used
   py_version='py310',           # python version used
)

In [None]:
# deploy the endpoint endpoint
predictor = huggingface_model.deploy(
    initial_instance_count=1,
    instance_type="ml.g4dn.xlarge"
    )

## Test Deployment

In [None]:
from PIL import Image
from io import BytesIO
from IPython.display import display
import base64
import matplotlib.pyplot as plt

In [None]:
# helper decoder
def decode_base64_image(image_string):
    base64_image = base64.b64decode(image_string)
    buffer = BytesIO(base64_image)
    return Image.open(buffer)

# display PIL images as grid
def display_images(images=None,columns=3, width=100, height=100):
    plt.figure(figsize=(width, height))
    for i, image in enumerate(images):
        plt.subplot(int(len(images) / columns + 1), columns, i + 1)
        plt.axis('off')
        plt.imshow(image)

In [None]:
num_images_per_prompt = 3
prompt = "a photo of sks cat"

# run prediction
response = predictor.predict(data={
  "inputs": prompt,
  "num_images_per_prompt" : num_images_per_prompt
  }
)

# decode images
decoded_images = [decode_base64_image(image) for image in response["generated_images"]]

# visualize generation
display_images(decoded_images)