# 1. 安装和加载使用Step Functions Data Science SDK必需的模块

In [None]:
import sys
!{sys.executable} -m pip install --upgrade stepfunctions

In [120]:
import sagemaker
import time
import random
import uuid
import logging
import stepfunctions
import io

from stepfunctions import steps
from stepfunctions.steps import TransformStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow
from stepfunctions.template import TrainingPipeline
from stepfunctions.template.utils import replace_parameters_with_jsonpath

session = sagemaker.Session()
stepfunctions.set_stream_logger(level=logging.INFO)

##定义Sagemaker的路径
bucket = 'sagemaker-cn-north-1-1234567890'
prefix = 'demo'
bucket_path = 's3://sagemaker-cn-north-1-1234567890'

# 2. 创建一个Role附加到Step Functions

## 2.1 关联的策略如下所示，并将策略附加到创建的Role

In [None]:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sagemaker:CreateTransformJob",
                "sagemaker:DescribeTransformJob",
                "sagemaker:StopTransformJob",
                "sagemaker:CreateTrainingJob",
                "sagemaker:DescribeTrainingJob",
                "sagemaker:StopTrainingJob",
                "sagemaker:CreateHyperParameterTuningJob",
                "sagemaker:DescribeHyperParameterTuningJob",
                "sagemaker:StopHyperParameterTuningJob",
                "sagemaker:CreateModel",
                "sagemaker:CreateEndpointConfig",
                "sagemaker:CreateEndpoint",
                "sagemaker:DeleteEndpointConfig",
                "sagemaker:DeleteEndpoint",
                "sagemaker:UpdateEndpoint",
                "sagemaker:ListTags",
                "sqs:SendMessage",
                "sns:Publish",
                "ecs:RunTask",
                "ecs:StopTask",
                "ecs:DescribeTasks",
                "glue:StartJobRun",
                "glue:GetJobRun",
                "glue:GetJobRuns",
                "glue:BatchStopJobRun"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "iam:PassRole"
            ],
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": "sagemaker.amazonaws.com"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "events:PutTargets",
                "events:PutRule",
                "events:DescribeRule"
            ],
            "Resource": [
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTrainingJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTransformJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTuningJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForECSTaskRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForBatchJobsRule"
            ]
        }
    ]
}

In [95]:
import sagemaker

## SageMaker执行角色
sagemaker_execution_role = sagemaker.get_execution_role()

## 从上方输出结果粘贴 StepFunctionsWorkflowExecutionRole ARN
workflow_execution_role = "arn:aws-cn:iam::1234567890:role/StepFunctionsWorkflowExecutionRole" 

# 3. 读取样本数据和加载模型

In [None]:
!ls

In [None]:
%cd /home/ec2-user/SageMaker/

In [None]:
!pwd

In [None]:
!pip install xgboost

In [None]:
!pip install Scikit-learn==0.20.0

In [96]:
import sklearn
print(sklearn.__version__)

0.20.0


In [97]:
import pandas as pd
import numpy as np
import s3fs

from xgboost import XGBClassifier

from sklearn.externals import joblib
##import joblib

import warnings
warnings.filterwarnings('ignore')

In [98]:
data_sample = pd.read_csv('sample_data.csv')

In [99]:
data_sample

Unnamed: 0,cust_id,trade_amt_last6m,pur_amt_online_last6m,pur_cnt_online_last6m,trade_amt_last3m,redem_cnt_online_last6m,pur_amt_last6m_growth,pur_amt_last6m_main,pur_amt_last1y_bond,mv_ttl_bond_type_avg_last3m,...,zz_cnt,pur_amt_last1m_stock,ret_rate_last2y_bond,pur_amt_last1y_award,fund_cnt_stock_type,pur_cnt_last1y_award,redem_amt_last1y_growth,redem_cnt_last1y_growth,pur_cnt_last6m_alltrack,p_dt
0,1069553680,0.00,0.0,0,0.00,0,0.0,0.0,0.0,0.000000,...,0,0.0,0.000000,0.0,1,0,0.00,0,0,20201231
1,1069554929,1500.00,0.0,0,900.00,0,0.0,0.0,0.0,0.000000,...,4,0.0,0.000000,0.0,1,0,11215.68,1,0,20201231
2,1069555867,108483.67,7800.0,3,33341.17,5,5800.0,7800.0,0.0,0.000000,...,5,0.0,0.000000,52800.0,2,22,53884.83,3,0,20201231
3,1069561326,100.00,100.0,1,100.00,0,0.0,0.0,100.0,100.091416,...,0,0.0,0.001918,0.0,0,0,0.00,0,0,20201231
4,1069568341,0.00,0.0,0,0.00,0,0.0,0.0,0.0,47852.314580,...,0,0.0,0.004741,0.0,0,0,0.00,0,0,20201231
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,1113893142,2800.00,0.0,0,1300.00,0,0.0,0.0,0.0,0.000000,...,2,0.0,0.000000,0.0,1,0,0.00,0,0,20201231
9996,1113894480,910.39,470.0,8,30.00,1,470.0,0.0,0.0,0.000000,...,2,0.0,0.000000,0.0,1,0,440.39,1,0,20201231
9997,1113895116,3750.00,0.0,0,1250.00,0,3750.0,0.0,0.0,0.000000,...,3,400.0,0.000000,0.0,1,0,0.00,0,0,20201231
9998,1113895561,1500.00,1500.0,3,0.00,0,1500.0,1500.0,0.0,0.000000,...,1,0.0,0.000000,1500.0,1,3,0.00,0,3,20201231


In [100]:
xgb_clf = joblib.load('loss_warning_model.pkl')

In [101]:
data_sample_new = data_sample.drop(['cust_id'],axis=1)

In [102]:
data_sample_new = data_sample_new.drop(['p_dt'],axis=1)

In [103]:
y_pred = xgb_clf.predict_proba(data_sample_new.iloc[:,:].values)[:,1]

In [104]:
yy_pred = xgb_clf.predict(data_sample_new.iloc[:,:].values)

# 4. 准备工作 BYOC（执行一次）

## 4.1 打包容器&推送容器到ECR

In [None]:
import os

import boto3
import sagemaker
from sagemaker import get_execution_role

region = boto3.session.Session().region_name
bucket = 'sagemaker-cn-north-1-1234567890'
prefix = 'demo'
role = get_execution_role()

In [None]:
%%writefile Dockerfile

From 450853457545.dkr.ecr.cn-north-1.amazonaws.com.cn/sagemaker-scikit-learn:0.20.0-cpu-py3
RUN pip install xgboost

In [None]:
account_id = boto3.client('sts').get_caller_identity().get('Account')
ecr_repository = 'sagemaker-batch-transform-demo'
tag = ':latest'
uri_suffix = 'amazonaws.com'
if region in ['cn-north-1', 'cn-northwest-1']:
    uri_suffix = 'amazonaws.com.cn'
processing_repository_uri = '{}.dkr.ecr.{}.{}/{}'.format(account_id, region, uri_suffix, ecr_repository + tag)
print(processing_repository_uri)
ecr = '{}.dkr.ecr.{}.{}'.format(account_id, region, uri_suffix)
print(ecr)
!aws ecr create-repository --repository-name $ecr_repository

In [None]:
%cd ~/SageMaker/docker

In [None]:
# if it said no basic auth for pull base image, use below cli (cn-north-1)
!aws ecr get-login-password --region $region | docker login --username AWS --password-stdin 
450853457545.dkr.ecr.cn-north-1.amazonaws.com.cn

In [None]:
!sudo aws ecr get-login-password --region $region | docker login --username AWS --password-stdin $ecr
# Create ECR repository and push docker image
!docker build -t $ecr_repository .

!docker tag {ecr_repository + tag} $processing_repository_uri
!docker push $processing_repository_uri

## 4.2 创建模型

In [None]:
# 打包模型
model = 'loss_warning_model.pkl'
model_uri = 's3://{}/{}/model/model.tar.gz'.format(bucket, prefix)

model_name = 'demo-model-1'

model_uri

In [None]:
# 上传到S3
!tar -czf model.tar.gz $model
!aws s3 cp model.tar.gz $model_uri

## 4.3 推理脚本（打包到SageMaker模型）

In [None]:
%%writefile transform_script.py

import pandas as pd
import numpy as np
import os
from xgboost import XGBClassifier
from sklearn.externals import joblib

def model_fn(model_dir):
    xgb_clf = joblib.load(os.path.join(model_dir,'loss_warning_model.pkl'))
    return xgb_clf

def predict_fn(input_data, model):
    pred_prob = model.predict_proba(input_data)[:,1:] #[:,1]
    return pred_prob

### 4.3.1 打包代码并上传到S3

In [None]:
transform_script = 'transform_script.py'
code_uri = 's3://{}/{}/code/sourcedir.tar.gz'.format(bucket, prefix)
code_uri

In [None]:
!tar -czf sourcedir.tar.gz $transform_script
!aws s3 cp sourcedir.tar.gz $code_uri

## 4.4 创建SageMaker模型

In [81]:
import boto3

client = boto3.client('sagemaker')

In [82]:
model_response = client.create_model(
    ModelName=model_name,
    PrimaryContainer={
        'Image': image_uri,
        'ImageConfig': {
            'RepositoryAccessMode': 'Platform'
        },
        'Mode': 'SingleModel',
        'ModelDataUrl': model_uri,
        'Environment': {
            'SAGEMAKER_CONTAINER_LOG_LEVEL':'20',
            'SAGEMAKER_PROGRAM': transform_script,
            'SAGEMAKER_REGION': region,
            'SAGEMAKER_SUBMIT_DIRECTORY':code_uri
        },
    },
    ExecutionRoleArn=role,
    EnableNetworkIsolation=False
)

# 5. 每次批量转换需要执行的代码（可执行多次）

## 5.1 使用SageMaker Batch Transform

### 5.1.1 输出结果不整合输入id

In [105]:
from time import gmtime, strftime

# This file doesn't contain header
data_file = 'sample_data_noheader.csv'

batch_job_name = 'Batch-Transform-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())

#upload sample data to S3
input_location = 's3://{}/{}/input/{}'.format(bucket, prefix, data_file)
output_location = 's3://{}/{}/output/{}'.format(bucket, prefix, batch_job_name)

In [106]:
from sagemaker.transformer import Transformer

##指定model_name,实例的数量和机型
tfm_transformer = Transformer(model_name='demo-model-1', 
                  instance_count=1, 
                  instance_type='ml.m4.xlarge', 
                  strategy=None, 
                  accept="text/csv",
                  assemble_with="Line",
                  output_path='s3://{}/{}/output/{}'.format(bucket, prefix, batch_job_name), 
                  max_concurrent_transforms=None, 
                  max_payload=None, 
                  tags=None, 
                  env=None, 
                  base_transform_job_name=batch_job_name, 
                  sagemaker_session=None)

# 6. 创建Step Functions Step

## 6.1 创建Glue Step，指定glue job name

In [107]:
etl_step = steps.GlueStartJobRunStep(
    'Extract, Transform, Load',
    parameters={"JobName":'glue-demo-job-0223',
               }
)

## 6.2 创建Batch Transform Step，指定job name和 model_name

In [None]:
transform_step = steps.TransformStep(
    'BatchPredictionStep',
    tfm_transformer,
    job_name='Batch-Transform-anytime', 
    model_name='demo-model-1',
    data='s3://{}/{}/input/{}'.format(bucket, prefix, data_file),
    content_type="text/csv",
    data_type='S3Prefix',
    split_type="Line",
    input_filter = "$[1:72]",
    join_source='Input', 
    output_filter="$[0,-1]"
)

## 6.3 串联上述步骤

In [109]:
from stepfunctions.steps.fields import Field

In [None]:
attrs=getattr(etl_step, "fields")
print(attrs)
attrs[Field.Resource.value] = "arn:aws-cn:states:::glue:startJobRun.sync"
setattr(etl_step, "fields", attrs)
print(attrs)

In [None]:
attrs=getattr(transform_step, "fields")
attrs[Field.Resource.value] = "arn:aws-cn:states:::sagemaker:createTransformJob.sync"
setattr(transform_step, "fields", attrs)
print(attrs)

In [110]:
workflow_definition = steps.Chain([
    etl_step,
    transform_step
])

# 7. 运行上述workflow，指定workflow name

In [114]:
workflow = Workflow(
    name='MyInferenceRoutine01',
    definition=workflow_definition,
    role=workflow_execution_role,
)

In [None]:
workflow.render_graph()

In [None]:
workflow.create()

In [None]:
execution = workflow.execute()

In [118]:
execution.render_progress()

In [None]:
execution.list_events(html=False)

In [None]:
workflow.list_executions(html=True)

In [None]:
Workflow.list_workflows(html=True)