# 使用 SageMaker Pipelines 创建自动模型Fine-Tuning工作流程

<a name='1'></a>
## Set up Kernel and Required Dependencies

First, check that the correct kernel is chosen.

<img src="img/kernel_set_up.png" width="300"/>

You can click on that to see and check the details of the image, kernel, and instance type.

<img src="img/w3_kernel_and_instance_type.png" width="600"/>

# NOTE:  THIS NOTEBOOK WILL TAKE ABOUT 30 MINUTES TO COMPLETE.

# PLEASE BE PATIENT.

# SageMaker Pipelines

亚马逊 SageMaker Pipelines 支持以下内容：

* **Pipelines** - 编排 SageMaker 作业和资源创建的步骤和条件的定向无环图。
* **处理作业步骤** - 在 SageMaker 上运行数据处理工作负载（例如特征工程、数据验证、模型评估和模型解释）的简化托管体验。
* **Training Job Steps** - 一个迭代过程，通过展示训练数据集中的示例，教导模型做出预测。
* **条件步骤** - 提供管道中分支的条件执行。
* **注册模型** - 在模型注册表中创建模型包资源，该资源可用于在 Amazon SageMaker 中创建可部署的模型。
* **参数化执行** - 允许管道执行因提供的参数而异。
* **Transform Job Steps** - 一种批量变换，用于预处理数据集，以消除干扰数据集训练或推理的噪音或偏见，从大型数据集中获取推断，并在不需要永久端点时运行推理。

# 设计Pipeline

在处理步骤中，我们使用 HuggingFace/ 中的 “transformer” 库执行特征工程来标记我们的对话输入

在训练步骤中，我们对模型进行了微调，以便在 “diagsum” 数据集中有效地总结对话。

在评估步骤中，我们将经过微调的模型和测试数据集作为输入，并生成一个包含基于 ROUGE 指标的评估指标的 JSON 文件进行汇总。

在条件步骤中，如果模型的指标（由我们的评估步骤确定）超过某个值，我们将决定是否注册此模型。 

In [3]:
import psutil

notebook_memory = psutil.virtual_memory()
print(notebook_memory)

if notebook_memory.total < 32 * 1000 * 1000 * 1000:
    print('*******************************************')    
    print('YOU ARE NOT USING THE CORRECT INSTANCE TYPE')
    print('PLEASE CHANGE INSTANCE TYPE TO  m5.2xlarge ')
    print('*******************************************')
else:
    correct_instance_type=True

svmem(total=33229983744, available=16399966208, percent=50.6, used=16420958208, free=3103346688, active=17451048960, inactive=11602255872, buffers=1634304, cached=13704044544, shared=1429504, slab=844406784)


In [4]:
from botocore.exceptions import ClientError

import os
import sagemaker
import logging
import boto3
import sagemaker
import pandas as pd

sess = sagemaker.Session()
bucket = sess.default_bucket()
region = boto3.Session().region_name

import botocore.config

config = botocore.config.Config(
    user_agent_extra='dsoaws/2.0'
)

sm = boto3.Session().client(service_name="sagemaker", 
                            region_name=region,
                            config=config)
s3 = boto3.Session().client(service_name="s3", 
                            region_name=region,
                            config=config)

In [5]:
%store -r role

# 设置 S3 源位置

In [6]:
%store -r raw_input_data_s3_uri

In [7]:
try:
    raw_input_data_s3_uri
except NameError:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN THE PREVIOUS NOTEBOOK ")
    print("You did not have the required datasets.       ")
    print("++++++++++++++++++++++++++++++++++++++++++++++")

In [8]:
print(raw_input_data_s3_uri)

s3://sagemaker-us-east-1-941797585610/data-summarization/


In [9]:
if not raw_input_data_s3_uri:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN THE PREVIOUS NOTEBOOK ")
    print("You did not have the required datasets.       ")
    print("++++++++++++++++++++++++++++++++++++++++++++++")
else:
    print("[OK]")

[OK]


# 将管道作为 “实验” 进行跟踪

In [10]:
import time

In [12]:
# 初始设置正在运行的管道执行和已完成的管道执行的数量为0
running_executions = 0
completed_executions = 0

try:
    # 使用 'list_pipeline_executions' 方法获取指定管道的所有执行信息
    # 'PipelineName' 参数指定了管道的名称
    # 'SortOrder' 参数指定了排序顺序，这里设为 "Descending"，即按降序排序
    existing_pipeline_executions_response = sm.list_pipeline_executions(
        PipelineName=pipeline_name,
        SortOrder="Descending",
    )

    # 检查响应中是否包含 'PipelineExecutionSummaries' 键
    # 如果包含，说明获取到了管道执行信息
    if "PipelineExecutionSummaries" in existing_pipeline_executions_response.keys():
        # 如果管道执行信息的数量大于0，进一步处理第一个执行信息
        if len(existing_pipeline_executions_response["PipelineExecutionSummaries"]) > 0:
            execution = existing_pipeline_executions_response["PipelineExecutionSummaries"][0]
            # 检查执行信息中是否包含 'PipelineExecutionStatus' 键
            # 如果包含，根据状态更新正在运行和已完成的执行数量
            if "PipelineExecutionStatus" in execution:
                if execution["PipelineExecutionStatus"] == "Executing":
                    running_executions = running_executions + 1
                else:
                    completed_executions = completed_executions + 1

            # 打印出正在运行和已完成的执行数量
            print(
                "[INFO] You have {} Pipeline execution(s) currently running and {} execution(s) completed.".format(
                    running_executions, completed_executions
                )
            )
    # 如果响应中不包含 'PipelineExecutionSummaries' 键，打印出提示信息
    else:
        print("[OK] Please continue.")
# 如果尝试获取执行信息时发生异常，忽略异常
except:
    pass

# 如果没有正在运行的执行，创建一个新的管道名称，然后打印出这个名称
if running_executions == 0:
    timestamp = int(time.time())
    pipeline_name = "dialogue-summary-pipeline-{}".format(timestamp)
    print("Created Pipeline Name: " + pipeline_name)

exception
Created Pipeline Name: dialogue-summary-pipeline-1694260461


In [13]:
print(pipeline_name)

dialogue-summary-pipeline-1694260461


In [14]:
%store pipeline_name

Stored 'pipeline_name' (str)


Amazon SageMaker Experiments 是 Amazon SageMaker 的一个功能，它允许你组织、跟踪和比较你的机器学习 (ML) 训练实验。

每个 SageMaker Experiment 是一系列的训练作业，这些作业可能有共享的目标（例如，优化相同的模型架构），但有不同的超参数或输入数据。你可以创建一个新的 Experiment，添加训练作业（被称为 "trial"）到这个 Experiment，然后比较这些 trial 的结果来找出最佳的模型配置。

每个 trial 都包含一个或多个 trial components，这些组件可以代表数据预处理步骤、训练步骤或模型评估步骤。你可以在 trial 之间共享 trial components，这样就可以跟踪和比较不同步骤的效果。

SageMaker Experiments 还提供了一个 SDK，你可以用这个 SDK 在 Python 脚本中创建和管理 Experiments 和 trials。你也可以使用 SageMaker Studio，这是一个基于 Web 的 IDE，它提供了一个可视化界面来查看和比较 Experiments 和 trials 的结果。

总的来说，SageMaker Experiments 是一个强大的工具，它可以帮助你更有效地进行机器学习模型的迭代和优化。

In [15]:
# 使用 IPython 的 %store 命令从 notebook 的全局环境中恢复 'pipeline_experiment_name' 变量
%store -r pipeline_experiment_name

# 从 'smexperiments.experiment' 包中导入 'Experiment' 类
# 'Experiment' 类是用于在 SageMaker Experiments 中创建和管理实验的类
from smexperiments.experiment import Experiment

# 使用 'Experiment.create' 方法创建一个新的实验
# 'experiment_name' 参数指定了实验的名称，这里使用之前定义的 'pipeline_name' 变量
# 'description' 参数指定了实验的描述
# 'sagemaker_boto_client' 参数指定了用于与 AWS 服务通信的 Boto3 客户端
pipeline_experiment = Experiment.create(
    experiment_name=pipeline_name,
    description="Dialogue Summarization Pipeline Experiment",
    sagemaker_boto_client=sm,
)

# 获取创建的实验的名称，并保存到 'pipeline_experiment_name' 变量中
pipeline_experiment_name = pipeline_experiment.experiment_name

# 打印出创建的实验的名称
print("Created Pipeline Experiment Name: {}".format(pipeline_experiment_name))

no stored variable or alias pipeline_experiment_name
Created Pipeline Experiment Name: dialogue-summary-pipeline-1694260461


In [16]:
print(pipeline_experiment_name)

dialogue-summary-pipeline-1694260461


In [17]:
%store pipeline_experiment_name

Stored 'pipeline_experiment_name' (str)


# 创建 `Trial`

In [18]:
# 从 smexperiments.trial 包中导入 Trial 类
# Trial 类用于在 Amazon SageMaker Experiments 中创建和管理试验
# 试验是一种实验的组成部分，通常用于跟踪不同的模型训练作业或模型版本
from smexperiments.trial import Trial

In [19]:
# 使用 IPython 的 %store 命令从 notebook 的全局环境中恢复 'pipeline_trial_name' 变量
%store -r pipeline_trial_name

# 获取当前的 Unix 时间戳
timestamp = int(time.time())

# 使用 'Trial.create' 方法创建一个新的试验
# 'trial_name' 参数指定了试验的名称，这里使用 Unix 时间戳创建一个唯一的名称
# 'experiment_name' 参数指定了试验所属的实验的名称，这里使用之前创建的实验的名称
# 'sagemaker_boto_client' 参数指定了用于与 AWS 服务通信的 Boto3 客户端
pipeline_trial = Trial.create(
    trial_name="trial-{}".format(timestamp), 
    experiment_name=pipeline_experiment_name, 
    sagemaker_boto_client=sm
)

# 获取创建的试验的名称，并保存到 'pipeline_trial_name' 变量中
pipeline_trial_name = pipeline_trial.trial_name

# 打印出创建的试验的名称
print("Created Trial Name: {}".format(pipeline_trial_name))

no stored variable or alias pipeline_trial_name
Created Trial Name: trial-1694260842


In [20]:
print(pipeline_trial_name)

trial-1694260842


In [21]:
%store pipeline_trial_name

Stored 'pipeline_trial_name' (str)


# 定义参数以参数化管道执行

我们定义工作流程参数，通过这些参数可以对管道进行参数化，并更改在流水线执行和计划中注入和使用的值，而无需修改流水线定义。

支持的参数类型包括：

* `parameterString`-代表一个 `str` Python 类型
* `parameterInteger`-表示 “int” Python 类型
* `parameterFloat`-表示 “float” Python 类型

这些参数支持提供默认值，该值可以在管道执行时被覆盖。指定的默认值应为该参数类型的实例。

In [23]:
# 从 sagemaker.workflow.parameters 模块导入以下类

# ParameterString: 用于定义一个字符串参数
from sagemaker.workflow.parameters import ParameterString

# ParameterInteger: 用于定义一个整数参数
from sagemaker.workflow.parameters import ParameterInteger

# ParameterFloat: 用于定义一个浮点数参数
from sagemaker.workflow.parameters import ParameterFloat

# Feature Engineering步骤

In [24]:
%store -r raw_input_data_s3_uri

In [25]:
print(raw_input_data_s3_uri)

s3://sagemaker-us-east-1-941797585610/data-summarization/


In [26]:
!aws s3 ls $raw_input_data_s3_uri

2023-09-05 03:06:01    6544107 dialogsum-1.csv
2023-09-05 03:06:01    6572423 dialogsum-2.csv


# 设置 Pipeline 参数
这些参数由整个pipeline使用。

In [27]:
%store -r model_checkpoint

In [28]:
try:
    model_checkpoint
except NameError:
    print("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] Please run the notebooks in the PREPARE section before you continue.")
    print("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")

In [29]:
print(model_checkpoint)

google/flan-t5-base


In [32]:
# 定义了一个 ParameterString 类型的参数，名为 ModelCheckpoint，并且默认值为 model_checkpoint
# 这一参数可以在 SageMaker 工作流中被使用，以便在运行工作流时进行动态配置
model_checkpoint = ParameterString(
    name="ModelCheckpoint",
    default_value=model_checkpoint,
)

TypeError: The default value specified does not match the Parameter Python type.

# 设置 Processing 参数

In [36]:
# 定义一个名为 "InputData" 的字符串参数，它的默认值是 raw_input_data_s3_uri
# 这个参数可能被用来指定输入数据的 S3 URI
input_data = ParameterString(
    name="InputData",
    default_value=raw_input_data_s3_uri,
)

# 定义一个名为 "ProcessingInstanceCount" 的整数参数，它的默认值是 1
# 这个参数可能被用来指定处理步骤的实例数量
processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1,
)

# 定义一个名为 "ProcessingInstanceType" 的字符串参数，它的默认值是 "ml.c5.2xlarge"
# 这个参数可能被用来指定处理步骤的实例类型
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.c5.2xlarge",
)

# 定义一个名为 "TrainSplitPercentage" 的浮点数参数，它的默认值是 0.90
# 这个参数可能被用来指定训练数据集的划分比例
train_split_percentage = ParameterFloat(
    name="TrainSplitPercentage",
    default_value=0.90,
)

# 定义一个名为 "ValidationSplitPercentage" 的浮点数参数，它的默认值是 0.05
# 这个参数可能被用来指定验证数据集的划分比例
validation_split_percentage = ParameterFloat(
    name="ValidationSplitPercentage",
    default_value=0.05,
)

# 定义一个名为 "TestSplitPercentage" 的浮点数参数，它的默认值是 0.05
# 这个参数可能被用来指定测试数据集的划分比例
test_split_percentage = ParameterFloat(
    name="TestSplitPercentage",
    default_value=0.05,
)

我们创建了一个 `sklearnProcessor` 处理器的实例，然后在 `ProcessingStep` 中使用它。

In [37]:
# 从 sagemaker.sklearn.processing 模块导入 SKLearnProcessor 类 
from sagemaker.sklearn.processing import SKLearnProcessor

# 创建一个 SKLearnProcessor 对象
processor = SKLearnProcessor(
    framework_version="0.23-1",  # 使用的 Scikit-learn 版本
    role=role,  # AWS IAM 角色，用于授权 Amazon SageMaker 访问 AWS 资源
    instance_type=processing_instance_type,  # 处理任务的实例类型
    instance_count=processing_instance_count,  # 处理任务的实例数量
    env={"AWS_DEFAULT_REGION": region},  # 环境变量，设定 AWS 默认区域
    max_runtime_in_seconds=432000,  # 处理任务的最大运行时间（秒）
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


### _Ignore any `WARNING` ^^ above ^^._

### 设置 Pipeline 步骤缓存
使用 [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601#Durations) 格式将管道步骤缓存一段时间。  

有关 SageMaker Pipeline 步骤缓存的更多详细信息在这里：https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html

In [38]:
# 在这段代码中，CacheConfig 配置了 SageMaker 工作流步骤的缓存设置
# 如果启用缓存 (enable_caching=True)，那么工作流步骤的输出会被缓存起来
# 如果后续的工作流运行与之前的运行在输入和参数上完全相同，那么可以直接使用缓存的输出结果，而不用再次运行该步骤

# 从 sagemaker.workflow.steps 模块导入 CacheConfig 类
from sagemaker.workflow.steps import CacheConfig

# 创建一个 CacheConfig 对象
cache_config = CacheConfig(
    enable_caching=True,  # 启用缓存
    # 缓存的过期时间是用 ISO 8601 格式的时间间隔字符串表示的，这里的 "PT1H" 表示缓存的过期时间是1小时。
    expire_after="PT1H"  # 缓存的过期时间，这里是1小时后
)

最后，我们使用处理器实例构造一个 `ProcessingStep`，以及输入和输出通道以及将在管道调用管道执行时执行的代码。对于那些熟悉现有 Python SDK 的人来说，这与处理器实例的 `运行` 方法非常相似。

请注意作为步骤本身的输入数据传递给 `ProcessingStep` 的 `input_data` 参数。处理器实例在运行时将使用这些输入数据。

另外，请注意处理作业的输出配置中指定的 `"train"`、`"validation"` 和 `"test"` 命名通道。这样的步骤 `Properties`可以在后续步骤中使用，并在执行时解析为其运行时值。特别是，我们将在定义训练步骤时提及这种用法。

In [39]:
# 在这段代码中，ProcessingStep 配置了一个数据预处理步骤的执行环境，包括输入、输出、执行的脚本、处理器、参数和缓存配置等
# 最后，这段代码打印了处理步骤的信息，以供查看和确认

# 从 sagemaker.processing 模块导入 ProcessingInput 和 ProcessingOutput 类
from sagemaker.processing import ProcessingInput, ProcessingOutput

# 从 sagemaker.workflow.steps 模块导入 ProcessingStep 类
from sagemaker.workflow.steps import ProcessingStep

# 定义处理步骤的输入
processing_inputs = [
    ProcessingInput(
        input_name="raw-input-data",  # 输入的名称
        source=input_data,  # 输入的 S3 URI
        destination="/opt/ml/processing/input/data/",  # 在处理容器内的目标目录
        s3_data_distribution_type="ShardedByS3Key",  # 数据分发类型
    )
]

# 定义处理步骤的输出
processing_outputs = [
    ProcessingOutput(
        output_name="train",  # 输出的名称
        s3_upload_mode="EndOfJob",  # S3 上传模式
        source="/opt/ml/processing/output/data/train",  # 在处理容器内的源目录
    ),
    ProcessingOutput(
        output_name="validation",  # 输出的名称
        s3_upload_mode="EndOfJob",  # S3 上传模式
        source="/opt/ml/processing/output/data/validation",  # 在处理容器内的源目录
    ),
    ProcessingOutput(
        output_name="test",  # 输出的名称
        s3_upload_mode="EndOfJob",  # S3 上传模式
        source="/opt/ml/processing/output/data/test",  # 在处理容器内的源目录
    ),
]

# 定义处理步骤
processing_step = ProcessingStep(
    name="Processing",  # 处理步骤的名称
    code="preprocess.py",  # 执行的脚本
    processor=processor,  # 使用的 processor
    inputs=processing_inputs,  # 输入
    outputs=processing_outputs,  # 输出
    job_arguments=[  # 工作参数
        "--train-split-percentage",
        str(train_split_percentage.default_value),
        "--validation-split-percentage",
        str(validation_split_percentage.default_value),
        "--test-split-percentage",
        str(test_split_percentage.default_value),
        "--model-checkpoint",
        str(model_checkpoint.default_value),
    ],
    cache_config=cache_config  # 缓存配置
)

# 打印处理步骤的信息
print(processing_step)

ProcessingStep(name='Processing', display_name=None, description=None, step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)


# Train 步骤

# 设置 Training 超参数

In [40]:
# 创建一个字符串参数对象，名为 "TrainInstanceType"，默认值为 "ml.c5.9xlarge"。
# 这个参数表示用于训练任务的 EC2 实例类型。
train_instance_type = ParameterString(name="TrainInstanceType", default_value="ml.c5.9xlarge")

# 创建一个整数参数对象，名为 "TrainInstanceCount"，默认值为 1。
# 这个参数表示用于训练任务的 EC2 实例数量。
train_instance_count = ParameterInteger(name="TrainInstanceCount", default_value=1)

In [41]:
# 创建一个整数参数，名为 "Epochs"，默认值为1。
# 这个参数表示模型训练时的迭代次数，也就是模型看过所有训练集数据的次数。
epochs = ParameterInteger(name="Epochs", default_value=1)

# 创建一个浮点数参数，名为 "LearningRate"，默认值为0.00001。
# 这个参数表示模型训练时的学习率，也就是模型在每次迭代时调整参数的速度。
learning_rate = ParameterFloat(name="LearningRate", default_value=0.00001)

# 创建一个浮点数参数，名为 "WeightDecay"，默认值为0.01。
# 这个参数表示模型训练时的权重衰减，用于防止模型过拟合。
weight_decay = ParameterFloat(name="WeightDecay", default_value=0.01)

# 创建一个整数参数，名为 "TrainBatchSize"，默认值为4。
# 这个参数表示在训练过程中一次性输入模型的数据数量。
train_batch_size = ParameterInteger(name="TrainBatchSize", default_value=4)

# 创建一个整数参数，名为 "ValidationBatchSize"，默认值为4。
# 这个参数表示在验证过程中一次性输入模型的数据数量。
validation_batch_size = ParameterInteger(name="ValidationBatchSize", default_value=4)

# 创建一个整数参数，名为 "TestBatchSize"，默认值为4。
# 这个参数表示在测试过程中一次性输入模型的数据数量。
test_batch_size = ParameterInteger(name="TestBatchSize", default_value=4)

# 创建一个整数参数，名为 "TrainVolumeSize"，默认值为1024。
# 这个参数表示训练任务使用的 EBS 卷的大小（单位：GB）。
train_volume_size = ParameterInteger(name="TrainVolumeSize", default_value=1024)

# 创建一个字符串参数，名为 "InputMode"，默认值为 "FastFile"。
# 这个参数表示输入模式，"FastFile" 表示使用 SageMaker FastFile 模式，这种模式可以更快地从 S3 下载数据。
input_mode = ParameterString(name="InputMode", default_value="FastFile")

# 创建一个浮点数参数，名为 "TrainSamplePercentage"，默认值为0.01。
# 这个参数表示用于训练的数据的比例。
train_sample_percentage = ParameterFloat(name="TrainSamplePercentage", default_value=0.01)

### 设置指标跟踪模型性能

In [42]:
# 定义 metrics_definitions 的列表，该列表包含两个字典，每个字典定义了一个要从训练日志中提取的指标
# 这些指标可以用来监控训练过程的进展

# 创建一个列表，列表中的每个元素都是一个字典，这些字典定义了要从训练日志中提取的指标。
metrics_definitions = [
    # 第一个字典定义了一个名为 "train:loss" 的指标，这个指标的值将从训练日志中通过正则表达式 "'train_loss': ([0-9\\.]+)" 提取。
    # 正则表达式 "'train_loss': ([0-9\\.]+)" 将匹配类似于 "'train_loss': 0.1234" 这样的文本，并提取出 0.1234 这样的数字作为 "train:loss" 指标的值。
    {"Name": "train:loss", "Regex": "'train_loss': ([0-9\\.]+)"},
    
    # 第二个字典定义了一个名为 "validation:loss" 的指标，这个指标的值将从训练日志中通过正则表达式 "'eval_loss': ([0-9\\.]+)" 提取。
    # 正则表达式 "'eval_loss': ([0-9\\.]+)" 将匹配类似于 "'eval_loss': 0.1234" 这样的文本，并提取出 0.1234 这样的数字作为 "validation:loss" 指标的值。
    {"Name": "validation:loss", "Regex": "'eval_loss': ([0-9\\.]+)"},
]

### 创建 Estimator

我们配置Estimator和输入数据集。典型的训练脚本从输入通道加载数据，使用超参数配置训练，训练模型，并将模型保存到 `“model_dir”`，以便日后可以托管使用。

我们还指定了保存模型的路径。

请注意，也可以使用传递的 `train_instance_type` 参数并将其传递到管道中的其他地方。在这种情况下，`train_instance_type` 被传递到Estimator中。

In [43]:
# 导入 sagemaker.pytorch 模块下的 PyTorch 类。这个类用于在 SageMaker 中配置和运行 PyTorch 训练任务。
from sagemaker.pytorch import PyTorch

# 导入 uuid 模块，用于生成唯一的标识符。
import uuid

# 生成一个唯一的前缀，用于在 S3 中存储训练过程中的模型检查点。
checkpoint_s3_prefix = "checkpoints/{}".format(str(uuid.uuid4()))

# 构造 S3 URI，用于指定模型检查点存储的位置。
checkpoint_s3_uri = "s3://{}/{}/".format(bucket, checkpoint_s3_prefix)

# 创建一个 PyTorch estimator。这个 estimator 包含了训练任务的所有配置信息。
estimator = PyTorch(
    # "train.py" 是训练脚本的文件名，它包含模型训练的代码。
    entry_point="train.py",
    
    # "src" 是训练脚本所在的目录。
    source_dir="src",
    
    # role 是 AWS IAM 角色，SageMaker 将使用这个角色来访问训练数据和模型输出。
    role=role,
    
    # 实例数量和实例类型是训练任务的硬件配置。
    instance_count=train_instance_count,
    instance_type=train_instance_type,
    
    # volume_size 是训练实例的磁盘大小（单位：GB）。
    volume_size=train_volume_size,
    
    # py_version 和 framework_version 分别指定了训练任务中 Python 和 PyTorch 的版本。
    py_version="py39",
    framework_version="1.13",
    
    # hyperparameters 字典包含了训练任务的超参数。
    hyperparameters={
        "epochs": epochs,
        "learning_rate": learning_rate,
        "weight_decay": weight_decay,        
        "train_batch_size": train_batch_size,
        "validation_batch_size": validation_batch_size,
        "test_batch_size": test_batch_size,
        "model_checkpoint": model_checkpoint,
        "train_sample_percentage": train_sample_percentage,
    },
    
    # input_mode 指定了数据输入模式，这里是 "FastFile"。
    input_mode=input_mode,
    
    # metric_definitions 列表定义了要从训练日志中提取的指标。
    metric_definitions=metrics_definitions,
)

### 设置 Training 步骤

最后，我们使用estimator实例构造一个 `TrainingStep` 以及之前的 `Properties` 的 `ProcessingStep`，在 `TrainingStep` 输入中用作输入，以及将在管道调用管道执行的执行代码。对于那些熟悉现有 Python SDK 的人来说，这与我们使用estimator实例构造一个的 `fit` 方法非常相似。

特别是，我们将 `"train"`、`"validation"` 和 `"test"` 输出通道的 `S3Uri` 传递给 `TrainingStep`。工作流步骤的 `properties` 属性与描述调用的相应响应的对象模型相匹配。这些属性可以作为占位符值引用，并在运行时解析或填充。例如，`ProcessingStep` `properties`属性与 [describeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) 响应对象的对象模型相匹配。

In [44]:
# 导入 sagemaker.inputs 模块下的 TrainingInput 类。这个类用于在 SageMaker 中配置训练输入。
from sagemaker.inputs import TrainingInput

# 导入 sagemaker.workflow.steps 模块下的 TrainingStep 类。这个类用于在 SageMaker Pipelines 中定义训练步骤。
from sagemaker.workflow.steps import TrainingStep

# 通过这个 training_step 对象，我们可以将训练步骤添加到 SageMaker Pipeline 中。
# 在运行 Pipeline 时，会按照步骤定义的顺序执行每个步骤，每个步骤的输出会被用作后续步骤的输入。

# 创建一个训练步骤。
training_step = TrainingStep(
    # "Train" 是这个训练步骤的名称。
    name="Train",
    
    # estimator 是前面创建的 PyTorch estimator，包含了训练任务的所有配置信息。
    estimator=estimator,
    
    # inputs 是一个字典，定义了训练、验证和测试数据的位置。这些位置是由前面的处理步骤生成的。
    inputs={
        "train": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
        ),
        "validation": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
        ),
        "test": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
        ),
    },
    
    # cache_config 是一个缓存配置对象，用于设置训练步骤的缓存策略。
    cache_config=cache_config,
)

# 打印训练步骤对象。
print(training_step)

TrainingStep(name='Train', display_name=None, description=None, step_type=<StepTypeEnum.TRAINING: 'Training'>, depends_on=None)




# 评估步骤

首先，我们开发了一个评估脚本，该脚本将在执行模型评估的处理步骤中指定。

评估脚本 `evaluate_model_metrics.py` 将经过训练的模型和测试数据集作为输入，并生成一个包含评估指标的 JSON 文件。

管道执行后，我们将检查生成的 `评估.json` 进行分析。

评估脚本：

* 部署模型
* 读取测试数据
* 根据测试数据进行大量预测
* 生成评估报告
* 将评估报告保存到评估目录中

接下来，我们创建一个 `sklearnProcessor` 的实例，然后在 `ProcessingStep` 中使用它。

请注意传递给处理器的 `processing_instance_type` 参数。

In [45]:
# 导入 sagemaker.sklearn.processing 模块下的 SKLearnProcessor 类。这个类用于在 SageMaker 中配置和运行 Scikit-learn 代码。
from sagemaker.sklearn.processing import SKLearnProcessor

# 通过这个 evaluation_processor 对象，我们可以调用其 run 方法来执行 Scikit-learn 代码
# 或者将其添加到 ProcessingStep 中，作为 SageMaker Pipeline 的一部分进行执行

# 创建一个 SKLearnProcessor 对象。
evaluation_processor = SKLearnProcessor(
    # 指定 Scikit-learn 的版本。
    framework_version="0.23-1",
    
    # role 是 AWS IAM 角色，SageMaker 将使用这个角色来访问数据和存储处理结果。
    role=role,
    
    # instance_type 和 instance_count 是处理任务的硬件配置。
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    
    # env 是一个字典，包含了传递给处理任务的环境变量。
    env={"AWS_DEFAULT_REGION": region},
    
    # max_runtime_in_seconds 是处理任务的最长运行时间（单位：秒）。
    max_runtime_in_seconds=432000,
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


### _Ignore any `WARNING` ^^ above ^^._

我们使用processor实例来构造 `ProcessingStep`，以及输入和输出通道以及将在管道调用管道执行时执行的代码。对于那些熟悉现有 Python SDK 的人来说，这与处理器实例的 `run` 方法非常相似。

`TrainingStep` 和 `ProcessingStep` 的 `properties` 属性分别与 [describeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) 和 [describeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) 响应对象的对象模型相匹配。

In [46]:
# 导入 sagemaker.workflow.properties 模块下的 PropertyFile 类。这个类用于在 SageMaker Pipeline 中定义属性文件。
from sagemaker.workflow.properties import PropertyFile

# evaluation_report 对象可以被添加到 ProcessingStep 中，作为其 property_files 参数的一部分。
# 在运行 Pipeline 时，ProcessingStep 会将指定路径的文件解析为一组属性，并将其作为输出提供给后续步骤
# evaluation_report 对象表示一个名为 "EvaluationReport" 的属性文件，这个文件由某个步骤生成并存储在 "evaluation.json" 文件中。
# 这个文件中可能包含了模型的各种评估指标，如精度、召回率等。
# 在 Pipeline 运行时，后续步骤可以通过 "metrics" 这个名称来引用这些指标。

# 创建一个 PropertyFile 对象。
evaluation_report = PropertyFile(
    # "EvaluationReport" 是这个属性文件的名称。
    name="EvaluationReport",
    
    # "metrics" 是输出的名称，这个名称将用于在后续步骤中引用这个输出。
    output_name="metrics",
    
    # "evaluation.json" 是属性文件在处理容器中的路径。
    path="evaluation.json",
)

In [48]:
# 创建一个处理步骤对象。
evaluation_step = ProcessingStep(
    # "EvaluateModel" 是这个处理步骤的名称。
    name="EvaluateModel",
    
    # processor 是前面创建的 SKLearnProcessor 对象，包含了处理任务的所有配置信息。
    processor=evaluation_processor,
    
    # "evaluate_model_metrics.py" 是要运行的 Python 脚本的路径。这个脚本应该包含模型评估的代码。
    code="evaluate_model_metrics.py",
    
    # inputs 是一个列表，定义了处理任务的输入数据的位置和在容器中的目标路径。
    inputs=[
        ProcessingInput(
            # source 是输入数据的 S3 路径。这里使用了前面的训练步骤的输出模型作为输入。
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            
            # destination 是在容器中的目标路径，输入数据将被下载到这个路径。
            destination="/opt/ml/processing/input/model"
        ),
        ProcessingInput(
            # source 是输入数据的 S3 路径。这里使用了前面的处理步骤的测试数据作为输入。
            source=processing_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            
            # destination 是在容器中的目标路径，输入数据将被下载到这个路径。
            destination="/opt/ml/processing/input/data"
        ),
    ],
    
    # outputs 是一个列表，定义了处理任务的输出数据的来源和存储位置。
    outputs=[
        ProcessingOutput(
            # source 是在容器中的来源路径，处理结果将从这个路径收集并上传到 S3。
            source="/opt/ml/processing/output/metrics/",
            
            # output_name 是输出的名称，这个名称将用于在后续步骤中引用这个输出。
            output_name="metrics",
            
            # s3_upload_mode 是数据上传到 S3 的模式。"EndOfJob" 表示在处理任务结束时上传数据。
            s3_upload_mode="EndOfJob"
        ),
    ],
    
    # property_files 是一个列表，定义了处理任务输出的属性文件。这些文件中的内容将被解析为一组属性，并可以在后续步骤中引用。
    property_files=[evaluation_report],
)

In [50]:
# 导入 SageMaker 库中的 MetricsSource 和 ModelMetrics 类
from sagemaker.model_metrics import MetricsSource, ModelMetrics

# 使用 ModelMetrics 类创建一个新的 model_metrics 对象
model_metrics = ModelMetrics(
    # 指定模型统计信息来源
    model_statistics=MetricsSource(
        # 使用 format 方法构建 s3_uri，该 URI 指向评估步骤输出的结果文件的位置
        # 其中，evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        # 是从处理步骤的输出配置中获取第一个输出的 S3Uri 的值
        s3_uri="{}/evaluation.json".format(
            evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        # 指定内容类型为 JSON
        content_type="application/json",
    )
)

# 打印 model_metrics 对象
print(model_metrics)

<sagemaker.model_metrics.ModelMetrics object at 0x7f6488a56bd0>


# 注册模型步骤

![](img/pipeline-5.png)

我们使用用于训练步骤的估计器实例来构造 `RegisterModel` 的实例。在管道中执行 `registerModel` 的结果是一个模型包。模型包是一种可重复使用的模型工件抽象，它打包了推理所需的所有要素。它主要由定义要使用的推理图像的推理规范以及可选的模型权重位置组成。

模型包组是模型包的集合。您可以针对特定的机器学习业务问题创建模型包组，并且可以继续向其中添加版本/模型包。通常，我们希望客户为 SageMaker 工作流管道创建 ModelPackageGroup，这样他们就可以在每次运行工作流管道时继续向该组添加版本/模型包。

对于那些熟悉现有 Python SDK 的人来说，`registerModel` 的构造与estimator实例的 `register` 方法非常相似。

特别是，我们从 `TrainingStep`、`step_train` 属性中传入 `s3modelArtifacts`。`TrainingStep` `properties`属性与 [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) 响应对象的对象模型相匹配。

值得注意的是，我们提供了一个特定的模型包组名称，稍后将在模型注册表和 CI/CD 工作中使用该名称。

In [51]:
# 导入 ParameterString 类，创建一个名为 "ModelApprovalStatus" 的参数，
# 默认值为 "PendingManualApproval"。这将用于设定模型批准的状态。
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")

# 导入 ParameterString 类，创建一个名为 "DeployInstanceType" 的参数，
# 默认值为 "ml.m5.4xlarge"。这将用于设定部署模型时使用的实例类型。
deploy_instance_type = ParameterString(name="DeployInstanceType", default_value="ml.m5.4xlarge")

# 导入 ParameterInteger 类，创建一个名为 "DeployInstanceCount" 的参数，
# 默认值为 1。这将用于设定部署模型时使用的实例数量。
deploy_instance_count = ParameterInteger(name="DeployInstanceCount", default_value=1)

In [52]:
# 导入 Python 的 time 模块
import time

# 使用 time 模块的 time 函数获取当前的 Unix 时间戳（以秒为单位），并将其转换为整数
timestamp = int(time.time())

# 使用 f-string（格式化字符串字面值）创建一个名为 "Summarization-{timestamp}" 的模型包组名，
# 其中 {timestamp} 会被上一步获得的时间戳替换。
model_package_group_name = f"Summarization-{timestamp}"

# 打印 model_package_group_name
print(model_package_group_name)

Summarization-1694913823


In [54]:
# 使用 sagemaker.image_uris.retrieve 函数获取适用于推理的 PyTorch 框架的 Docker 镜像 URI
inference_image_uri = sagemaker.image_uris.retrieve(
    # 指定机器学习框架为 "pytorch"
    framework="pytorch",
    # 指定 AWS 区域
    region=region,
    # 指定框架版本为 "1.13"
    version="1.13",
    # 指定实例类型，值从之前定义的 deploy_instance_type 参数获取
    instance_type=deploy_instance_type,
    # 指定镜像作用范围为 "inference"
    image_scope="inference",
)

# 打印 inference_image_uri
print(inference_image_uri)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py39


763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:1.13-cpu-py39


In [60]:
# 创建一个 RegisterModel 步骤，该步骤将在 SageMaker 工作流中执行，用于将训练好的模型注册到 SageMaker 模型注册表中，以便后续进行部署或其他操作。

# 导入 SageMaker 工作流的 RegisterModel 类
from sagemaker.workflow.step_collections import RegisterModel

# 创建一个 RegisterModel 实例，用于在 SageMaker 模型注册表中注册模型
register_step = RegisterModel(
    # 为模型指定一个名称 "Summarization"
    name="Summarization",
    # 指定一个估算器对象，通常包含了训练作业的配置
    estimator=estimator,
    # 指定用于推理的 Docker 镜像 URI，这里我们需要明确指定，因为默认情况下会使用训练镜像
    image_uri=inference_image_uri,
    # 指定模型数据的 S3 URI，这里从训练步骤的属性中获取
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    # 指定模型可以接受的输入数据类型，这里为 "application/jsonlines"
    content_types=["application/jsonlines"],
    # 指定模型可以返回的响应类型，这里为 "application/jsonlines"
    response_types=["application/jsonlines"],
    # 指定可用于推理的实例类型，这里从之前定义的 deploy_instance_type 参数获取
    inference_instances=[deploy_instance_type],
    # 指定可用于转换作业的实例类型，这里从之前定义的 deploy_instance_type 参数获取
    transform_instances=[deploy_instance_type],
    # 指定模型包组的名称，这里从之前生成的 model_package_group_name 变量获取
    model_package_group_name=model_package_group_name,
    # 指定模型的批准状态，这里从之前定义的 model_approval_status 参数获取
    approval_status=model_approval_status,
    # 指定模型的指标，这里从之前创建的 model_metrics 对象获取
    model_metrics=model_metrics,
)

# 为部署步骤创建模型

In [55]:
# 导入 SageMaker 的 Model 类
from sagemaker.model import Model

# 使用 f-string（格式化字符串字面值）创建一个名为 "model-{timestamp}" 的模型名，
# 其中 {timestamp} 会被上一步获得的时间戳替换。
model_name = "model-{}".format(timestamp)

# 使用 Model 类创建一个新的 model 对象
model = Model(
    # 指定模型的名称
    name=model_name,
    # 指定用于推理的 Docker 镜像 URI
    image_uri=inference_image_uri,
    # 指定模型数据的 S3 URI，这里从训练步骤的属性中获取
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    # 指定 SageMaker 会话，这通常包含了 SageMaker 运行的配置信息
    sagemaker_session=sess,
    # 指定 IAM 角色，SageMaker 会使用这个角色来执行训练和部署任务
    role=role,
)

In [56]:
# 导入 SageMaker 的 CreateModelInput 类
from sagemaker.inputs import CreateModelInput

# 使用 CreateModelInput 类创建一个新的 create_inputs 对象
create_inputs = CreateModelInput(
    # 指定实例类型，这通常是预定于模型部署的实例类型，
    # 这里从之前定义的 deploy_instance_type 参数获取
    instance_type=deploy_instance_type,
)

In [58]:
# 导入 SageMaker 工作流的 CreateModelStep 类
from sagemaker.workflow.steps import CreateModelStep

# 创建一个 CreateModelStep 实例，用于在 SageMaker 工作流中创建模型
create_step = CreateModelStep(
    # 为创建模型的步骤指定一个名称 "CreateModel"
    name="CreateModel",
    # 指定要创建的模型，这里从之前创建的 model 对象获取
    model=model,
    # 指定创建模型时的输入参数，这里从之前创建的 create_inputs 对象获取
    inputs=create_inputs,
)

# 条件部署步骤
![](img/pipeline-6.png)

最后，我们只想在模型的指标（由我们的评估步骤确定）超过给定阈值时才注册此模型。`ConditionStep` 允许管道支持基于步骤属性的条件在管道 DAG 中进行条件执行。

下面，我们执行以下操作：
* 根据评估步骤输出中找到的评估指标定义一个条件
* 在 `ConditionStep` 中使用条件列表中的条件
* 将 `registerModel` 步骤集合传递到 `ConditionStep` 的 `if_steps` 中

In [61]:
# 导入 SageMaker 工作流的条件和条件步骤类
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep, JsonGet
from sagemaker.workflow.parameters import ParameterFloat

# 创建一个名为 "MinRouge1Value" 的浮点参数，其默认值为 0.005
min_rouge_value = ParameterFloat(name="MinRouge1Value", default_value=0.005)

# 创建一个条件对象，该条件检查 evaluation_step 的 "metrics.eval_rouge1.value" 是否大于或等于 min_rouge_value
min_rouge_condition = ConditionGreaterThanOrEqualTo(
    # 从 evaluation_step 的 evaluation_report 属性文件中获取 "metrics.eval_rouge1.value"
    left=JsonGet(
        step=evaluation_step,
        property_file=evaluation_report,
        json_path="metrics.eval_rouge1.value",
    ),
    # 指定右边的值，这里是 min_rouge_value 参数
    right=min_rouge_value,
)

# 创建一个条件步骤，如果条件满足，则执行 register_step 和 create_step，否则，结束流程
min_rouge_condition_step = ConditionStep(
    # 为条件步骤指定一个名称 "EvaluationCondition"
    name="EvaluationCondition",
    # 指定条件，这里是上面创建的 min_rouge_condition
    conditions=[min_rouge_condition],
    # 如果条件满足，则执行的步骤，这里是 register_step 和 create_step
    if_steps=[register_step, create_step],
    # 如果条件不满足，则执行的步骤，这里为空，表示结束流程
    else_steps=[],
)

See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


# 定义由参数、步骤和条件组成的Pipeline

让我们把它全部绑定到一个工作流程Pipeline中，这样我们就可以执行它，甚至可以安排它。

管道需要 `name`, `parameters` 和 `steps`。在 `(account, region)` 对中，名称必须是唯一的，因此我们在名称上加上时间戳。


注意：

* 定义中使用的所有参数都必须存在。
* 传递到管道中的步骤不必按执行顺序排列。SageMaker Workflow 服务将在执行完成的步骤中解析 _data 依赖关系_ DAG。
* 对于工作流步骤列表或单个条件步骤 if/else 列表，步骤必须是唯一的。

# 将Pipeline提交给 SageMaker 进行执行 

让我们向工作流服务提交我们的Pipeline定义。工作流服务将使用传入的角色来创建步骤中定义的所有作业。

# 创建Pipeline

### _Ignore any `WARNING` below._

In [63]:
# 从sagemaker.workflow.pipeline导入Pipeline模块
from sagemaker.workflow.pipeline import Pipeline

# 初始化现有管道的数量为0
existing_pipelines = 0

# 调用sm.list_pipelines()方法来列出所有的管道，按管道名前缀排序，并且是降序排序
existing_pipelines_response = sm.list_pipelines(
    PipelineNamePrefix=pipeline_name,
    SortOrder="Descending",
)

# 检查返回的响应中是否包含"PipelineSummaries"这个关键字
if "PipelineSummaries" in existing_pipelines_response.keys():
    # 如果"PipelineSummaries"的长度大于0，说明已经有现有的管道
    if len(existing_pipelines_response["PipelineSummaries"]) > 0:
        existing_pipelines = existing_pipelines + 1
        # 打印出已经创建的管道的数量和名称
        print("[INFO] You already have created {} pipeline with name {}.".format(existing_pipelines, pipeline_name))
    else:
        pass

# 如果没有已经存在的管道，就创建一个新的管道
if existing_pipelines == 0:  
    # 使用Pipeline类创建一个新的管道，配置其名称、参数和步骤等信息
    pipeline = Pipeline(
        name=pipeline_name,
        parameters=[
            input_data,
            processing_instance_count,
            processing_instance_type,
            train_split_percentage,
            validation_split_percentage,
            test_split_percentage,
            train_instance_type,
            train_instance_count,
            epochs,
            learning_rate,
            weight_decay,
            train_sample_percentage,
            train_batch_size,
            validation_batch_size,
            test_batch_size,
            train_volume_size,
            input_mode,
            min_rouge_value,
            model_approval_status,
            deploy_instance_type,
            deploy_instance_count,
            model_checkpoint.to_string(),
        ],
        steps=[processing_step, training_step, evaluation_step, min_rouge_condition_step],
        sagemaker_session=sess,
    )

    # 使用upsert方法将新创建的管道更新或插入到SageMaker的管道列表中
    pipeline.upsert(role_arn=role)["PipelineArn"]
    # 打印出已经创建的管道的名称
    print("Created pipeline with name {}".format(pipeline_name))
else:
    # 如果已经存在一个同名的管道，就打印出提示信息
    print(
        "****************************************************************************************************************"
    )
    print(
        "You have already create a pipeline with the name {}. This is OK. Please continue to the next cell.".format(
            pipeline_name
        )
    )
    print(
        "****************************************************************************************************************"
    )

[INFO] You already have created 1 pipeline with name dialogue-summary-pipeline-1694260461.
****************************************************************************************************************
You have already create a pipeline with the name dialogue-summary-pipeline-1694260461. This is OK. Please continue to the next cell.
****************************************************************************************************************


### _Ignore any `WARNING` ^^ above ^^._

# 启动 Pipeline

### _Ignore any `WARNING` below._

In [64]:
# 初始化正在运行的执行数量和已完成的执行数量均为0
running_executions = 0
completed_executions = 0

# 如果存在管道
if existing_pipelines > 0:
    # 调用sm.list_pipeline_executions()方法列出所有管道执行，按执行名前缀排序，并且是降序排序
    existing_pipeline_executions_response = sm.list_pipeline_executions(
        PipelineName=pipeline_name,
        SortOrder="Descending",
    )

    # 检查返回的响应中是否包含"PipelineExecutionSummaries"这个关键字
    if "PipelineExecutionSummaries" in existing_pipeline_executions_response.keys():
        # 如果"PipelineExecutionSummaries"的长度大于0，说明已经有现有的管道执行
        if len(existing_pipeline_executions_response["PipelineExecutionSummaries"]) > 0:
            execution = existing_pipeline_executions_response["PipelineExecutionSummaries"][0]
            # 检查执行状态
            if "PipelineExecutionStatus" in execution:
                # 如果执行状态为"Executing"，则该执行正在运行
                if execution["PipelineExecutionStatus"] == "Executing":
                    running_executions = running_executions + 1
                else:
                    # 否则，该执行已完成
                    completed_executions = completed_executions + 1

            # 打印出当前正在运行的执行数量和已完成的执行数量
            print(
                "[INFO] You have {} Pipeline execution(s) currently running and {} execution(s) completed.".format(
                    running_executions, completed_executions
                )
            )
    else:
        pass
else:
    pass

# 如果没有正在运行的执行，就启动一个新的执行
if running_executions == 0:  
    # 使用start()方法启动管道执行
    execution = pipeline.start()
    running_executions = running_executions + 1
    # 打印出已经启动的管道的名称和执行的ARN
    print("Started pipeline {}.  Ignore any warnings above.".format(pipeline_name))
    print(execution.arn)
else:
    # 如果已经存在一个正在运行的执行，就打印出提示信息
    print(
        "********************************************************************************************************************"
    )
    print(
        "You have already launched {} pipeline execution(s).  This is OK.  Please continue to see the next cell.".format(
            running_executions
        )
    )
    print(
        "********************************************************************************************************************"
    )

arn:aws:sagemaker:us-east-1:941797585610:pipeline/dialogue-summary-pipeline-1694260461/execution/jf2hsti28rlc


### _Ignore any `WARNING` ^^ above ^^._

# 等待Pipeline完成

### _This next cell takes about 40 mins.  Please be patient._

In [65]:
# 使用 %%time 魔法命令计算代码块的运行时间
%%time

# 导入所需的模块
import time
from pprint import pprint

# 调用sm.list_pipeline_executions()方法列出所有管道执行，并获取第一个执行的状态
executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)["PipelineExecutionSummaries"]
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
# 打印出执行的状态
print(pipeline_execution_status)

# 当执行的状态是"Executing"（正在执行）时，继续检查执行状态
while pipeline_execution_status == "Executing":
    try:
        # 再次调用sm.list_pipeline_executions()方法获取执行的状态
        executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)["PipelineExecutionSummaries"]
        pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
    except Exception as e:
        # 如果发生异常，打印"Please wait..."并等待30秒后再次检查
        print("Please wait...")
        time.sleep(30)

# 打印出执行的响应
pprint(executions_response)

UsageError: Line magic function `%%time` not found.


### _Wait for the Pipeline ^^ Above ^^ to Complete_

# 列出完成后的Pipeline执行步骤和状态

In [None]:
# 从执行响应中获取管道执行的状态和ARN
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
pipeline_execution_arn = executions_response[0]["PipelineExecutionArn"]

# 打印出管道执行的状态和ARN
print("Pipeline execution status {}".format(pipeline_execution_status))
print("Pipeline execution arn {}".format(pipeline_execution_arn))

In [None]:
# 导入pprint库，用于更优美的打印输出
from pprint import pprint

# 调用sm.list_pipeline_execution_steps()方法，列出指定管道执行的所有步骤
steps = sm.list_pipeline_execution_steps(PipelineExecutionArn=pipeline_execution_arn)

# 使用pprint打印出所有步骤的详细信息
pprint(steps)

# 列出管道生成的所有Artifacts

In [None]:
# 初始化处理任务的名称和训练任务的名称为None
processing_job_name = None
training_job_name = None

In [None]:
# 导入需要的模块
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer

# 使用sagemaker的会话创建LineageTableVisualizer对象，用于显示管道执行的各个步骤与其输出的关系
viz = LineageTableVisualizer(sagemaker.session.Session())

# 逆序遍历管道执行的所有步骤
for execution_step in reversed(steps["PipelineExecutionSteps"]):
    # 打印出每个步骤的详细信息
    print(execution_step)
    # 检查步骤名称是否为"Processing"
    if execution_step["StepName"] == "Processing":
        # 如果是"Processing"步骤，获取处理任务的ARN，并解析出处理任务的名称
        processing_job_name = execution_step["Metadata"]["ProcessingJob"]["Arn"].split("/")[-1]
        # 打印出处理任务的名称
        print(processing_job_name)
        # 显示处理任务的血统表格
        display(viz.show(processing_job_name=processing_job_name))
    # 检查步骤名称是否为"Train"
    elif execution_step["StepName"] == "Train":
        # 如果是"Train"步骤，获取训练任务的ARN，并解析出训练任务的名称
        training_job_name = execution_step["Metadata"]["TrainingJob"]["Arn"].split("/")[-1]
        # 打印出训练任务的名称
        print(training_job_name)
        # 显示训练任务的血统表格
        display(viz.show(training_job_name=training_job_name))
    else:
        # 如果是其他步骤，直接显示该步骤的血统表格
        display(viz.show(pipeline_execution_step=execution_step))
        # 等待5秒，然后处理下一个步骤
        time.sleep(5)

## Add Execution Run as Trial to Experiments

In [None]:
# '-aws-processing-job' 是 ProcessingJob 默认分配的名称
# 格式化字符串，将处理任务的名称与'-aws-processing-job'拼接起来
processing_job_tc = "{}-aws-processing-job".format(processing_job_name)
# 打印出拼接后的处理任务名称
print(processing_job_tc)

In [None]:
# 调用 sm.associate_trial_component 方法将处理任务与试验关联起来
response = sm.associate_trial_component(TrialComponentName=processing_job_tc, TrialName=pipeline_trial_name)

# 这段脚本的主要功能是关联处理任务和试验。在Amazon SageMaker中，一个试验可以包含一个或多个试验组件（例如处理任务、训练任务等）。
# 通过调用sm.associate_trial_component方法，可以将处理任务（作为试验组件）与指定的试验关联起来。
# 这样，当查看试验的详细信息时，就可以看到该试验包含哪些试验组件，以及每个组件的详细信息和结果。

In [None]:
# '-aws-training-job' 是 TrainingJob 默认分配的名称
# 格式化字符串，将训练任务的名称与'-aws-training-job'拼接起来
training_job_tc = "{}-aws-training-job".format(training_job_name)
# 打印出拼接后的训练任务名称
print(training_job_tc)

# 这段脚本的主要功能是创建一个新的字符串training_job_tc，它是由训练任务的名称和字符串"-aws-training-job"拼接而成。
# 在Amazon SageMaker中，训练任务的名称通常以"-aws-training-job"为后缀。
# 这个新的字符串training_job_tc可能会被用作进一步查询训练任务的详细信息或结果的标识符

In [None]:
# 调用 sm.associate_trial_component 方法将训练任务与试验关联起来
response = sm.associate_trial_component(TrialComponentName=training_job_tc, TrialName=pipeline_trial_name)

# 这段脚本的主要功能是关联训练任务和试验。
# 在Amazon SageMaker中，一个试验可以包含一个或多个试验组件（例如处理任务、训练任务等）。
# 通过调用sm.associate_trial_component方法，您可以将训练任务（作为试验组件）与指定的试验关联起来。
# 这样，当您查看试验的详细信息时，就可以看到该试验包含哪些试验组件，以及每个组件的详细信息和结果。

# Release Resources

In [None]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>