In [None]:
import boto3
import tarfile
from sagemaker import session, get_execution_role, image_uris
from sagemaker.inputs import CreateModelInput, TrainingInput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString
)
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep, JsonGet
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.model import Model

## Set up the environment

In [None]:
region = boto3.Session().region_name
sagemaker_session = session.Session()
default_bucket = sagemaker_session.default_bucket()
role = get_execution_role()
model_package_group_name = "MedicalLogisticalRegressionPackageGroup"

## Constants

In [None]:
BUCKET = 'sagemaker-medical-logistical-regression-data-storage'
DATA_KEY = 'data.xlsx'
TARGET_COLUMN = "осложнения есть/нет"

## Define a Preprocessing Step

In [None]:
framework_version = "0.23-1"
data_location = f's3://{BUCKET}/{DATA_KEY}'

processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.t3.medium"
)
processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_outputs_path = ParameterString(
    name="ProcessingOutputsPath",
    default_value=f"s3://{default_bucket}"
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=data_location,
)
target_column = ParameterString(
    name="TargetColumn",
    default_value=TARGET_COLUMN
)

In [None]:
sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    sagemaker_session=sagemaker_session,
    role=role,
)

In [None]:
step_process = ProcessingStep(
    name="PreprocessingStep",
    processor=sklearn_processor,
    inputs=[
      ProcessingInput(
          source=input_data, destination="/opt/ml/processing/input"
      ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="X_train", source="/opt/ml/train/X_train"
        ),
        ProcessingOutput(
            output_name="X_test", source="/opt/ml/train/X_test"
        ),
        ProcessingOutput(
            output_name="y_train", source="/opt/ml/train/y_train"
        ),
        ProcessingOutput(
            output_name="y_test", source="/opt/ml/train/y_test"
        ),
    ],
    code="./scripts/preprocessing.py",
    job_arguments=[
        '--target-column', TARGET_COLUMN
    ],
)

## Define a Training step

In [None]:
def make_tarfile(files):
    tar = tarfile.open("sourcedir.tar.gz", "w:gz")
    for name in files:
        tar.add(name)
    tar.close()
    return "file processed"


In [None]:
training_instance_count = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=1
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.t3.medium"
)
training_output_path=ParameterString(
    name="TrainingOutputPath",
    # default_value=f"s3://{default_bucket}/artefacts/models/medical_logistic_regression"
)
model_path = f"s3://{default_bucket}/trained_model"

In [None]:
sklearn_estimator = SKLearn(
    entry_point='train.py',
    source_dir=f's3://{default_bucket}/tests/wines/train/sourcedir.tar.gz',
    framework_version=framework_version,
    instance_type=training_instance_type,
    role=role,
    output_path=training_output_path,
    sagemaker_session=sagemaker_session,
    hyperparameters={"regularisation_parameter": 2}
)

In [None]:
step_train = TrainingStep(
    name="TrainStep",
    estimator=sklearn_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    }
)

## Define evaluation step

In [None]:
evaluation_output_path=ParameterString(name="EvaluationOutputPath")

In [None]:
sklearn_processor_evaluate = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    role=role,
    sagemaker_session=sagemaker_session
)

In [None]:
evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

In [None]:
step_eval= ProcessingStep(
    name=f"eval-wines",
    processor=sklearn_processor_evaluate,
    inputs=[
        ProcessingInput(
            input_name='model',
            source= step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination='/opt/ml/processing/input/model')
    ],
    outputs=[
        ProcessingOutput(
            output_name='evaluation',
            source='/opt/ml/processing/output/evaluation',
            destination=evaluation_output_path
        )],
    code = 'evaluation.py', # or S3 URI with evaluation file
    property_files=[evaluation_report],
    # we can pass parameters to execution
    #job_arguments=[],
    depends_on=[f'train-wines']
)


## Define a RegisterModel Step to Create a Model Package

In [None]:
register_model_inference_instance_type = ParameterString(name="RegisterModelInferenceInstanceType",default_value="ml.m5.large")
register_model_metrics=ParameterString(name="RegisterModelMetrics")
register_model_package_group_name = ParameterString(name="RegisterModelPackageGroupName")
report_metrics_complete_path=f"s3://{default_bucket}/tests/wines/evaluacion/evaluation.json"
model_package_name="package-model-wines-pipeline"

In [None]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=register_model_metrics,
        content_type="application/json",
    )
)

In [None]:
step_register = RegisterModel(
    name=f"register-wines",
    estimator=sklearn_estimator,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=[register_model_inference_instance_type],
    transform_instances=[register_model_inference_instance_type],
    model_package_group_name=register_model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
    depends_on=[f"eval-wines"]
)

## Create Model

In [None]:
image_uri = image_uris.retrieve(
    framework="sklearn",
    region=region,
    version=framework_version,
    py_version="py3",
    instance_type=register_model_inference_instance_type,
)

In [None]:
model = Model(
    image_uri=image_uri,
    sagemaker_session=sagemaker_session,
    role=role,
    env={'SAGEMAKER_CONTAINER_LOG_LEVEL':'20',
        'SAGEMAKER_PROGRAM': 'train.py',
        'SAGEMAKER_REGION': region,
        'SAGEMAKER_SUBMIT_DIRECTORY': f's3://{default_bucket}/tests/wines/train/sourcedir.tar.gz'},
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts
)

In [None]:
inputs = CreateModelInput(
    instance_type=register_model_inference_instance_type
)

In [None]:
step_create_model = CreateModelStep(
    name=f'create-medical-logistic-regression-model',
    model=model,
    inputs=inputs,
    depends_on=[f"register-wines"]
)


## Define a Condition Step to Verify Model Accuracy

In [None]:
cond_lte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=step_eval,
        property_file=evaluation_report,
        json_path="classification_metrics.f1.value",
    ),
    right=0.6,
)


In [None]:
step_cond = ConditionStep(
    name=f"cond-wines",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model],
    else_steps=[]
)


## Create a pipeline

In [None]:
pipeline_name = f"ml-wines-train"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        processing_outputs_path,
        training_instance_count,
        training_instance_type,
        training_output_path,
        evaluation_output_path,
        register_model_inference_instance_type,
        register_model_package_group_name,
        register_model_metrics
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)
