# SageMaker Example

## 2. Build the container

demo codes are in `app/`
build and push the docker with following commands:

In [1]:
!bash build_and_push_sglang.sh

set -e

# This script shows how to build the Docker image and push it to ECR to be ready for use
# by SageMaker.

# The argument to this script is the region name. 
# 尝试使用 IMDSv2 获取 token
TOKEN=$(curl -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600")
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    56  100    56    0     0  45197      0 --:--:-- --:--:-- --:--:-- 56000

# Get the current region and write it to the backend .env file
region=$(curl -H "X-aws-ec2-metadata-token: $TOKEN" -s http://169.254.169.254/latest/meta-data/placement/region)
# region=$(aws configure get region)
suffix="com"

if [[ $region =~ ^cn ]]; then
    suffix="com.cn"
fi

# Get the account number associated with the current IAM credentials
account=$(aws sts  get-caller-identity --query Account --output text)

SGL_VERSION=latest
inference_image=sagema

## 3. Deploy on SageMaker

define the model and deploy on SageMaker


### 3.1 Init SageMaker session

In [1]:
# !pip install boto3 sagemaker transformers
import re
import json
import os,dotenv
import boto3
import sagemaker
from sagemaker import Model


dotenv.load_dotenv()
print(os.environ)

boto_sess = boto3.Session(
    region_name='us-east-1'
)

sess = sagemaker.session.Session(boto_session=boto_sess)
# role = sagemaker.get_execution_role()
role = os.environ.get('role')



sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ubuntu/.config/sagemaker/config.yaml
environ({'USER': 'ubuntu', 'SSH_CLIENT': '52.94.133.139 4026 22', 'XDG_SESSION_TYPE': 'tty', 'SHLVL': '2', 'HOME': '/home/ubuntu', 'SSL_CERT_FILE': '/usr/lib/ssl/cert.pem', 'DBUS_SESSION_BUS_ADDRESS': 'unix:path=/run/user/1000/bus', 'LOGNAME': 'ubuntu', '_': '/home/ubuntu/workspace/llm_model_hub/miniconda3/envs/py311/bin/python', 'XDG_SESSION_CLASS': 'user', 'XDG_SESSION_ID': '34414', 'VSCODE_CLI_REQUIRE_TOKEN': '3ae02bac-b0cf-4129-8cc1-92a260fb9929', 'PATH': '/home/ubuntu/workspace/llm_model_hub/miniconda3/envs/py311/bin:/home/ubuntu/.vscode-server/cli/servers/Stable-e54c774e0add60467559eb0d1e229c6452cf8447/server/bin/remote-cli:/home/ubuntu/.local/bin:/home/ubuntu/workspace/llm_model_hub/miniconda3/envs/py311/bin:/home/ubuntu/workspace/llm_model_hub/miniconda3/condabin:/home/ubuntu/.

### 3.2 Prepare model file

#### Option 2: deploy vllm by model_id

In [3]:
!tar czvf model.tar.gz model_tar/

model_tar/
model_tar/env
model_tar/s5cmd


In [3]:


s3_code_prefix = f"sagemaker_endpoint/sglang/"
bucket = sess.default_bucket() 
code_artifact = sess.upload_data("model.tar.gz", bucket, s3_code_prefix)
print(f"S3 Code or Model tar ball uploaded to --- > {code_artifact}")

S3 Code or Model tar ball uploaded to --- > s3://sagemaker-us-east-1-434444145045/sagemaker_endpoint/sglang//model.tar.gz


### 3.3 Deploy model

In [None]:
# CONTAINER='434444145045.dkr.ecr.us-east-1.amazonaws.com/sagemaker_endpoint/vllm:v0.7.2'

# env={
#     "HF_MODEL_ID": model_name,
#     "DTYPE": dtype,
#     "LIMIT_MM_PER_PROMPT":extra_params.get('limit_mm_per_prompt',''),
#     "S3_MODEL_PATH":model_path,
#     "VLLM_ALLOW_LONG_MAX_MODEL_LEN":"1",
#     "HF_TOKEN":os.environ.get('HUGGING_FACE_HUB_TOKEN'),
#     "MAX_MODEL_LEN":extra_params.get('max_model_len', "12288"), 
#     "ENABLE_PREFIX_CACHING": "1" if extra_params.get('enable_prefix_caching') else "0",
#     "TENSOR_PARALLEL_SIZE": extra_params.get('tensor_parallel_size',str(get_auto_tensor_parallel_size(instance_type))),
#     "MAX_NUM_SEQS": extra_params.get('max_num_seqs','256'),
#     "ENFORCE_EAGER": "1" if extra_params.get('enforce_eager') else "0",

#         }

# model = Model(
#     name=sagemaker.utils.name_from_base("sagemaker-vllm")+"_model",
#     model_data=code_artifact,
#     image_uri=CONTAINER,
#     role=role,
#     sagemaker_session=sess,
#     env=env,
    
# )



# # 部署模型到endpoint
# endpoint_name = sagemaker.utils.name_from_base("sagemaker-vllm")+"_endpoint"
# print(f"endpoint_name: {endpoint_name}")
# predictor = model.deploy(
#     initial_instance_count=1,
#     instance_type='ml.g5.2xlarge',
#     endpoint_type = EndpointType.INFERENCE_COMPONENT_BASED,
#     endpoint_name=endpoint_name,
# )

### test deployment from s3

In [None]:
from sagemaker.enums import EndpointType
from sagemaker.compute_resource_requirements.resource_requirements import ResourceRequirements
from sagemaker import Predictor
from sagemaker import Model

resources = ResourceRequirements(
    requests = {
        # "num_cpus": 4,  # Number of CPU cores required:
        "num_accelerators": 1, # Number of accelerators required
        "memory": 1024*4,  # Minimum memory required in Mb (required)
        "copies": 1,
    },
    limits = {},
)

CONTAINER='434444145045.dkr.ecr.us-east-1.amazonaws.com/sagemaker_endpoint/sglang:v0.4.3.post2-cu124'
model_path = "s3://sagemaker-us-east-1-434444145045/Qwen2-5-3B-Instruct/032650faedac452e86f95f3f3b004342/finetuned_model/"
model_id = 'Qwen/Qwen2-1.5B-Instruct'
env={
    "HF_MODEL_ID": model_id,
    "S3_MODEL_PATH":model_path,
}

model_name = sagemaker.utils.name_from_base("sagemaker-sglang")+"-model"

model = Model(
    name=model_name,
    model_data=code_artifact,
    image_uri=CONTAINER,
    role=role,
    sagemaker_session=sess,
    resources = resources,
    env=env,
    predictor_cls = Predictor,
)

# Create the model in SageMaker
# model.create()

# 部署模型到endpoint
endpoint_name = sagemaker.utils.name_from_base("sagemaker-sglang")+"-endpoint"
print(f"endpoint_name: {endpoint_name}")
predictor = model.deploy(
    initial_instance_count=1,
    instance_type='ml.g5.2xlarge',
    endpoint_name=endpoint_name,
    resources = resources,
    endpoint_type = EndpointType.INFERENCE_COMPONENT_BASED,
    model_name=model_name, 
)

endpoint_name: sagemaker-sglang-2025-03-01-09-03-13-505-endpoint


-

In [None]:
import sagemaker
from sagemaker import Model, image_uris, serializers, deserializers
from sagemaker import get_execution_role
from sagemaker.pytorch import PyTorchModel

sm_client = boto3.client(service_name="sagemaker")
model_name = sagemaker.utils.name_from_base("sagemaker-sglang")+"-model"
endpoint_name = sagemaker.utils.name_from_base("sagemaker-sglang")+"-endpoint"
component_name = sagemaker.utils.name_from_base("sagemaker-sglang")+"-component"
env={
    # "HF_MODEL_ID": model_id,
    "S3_MODEL_PATH":model_path,
}
container_config = {
    'Image': CONTAINER,
    'ModelDataUrl': code_artifact,
    'Environment': env
}

response = sm_client.create_model(
    ModelName=model_name,
    ExecutionRoleArn=role,
    PrimaryContainer=container_config
)

print(f"Model created: {response['ModelArn']}")

sm_client.create_inference_component(
    InferenceComponentName=component_name,
    EndpointName=endpoint_name,
    VariantName="AllTraffic",
    Specification={
        "ModelName": model_name,
        "ComputeResourceRequirements": {
		    "NumberOfAcceleratorDevicesRequired": 1, 
			#"NumberOfCpuCoresRequired": 2, 
			"MinMemoryRequiredInMb": 1024*8
	    }
    },
    RuntimeConfig={"CopyCount": 1},
)

## 4. Test

you can invoke your model with SageMaker SDK

### 4.1 Message api non-stream mode

In [27]:
runtime = boto3.client('runtime.sagemaker',region_name='us-east-1')
endpoint_name = "sagemaker-sglang-2025-02-24-06-45-27-924-endpoint"
payload = {
    "messages": [
    {
        "role": "user",
        "content": "who are you"
    }
    ],
    "model":"qwen",
    "max_tokens": 1024,
    "stream": False
}
response = runtime.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType='application/json',
    Body=json.dumps(payload)
)

print(json.loads(response['Body'].read())["choices"][0]["message"]["content"])

ValidationError: An error occurred (ValidationError) when calling the InvokeEndpoint operation: Inference Component Name header is required for endpoints to which you plan to deploy inference components. Please include Inference Component Name header or consider using SageMaker models.

### 4.2 Message api stream mode

In [None]:
payload = {
    "messages": [
    {
        "role": "user",
        "content": "Write a quick sort in python"
    }
    ],
    "model":"custome",
    "max_tokens": 4096,
    "stream": True
}

response = runtime.invoke_endpoint_with_response_stream(
    EndpointName=endpoint_name,
    ContentType='application/json',
    Body=json.dumps(payload)
)

buffer = ""
for t in response['Body']:
    buffer += t["PayloadPart"]["Bytes"].decode()
    last_idx = 0
    for match in re.finditer(r'^data:\s*(.+?)(\n\n)', buffer):
        try:
            data = json.loads(match.group(1).strip())
            last_idx = match.span()[1]
            print(data["choices"][0]["delta"]["content"], end="")
        except (json.JSONDecodeError, KeyError, IndexError) as e:
            pass
    buffer = buffer[last_idx:]

<think>
嗯，用户让我写一个Python的快速排序。好的，首先我得回忆一下快速排序的基本原理。快速排序是通过分治法来排序的，基本思想是选一个基准元素，把数组分成比基准小的和比基准大的两个子数组，然后递归地对这两个子数组进行排序。对吧？

那首先我得想一下怎么选择基准元素。通常的做法是选第一个元素，比如数组中的第一个元素作为基准。或者可以用中间元素，比如len(arr)//2这样的，不过可能不太好。或者用随机选一个基准，这样分割的时候可能分布更均匀。比如可能用户可能希望用第一个或者中间的元素作为基准，这样代码更简单。

那基准的选择方法可能影响排序的效率，但对于普通情况来说，可能第一个元素或者中间的元素就足够了。比如，假设数组是 [5, 2, 9, 1, 5]，选第一个5作为基准的话，分割的时候会把小于等于5的放在左边，大于的放在右边。然后递归处理左边和右边的子数组。这样应该可行。

接下来，实现步骤大概是这样的：首先分割数组，然后递归地处理左右子数组。分割的过程需要一个左指针和一个右指针，初始时左指针在0，右指针在最后一个元素的位置。然后交换元素直到左指针不小于右指针的时候，基准就放到了正确的位置。

那具体的步骤可能是这样的：

1. 如果数组长度小于等于1，返回，因为已经有序。
2. 选择基准元素，比如第一个元素。
3. 初始化左和右的指针，左从0，右从最后一个元素。
4. 交换左指针和右指针移动，直到左指针达到右指针的位置。
5. 将基准元素放到正确的位置。
6. 递归地对左右子数组排序。

现在具体写代码的话，可能需要一个函数，接收数组作为参数。然后处理分割和递归。

比如，初始化pivot_index = 0，然后从right指针往左移动，直到left_index < right_index的时候交换。然后把pivot移到pivot_index的位置。然后递归处理left和right子数组。

那现在写代码的结构大概是这样的：

def quicksort(arr):
    if len(arr) <= 1:
        return arr
    pivot_index = 0
    i = 0
    j = len(arr) - 1
    while i <= j:
        while arr[i] < arr[pivot_

### 4.3 Completion api non-stream mode

In [19]:
# from transformers import AutoTokenizer
# tokenizer = AutoTokenizer.from_pretrained("deepseek-ai/deepseek-coder-6.7b-instruct", trust_remote_code=True)
# messages=[
#     { 'role': 'user', 'content': "write a quick sort algorithm in python."}
# ]
# prompt = tokenizer.apply_chat_template(messages, add_generation_prompt=True, tokenize=False)

# payload = {
#     "model": "deepseek-ai/deepseek-coder-1.3b-instruct",
#     "prompt": prompt,
#     "max_tokens": 1024,
#     "stream": False
# }
endpoint_name = "Llama-3-2-3B-Instruct-2025-02-23-13-01-30-887-sglang-endpoint"
payload = {
    "messages": [
    {
        "role": "user",
        "content": "Write a quick sort in python"
    }
    ],
    "model":"custome",
    "max_tokens": 1000,
    "stream": False
}

response = runtime.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType='application/json',
    Body=json.dumps(payload)
)

In [20]:

print(json.loads(response['Body'].read()))

{'id': 'd06be2c9d2964d24adec6eaa20dbd1a2', 'object': 'chat.completion', 'created': 1740368138, 'model': 'custome', 'choices': [{'index': 0, 'message': {'role': 'assistant', 'content': "<think>\nOkay, I need to write a quick sort algorithm in Python. Let me think about how quick sort works. I remember that quick sort is a divide-and-conquer algorithm. It usually works by choosing a 'pivot' element from the array and partitioning the other elements into two sub-arrays, according to whether they are less than or greater than the pivot. Then, recursively applying the same process to the sub-arrays.\n\nSo, the first step is to select a pivot. Common choices could be the first element, the last element, the median of the first, middle, and last elements, or the median of a subset. Let's pick the median of the first, last and middle elements as the pivot. This can help reduce the worst-case scenario.\n\nOnce the pivot is chosen, we partition the array. The partition step involves rearranging 

In [22]:
1000/15.2

65.78947368421053

In [15]:
endpoint_name = "Llama-3-2-3B-Instruct-2025-02-24-02-47-43-275-vllm-endpoint"

payload = {
    "messages": [
    {
        "role": "user",
        "content": "Write a quick sort in python"
    }
    ],
    "model":"custome",
    "max_tokens": 1000,
    "stream": False
}

response = runtime.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType='application/json',
    Body=json.dumps(payload)
)

In [16]:
print(json.loads(response['Body'].read()))

{'id': 'chatcmpl-e8a5a693408340bb967859a3718baeca', 'object': 'chat.completion', 'created': 1740368084, 'model': '/tmp/model_file/', 'choices': [{'index': 0, 'message': {'role': 'assistant', 'reasoning_content': None, 'content': "<think>\nOkay, so I need to write a quick sort algorithm in Python. Let's start by recalling what quick sort is. It's a divide-and-conquer algorithm that works by selecting a 'pivot' element from the array and partitioning the other elements into two sub-arrays, according to whether they are less than or greater than the pivot. Then, recursively applying this process to the sub-arrays.\n\nI should first choose a pivot, but the choice of pivot can affect performance.通常的做法是用中间元素或者随机元素作为pivot.为了简单起见，这里用第一项或者最后一项作为pivot。比如，可能选择stack[len(stack)-1]或stack[0]。或者用户可能需要自定义的分割方式？\n\n先写一个基础的实现。比如，假设有一个排序好的函数来帮助处理重复元素的话，但通常是O(n)的时间复杂度，所以应该处理重复的情况。但这里可能先不考虑重复，之后考虑是否能优化。\n\n首先定义quick_sort函数。参数是数组list，要等于至少两个元素的情况。\n\n那步骤大概是这样的：\n\n1. 如果数组长度小于等于1，直接返回，因为已经排序好了。\n2. 选择一个pivot。

In [18]:
1000/15.9

62.893081761006286

### 4.4 Completion api stream mode

In [1]:
import re
import json
import boto3
from botocore.exceptions import ClientError
import time
max_retries = 8
retry_delay = 10

# 更改成model hub部署的endpoint名称和region name
endpoint_name = "Qwen2-5-VL-3B-Instruct-2025-03-07-15-34-sglang-endpoint"
inference_component_name = "Qwen2-5-VL-3B-Instruct-2025-03-07-15-34-sglang-component"
region_name = 'us-east-1'
runtime = boto3.client('runtime.sagemaker',region_name=region_name)
payload = {
    "messages": [
    {
        "role": "user",
        "content": "who are you"
    }
    ],
    "max_tokens": 1024,
    "stream": False,
    "model":"any"
}

print("# 指数回退重试 --------------")
def invoke_with_exponential_backoff(endpoint_name, payload,retry_delay):
    for attempt in range(max_retries):
        try:
            response = runtime.invoke_endpoint(
                EndpointName=endpoint_name,
                InferenceComponentName=inference_component_name,
                ContentType='application/json',
                Body=json.dumps(payload)
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ValidationError' and 'Inference Component has no capacity' in e.response['Error']['Message']:
                if attempt < max_retries - 1:
                    print(f"Inference Component has no capacity, retrying in {retry_delay} seconds (attempt {attempt+1}/{max_retries})")
                    time.sleep(retry_delay)
                    retry_delay *= 2  # Exponential backoff
                else:
                    print("Maximum retries reached, unable to process request")
            else:
                raise
            
response = invoke_with_exponential_backoff(endpoint_name, payload,retry_delay)
print(json.loads(response['Body'].read())["choices"][0]["message"]["content"])


print("# 非流式 --------------")
# 非流式
response = runtime.invoke_endpoint(
    EndpointName=endpoint_name,
    InferenceComponentName=inference_component_name,
    ContentType='application/json',
    Body=json.dumps(payload)
)

print(json.loads(response['Body'].read())["choices"][0]["message"]["content"])


payload = {
    "messages": [
    {
        "role": "user",
        "content": "Write a quick sort in python"
    }
    ],
    "max_tokens": 1024,
    "stream": True,
    "model":"any"
}


# 流式
print("# 流式 --------------")
response = runtime.invoke_endpoint_with_response_stream(
    EndpointName=endpoint_name,
    InferenceComponentName=inference_component_name,
    ContentType='application/json',
    Body=json.dumps(payload)
)

buffer = ""
for t in response['Body']:
    buffer += t["PayloadPart"]["Bytes"].decode()
    last_idx = 0
    for match in re.finditer(r'^data:\s*(.+?)(\n\n)', buffer):
        try:
            data = json.loads(match.group(1).strip())
            last_idx = match.span()[1]
            print(data["choices"][0]["delta"]["content"], end="",flush=True)
        except (json.JSONDecodeError, KeyError, IndexError) as e:
            pass
    buffer = buffer[last_idx:]

# 指数回退重试 --------------
Inference Component has no capacity, retrying in 10 seconds (attempt 1/8)
Inference Component has no capacity, retrying in 20 seconds (attempt 2/8)
Inference Component has no capacity, retrying in 40 seconds (attempt 3/8)
Inference Component has no capacity, retrying in 80 seconds (attempt 4/8)
Inference Component has no capacity, retrying in 160 seconds (attempt 5/8)


KeyboardInterrupt: 