# VLM 部署 - SageMaker Endpoint

本notebook提供完整的视觉语言模型(VLM)在SageMaker上的生产级部署流程，包括：
- 模型准备和S3存储
- LMI容器配置
- 端点部署
- 自动扩缩容配置
- 推理调用示例

## 前置要求
- 在SageMaker Studio或Notebook Instance中运行
- 确保有足够的ml.g6e实例配额
- 准备好要部署的VLM模型

## 1. 环境准备

In [None]:
# 检查是否安装了依赖
!pip list | grep -E "(boto3|sagemaker|huggingface_hub|transformers)"

In [None]:
# 按需安装缺失的的依赖，例如：
!pip install boto3 sagemaker -q

In [None]:
import os
import tarfile
import json
import base64
import boto3
import sagemaker
from sagemaker import Model
from datetime import datetime

# 获取SageMaker session和权限
sess = sagemaker.Session()
role = sagemaker.get_execution_role()
region = sess._region_name
bucket = sess.default_bucket()

print(f"Region: {region}, Role: {role}")
print(f"Default bucket: {bucket}")

## 2. 配置参数

根据你的需求修改以下配置：

In [None]:
# 部署配置
MODEL_S3_PATH = "s3://your-bucket/models/qwen2-5-vl-7b/"  # 修改为你的模型S3路径
INSTANCE_TYPE = "ml.g6e.2xlarge"  # 实例类型
INITIAL_INSTANCE_COUNT = 2  # 初始实例数量
MODEL_NAME = "qwen2-5-vl-7b"  # 模型名称

# 扩缩容配置
MIN_CAPACITY = 2   # 最小实例数
MAX_CAPACITY = 10  # 最大实例数
TARGET_INVOCATIONS_PER_INSTANCE = 60  # 每实例每分钟调用数目标值
TARGET_GPU_UTILIZATION = 70.0  # 目标GPU利用率
TARGET_GPU_MEM_UTILIZATION = 85.0  # 目标GPU显存利用率

# LMI容器配置
MAX_MODEL_LEN = 4096  # 最大序列长度
MAX_BATCH_SIZE = 32   # 最大批处理大小

print("配置参数:")
print(f"模型S3路径: {MODEL_S3_PATH}")
print(f"实例类型: {INSTANCE_TYPE}")
print(f"初始实例数: {INITIAL_INSTANCE_COUNT}")
print(f"扩缩容范围: {MIN_CAPACITY}-{MAX_CAPACITY}")

## 3. 创建LMI配置

创建serving.properties配置文件并打包：

In [None]:
# 清理并创建配置目录
!rm -rf lmi_config
os.makedirs("lmi_config", exist_ok=True)

# 创建serving.properties配置
serving_config = f"""engine=Python
option.model_id={MODEL_S3_PATH}
option.dtype=fp16
option.rolling_batch=vllm
option.tensor_parallel_degree=1
option.device_map=auto
option.max_model_len={MAX_MODEL_LEN}
option.max_rolling_batch_size={MAX_BATCH_SIZE}
option.use_v2_block_manager=true
option.enable_streaming=false
"""

# 保存配置文件
with open("lmi_config/serving.properties", "w") as f:
    f.write(serving_config)

print("serving.properties内容:")
print(serving_config)

# 打包配置
with tarfile.open("lmi_config.tar.gz", "w:gz") as tar:
    tar.add("lmi_config", arcname="lmi_config")

print("\n配置文件已打包为 lmi_config.tar.gz")

## 4. 部署到SageMaker

创建模型并部署到端点：

In [None]:
# 使用最新的LMI容器
image_uri = f"763104351884.dkr.ecr.{region}.amazonaws.com/djl-inference:0.33.0-lmi15.0.0-cu128"
print(f"使用LMI容器: {image_uri}")

# 上传配置包到S3
s3_code_prefix = "large-model-lmi/code"
code_artifact = sess.upload_data("lmi_config.tar.gz", bucket, s3_code_prefix)
print(f"配置包上传到: {code_artifact}")

# 创建模型
model = Model(image_uri=image_uri, model_data=code_artifact, role=role)
print("SageMaker模型已创建")

In [None]:
# 生成端点名称
endpoint_name = f"vlm-{MODEL_NAME}-{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}"
print(f"端点名称: {endpoint_name}")

# 部署端点 (异步)
print("开始部署端点...")
predictor = model.deploy(
    initial_instance_count=INITIAL_INSTANCE_COUNT,
    instance_type=INSTANCE_TYPE,
    endpoint_name=endpoint_name,
    wait=False  # 异步部署，不等待完成
)

print(f"✅ 端点部署已启动: {endpoint_name}")
print("⏱️ 预计部署时间: 10-15分钟")

# 保存端点名称供后续使用
with open("endpoint_name.save", "w") as f:
    f.write(endpoint_name)
    
print(f"\n📍 端点名称已保存到 endpoint_name.save")

## 5. 等待端点就绪

**重要**: 必须等待端点状态变为 `InService` 后才能配置扩缩容！

In [None]:
import time

# 创建SageMaker客户端
sm_client = boto3.client('sagemaker', region_name=region)

def check_endpoint_status(endpoint_name):
    try:
        response = sm_client.describe_endpoint(EndpointName=endpoint_name)
        return response['EndpointStatus']
    except Exception as e:
        return f"Error: {e}"

# 等待端点就绪
print(f"等待端点就绪: {endpoint_name}")
print("这可能需要10-15分钟...")

start_time = time.time()
while True:
    status = check_endpoint_status(endpoint_name)
    elapsed = int(time.time() - start_time)
    
    print(f"\r⏱️  {elapsed//60:02d}:{elapsed%60:02d} - 状态: {status}", end="", flush=True)
    
    if status == "InService":
        print(f"\n\n✅ 端点已就绪! 总耗时: {elapsed//60}分{elapsed%60}秒")
        print("现在可以配置扩缩容了")
        break
    elif status in ["Failed", "OutOfService"]:
        print(f"\n\n❌ 端点部署失败: {status}")
        # 获取失败原因
        try:
            response = sm_client.describe_endpoint(EndpointName=endpoint_name)
            if 'FailureReason' in response:
                print(f"失败原因: {response['FailureReason']}")
        except:
            pass
        break
    
    time.sleep(30)  # 每30秒检查一次
    
    # 超时检查 (30分钟)
    if elapsed > 1800:
        print(f"\n\n⚠️  部署超时，请检查SageMaker控制台")
        break

## 6. 配置弹性扩缩容

**仅在端点状态为 InService 时执行此步骤！**

In [None]:
# 创建Auto Scaling客户端
autoscaling_client = boto3.client('application-autoscaling', region_name=region)

# 注册扩缩容目标
print("注册扩缩容目标...")
try:
    autoscaling_client.register_scalable_target(
        ServiceNamespace='sagemaker',
        ResourceId=f'endpoint/{endpoint_name}/variant/AllTraffic',
        ScalableDimension='sagemaker:variant:DesiredInstanceCount',
        MinCapacity=MIN_CAPACITY,
        MaxCapacity=MAX_CAPACITY
    )
    print(f"✅ 扩缩容目标已注册: {MIN_CAPACITY}-{MAX_CAPACITY}个实例")
except Exception as e:
    print(f"❌ 注册扩缩容目标失败: {e}")

In [None]:
# 创建扩缩容策略
print("创建扩缩容策略...")
try:
    # 方案1: 基于调用频率扩缩容 (当前使用)
    autoscaling_client.put_scaling_policy(
        PolicyName=f'vlm-scaling-policy-{MODEL_NAME}',
        ServiceNamespace='sagemaker',
        ResourceId=f'endpoint/{endpoint_name}/variant/AllTraffic',
        ScalableDimension='sagemaker:variant:DesiredInstanceCount',
        PolicyType='TargetTrackingScaling',
        TargetTrackingScalingPolicyConfiguration={
            'TargetValue': TARGET_INVOCATIONS_PER_INSTANCE,
            'PredefinedMetricSpecification': {
                'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance'
            },
            'ScaleOutCooldown': 180,  # 3分钟快速扩容
            'ScaleInCooldown': 300    # 5分钟谨慎缩容
        }
    )
    
    # 方案2: 基于GPU利用率扩缩容 (示例代码，未启用)
    # autoscaling_client.put_scaling_policy(
    #     PolicyName=f'vlm-gpu-scaling-{MODEL_NAME}',
    #     ServiceNamespace='sagemaker',
    #     ResourceId=f'endpoint/{endpoint_name}/variant/AllTraffic',
    #     ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    #     PolicyType='TargetTrackingScaling',
    #     TargetTrackingScalingPolicyConfiguration={
    #         'TargetValue': TARGET_GPU_UTILIZATION,
    #         'CustomizedMetricSpecification': {
    #             'MetricName': 'GPUUtilization',
    #             'Namespace': 'AWS/SageMaker',
    #             'Dimensions': [
    #                 {'Name': 'EndpointName', 'Value': endpoint_name},
    #                 {'Name': 'VariantName', 'Value': 'AllTraffic'}
    #             ],
    #             'Statistic': 'Average'
    #         },
    #         'ScaleOutCooldown': 180,  # 3分钟快速扩容
    #         'ScaleInCooldown': 300   # 5分钟谨慎缩容
    #     }
    # )

    # 方案3: 基于GPU内存利用率扩缩容 (示例代码，未启用)
    # autoscaling_client.put_scaling_policy(
    #     PolicyName=f'vlm-gpu-memory-scaling-{MODEL_NAME}',
    #     ServiceNamespace='sagemaker',
    #     ResourceId=f'endpoint/{endpoint_name}/variant/AllTraffic',
    #     ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    #     PolicyType='TargetTrackingScaling',
    #     TargetTrackingScalingPolicyConfiguration={
    #         'TargetValue': TARGET_GPU_MEM_UTILIZATION,
    #         'CustomizedMetricSpecification': {
    #             'MetricName': 'GPUMemoryUtilization',
    #             'Namespace': 'AWS/SageMaker',
    #             'Dimensions': [
    #                 {'Name': 'EndpointName', 'Value': endpoint_name},
    #                 {'Name': 'VariantName', 'Value': 'AllTraffic'}
    #             ],
    #             'Statistic': 'Average'
    #         },
    #         'ScaleOutCooldown': 120,  # GPU内存满时快速扩容
    #         'ScaleInCooldown': 600   # 保守缩容避免频繁变化
    #     }
    # )

    print(f"✅ 扩缩容策略已创建")
    print(f"   监控指标: 每实例调用数")
    print(f"   目标阈值: {TARGET_INVOCATIONS_PER_INSTANCE}次/实例/分钟")
    print(f"   扩容冷却: 3分钟")
    print(f"   缩容冷却: 5分钟")
except Exception as e:
    print(f"❌ 创建扩缩容策略失败: {e}")

## 7. 推理调用示例

端点就绪后，测试推理调用：

In [None]:
# 创建SageMaker Runtime客户端
smr_client = boto3.client("sagemaker-runtime", region_name=region)

def encode_image(image_path):
    """将图像文件编码为base64字符串"""
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode('utf-8')

def call_vlm_endpoint(endpoint_name, text_prompt, image_path=None, max_tokens=512):
    """调用VLM端点进行推理"""
    
    # 构建消息内容
    content = [{"type": "text", "text": text_prompt}]
    
    # 如果有图像，添加图像内容
    if image_path and os.path.exists(image_path):
        encoded_image = encode_image(image_path)
        image_url = f"data:image/png;base64,{encoded_image}"
        content.append({"type": "image_url", "image_url": {"url": image_url}})
    
    # 构建请求
    prompt = {
        "messages": [
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": content}
        ],
        "temperature": 0.0,
        "max_tokens": max_tokens,
    }
    
    try:
        # 调用端点
        response = smr_client.invoke_endpoint(
            EndpointName=endpoint_name,
            ContentType="application/json",
            Body=json.dumps(prompt),
        )
        
        # 解析响应
        result = json.loads(response["Body"].read().decode("utf-8"))
        return result["choices"][0]["message"]["content"]
        
    except Exception as e:
        return f"调用失败: {e}"

print("推理调用函数已定义")

### 测试文本推理

In [None]:
# 测试纯文本推理
print("测试文本推理...")
text_prompt = "请介绍一下人工智能的发展历史"
print(f"\n问题: {text_prompt}")

result = call_vlm_endpoint(endpoint_name, text_prompt)
print(f"\n回答: {result}")

### 测试视觉推理

In [None]:
# 测试视觉推理 (需要提供图像文件)
image_path = "test_image.jpg"  # 修改为你的测试图像路径

if os.path.exists(image_path):
    print(f"测试视觉推理: {image_path}")
    vision_prompt = "请详细描述这张图片的内容"
    print(f"\n问题: {vision_prompt}")
    
    result = call_vlm_endpoint(endpoint_name, vision_prompt, image_path)
    print(f"\n回答: {result}")
else:
    print(f"测试图像不存在: {image_path}")
    print("请上传测试图像文件，或修改image_path变量")

## 8. 部署总结

显示部署信息和后续步骤：

In [None]:
print("🎉 VLM生产级部署完成!")
print("\n📋 部署信息:")
print(f"   端点名称: {endpoint_name}")
print(f"   实例类型: {INSTANCE_TYPE}")
print(f"   初始实例数: {INITIAL_INSTANCE_COUNT}")
print(f"   扩缩容范围: {MIN_CAPACITY}-{MAX_CAPACITY}")
print(f"   模型路径: {MODEL_S3_PATH}")

print("\n🔧 已配置功能:")
print("   ✅ LMI容器部署")
print("   ✅ 自动扩缩容")
print("   ✅ 负载均衡")
print("   ✅ 推理调用")

print("\n📊 监控建议:")
print("   - 在CloudWatch中监控GPUUtilization指标")
print("   - 设置ModelLatency告警 (建议<3秒)")
print("   - 监控InvocationsPerInstance指标")

print("\n💰 成本优化:")
print("   - 考虑购买SageMaker Savings Plans (最高64%折扣)")
print("   - 根据实际使用情况调整扩缩容参数")
print("   - 启用AWQ量化减少显存占用")

print("\n🔗 相关资源:")
print(f"   - SageMaker控制台: https://console.aws.amazon.com/sagemaker/home?region={region}#/endpoints/{endpoint_name}")
print(f"   - CloudWatch指标: https://console.aws.amazon.com/cloudwatch/home?region={region}#metricsV2:graph=~();search=SageMaker")

print("\n⚠️  重要提醒:")
print("   - 不使用时请删除端点以避免产生费用")
print("   - 定期检查和优化扩缩容策略")
print("   - 建议在生产环境中启用VPC端点")

## 9. 清理资源 (可选)

如果需要删除端点以停止计费：

In [None]:
# 取消注释以下代码来删除端点
# 警告: 这将删除端点并停止服务

# print(f"删除端点: {endpoint_name}")
# try:
#     sm_client.delete_endpoint(EndpointName=endpoint_name)
#     print("✅ 端点删除请求已提交")
# except Exception as e:
#     print(f"❌ 删除端点失败: {e}")