# Heart Disease Prediction Demo

- 这个样例是PAI Studio版本的[心脏病预测案例](https://help.aliyun.com/document_detail/34929.html?spm=a2c4g.11186623.6.769.5b7e340fwAhTsW)的PAIFlow版本，使用了PAIFlow Pipeline service运行心脏病预测案例中的workflow。

- 本样例与PAI Studio版本稍有不同，目前的PAIFlow没有提供混淆矩阵的组件，对于结果的评估使用二分类评估器。


## 初始化运行环境

- 需要提供阿里云的访问密钥对初始化Session:
    - https://yuque.antfin-inc.com/pai-user/manual/fqcsry
    - 需要使用*set_default_pai_session*(必需)函数，传入AK, 默认的oss_bucket(可选）, 默认的odps_client（可选)初始化默认的session.
- 目前的算法模块主要依赖于XFlow实现，数据集和结果数据集大多数情况下是存储在MaxCompute中，因而需要使用MaxCompute访问(ODPS)
- 该案例中，PAI算法服务需要访问用户的OSS，提供相应的oss bucket, endpoint, path(存储PMML模型路径), rolearn(PAI服务访问用户OSS的凭证)
    - oss rolearn参考 https://help.aliyun.com/document_detail/106225.html?spm=a2c6h.12873639.0.0.82bd6a8a6K624y


In [None]:
import random
import time

import pai
print(pai.__file__)

import yaml

from pai.session import  set_default_pai_session
from pai.common import ProviderAlibabaPAI
from pai.pipeline import Pipeline
from pai.pipeline.template import PipelineTemplate
from pai.utils import gen_temp_table
from pai.pipeline.types import  PipelineParameter, PipelineArtifact, ArtifactMetadata
from pai.pipeline.step import PipelineStep

from odps import ODPS, DataFrame
import oss2

In [None]:

##################################### User Config field ##############################
config = {
    "access_key_id": "your_access_key",
    "access_key_secret": "your_access_key_secret",
    "region_id": "your_region",
    }

default_project = "default_odps_project_name"


oss_endpoint = "your_oss_endpoint"
# oss_path = "/paiflow/model_transfer2oss_test/"
oss_path = "/model_store_path_in_oss/"
oss_bucket_name = "your_oss_bucket_name"
# OSS Rolearn
# 公有云用户参考: https://help.aliyun.com/document_detail/106225.html?spm=a2c6h.12873639.0.0.82bd6a8a6K624y
# 集团内参考: https://yuque.antfin-inc.com/pai-user/manual/fqcsry
# oss_rolearn = "acs:ram::{{your_account_id}}:role/aliyunodpspaidefaultrole"
oss_rolearn = "your_oss_rolearn"


xflow_execution = {
    "odpsInfoFile": "/share/base/odpsInfo.ini",
    "endpoint": "ODPS_Endpoint",
    "logViewHost": "logview_host_config",
    "odpsProject": default_project,
}

################################### User Config field ##############################

In [None]:
odps_client = ODPS(access_id=config["access_key_id"], secret_access_key=config["access_key_secret"], project=default_project, endpoint=xflow_execution["endpoint"])
oss_auth = oss2.Auth(access_key_id=config["access_key_id"], access_key_secret=config["access_key_secret"])
oss_bucket = oss2.Bucket(auth=oss_auth, endpoint=oss_endpoint, bucket_name=oss_bucket_name)

session = set_default_pai_session(odps_client=odps_client, oss_bucket=oss_bucket, **config)


## 获取可用的PipelineTemplate

PipelineTemplate表示Pipeline或是Component的*定义*， 可以是保存在服务端的模板， 也可以是由本地构造/拼接获得的组件的定义，区别是前者包含一个由服务端生成的PipelineId标识，后者可以通过save接口保存到服务端，并且获得对应的PipelineId。

### PipelineTemplate class可以通过:

- list_templates(identifier, provider, version, fuzzy):
    - 搜索/列出获得已保存的PAIFlow.

- get(pipeline_id)/get_by_identifier(identifier, provider, version):
    - 获得具体的某一个Pipeline模板


### PipelineTemplate对象主要支持：

- load():
    - 尝试获得Template的实现（调用DescribePipeline接口获得详细的Implementation），解析实现获得对应的Component或是Pipeline对象.

- run(job_name, arguments):
    - 传入运行参数直接运行任务

- inputs/outputs:
    - property: 获取查看模板的inputs/outputs信息

- save(identifier, version):
    - 指定identifier和version，保存当前的Manifest

- as_step(inputs, name)：
    - 生成一个PipelineStep,可以用于Pipeline拼接
    - 注：目前PAIFlow的后端只支持使用*已保存*template



In [None]:
templates, count = PipelineTemplate.list_templates(identifier="xflow", provider=ProviderAlibabaPAI, fuzzy=True)

In [None]:
templates[0]


In [None]:
templates[0].inputs

## Step-By-Step的方式运行任务

- 使用同步方式，提交任务，获得返回结果后，使用结果数据集提交给下一个任务。

### 使用的数据集
数据集使用的是PAI提供的公共读MaxCompute table- heart_disease_prediction.

In [None]:
# 公共读的数据表项目
# 集团内是pai_inner_project;
# 集团外（公有云）是pai_online_project

source_table_project = "pai_inner_project"

dataset_table = "odps://{}/tables/heart_disease_prediction".format(source_table_project)

odps_table = odps_client.get_table("heart_disease_prediction", project=source_table_project)
df = DataFrame(odps_table)
df


### 数据预处理
- 将字符串数据转换为数值类型
- run中传入的参数parameters dict，key表示的是对应的Manifest的inputs的name， value则表示需要输入的数据.

In [None]:
sql = "select age, (case sex when 'male' then 1 else 0 end) as sex, (case cp when " \
        "'angina' then 0  when 'notang' then 1 else 2 end) as cp, trestbps, chol, (case" \
        " fbs when 'true' then 1 else 0 end) as fbs, (case restecg when 'norm' then 0 " \
        " when 'abn' then 1 else 2 end) as restecg, thalach, (case exang when 'true' then" \
        " 1 else 0 end) as exang, oldpeak, (case slop when 'up' then 0  when 'flat' then " \
        "1 else 2 end) as slop, ca, (case thal when 'norm' then 0  when 'fix' then 1" \
        " else 2 end) as thal, (case status  when 'sick' then 1 else 0 end) as" \
        " ifHealth from ${t1};"

# Extract and transform dataset using max_compute sql.
sql_job = PipelineTemplate.get_by_identifier(
    identifier="sql-xflow-maxCompute", provider=ProviderAlibabaPAI, version="v1").run(
    job_name="sql-job",
    arguments={
        "execution": xflow_execution,
        "inputArtifact1": dataset_table,
        "sql": sql,
        "outputTable": gen_temp_table(),
    })

# because of outputs not ready after workflow finished.
time.sleep(10)
output_table_artifact = sql_job.get_outputs()[0]



In [None]:
sql_output_table = odps_client.get_table(output_table_artifact.value.table, output_table_artifact.value.project)
DataFrame(sql_output_table)

### Heart-disease-prediction的工作流包括以下

- type_transform 完成数据类型转换，将部分列装换为doubel
- normalize_job 完成数据归一化处理
- split_job 切分数据为训练数据集和验证数据集
- lr_job 使用训练数据集训练，获得一个offlineModel
- transform_job 使用offlinemodel和验证数据集进行批量预测
- evaluate_job 使用预测结果评估模型准确性


In [None]:

from pai.xflow.classifier import LogisticRegression
# Transform value to Double
type_transform_job = PipelineTemplate.get_by_identifier(
    identifier="type-transform-xflow-maxCompute",
    provider=ProviderAlibabaPAI, version="v1").run(
    job_name="type-transform-job",
    arguments={
        "execution": xflow_execution,
        "inputArtifact": output_table_artifact,
        "cols_to_double": 'sex,cp,fbs,restecg,exang,slop,thal,ifhealth,age,trestbps,chol,thalach,oldpeak,ca',
        "outputTable": gen_temp_table(),
    }
)

time.sleep(20)
type_transform_result = type_transform_job.get_outputs()[0]

# Normalize Feature
normalize_job = PipelineTemplate.get_by_identifier(identifier="normalize-xflow-maxCompute",
                                            provider=ProviderAlibabaPAI, version="v1").run(
    job_name="normalize-job",
    arguments={
        "execution": xflow_execution,
        "inputArtifact": type_transform_result,
        "selectedColNames": 'sex,cp,fbs,restecg,exang,slop,thal,ifhealth,age,trestbps,'
                            'chol,thalach,oldpeak,ca',
        "lifecycle": 1,
        "outputTableName": gen_temp_table(),
        "outputParaTableName": gen_temp_table(),
    }
)

# because of outputs is not ready after run is succeeded.
time.sleep(20)
normalized_dataset = normalize_job.get_outputs()[0]

split_job = PipelineTemplate.get_by_identifier(
    identifier="split-xflow-maxCompute",
    provider=ProviderAlibabaPAI, version="v1").run(
    job_name="split-job",
    arguments={
        "inputArtifact": normalized_dataset,
        "execution": xflow_execution,
        "fraction": 0.8,
        "output1TableName": gen_temp_table(),
        "output2TableName": gen_temp_table(),
    }
)

time.sleep(20)
split_output_1, split_output_2 = split_job.get_outputs()


lr_job = LogisticRegression(
    regularized_type="l2", xflow_execution=xflow_execution,
    pmml_gen=True, pmml_oss_bucket=oss_bucket_name,
    pmml_oss_path=oss_path, pmml_oss_endpoint=oss_endpoint,
    pmml_oss_rolearn=oss_rolearn,
).fit(split_output_1,
        wait=True,
        feature_cols='sex,cp,fbs,restecg,exang,slop,thal,age,trestbps,chol,thalach,oldpeak,ca',
        label_col="ifhealth",
        good_value=1,
        model_name="test_health_prediction")

time.sleep(20)
offlinemodel_artifact, pmml_output = lr_job.get_outputs()
transform_job = PipelineTemplate.get_by_identifier(
    identifier="prediction-xflow-maxCompute",
    provider=ProviderAlibabaPAI, version="v1").run(
    job_name="prediction-job",
    arguments={
        "inputModelArtifact": offlinemodel_artifact,
        "inputDataSetArtifact": split_output_2,
        "execution": xflow_execution,
        "outputTableName": gen_temp_table(),
        "featureColNames": 'sex,cp,fbs,restecg,exang,slop,thal,age,trestbps,chol,thalach,oldpeak,ca',
        "appendColNames": "ifhealth",
    }
)

time.sleep(20)
transform_result = transform_job.get_outputs()[0]

evaluate_job = PipelineTemplate.get_by_identifier(
    identifier="evaluate-xflow-maxCompute",
    provider=ProviderAlibabaPAI, version="v1",
    ).run(
    job_name="evaluate-job",
    arguments={
        "execution": xflow_execution,
        "inputArtifact": transform_result,
        "outputDetailTableName": gen_temp_table(),
        "outputELDetailTableName": gen_temp_table(),
        "outputMetricTableName": gen_temp_table(),
        "scoreColName": "prediction_score",
        "labelColName": "ifhealth",
        "coreNum": 2,
        "memSizePerCore": 512,
    }
)

time.sleep(20)
evaluate_result = evaluate_job.get_outputs()[2]

### 查看训练效果评估数据

In [None]:
evaluate_table = odps_client.get_table(evaluate_result.value.table, evaluate_result.value.project)
DataFrame(evaluate_table)

以上的case是一个step-by-step的任务，节点任务之间的串联是维护在具体的Python code中，不利于复杂Pipeline的分享复用，以及执行的优化。
PAIFlow提供了将节点串联，构建为一个复合Pipeline的能力。

## 复合Pipeline的构建


- 使用PAI提供的Pipeline实现（包括，split, normalize, logisticregression, prediction, evaluate等）,构建出一个复合Pipeline工作定义。
- 使用函数构建复合Pipeline：构建过程中出现的错误操作，不会污染notebook kernel的全局环境（除非对global变量进行写入），如果需要修改构建Pipeline的流程，例如修改参数名，减少参数，则只需要修改函数定义，重新运行函数获得新的Pipeline定义


In [None]:

from pai.pipeline.types import ArtifactDataType, ArtifactLocationType
def create_heart_disease_pred_pl():
    pmml_oss_bucket = PipelineParameter("pmml_oss_bucket")
    pmml_oss_rolearn = PipelineParameter("pmml_oss_rolearn")
    pmml_oss_path = PipelineParameter("pmml_oss_path")
    pmml_oss_endpoint = PipelineParameter("pmml_oss_endpoint")
    xflow_execution = PipelineParameter("xflow_execution", dict)
    dataset_input = PipelineArtifact("dataset-table",
                                        metadata=ArtifactMetadata(
                                            data_type=ArtifactDataType.DataSet,
                                            location_type=ArtifactLocationType.MaxComputeTable,
                                        ),
                                        required=True)

    sql = "select age, (case sex when 'male' then 1 else 0 end) as sex,(case cp when" \
            " 'angina' then 0  when 'notang' then 1 else 2 end) as cp, trestbps, chol," \
            " (case fbs when 'true' then 1 else 0 end) as fbs, (case restecg when 'norm'" \
            " then 0  when 'abn' then 1 else 2 end) as restecg, thalach, (case exang when" \
            " 'true' then 1 else 0 end) as exang, oldpeak, (case slop when 'up' then 0  " \
            "when 'flat' then 1 else 2 end) as slop, ca, (case thal when 'norm' then 0 " \
            " when 'fix' then 1 else 2 end) as thal, (case status when 'sick' then 1 else " \
            "0 end) as ifHealth from ${t1};"
    sql_step = PipelineStep("sql-xflow-maxCompute", name="sql-1",
                            provider=ProviderAlibabaPAI,
                            version="v1",
                            inputs={
                                "inputArtifact1": dataset_input,
                                "execution": xflow_execution,
                                "sql": sql,
                                "outputTable": gen_temp_table(),
                            })

    type_transform_step = PipelineStep(
        "type-transform-xflow-maxCompute",
        name="type-transform-1",
        provider=ProviderAlibabaPAI, version="v1",
        inputs={
            "execution": xflow_execution,
            "inputArtifact": sql_step.outputs["outputArtifact"],
            "cols_to_double": 'sex,cp,fbs,restecg,exang,slop,thal,ifhealth,age,trestbps,'
                                'chol,thalach,oldpeak,ca',
            "outputTable": gen_temp_table(),

        })

    normalize_step = PipelineStep(
        "normalize-xflow-maxCompute",
        name="normalize-1",
        provider=ProviderAlibabaPAI,
        version="v1", inputs={
            "execution": xflow_execution,
            "inputArtifact": type_transform_step.outputs["outputArtifact"],
            "selectedColNames": 'sex,cp,fbs,restecg,exang,slop,thal,ifhealth,age,trestbps,'
                                'chol,thalach,oldpeak,ca',
            "lifecycle": 1,
            "outputTableName": gen_temp_table(),
            "outputParaTableName": gen_temp_table(),

        })

    split_step = PipelineStep(
        identifier="split-xflow-maxCompute",
        name='split-1',
        provider=ProviderAlibabaPAI, version="v1", inputs={
            "inputArtifact": normalize_step.outputs["outputArtifact"],
            "execution": xflow_execution,
            "fraction": 0.8,
            "output1TableName": gen_temp_table(),
            "output2TableName": gen_temp_table(),

        }
    )

    model_name = 'test_health_prediction_by_pipeline_%s' % (random.randint(0, 999999))

    lr_step = PipelineStep(
        identifier="logisticregression-binary-xflow-maxCompute",
        name="logisticregression-1",
        provider=ProviderAlibabaPAI, version="v1", inputs={
            "inputArtifact": split_step.outputs["outputArtifact1"],
            "execution": xflow_execution,
            "generatePmml": True,
            "endpoint": pmml_oss_endpoint,
            "bucket": pmml_oss_bucket,
            "path": pmml_oss_path,
            "rolearn": pmml_oss_rolearn,
            # "regulizedType": "l2",
            "modelName": model_name,
            "goodValue": 1,
            "featureColNames": "sex,cp,fbs,restecg,exang,slop,thal,age,trestbps,chol,thalach,oldpeak,ca",
            "labelColName": "ifhealth",
        }
    )

    offline_model_pred_step = PipelineStep(
        identifier="prediction-xflow-maxCompute",
        name="offlinemodel-pred",
        provider=ProviderAlibabaPAI, version="v1", inputs={
            "inputModelArtifact": lr_step.outputs["outputArtifact"],
            "inputDataSetArtifact": split_step.outputs["outputArtifact2"],
            "execution": xflow_execution,
            "outputTableName": gen_temp_table(),
            "featureColNames": 'sex,cp,fbs,restecg,exang,slop,thal,age,trestbps,chol,thalach,oldpeak,ca',
            "appendColNames": "ifhealth",
        }
    )

    evaluate_step = PipelineStep(
        identifier="evaluate-xflow-maxCompute",
        name="evaluate-1",
        provider=ProviderAlibabaPAI, version="v1",
        inputs={
            "execution": xflow_execution,
            "inputArtifact": offline_model_pred_step.outputs["outputArtifact"],
            "outputDetailTableName": gen_temp_table(),
            "outputELDetailTableName": gen_temp_table(),
            "outputMetricTableName": gen_temp_table(),
            "scoreColName": "prediction_score",
            "labelColName": "ifhealth",
            "coreNum": 2,
            "memSizePerCore": 512,
        }
    )

    p = Pipeline(
        steps=[evaluate_step, offline_model_pred_step],
        outputs={"pmmlModel": lr_step.outputs["outputArtifact"],
                    "evaluateResult": evaluate_step.outputs["outputMetricsArtifact"]
                    }
    )
    return p


## 创建Pipeline
- 可以使用`pipeline.dot()`命令查看Pipeline的运行拓扑图(需要安装graphviz)
- 可以通过p.to_dict()方法获得对应的Pipeline Manifest的定义

In [None]:
p = create_heart_disease_pred_pl()
print(yaml.dump(p.to_dict()))

In [None]:
p.dot()

## 运行Pipeline
- 指定输入参数，运行Pipeline

In [None]:
run_instance = p.run(job_name="heart-disease-pipeline-job", arguments={
    "dataset-table": dataset_table,
    "xflow_execution": xflow_execution,
    "pmml_oss_endpoint": oss_endpoint,
    "pmml_oss_bucket": oss_bucket_name,
    "pmml_oss_path": oss_path,
    "pmml_oss_rolearn": oss_rolearn,
})


In [None]:
run_instance.get_outputs()

- 将Pipeline推送到服务端保存，进行复用
- 保存的Pipeline 可以作为其他Pipeline的节点进行复用

In [None]:
p.save(identifier="heart-disease-prediction-pl", version="v%s"%int(time.time()))

In [None]:
p

In [None]:
save_pipeline = PipelineTemplate.get(p.pipeline_id).load()

In [None]:
save_pipeline.dot()

In [None]:
saved_pipeline = PipelineTemplate.get(p.pipeline_id)
saved_pipeline.run(job_name="saved-pl-heart-disease-job", arguments={
    "dataset-table": dataset_table,
    "xflow_execution": xflow_execution,
    "pmml_oss_endpoint": oss_endpoint,
    "pmml_oss_bucket": oss_bucket_name,
    "pmml_oss_path": oss_path,
    "pmml_oss_rolearn": oss_rolearn,
})