# 使用 AWS Step Functions Data Science SDK 实施端到端 Amazon Personalize 模型部署流程

1. [简介](#Introduction)
2. [设置](#Setup)
3. [任务-状态](#Task-States)
4. [等待-状态](#Wait-States)
5. [选择-状态](#Choice-States)
6. [工作流](#Workflow)
7. [生成-推荐](#Generate-Recommendations)



## 简介

本文介绍使用 AWS Step Functions Data Science SDK 创建和管理 Amazon Personalize 工作流程。Step Functions SDK 是一个开源库，允许数据科学家使用 AWS Step Functions 轻松创建和执行机器学习工作流。有关 Step Functions SDK 的更多信息，请参阅以下内容。
* [AWS Step Functions](https://aws.amazon.com/step-functions/)
* [AWS Step Functions 开发人员指南](https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html)
* [AWS Step Functions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io)

在本文中，我们将使用 SDK 逐步创建 Personalize 资源，将资源链接在一起以构建工作流，然后在 AWS Step Functions 中执行。

有关 Amazon Personalize 的更多信息，请参阅以下内容。

* [Amazon Personalize](https://aws.amazon.com/personalize/)


## 设置

### 从 SDK 导入所需的模块

In [None]:
#import sys
#!{sys.executable} -m pip install --upgrade stepfunctions

In [None]:
import boto3
import json
import numpy as np
import pandas as pd
import time

personalize = boto3.client('personalize')
personalize_runtime = boto3.client('personalize-runtime')


import stepfunctions
import logging

from stepfunctions.steps import *
from stepfunctions.workflow import Workflow

stepfunctions.set_stream_logger(level=logging.INFO)

workflow_execution_role = "<Workflow exection role name>" # paste the StepFunctionsWorkflowExecutionRole ARN from above

### 设置 S3 位置和文件名

In [None]:
bucket = "<Bucket Name>"       # replace with the name of your S3 bucket
filename = "<File Name>"  # replace with a name that you want to save the dataset under

### 设置 IAM 角色

#### 为 Step Functions 创建执行角色

您需要一个执行角色，以便您可以在 Step Functions 中创建和执行工作流。

1. 转到 [IAM 控制台](https://console.aws.amazon.com/iam/)
2. 选择 **Roles**（角色），然后选择 **Create role**（创建角色）。
3. 在 **Choose the service that will use this role**（选择将使用此角色的服务）下选择 **Step Functions**
4. 选择 **Next**（下一步），直至您可以输入 **Role name**（角色名称）
5. 输入名称，例如 `StepFunctionsWorkflowExecutionRole`，然后选择 **Create role**（创建角色）


将策略附加至您创建的角色。以下步骤附加的策略可提供对 Step Functions 的完全访问权限，但是作为一个良好实践，您应该只提供对所需资源的访问权限。

1. 在 **Permissions**（权限）选项卡下，单击 **Add inline policy**（添加内联策略）
2. 在 **JSON** 选项卡中输入以下内容

```json
{
    "Version": "2012-10-17",
    "Statement": [
    
        {
            "Effect": "Allow",
            "Action": [
                "personalize:*"
            ],
            "Resource": "*"
        },   

        {
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "iam:PassRole"
            ],
            "Resource": "*",
        },
        {
            "Effect": "Allow",
            "Action": [
                "events:PutTargets",
                "events:PutRule",
                "events:DescribeRule"
            ],
            "Resource": "*"
        }
    ]
}
```

3. 选择 **Review policy**（审核策略）并为策略命名，例如 `StepFunctionsWorkflowExecutionPolicy`
4. 选择 **Create policy**（创建策略）。您将被重定向到该角色的详细信息页面。
5. 复制 **Summary**（摘要）顶部的 **Role ARN**（角色 ARN）



In [None]:
lambda_state_role = LambdaStep(
    state_id="create bucket and role",
    parameters={  
        "FunctionName": "stepfunction_create_personalize_role", #replace with the name of the function you created
        "Payload": {  
           "bucket": bucket
        }
    },
    result_path='$'
 
)

lambda_state_role.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_role.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("CreateRoleTaskFailed")
))

#### 将策略附加到 S3 存储桶

In [None]:
s3 = boto3.client("s3")

policy = {
    "Version": "2012-10-17",
    "Id": "PersonalizeS3BucketAccessPolicy",
    "Statement": [
        {
            "Sid": "PersonalizeS3BucketAccessPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "personalize.amazonaws.com"
            },
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::{}".format(bucket),
                "arn:aws:s3:::{}/*".format(bucket)
                
            ]
        }
    ]
}

s3.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy))

# AmazonPersonalizeFullAccess provides access to any S3 bucket with a name that includes "personalize" or "Personalize" 
# if you would like to use a bucket with a different name, please consider creating and attaching a new policy
# that provides read access to your bucket or attaching the AmazonS3ReadOnlyAccess policy to the role


#### 创建 Personalize 角色


In [None]:
iam = boto3.client("iam")

role_name = "<Role Name>" # Create a personalize role


assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "personalize.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
    ]
}

create_role_response = iam.create_role(
    RoleName = role_name,
    AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
)



policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonPersonalizeFullAccess"
iam.attach_role_policy(
    RoleName = role_name,
    PolicyArn = policy_arn
)

time.sleep(60) # wait for a minute to allow IAM role policy attachment to propagate

role_arn = create_role_response["Role"]["Arn"]


## 数据准备

### 下载、准备和上载训练数据

In [None]:
!pwd

In [None]:
!wget -N http://files.grouplens.org/datasets/movielens/ml-100k.zip
!unzip -o ml-100k.zip
data = pd.read_csv('./ml-100k/u.data', sep='\t', names=['USER_ID', 'ITEM_ID', 'RATING', 'TIMESTAMP'])
pd.set_option('display.max_rows', 5)
data



In [None]:
data = data[data['RATING'] > 2]                # keep only movies rated 2 and above
data2 = data[['USER_ID', 'ITEM_ID', 'TIMESTAMP']] 
data2.to_csv(filename, index=False)

boto3.Session().resource('s3').Bucket(bucket).Object(filename).upload_file(filename)

## 任务-状态

### Lambda 任务状态

Step Functions 中的 `Task` 状态表示工作流执行的单个工作单元。任务可以调用 Lambda 函数并编排其他 AWS 服务。请参阅 *AWS Step Functions 开发人员指南*中的 [AWS 服务集成](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-service-integrations.html)。

下面创建一个名为 `lambda_state` 的 [LambdaStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.LambdaStep)，然后将选项配置为在 Lambda 函数失败时 [Retry](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-error-handling.html#error-handling-retrying-after-an-error)（重试）。

#### 创建 Lambda 函数

此工作流程中的 Lambda 任务状态使用 Lambda 函数 **（Python 3.x）**，该函数返回 Personalize 资源，例如架构、数据集组、数据集、解决方案、解决方案版本等。在 [Lambda 控制台](https://console.aws.amazon.com/lambda/) 中创建以下函数。

1. stepfunction-create-schema
2. stepfunctioncreatedatagroup
3. stepfunctioncreatedataset
4. stepfunction-createdatasetimportjob
5. stepfunction_select-recipe_create-solution
6. stepfunction_create_solution_version
7. stepfunction_getsolution_metric_create_campaign

从存储库中的 ./Lambda/ 文件夹复制/粘贴相应的 Lambda 函数代码


#### 创建架构

In [None]:
lambda_state_schema = LambdaStep(
    state_id="create schema",
    parameters={  
        "FunctionName": "stepfunction-create-schema", #replace with the name of the function you created
        "Payload": {  
           "input": "personalize-stepfunction-schema263"
        }
    },
    result_path='$'    
)

lambda_state_schema.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_schema.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("CreateSchemaTaskFailed")
))

#### 创建数据集组

In [None]:
lambda_state_datasetgroup = LambdaStep(
    state_id="create dataset Group",
    parameters={  
        "FunctionName": "stepfunctioncreatedatagroup", #replace with the name of the function you created
        "Payload": {  
           "input": "personalize-stepfunction-dataset-group", 
           "schemaArn.$": '$.Payload.schemaArn'
        }
    },

    result_path='$'
)



lambda_state_datasetgroup.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))


lambda_state_datasetgroup.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("CreateDataSetGroupTaskFailed")
))

#### 创建数据集

In [None]:
lambda_state_createdataset = LambdaStep(
    state_id="create dataset",
    parameters={  
        "FunctionName": "stepfunctioncreatedataset", #replace with the name of the function you created
#        "Payload": {  
#           "schemaArn.$": '$.Payload.schemaArn',
#           "datasetGroupArn.$": '$.Payload.datasetGroupArn',
            
            
#        }
        
        "Payload": {  
           "schemaArn.$": '$.schemaArn',
           "datasetGroupArn.$": '$.datasetGroupArn',        
        } 
        
        
    },
    result_path = '$'
)

lambda_state_createdataset.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_createdataset.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("CreateDataSetTaskFailed")
))

#### 创建数据集导入作业

In [None]:
lambda_state_datasetimportjob = LambdaStep(
    state_id="create dataset import job",
    parameters={  
        "FunctionName": "stepfunction-createdatasetimportjob", #replace with the name of the function you created
        "Payload": {  
           "datasetimportjob": "stepfunction-createdatasetimportjob",
           "dataset_arn.$": '$.Payload.dataset_arn',
           "datasetGroupArn.$": '$.Payload.datasetGroupArn',
           "bucket_name": bucket,
           "file_name": filename,
           "role_arn": role_arn
            
        }
    },

    result_path = '$'
)

lambda_state_datasetimportjob.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_datasetimportjob.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("DatasetImportJobTaskFailed")
))

#### 创建配方和解决方案

In [None]:
lambda_state_select_receipe_create_solution = LambdaStep(
    state_id="select receipe and create solution",
    parameters={  
        "FunctionName": "stepfunction_select-recipe_create-solution", #replace with the name of the function you created
        "Payload": {  
           #"dataset_group_arn.$": '$.Payload.datasetGroupArn' 
            "dataset_group_arn.$": '$.datasetGroupArn'
        }
    },
    result_path = '$'
)

lambda_state_select_receipe_create_solution.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_select_receipe_create_solution.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("DatasetReceiptCreateSolutionTaskFailed")
))

#### 创建解决方案版本

In [None]:
lambda_create_solution_version = LambdaStep(
    state_id="create solution version",
    parameters={  
        "FunctionName": "stepfunction_create_solution_version", 
        "Payload": {  
           "solution_arn.$": '$.Payload.solution_arn'           
        }
    },
    result_path = '$'
)

lambda_create_solution_version.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_create_solution_version.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("CreateSolutionVersionTaskFailed")
))

#### 创建活动

In [None]:
lambda_create_campaign = LambdaStep(
    state_id="create campaign",
    parameters={  
        "FunctionName": "stepfunction_getsolution_metric_create_campaign", 
        "Payload": {  
            #"solution_version_arn.$": '$.Payload.solution_version_arn'  
            "solution_version_arn.$": '$.solution_version_arn'
        }
    },
    result_path = '$'
)

lambda_create_campaign.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_create_campaign.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("CreateCampaignTaskFailed")
))

## 等待-状态

#### Step Functions 中的 `Wait` 状态会在特定的时间内进行等待。请参阅 AWS Step Functions Data Science SDK 文档中的 [Wait](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Wait)（等待）。

#### 等待架构准备就绪

In [None]:
wait_state_schema = Wait(
    state_id="Wait for create schema - 5 secs",
    seconds=5
)

#### 等待数据集组准备就绪

In [None]:
wait_state_datasetgroup = Wait(
    state_id="Wait for datasetgroup - 30 secs",
    seconds=30
)

#### 等待数据集准备就绪

In [None]:
wait_state_dataset = Wait(
    state_id="wait for dataset - 30 secs",
    seconds=30
)

#### 等待数据集导入作业状态变为 ACTIVE

In [None]:
wait_state_datasetimportjob = Wait(
    state_id="Wait for datasetimportjob - 30 secs",
    seconds=30
)

#### 等待配方准备就绪

In [None]:
wait_state_receipe = Wait(
    state_id="Wait for receipe - 30 secs",
    seconds=30
)

#### 等待解决方案版本状态变为 ACTIVE

In [None]:
wait_state_solutionversion = Wait(
    state_id="Wait for solution version - 60 secs",
    seconds=60
)

#### 等待活动状态变为 ACTIVE

In [None]:
wait_state_campaign = Wait(
    state_id="Wait for Campaign - 30 secs",
    seconds=30
)



### 检查 lambda 任务的状态并采取相应措施

#### 如果状态失败，则移至 `Fail` 状态。请参阅 AWS Step Functions Data Science SDK 文档中的 [Fail](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Fail)（失败）。

### 检查数据集组状态

In [None]:
lambda_state_datasetgroupstatus = LambdaStep(
    state_id="check dataset Group status",
    parameters={  
        "FunctionName": "stepfunction_waitforDatasetGroup", #replace with the name of the function you created
        "Payload": {  
           "input.$": '$.Payload.datasetGroupArn',
           "schemaArn.$": '$.Payload.schemaArn'
        }
    },
    result_path = '$'
)

lambda_state_datasetgroupstatus.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_datasetgroupstatus.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("DatasetGroupStatusTaskFailed")
))

### 检查数据集导入作业状态

In [None]:
lambda_state_datasetimportjob_status = LambdaStep(
    state_id="check dataset import job status",
    parameters={  
        "FunctionName": "stepfunction_waitfordatasetimportjob", #replace with the name of the function you created
        "Payload": {  
           "dataset_import_job_arn.$": '$.Payload.dataset_import_job_arn',
           "datasetGroupArn.$": '$.Payload.datasetGroupArn'
        }
    },
    result_path = '$'
)

lambda_state_datasetimportjob_status.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_datasetimportjob_status.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("DatasetImportJobStatusTaskFailed")
))

### 检查解决方案版本状态

In [None]:

solutionversion_succeed_state = Succeed(
    state_id="The Solution Version ready?"
)

In [None]:
lambda_state_solutionversion_status = LambdaStep(
    state_id="check solution version status",
    parameters={  
        "FunctionName": "stepfunction_waitforSolutionVersion", #replace with the name of the function you created
        "Payload": {  
           "solution_version_arn.$": '$.Payload.solution_version_arn'           
        }
    },
    result_path = '$'
)

lambda_state_solutionversion_status.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_solutionversion_status.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("SolutionVersionStatusTaskFailed")
))

### 检查活动状态

In [None]:
lambda_state_campaign_status = LambdaStep(
    state_id="check campaign status",
    parameters={  
        "FunctionName": "stepfunction_waitforCampaign", #replace with the name of the function you created
        "Payload": {  
           "campaign_arn.$": '$.Payload.campaign_arn'           
        }
    },
    result_path = '$'
)

lambda_state_campaign_status.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_campaign_status.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("CampaignStatusTaskFailed")
))

## 选择-状态

现在，将分支附加到您之前创建的 Choice（选择）状态。请参阅 [AWS Step Functions Data Science SDK 文档](https://aws-step-functions-data-science-sdk.readthedocs.io)中的 *Choice Rules*（选择规则）。

#### 将定义工作流路径的步骤链接在一起

以下单元格将您在上面创建的步骤链接到一个顺序组中。新路径依次包括您之前创建的 Lambda 状态、等待状态和成功状态。

#### 将工作流路径的步骤链接在一起之后，我们将定义和可视化工作流。

In [None]:
create_campaign_choice_state = Choice(
    state_id="Is the Campaign ready?"
)

In [None]:
create_campaign_choice_state.add_choice(
    rule=ChoiceRule.StringEquals(variable=lambda_state_campaign_status.output()['Payload']['status'], value='ACTIVE'),
    next_step=Succeed("CampaignCreatedSuccessfully")     
)
create_campaign_choice_state.add_choice(
    ChoiceRule.StringEquals(variable=lambda_state_campaign_status.output()['Payload']['status'], value='CREATE PENDING'),
    next_step=wait_state_campaign
)
create_campaign_choice_state.add_choice(
    ChoiceRule.StringEquals(variable=lambda_state_campaign_status.output()['Payload']['status'], value='CREATE IN_PROGRESS'),
    next_step=wait_state_campaign
)

create_campaign_choice_state.default_choice(next_step=Fail("CreateCampaignFailed"))


In [None]:
solutionversion_choice_state = Choice(
    state_id="Is the Solution Version ready?"
)

In [None]:
solutionversion_succeed_state = Succeed(
    state_id="The Solution Version ready?"
)

In [None]:
solutionversion_choice_state.add_choice(
    rule=ChoiceRule.StringEquals(variable=lambda_state_solutionversion_status.output()['Payload']['status'], value='ACTIVE'),
    next_step=solutionversion_succeed_state   
)
solutionversion_choice_state.add_choice(
    ChoiceRule.StringEquals(variable=lambda_state_solutionversion_status.output()['Payload']['status'], value='CREATE PENDING'),
    next_step=wait_state_solutionversion
)
solutionversion_choice_state.add_choice(
    ChoiceRule.StringEquals(variable=lambda_state_solutionversion_status.output()['Payload']['status'], value='CREATE IN_PROGRESS'),
    next_step=wait_state_solutionversion
)

solutionversion_choice_state.default_choice(next_step=Fail("create_solution_version_failed"))


In [None]:
datasetimportjob_succeed_state = Succeed(
    state_id="The Solution Version ready?"
)

In [None]:
datasetimportjob_choice_state = Choice(
    state_id="Is the DataSet Import Job ready?"
)

In [None]:
datasetimportjob_choice_state.add_choice(
    rule=ChoiceRule.StringEquals(variable=lambda_state_datasetimportjob_status.output()['Payload']['status'], value='ACTIVE'),
    next_step=datasetimportjob_succeed_state   
)
datasetimportjob_choice_state.add_choice(
    ChoiceRule.StringEquals(variable=lambda_state_datasetimportjob_status.output()['Payload']['status'], value='CREATE PENDING'),
    next_step=wait_state_datasetimportjob
)
datasetimportjob_choice_state.add_choice(
    ChoiceRule.StringEquals(variable=lambda_state_datasetimportjob_status.output()['Payload']['status'], value='CREATE IN_PROGRESS'),
    next_step=wait_state_datasetimportjob
)


datasetimportjob_choice_state.default_choice(next_step=Fail("dataset_import_job_failed"))


In [None]:
datasetgroupstatus_choice_state = Choice(
    state_id="Is the DataSetGroup ready?"
)

## 工作流

### 定义工作流

在以下单元格中，您将定义将在我们的工作流程中使用的步骤。然后，您将创建、可视化和执行工作流。

步骤与 AWS Step Functions 中的状态相关。有关更多信息，请参阅*AWS Step Functions 开发人员指南*中的 [States](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-states.html)（状态）。有关 AWS Step Functions Data Science SDK API 的更多信息，请参阅：https://aws-step-functions-data-science-sdk.readthedocs.io。




### 数据集工作流

In [None]:
Dataset_workflow_definition=Chain([lambda_state_schema,
                                   wait_state_schema,
                                   lambda_state_datasetgroup,
                                   wait_state_datasetgroup,
                                   lambda_state_datasetgroupstatus
                                  ])

In [None]:
Dataset_workflow = Workflow(
    name="Dataset-workflow",
    definition=Dataset_workflow_definition,
    role=workflow_execution_role
)

In [None]:
Dataset_workflow.render_graph()

In [None]:
DatasetWorkflowArn = Dataset_workflow.create()

### 数据集导入工作流

In [None]:
DatasetImport_workflow_definition=Chain([lambda_state_createdataset,
                                   wait_state_dataset,
                                   lambda_state_datasetimportjob,
                                   wait_state_datasetimportjob,
                                   lambda_state_datasetimportjob_status,
                                   datasetimportjob_choice_state
                                  ])

In [None]:
DatasetImport_workflow = Workflow(
    name="DatasetImport-workflow",
    definition=DatasetImport_workflow_definition,
    role=workflow_execution_role
)

In [None]:
DatasetImport_workflow.render_graph()

In [None]:
DatasetImportflowArn = DatasetImport_workflow.create()

配方和解决方案工作流

In [None]:
Create_receipe_sol_workflow_definition=Chain([lambda_state_select_receipe_create_solution,
                                   wait_state_receipe,
                                   lambda_create_solution_version,
                                   wait_state_solutionversion,
                                   lambda_state_solutionversion_status,
                                   solutionversion_choice_state
                                  ])

In [None]:
Create_receipe_sol_workflow = Workflow(
    name="Create_receipe_sol-workflow",
    definition=Create_receipe_sol_workflow_definition,
    role=workflow_execution_role
)

In [None]:
Create_receipe_sol_workflow.render_graph()

In [None]:
CreateReceipeArn = Create_receipe_sol_workflow.create()

创建活动工作流

In [None]:
Create_Campaign_workflow_definition=Chain([lambda_create_campaign,
                                   wait_state_campaign,
                                   lambda_state_campaign_status,
                                   wait_state_datasetimportjob,
                                   create_campaign_choice_state
                                  ])

In [None]:
Campaign_workflow = Workflow(
    name="Campaign-workflow",
    definition=Create_Campaign_workflow_definition,
    role=workflow_execution_role
)

In [None]:
Campaign_workflow.render_graph()

In [None]:
CreateCampaignArn = Campaign_workflow.create()

主工作流

In [None]:
call_dataset_workflow_state = Task(
    state_id="DataSetWorkflow",
    resource="arn:aws:states:::states:startExecution.sync:2",
    parameters={
                                "Input": "true",
                                #"StateMachineArn": "arn:aws:states:us-east-1:444602785259:stateMachine:Dataset-workflow",
                                "StateMachineArn": DatasetWorkflowArn
                }
)

In [None]:
call_datasetImport_workflow_state = Task(
    state_id="DataSetImportWorkflow",
    resource="arn:aws:states:::states:startExecution.sync:2",
    parameters={
                                 "Input":{
                                    "schemaArn.$": "$.Output.Payload.schemaArn",
                                    "datasetGroupArn.$": "$.Output.Payload.datasetGroupArn"
                                   },
                                "StateMachineArn": DatasetImportflowArn,
                }
)

In [None]:
call_receipe_solution_workflow_state = Task(
    state_id="ReceipeSolutionWorkflow",
    resource="arn:aws:states:::states:startExecution.sync:2",
    parameters={
                                 "Input":{
                                    "datasetGroupArn.$": "$.Output.Payload.datasetGroupArn"

                                   },
                                "StateMachineArn": CreateReceipeArn
                }
)

In [None]:
call_campaign_solution_workflow_state = Task(
    state_id="CampaignWorkflow",
    resource="arn:aws:states:::states:startExecution.sync:2",
    parameters={
                                 "Input":{
                                    "solution_version_arn.$": "$.Output.Payload.solution_version_arn"

                                   },
                                "StateMachineArn": CreateCampaignArn
                }
)

In [None]:
Main_workflow_definition=Chain([call_dataset_workflow_state,
                                call_datasetImport_workflow_state,
                                call_receipe_solution_workflow_state,
                                call_campaign_solution_workflow_state
                               ])

In [None]:
Main_workflow = Workflow(
    name="Main-workflow",
    definition=Main_workflow_definition,
    role=workflow_execution_role
)

In [None]:
Main_workflow.render_graph()

In [None]:
Main_workflow.create()

In [None]:
Main_workflow_execution = Main_workflow.execute()

Main_workflow_execution = Workflow(
    name="Campaign_Workflow",
    definition=path1,
    role=workflow_execution_role
)


In [None]:
#Main_workflow_execution.render_graph()

### 创建并执行工作流

在接下来的单元格中，我们将使用 [create](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.create)（创建）在 AWS Step Functions 中创建分支工作流，并使用 [execute](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.execute)（执行）执行该工作流。


In [None]:
#personalize_workflow.create()

In [None]:
#personalize_workflow_execution = happy_workflow.execute()

###  审核工作流进度

使用 [render_progress](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.render_progress) 审核工作流进度。

通过调用 [list_events](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.list_events) 审核执行历史记录以列出工作流执行中的所有事件。

In [None]:
Main_workflow_execution.render_progress()

In [None]:
Main_workflow_execution.list_events(html=True)

## 生成-推荐

### 现在我们有一个成功的活动，我们可以为这个活动生成推荐

#### 选择用户和项目

In [None]:
items = pd.read_csv('./ml-100k/u.item', sep='|', usecols=[0,1], encoding='latin-1')
items.columns = ['ITEM_ID', 'TITLE']


user_id, item_id, rating, timestamp = data.sample().values[0]

user_id = int(user_id)
item_id = int(item_id)

print("user_id",user_id)
print("items",items)


item_title = items.loc[items['ITEM_ID'] == item_id].values[0][-1]
print("USER: {}".format(user_id))
print("ITEM: {}".format(item_title))
print("ITEM ID: {}".format(item_id))


In [None]:
wait_recommendations = Wait(
    state_id="Wait for recommendations - 10 secs",
    seconds=10
)

#### Lambda 任务

In [None]:
lambda_state_get_recommendations = LambdaStep(
    state_id="get recommendations",
    parameters={  
        "FunctionName": "stepfunction_getRecommendations", 
        "Payload": {  
           "campaign_arn": 'arn:aws:personalize:us-east-1:261602857181:campaign/stepfunction-campaign',            
           "user_id": user_id,  
           "item_id": item_id             
        }
    },
    result_path = '$'
)

lambda_state_get_recommendations.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_get_recommendations.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("GetRecommendationTaskFailed")
    #next_step=recommendation_path   
))

#### 创建 Succeed（成功）状态

In [None]:
workflow_complete = Succeed("WorkflowComplete")

In [None]:
recommendation_path = Chain([ 
lambda_state_get_recommendations,
wait_recommendations,
workflow_complete
])

### 定义、创建、呈现和执行推荐工作流

在接下来的单元格中，我们将使用 [create](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.create)（创建）在 AWS Step Functions 中创建工作流，并使用 [execute](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.execute)（执行）执行该工作流。

In [None]:
recommendation_workflow = Workflow(
    name="Recommendation_Workflow4",
    definition=recommendation_path,
    role=workflow_execution_role
)



In [None]:
recommendation_workflow.render_graph()

In [None]:
recommendation_workflow.create()

In [None]:
recommendation_workflow_execution = recommendation_workflow.execute()

### 审核进度

使用 [render_progress](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.render_progress) 审核工作流进度。

通过调用 [list_events](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.list_events) 审核执行历史记录以列出工作流执行中的所有事件。

In [None]:
recommendation_workflow_execution.render_progress()

In [None]:
recommendation_workflow_execution.list_events(html=True)


In [None]:
item_list = recommendation_workflow_execution.get_output()['Payload']['item_list']

### 获得推荐

In [None]:
item_list = recommendation_workflow_execution.get_output()['Payload']['item_list']

print("Recommendations:")
for item in item_list:
    np.int(item['itemId'])
    item_title = items.loc[items['ITEM_ID'] == np.int(item['itemId'])].values[0][-1]
    print(item_title)


## 清理 Amazon Personalize 资源

确保清理 Amazon Personalize 和状态机创建的博客文章。登录 Amazon Personalize 控制台并删除数据集组、数据集、解决方案、配方和活动等资源。 

## 清理状态机资源

In [None]:
Campaign_workflow.delete()

recommendation_workflow.delete()

Main_workflow.delete()

Create_receipe_sol_workflow.delete()

DatasetImport_workflow.delete()

Dataset_workflow.delete()
