# SageMaker Pipelines

## Notebook Overview

   No scope

## Def Pipeline

### Get sys parameters

In [30]:
import boto3
import sagemaker
import os

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"RiskModelPackageGroupName"

In [31]:
#

In [32]:
sagemaker_session

<sagemaker.session.Session at 0x7fc886f9dd10>

In [33]:
!mkdir -p data

In [34]:
!pwd

/root/demo-kueski-p-23edvki0afej/sagemaker-demo-kueski-p-23edvki0afej-modelbuild


## Upload or / Download the data files to the Amazon S3 bucket

In [35]:
# dataset_credit_risk.csv
# No scope -> manual for this PoC

In [36]:
# 

In [37]:
local_path = "data/dataset_credit_risk.csv"


base_uri = f"s3://{default_bucket}/risk"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://sagemaker-us-east-2-690391346049/risk/dataset_credit_risk.csv


## Define Parameters to Parametrize Pipeline Execution

- input and bacht data are the same for convenience
- Train, test and validation data are the same for convenience

In [38]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)


processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge")
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")

input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)

batch_data = ParameterString(
    name="BatchData",
    default_value=input_data_uri,
)


In [39]:
input_data

ParameterString(name='InputData', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://sagemaker-us-east-2-690391346049/risk/dataset_credit_risk.csv')

## Define a Processing Step for Feature Engineering

In [40]:
!mkdir -p risk

### Create preprocessing.py 

In [41]:
%%writefile risk/preprocessing.py

from pyspark.sql.functions import dense_rank,col,avg,to_date,round as round_,current_date,to_date as to_date_,months_between,udf
from pyspark.sql.window import Window
from pyspark.ml.feature import (
    #OneHotEncoder,
    #StringIndexer,
    VectorAssembler,
    #VectorIndexer,
)


from pyspark.sql import SparkSession
from pyspark.sql.types import ShortType

# Import dataframe into MySQL
#import sqlalchemy

def csv_line(data):
    r = ",".join(str(d) for d in data[1])
    return str(data[0]) + "," + r





def main():


    base_dir = "/opt/ml/processing"
    
    
    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()


    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    spark.sparkContext._jsc.hadoopConfiguration().set("mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter")
    udf_flag_own_car = udf(lambda x: 0 if x == 'N' else 1,ShortType() )



    df_features = spark.read.csv('s3://sagemaker-us-east-2-690391346049/risk/dataset_credit_risk.csv',inferSchema =True, header=True).na.drop()\
    .sort(col('id'),col('loan_date'))\
    .withColumn("loan_date",to_date(col("loan_date"),"yyyy-MM-dd"))\
    .withColumn("nb_previous_loans", dense_rank().over(Window.partitionBy("id").orderBy(col("loan_date")))-1)\
    .sort(col('id'),col('loan_date'))\
    .withColumn('avg_amount_loans_previous', avg(col('loan_amount'))\
    .over(Window.partitionBy(col('id')).orderBy(col("loan_date")).rowsBetween(Window.unboundedPreceding,-1)))\
    .withColumn("age",round_(months_between(current_date(),to_date_(col("birthday"), "yyyy-MM-dd"), True)/12).cast('int'))\
    .withColumn("years_on_the_job",round_(months_between(current_date(),to_date(col("job_start_date"), "yyyy-MM-dd"), True)/12).cast('int'))\
    .withColumn("flag_own_car",udf_flag_own_car(col('flag_own_car')))\
    .select('status', 'age', 'years_on_the_job', 'nb_previous_loans', 'avg_amount_loans_previous', 'flag_own_car')\
    .repartition(1).write.format("csv").option("header", "true").save("s3://sagemaker-us-east-2-690391346049/risk/train")
    
    


if __name__ == "__main__":
    main()

Overwriting risk/preprocessing.py


In [42]:
from sagemaker.spark.processing import PySparkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.spark.processing import PySparkProcessor


#Def PySpark process
pyspark_processor = PySparkProcessor(framework_version='3.0',
                                     base_job_name="sm-spark",
                                     role=role,
                                     instance_type='ml.m5.xlarge',
                                     instance_count=1)




run_args = pyspark_processor.get_run_args(
    "risk/preprocessing.py",
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train")
    ],
    arguments=None
)




# Add spark process into processing Step
step_process = ProcessingStep(
    name="RiskProcess",
    processor=pyspark_processor,
    inputs=run_args.inputs,
    outputs=run_args.outputs,
    job_arguments=run_args.arguments,
    code=run_args.code
)
#/opt/ml/processing/input/dataset_credit_risk.csv;

## Define a Training Step to Train a Model

In [43]:
from sagemaker.estimator import Estimator


model_path = f"s3://{default_bucket}/RiskTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_type,
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0,
)

In [44]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="RiskTrain",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data='s3://sagemaker-us-east-2-690391346049/risk/train_model.csv',
            #step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data= 's3://sagemaker-us-east-2-690391346049/risk/train_model.csv',
            #step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)

## Define a Model Evaluation Step to Evaluate the Trained Model

The evaluation script uses xgboost to do the following:

- Load the model.

- Read the test data.

- Issue predictions against the test data.

- Build a classification report, including accuracy and ROC curve.

- Save the evaluation report to the evaluation directory.


After pipeline execution, you can examine the resulting evaluation.json for analysis.

In [45]:
%%writefile risk/evaluation.py
import json
import pathlib
import pickle
import tarfile

import joblib
import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import mean_squared_error


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/train/train.csv"
    df = pd.read_csv(test_path, header=None)

    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)

    X_test = xgboost.DMatrix(df.values)

    predictions = model.predict(X_test)

    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {"value": mse, "standard_deviation": std},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting risk/evaluation.py


In [46]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-risk-eval",
    role=role,
)

In [47]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="RiskEval",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput( 
            source=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="risk/evaluation.py",
    property_files=[evaluation_report],
)

## Define a Create Model Step to Create a Model

 create a SageMaker model using the training model

In [48]:
from sagemaker.model import Model


model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role,
)

In [49]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep


inputs = CreateModelInput(
    instance_type="ml.m5.large",
    accelerator_type="ml.eia1.medium",
)
step_create_model = CreateModelStep(
    name="RiskCreateModel",
    model=model,
    inputs=inputs,
)

## Define a Transform Step to Perform Batch Transformation

 create a Transformer instance with the appropriate model type, compute instance type, and desired output S3 URI.

In [50]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/RiskTransform",
)

In [51]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="RiskTransform", transformer=transformer, inputs=TransformInput(data=batch_data)
)

## Define a Register Model Step to Create a Model Package

In [52]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)


step_register = RegisterModel(
    name="RiskRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

## Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry

In [53]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=6.0,
)

step_cond = ConditionStep(
    name="RiskMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[],
)

## Define a Pipeline of Parameters, Steps, and Conditions

In [58]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"RiskPipelineSM"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
    ],
    steps=[step_process],#, step_train, step_eval, step_cond],
)

##  Examining the pipeline definition

In [59]:
import json


#definition = json.loads(pipeline.definition())
#definition

## Submit the pipeline to SageMaker and start execution

In [60]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()

# batch_data,
# , step_eval, step_cond

## Pipeline Operations: Examining and Waiting for Pipeline Execution


In [61]:
execution.wait()

execution.describe()


WaiterError: Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"

In [None]:
execution.list_steps()

## Check directories 

In [63]:

! aws s3 ls sagemaker-us-east-2-690391346049 --recursive

2022-02-14 19:42:51      24622 RiskTrain/pipelines-i78mx6pndrp1-RiskTrain-GPdaFBNQ84/output/model.tar.gz
2022-02-14 19:42:53          0 RiskTrain/pipelines-i78mx6pndrp1-RiskTrain-GPdaFBNQ84/profiler-output/framework/training_job_end.ts
2022-02-14 19:42:49     119051 RiskTrain/pipelines-i78mx6pndrp1-RiskTrain-GPdaFBNQ84/profiler-output/system/incremental/2022021419/1644867660.algo-1.json
2022-02-14 19:42:49     193761 RiskTrain/pipelines-i78mx6pndrp1-RiskTrain-GPdaFBNQ84/profiler-output/system/incremental/2022021419/1644867720.algo-1.json
2022-02-14 19:42:53          0 RiskTrain/pipelines-i78mx6pndrp1-RiskTrain-GPdaFBNQ84/profiler-output/system/training_job_end.ts
2022-02-14 19:44:27     329712 RiskTrain/pipelines-i78mx6pndrp1-RiskTrain-GPdaFBNQ84/rule-output/ProfilerReport-1644867587/profiler-output/profiler-report.html
2022-02-14 19:44:26     171095 RiskTrain/pipelines-i78mx6pndrp1-RiskTrain-GPdaFBNQ84/rule-output/ProfilerReport-1644867587/profiler-output/profiler-report.ipynb
2022-02