# 瑜伽高低肩识别示例

## 环境初始化

In [1]:
! pip install sagemaker==2.16.1 --user

Collecting sagemaker==2.16.1
  Using cached sagemaker-2.16.1.tar.gz (306 kB)
Collecting smdebug-rulesconfig==0.1.5
  Using cached smdebug_rulesconfig-0.1.5-py2.py3-none-any.whl (6.2 kB)
Building wheels for collected packages: sagemaker
  Building wheel for sagemaker (setup.py) ... [?25ldone
[?25h  Created wheel for sagemaker: filename=sagemaker-2.16.1-py2.py3-none-any.whl size=435538 sha256=a91eaefe7c3a799d8c33e258c1d4f438ba302a4ed6fe1c63187107afef629fc7
  Stored in directory: /home/ec2-user/.cache/pip/wheels/e4/18/de/1f047858249ced343e3ebd5a93847e05c7a027363e252f0d87
Successfully built sagemaker
Installing collected packages: smdebug-rulesconfig, sagemaker
Successfully installed sagemaker-2.16.1 smdebug-rulesconfig-0.1.5


In [2]:
import json
import os
import time
import uuid

import boto3
import sagemaker

## 数据预处理
### 参数设置
下面参数设置将在后续使用

In [3]:
# S3 bucket for saving processing job outputs
# Feel free to specify a different bucket here if you wish.
sess = sagemaker.Session()
bucket = sess.default_bucket()
prefix = "/home/sagemaker-user/dy-ml/data_wrangler_flows"
flow_id = f"{time.strftime('%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}"
flow_name = f"flow-{flow_id}"
flow_uri = f"s3://{bucket}/{prefix}/{flow_name}.flow"

flow_file_name = "dy-data-processing.flow"

iam_role = sagemaker.get_execution_role()

container_uri = "174368400705.dkr.ecr.us-west-2.amazonaws.com/sagemaker-data-wrangler-container:1.0.1"

# Processing Job Resources Configurations
# Data wrangler processing job only supports 1 instance.
instance_count = 1
instance_type = "ml.m5.4xlarge"

# Processing Job Path URI Information
output_prefix = f"export-{flow_name}/output"
output_path = f"s3://{bucket}/{output_prefix}"
output_name = "21a6c8d8-e540-4d1a-ad10-7803bc6e7c66.default"

processing_job_name = f"data-wrangler-flow-processing-{flow_id}"

processing_dir = "/opt/ml/processing"

# Modify the variable below to specify the content type to be used for writing each output
# Currently supported options are 'CSV' or 'PARQUET', and default to 'CSV'
output_content_type = "CSV"

# URL to use for sagemaker client.
# If this is None, boto will automatically construct the appropriate URL to use
# when communicating with sagemaker.
sagemaker_endpoint_url = None

### 推送数据处理流程文件到S3

In [4]:
# Load .flow file
with open(flow_file_name) as f:
    flow = json.load(f)

# Upload to S3
s3_client = boto3.client("s3")
s3_client.upload_file(flow_file_name, bucket, f"{prefix}/{flow_name}.flow")

print(f"Data Wrangler Flow notebook uploaded to {flow_uri}")

Data Wrangler Flow notebook uploaded to s3://sagemaker-us-west-2-517141035927//home/sagemaker-user/dy-ml/data_wrangler_flows/flow-22-02-50-44-bcd068dd.flow


### 创建数据处理流程参数 


In [5]:
def create_flow_notebook_processing_input(base_dir, flow_s3_uri):
    return {
        "InputName": "flow",
        "S3Input": {
            "LocalPath": f"{base_dir}/flow",
            "S3Uri": flow_s3_uri,
            "S3DataType": "S3Prefix",
            "S3InputMode": "File",
        },
    }

def create_s3_processing_input(base_dir, name, dataset_definition):
    return {
        "InputName": name,
        "S3Input": {
            "LocalPath": f"{base_dir}/{name}",
            "S3Uri": dataset_definition["s3ExecutionContext"]["s3Uri"],
            "S3DataType": "S3Prefix",
            "S3InputMode": "File",
        },
    }

def create_redshift_processing_input(base_dir, name, dataset_definition):
    return {
        "InputName": name,
        "DatasetDefinition": {
            "RedshiftDatasetDefinition": {
                "ClusterId": dataset_definition["clusterIdentifier"],
                "Database": dataset_definition["database"],
                "DbUser": dataset_definition["dbUser"],
                "QueryString": dataset_definition["queryString"],
                "ClusterRoleArn": dataset_definition["unloadIamRole"],
                "OutputS3Uri": f'{dataset_definition["s3OutputLocation"]}{name}/',
                "OutputFormat": dataset_definition["outputFormat"].upper(),
            },
            "LocalPath": f"{base_dir}/{name}",
        },
    }

def create_athena_processing_input(base_dir, name, dataset_definition):
    return {
        "InputName": name,
        "DatasetDefinition": {
            "AthenaDatasetDefinition": {
                "Catalog": dataset_definition["catalogName"],
                "Database": dataset_definition["databaseName"],
                "QueryString": dataset_definition["queryString"],
                "OutputS3Uri": f'{dataset_definition["s3OutputLocation"]}{name}/',
                "OutputFormat": dataset_definition["outputFormat"].upper(),
            },
            "LocalPath": f"{base_dir}/{name}",
        },
    }

def create_processing_inputs(processing_dir, flow, flow_uri):
    """Helper function for creating processing inputs
    :param flow: loaded data wrangler flow notebook
    :param flow_uri: S3 URI of the data wrangler flow notebook
    """
    processing_inputs = []
    flow_processing_input = create_flow_notebook_processing_input(processing_dir, flow_uri)
    processing_inputs.append(flow_processing_input)

    for node in flow["nodes"]:
        if "dataset_definition" in node["parameters"]:
            data_def = node["parameters"]["dataset_definition"]
            name = data_def["name"]
            source_type = data_def["datasetSourceType"]

            if source_type == "S3":
                s3_processing_input = create_s3_processing_input(
                    processing_dir, name, data_def)
                processing_inputs.append(s3_processing_input)
            elif source_type == "Athena":
                athena_processing_input = create_athena_processing_input(
                    processing_dir, name, data_def)
                processing_inputs.append(athena_processing_input)
            elif source_type == "Redshift":
                redshift_processing_input = create_redshift_processing_input(
                    processing_dir, name, data_def)
                processing_inputs.append(redshift_processing_input)
            else:
                raise ValueError(f"{source_type} is not supported for Data Wrangler Processing.")
    return processing_inputs

def create_container_arguments(output_name, output_content_type):
    output_config = {
        output_name: {
            "content_type": output_content_type
        }
    }
    return [f"--output-config '{json.dumps(output_config)}'"]

# Create Processing Job Arguments
processing_job_arguments = {
    "AppSpecification": {
        "ContainerArguments": create_container_arguments(output_name, output_content_type),
        "ImageUri": container_uri,
    },
    "ProcessingInputs": create_processing_inputs(processing_dir, flow, flow_uri),
    "ProcessingOutputConfig": {
        "Outputs": [
            {
                "OutputName": output_name,
                "S3Output": {
                    "S3Uri": output_path,
                    "LocalPath": os.path.join(processing_dir, "output"),
                    "S3UploadMode": "EndOfJob",
                }
            },
        ],
    },
    "ProcessingJobName": processing_job_name,
    "ProcessingResources": {
        "ClusterConfig": {
            "InstanceCount": instance_count,
            "InstanceType": instance_type,
            "VolumeSizeInGB": 30,
        }
    },
    "RoleArn": iam_role,
    "StoppingCondition": {
        "MaxRuntimeInSeconds": 86400,
    },
}

### 开始执行预处理任务

In [59]:
sagemaker_client = boto3.client("sagemaker", endpoint_url=sagemaker_endpoint_url)
create_response = sagemaker_client.create_processing_job(**processing_job_arguments)

status = sagemaker_client.describe_processing_job(ProcessingJobName=processing_job_name)

while status["ProcessingJobStatus"] == "InProgress":
    status = sagemaker_client.describe_processing_job(ProcessingJobName=processing_job_name)
    print(status["ProcessingJobStatus"])
    time.sleep(60)

print(status)

InProgress
InProgress
InProgress
InProgress
InProgress
Completed
{'ProcessingInputs': [{'InputName': 'flow', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-517141035927//home/sagemaker-user/dy-ml/data_wrangler_flows/flow-12-13-32-50-b38e244f.flow', 'LocalPath': '/opt/ml/processing/flow', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated'}}, {'InputName': 'high-low-shoulder-data.csv', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://aws-glue-517141035927-us-west-2/high-low-shoulder-data.csv', 'LocalPath': '/opt/ml/processing/high-low-shoulder-data.csv', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated'}}], 'ProcessingOutputConfig': {'Outputs': [{'OutputName': '21a6c8d8-e540-4d1a-ad10-7803bc6e7c66.default', 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-517141035927/export-flow-12-13-32-50-b38e244f/output', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}, 'AppMa

### 查看预处理结果

In [60]:
s3_client = boto3.client("s3")
list_response = s3_client.list_objects_v2(Bucket=bucket, Prefix=output_prefix)

training_path = None

for content in list_response["Contents"]:
    if "_SUCCESS" not in content["Key"]:
        training_path = content["Key"]

print(training_path)

export-flow-12-13-32-50-b38e244f/output/data-wrangler-flow-processing-12-13-32-50-b38e244f/21a6c8d8-e540-4d1a-ad10-7803bc6e7c66/default/part-00000-5940954b-30c5-43fb-9588-ae766648b34e-c000.csv


## 模型训练
### 定义算法&超参数&训练数据

In [61]:
region = boto3.Session().region_name
container = sagemaker.image_uris.retrieve("xgboost", region, "1.2-1")
hyperparameters = {
    "max_depth":"5",
    "objective": "binary:logistic",
    "num_round": "4",
    "eval_metric": "auc"
}
train_content_type = (
    "application/x-parquet" if output_content_type.upper() == "PARQUET"
    else "text/csv"
)
train_input = sagemaker.inputs.TrainingInput(
    s3_data=f"s3://{bucket}/{training_path}",
    content_type=train_content_type,
)

### 通过评估器触发训练模型

In [115]:
estimator = sagemaker.estimator.Estimator(
    container,
    iam_role,
    hyperparameters=hyperparameters,
    instance_count=1,
    instance_type="ml.m5.2xlarge",
)
estimator.fit({"train": train_input})

2021-03-17 11:32:52 Starting - Starting the training job...
2021-03-17 11:32:54 Starting - Launching requested ML instances.........
2021-03-17 11:34:29 Starting - Preparing the instances for training...
2021-03-17 11:35:05 Downloading - Downloading input data...
2021-03-17 11:35:50 Training - Training image download completed. Training in progress..[34m[2021-03-17 11:35:52.339 ip-10-0-162-206.us-west-2.compute.internal:1 INFO utils.py:27] RULE_JOB_STOP_SIGNAL_FILENAME: None[0m
[34mINFO:sagemaker-containers:Imported framework sagemaker_xgboost_container.training[0m
[34mINFO:sagemaker-containers:Failed to parse hyperparameter eval_metric value auc to Json.[0m
[34mReturning the value itself[0m
[34mINFO:sagemaker-containers:Failed to parse hyperparameter objective value binary:logistic to Json.[0m
[34mReturning the value itself[0m
[34mINFO:sagemaker-containers:No GPUs detected (normal if no gpus installed)[0m
[34mINFO:sagemaker_xgboost_container.training:Running XGBoost Sag

### 部署模型

In [111]:
from sagemaker.serializers import CSVSerializer
xgb_predictor = estimator.deploy(
    initial_instance_count = 1, 
    instance_type = 'ml.m4.xlarge',
    serializer=CSVSerializer())

---------------!

In [54]:
response = xgb_predictor.predict(input).decode('utf-8')
print(response)

### 推理

### 使用sagemaker endpoint推理

In [6]:
import pandas as pd 
test_data = pd.read_csv('./test.csv')
test_data=test_data.drop(test_data.columns[0],axis=1)##删除没有列名的数据

In [63]:
test_data

Unnamed: 0,0,1,2
0,0.000128,6.999593e-09,1
1,0.022318,1.336634e-03,1
2,0.062001,6.692099e-02,1
3,0.021154,2.386069e-02,1
4,0.009066,9.534239e-03,1
...,...,...,...
471,0.009085,3.998876e-03,0
472,0.016884,1.601127e-02,0
473,0.011053,1.626371e-02,0
474,0.017410,1.308095e-02,0


## 使用predictor 推理

推理单条记录

In [98]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
endpoint_name = "sagemaker-xgboost-2021-03-17-11-18-11-571" 
input = '0.009085,1.601127e-02'

xgb_predictor = sagemaker.Predictor(endpoint_name=endpoint_name,
                                   sagemaker_session=sess,
                                   serializer=CSVSerializer())
response = xgb_predictor.predict(input).decode('utf-8')
print(response)
if float(response)>0.652 :
    print("1")
else :
    print("0")

0.6214224696159363
0


### 推理训练数据集

In [128]:
y_true = []
y_pred = []
for i in range(0, len(test_data)):
    input = str(test_data.iloc[i]['0'])+","+str(test_data.iloc[i]['1'])
    response = xgb_predictor.predict(input).decode('utf-8')
    y_true.append(round(test_data.iloc[i]['2']))
    if float(response)>0.6 :
        response=1
    else :
        response=0
    y_pred.append(response)
#     print(test_data.iloc[i]['0'], test_data.iloc[i]['1'], test_data.iloc[i]['2'],response)
#     print(predictions)

In [130]:
# print(y_true)

In [135]:
# ! pip install sklearn

### 模型评估

In [134]:
import matplotlib
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc, confusion_matrix, f1_score, precision_score, recall_score
#https://scikit-learn.org/stable/modules/generated/sklearn.metrics.confusion_matrix.html
print(confusion_matrix(y_true, y_pred))

[[124  34]
 [108 210]]


* TP =124 ，预测正确 存在高低肩问题，而且实际有高低肩问题的数量
* FN =108 ，预测错误 不存在高低肩问题，而实际是有高低肩问题的数量
* FP = 34 ，预测错误 存在高低肩问题，而实际是不存在高低肩问题的数量
* TN = 201,预测正确 不存在高低肩问题，实际也是不存在高低肩问题的数量

#### 准确率
Accuracy = TP+TN/TP+TN+FP+FN 

In [143]:

Accuracy = (124+210)/(124+34+108+210)
print('Accuracy is ',Accuracy)

Accuracy is  0.7016806722689075


#### 精度
预测存在高低肩问题 ：在所有类别中，我们正确预测了多少
precision = TP/（TP+FP）
 

In [145]:
precision= 120/(120+34)
precision

In [140]:

precision = precision_score(y_true, y_pred)
print('precision:', precision)

precision: 0.860655737704918


#### 召回
在所有为正的类别中，我们正确预测了多少。
recall = tp/(tp+FN)

In [153]:
recall = recall_score(y_true, y_pred)
print('recall:', recall)

recall: 0.660377358490566


#### F1得分
F1 = 2 * (precision * recall) / (precision + recall)

In [156]:
f1_score(y_true, y_pred)

0.7473309608540925