# Facefusion on Sagemaker

## build image

In [None]:
# Build an image that can do training and inference in SageMaker
# This is a Python 3 image that uses the nginx, gunicorn, flask stack
# for serving inferences in a stable way.

# FROM pytorch/pytorch:2.1.0-cuda11.8-cudnn8-devel
# FROM 763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:2.1.0-cpu-py310-ubuntu20.04-ec2
# FROM 763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:2.1.0-gpu-py310-cu118-ubuntu20.04-ec2
# FROM 763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:1.13.1-gpu-py39-cu117-ubuntu20.04-ec2
# ref from https://github.com/facefusion/facefusion-docker
FROM python:3.10
ARG DEBIAN_FRONTEND=noninteractive
ARG FACEFUSION_VERSION=2.3.0
ENV GRADIO_SERVER_NAME=0.0.0.0
ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE
ENV PATH="/opt/program:${PATH}"


WORKDIR /opt/program

RUN apt-get update
RUN apt-get install curl -y
RUN apt-get install ffmpeg -y

##安装sagemaker endpoint所需的组件
RUN apt-get install nginx -y  
RUN pip install --no-cache-dir boto3 flask gunicorn
# RUN git clone https://github.com/facefusion/facefusion.git --branch ${FACEFUSION_VERSION} --single-branch .
##拷贝包含sagemaker endpoint所需的python和配置文件
COPY facefusion /opt/program
RUN python install.py --torch cpu --onnxruntime default

WORKDIR /opt/program

In [None]:
# 在 Facefusion-Sagemaker-Studio-Lab 目录执行如下命令，如上docker file是已CPU举例的，可以修改使用GPU 可以参考gpu_Dockerfile
!./build_and_push.sh faces-swap-on-sagemaker

## local test

In [None]:
# 镜像构建完毕后，使用如下命令在Facefusion-Sagemaker-Studio-Lab目录执行,本地启动docker 镜像
./local_test/serve_local.sh facefusion-sagemaker

执行 如下命令，测试本地endpoint 参数根据实际情况替换
curl -X POST -H 'content-type:application/json'  localhost:8080/invocations  \ -d '{"encrypt_tel":"c64ac2a6b2d149a50fb2634c7b18514d","clue_createtime":"2021-7-20 10:58:33","clue_type":"student","city_code":"5"}' 

In [None]:
# 新建CLI终端执行如下命令用于本地测试
curl -XPOST localhost:8080/invocations  -H 'content-type:application/json'  -d '{"input":"python run.py -s image1.jpg -t test.mp4 -o . --headless"}'

## create sagemaker model

In [3]:
import boto3
import sagemaker
from sagemaker import Model, image_uris, serializers, deserializers

role = sagemaker.get_execution_role()  # execution role for the endpoint
sess = sagemaker.session.Session()  # sagemaker session for interacting with different AWS APIs
region = sess._region_name  # region name of the current SageMaker Studio environment
account_id = sess.account_id()  # account_id of the current SageMaker Studio environment
bucket = sess.default_bucket()
image="facefusion-sagemaker"
s3_client = boto3.client("s3")
sm_client = boto3.client("sagemaker")
smr_client = boto3.client("sagemaker-runtime")

full_image_uri=f"{account_id}.dkr.ecr.{region}.amazonaws.com/{image}:latest"
print(full_image_uri)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml
517141035927.dkr.ecr.us-west-2.amazonaws.com/facefusion-sagemaker:latest


In [6]:
!pip install sagemaker_ssh_helper

Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
Collecting sagemaker_ssh_helper
  Downloading https://pypi.tuna.tsinghua.edu.cn/packages/4d/2c/667d2ca96dc1bd3cc41561baed6cfc3745f0d72d57f3aaf5dee01a4aa8ce/sagemaker_ssh_helper-2.1.0-py3-none-any.whl (70 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m71.0/71.0 kB[0m [31m192.8 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Installing collected packages: sagemaker_ssh_helper
Successfully installed sagemaker_ssh_helper-2.1.0


In [7]:
import boto3
import re
import os
import json
import uuid
import boto3
import sagemaker
from time import gmtime, strftime
## for debug only
from sagemaker_ssh_helper.wrapper import SSHModelWrapper
sm_client = boto3.client(service_name='sagemaker')



def create_model():
    image=full_image_uri
    model_name="facefusion-sagemaker-01"+strftime("%Y-%m-%d-%H-%M-%S", gmtime())
    create_model_response = sm_client.create_model(
        ModelName=model_name,
        ExecutionRoleArn=role,
        Containers=[{"Image": image}],
    )
    print(create_model_response)
    return model_name

In [8]:
model_name=create_model()

{'ModelArn': 'arn:aws:sagemaker:us-west-2:517141035927:model/facefusion-sagemaker-012024-04-04-16-49-40', 'ResponseMetadata': {'RequestId': '6a0e0d24-1075-4b97-a3d1-e31d10910359', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6a0e0d24-1075-4b97-a3d1-e31d10910359', 'content-type': 'application/x-amz-json-1.1', 'content-length': '104', 'date': 'Thu, 04 Apr 2024 16:49:40 GMT'}, 'RetryAttempts': 0}}


## create endpoint configuration

In [26]:
# modelName="facefusion-sagemaker"+strftime("%Y-%m-%d-%H-%M-%S", gmtime())
variantName="facefusion-sagemaker"+strftime("%Y-%m-%d-%H-%M-%S", gmtime())+"-variant"

def create_endpoint_configuration():
    endpointConfigName="facefusion-sagemaker-configuration"+strftime("%Y-%m-%d-%H-%M-%S", gmtime())
    create_endpoint_config_response = sm_client.create_endpoint_config(
       EndpointConfigName=endpointConfigName,
        ProductionVariants=[
            {
            "ModelName":model_name,
            "VariantName": variantName,
            "InitialInstanceCount": 1,
            "InitialVariantWeight": 1.0,
            "InstanceType": "ml.g5.xlarge"
            }
        ]
    )
    print(create_endpoint_config_response)
    return endpointConfigName

In [28]:
endpointConfigName=create_endpoint_configuration()

TypeError: create_endpoint_config() only accepts keyword arguments.

In [23]:
print(endpointConfigName)

None


## create endpoint

In [20]:
endpointName="facefusion-sagemaker-endpoint"+strftime("%Y-%m-%d-%H-%M-%S", gmtime())
def create_endpoint():
    create_endpoint_response = sm_client.create_endpoint(
        EndpointName=endpointName,
        #EndpointConfigName="facefusion-sagemaker-configuration2024-03-28-04-03-53",
        EndpointConfigName=endpointConfigName
    )
    print("Endpoint Arn: " + create_endpoint_response["EndpointArn"])
    resp = sm_client.describe_endpoint(EndpointName=endpointName)
    print("Endpoint Status: " + resp["EndpointStatus"])
    print("Waiting for {} endpoint to be in service".format(endpointName))
    waiter = sm_client.get_waiter("endpoint_in_service")
    waiter.wait(EndpointName=endpointName)

In [21]:
create_endpoint()

ParamValidationError: Parameter validation failed:
Invalid type for parameter EndpointConfigName, value: None, type: <class 'NoneType'>, valid types: <class 'str'>

## Realtime inferecne with sagemaker endpoint

In [5]:
import json
runtime_sm_client = boto3.client(service_name="sagemaker-runtime")
endpointName="facefusion-sagemaker-endpoint2024-04-03-23-49-44"
request = {"input":'s3://sagemaker-us-west-2-687912291502/video/test_out2.mp4',"method":"get_status"}                   
#request = {"method":"submit","input":['-s','image1.jpg','-t','test.mp4','-o','/tmp/','-u','s3://sagemaker-us-west-2-687912291502/video/test_out2.mp4','--headless'],}
def invoke_endpoint():
    content_type = "application/json"
    #request_body = {"input":['-s', 'taotao.jpeg', '-t', 'lht.jpg', '-o', '.', '--headless']} ##输入是s3地址
    #request_body = {"method":"submit","input":['-s','image1.jpg','-t','test.mp4','-o','/tmp/','-u','s3://sagemaker-us-west-2-687912291502/video/test_out.mp4','--headless'],}
    request_body = request
    payload = json.dumps(request_body)
    print(payload)
    response = runtime_sm_client.invoke_endpoint(
        EndpointName=endpointName,
        ContentType=content_type,
        Body=payload,
    )
    result = response['Body'].read().decode()
    print('返回：',result)

In [6]:
response=invoke_endpoint()

{"input": "s3://sagemaker-us-west-2-687912291502/video/test_out2.mp4", "method": "get_status"}
返回： {"status": "success"}


In [7]:
!aws s3 cp s3://sagemaker-us-west-2-687912291502/video/test_out2.mp4 ./

download: s3://sagemaker-us-west-2-687912291502/video/test_out2.mp4 to ./test_out2.mp4


## Async inference

In [None]:
import re
import os
import json
import uuid
import boto3
import sagemaker
from time import gmtime, strftime

from sagemaker import get_execution_role,session

role = get_execution_role()


sage_session = session.Session()
bucket = sage_session.default_bucket()
aws_region = boto3.Session().region_name
client = boto3.client('sagemaker')

print(f'sagemaker sdk version: {sagemaker.__version__}\nrole:  {role}  \nbucket:  {bucket}')

In [None]:

boto3_session = boto3.session.Session()
current_region=boto3_session.region_name

client = boto3.client("sts")
account_id=client.get_caller_identity()["Account"]

client = boto3.client('sagemaker')

#使用步骤2编译好的docker images
container = '517141035927.dkr.ecr.us-west-2.amazonaws.com/facefusion-sagemaker'

role_arn = "arn:aws:iam::517141035927:role/vis-search-Role-1TOLNTF2IOGEC"
model_name = 'facefusion-' +  strftime("%Y-%m-%d-%H-%M-%S", gmtime())
image="517141035927.dkr.ecr.us-west-2.amazonaws.com/facefusion-sagemaker"
create_model_response = client.create_model(
        ModelName=model_name,
        ExecutionRoleArn=role_arn,
        Containers=[{"Image": image}],
    )
    

# model_name = 'AIGC-Quick-Kit-' +  strftime("%Y-%m-%d-%H-%M-%S", gmtime())
# role = get_execution_role()
# model_data = f's3://{bucket}/stablediffusion/assets/model.tar.gz'
# primary_container = {
#     'Image': container,
#     'ModelDataUrl': model_data,
#     'Environment':{
#         's3_bucket': bucket,
#         'model_name':'runwayml/stable-diffusion-v1-5' #默认为runwayml/stable-diffusion-v1-5
#     }
# }

# create_model_response = client.create_model(
#     ModelName = model_name,
#     ExecutionRoleArn = role,
#     PrimaryContainer = primary_container,

In [None]:
_time_tag = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
_variant_name =  'facusion-'+ _time_tag
endpoint_config_name = f'facefusion-{str(uuid.uuid4())}'

response = client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            'VariantName': _variant_name,
            'ModelName': model_name,
            'InitialInstanceCount': 1,
            'InstanceType': 'ml.c5.large',
            'InitialVariantWeight': 1
        },
    ]
    ,
    AsyncInferenceConfig={
        'OutputConfig': {
            'S3OutputPath': f's3://{bucket}/stablediffusion/asyncinvoke/out/'
        }
    }
)

In [None]:
endpoint_name = f'facefusion-{str(uuid.uuid4())}'

response = client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName="endpoint_config_name",
    
)

print(f'终端节点:{endpoint_name} 正在创建中，首次启动中会加载模型，请耐心等待, 请在控制台上查看状态')


In [None]:
import time
def predict_async(endpoint_name,payload):
    runtime_client = boto3.client('runtime.sagemaker')
    input_file=str(uuid.uuid4())+".json"
    s3_resource = boto3.resource('s3')
    s3_object = s3_resource.Object(bucket, f'stablediffusion/asyncinvoke/input/{input_file}')
    payload_data = json.dumps(payload).encode('utf-8')
    s3_object.put( Body=bytes(payload_data))
    input_location=f's3://{bucket}/stablediffusion/asyncinvoke/input/{input_file}'
    print(f'input_location: {input_location}')
    response = runtime_client.invoke_endpoint_async(
        EndpointName=endpoint_name,
        InputLocation=input_location
    )
    result =response.get("OutputLocation",'')
    wait_async_result(result)
    
def wait_async_result(output_location,timeout=60):
    current_time=0
    while current_time<timeout:
        if s3_object_exists(output_location):
            print("have async result")
            draw_image(output_location)
            break
        else:
            time.sleep(5)
def s3_object_exists(s3_path):
    """
    s3_object_exists
    """
    try:
        s3 = boto3.client('s3')
        base_name=os.path.basename(s3_path)
        _,ext_name=os.path.splitext(base_name)
        bucket,key=get_bucket_and_key(s3_path)
        
        s3.head_object(Bucket=bucket, Key=key)
        return True
    except Exception as ex:
        print("job is not completed, waiting...")   
        return False

### test

In [None]:
endpoint_name="facefusion-971153c4-297d-41e6-b952-cf0e8b9e3956"
payload='{"input":"python run.py -s 1.jpg -t test.mp4 -o . --headless"}'
predict_async(endpoint_name,payload)

## create all resouces with cdk

In [None]:
from aws_cdk import core
from aws_cdk.aws_iam import Role, ManagedPolicy, ServicePrincipal
from aws_cdk.aws_sagemaker import CfnModel, CfnEndpointConfig, CfnEndpoint


class SageMakerStack(core.Stack):
    def __init__(
        self,
        scope: core.Construct,
        id_: str,
        env: core.Environment,
    ) -> None:
        super().__init__(scope=scope, id=id_, env=env)
        self.env = env

    def create_model(
        self,
        id_: str,
        model_name: str,
        image_name: str,
    ) -> CfnModel:
        role = Role(
            self,
            id=f"{id_}-SageMakerRole",
            role_name=f"{id_}-SageMakerRole",
            assumed_by=ServicePrincipal("sagemaker.amazonaws.com"),
            managed_policies=[
                ManagedPolicy.from_aws_managed_policy_name("AmazonSageMakerFullAccess")
            ],
        )
        container = CfnModel.ContainerDefinitionProperty(
            container_hostname="<container-hostname>",
            image="{}.dkr.ecr.eu-west-1.amazonaws.com/{}:latest".format(
                self.env.account, image_name
            ),
        )
        return CfnModel(
            self,
            id=f"{id_}-SageMakerModel",
            model_name=model_name,
            execution_role_arn=role.role_arn,
            containers=[container],
        )

    def create_endpoint_configuration(
        self,
        id_: str,
        model_name: str,
        endpoint_configuration_name: str,
    ) -> CfnEndpointConfig:
        return CfnEndpointConfig(
            self,
            id=f"{id_}-SageMakerEndpointConfiguration",
            endpoint_config_name=endpoint_configuration_name,
            production_variants=[
                CfnEndpointConfig.ProductionVariantProperty(
                    model_name=model_name,
                    initial_variant_weight=1.0,
                    variant_name="AllTraffic",
                    serverless_config=CfnEndpointConfig.ServerlessConfigProperty(
                        max_concurrency=1,
                        memory_size_in_mb=2048,
                    ),
                )
            ],
        )

    def create_endpoint(
        self,
        id_: str,
        endpoint_configuration_name: str,
        endpoint_name: str,
    ) -> CfnEndpoint:
        return CfnEndpoint(
            self,
            id=f"{id_}-SageMakerEndpoint",
            endpoint_config_name=endpoint_configuration_name,
            endpoint_name=endpoint_name,
        )

## deploy model with cdk

In [None]:
from aws_cdk import core
from stacks.sagemaker import SageMakerStack


class SimpleExampleApp(core.App):

    def __init__(self) -> None:
        super().__init__()
        env = core.Environment(
            account="<account>",
            region="<region>",
        )

        sagemaker = SageMakerStack(
            scope=self,
            id_="app-sagemaker-stack",
            env=env,
        )
        model = sagemaker.create_model(
            id_="AppModel",
            model_name="<model-name>",
            image_name="<image-name>",
        )
        endpoint_config = sagemaker.create_endpoint_configuration(
            id_="AppEndpointConfiguration",
            model_name="<model-name>",
            endpoint_configuration_name="app-endpoint-configuration",
        )
        endpoint_config.add_depends_on(model)
        endpoint = sagemaker.create_endpoint(
            id_="AppEndpoint",
            endpoint_configuration_name="app-endpoint-configuration",
            endpoint_name="app-endpoint",
        )
        endpoint.add_depends_on(endpoint_config)

simple_app = SimpleExampleApp()
simple_app.synth()