# Sesión 2 - Sagemaker Pipeline

## Subida de datos

In [2]:
import boto3
import pandas as pd
import numpy as np
import sagemaker
from sklearn.datasets import load_iris


sesion = sagemaker.Session()
region = sesion.boto_session.region_name
bucket = sesion.default_bucket()

data = load_iris()
df = pd.DataFrame(
    data=np.c_[data['data'], data['target']],
    columns= data['feature_names'] + ['Species']
    )
    
df.to_csv("iris.csv",index=False)

s3_path = sesion.upload_data(
    path="iris.csv",
    bucket=bucket,
    key_prefix="curso_sagemaker/data"
)

print(s3_path)

s3://sagemaker-eu-west-1-827345860551/curso_sagemaker/data/iris.csv


# Creación de los Steps del Pipeline

## Processing Step

In [3]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.processing import ProcessingInput, ProcessingOutput

input_data="s3://sagemaker-eu-west-1-827345860551/curso_sagemaker/data/"
role="arn:aws:iam::827345860551:role/SageMakerExecutionRole"

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.c4.xlarge",
    instance_count=1,
    role=role,
)

step_process = ProcessingStep(
    name="PreprocessStep",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(
            source=input_data,
            destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/output/train"),
        ProcessingOutput(
            output_name="test",
            source="/opt/ml/processing/output/test"),
    ],
    code="src/process.py",
)

## Train Step

In [4]:
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.workflow.steps import TrainingStep
from sagemaker.inputs import TrainingInput

sklearn_estimator = SKLearn(
    source_dir='./src',
    entry_point='train.py',
    framework_version='0.23-1',
    instance_type="ml.c4.xlarge",
    role=role,
    sagemaker_session=sesion,
    hyperparameters={
        "min_leaf_nodes": 3,
        "n_estimators": 10,
        "target": "Species"
    }
)

step_train = TrainingStep(
    name="TrainStep",
    estimator=sklearn_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)

## Create Model

In [5]:
from sagemaker.sklearn.model import SKLearnModel
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.step_collections import CreateModelStep

model = SKLearnModel(
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    source_dir='./src',
    entry_point='train.py',
    framework_version='0.23-1',
    role=role,
    sagemaker_session=sesion,
)

step_create_model = CreateModelStep(
    name="CreateModel",
    model=model,
    inputs = CreateModelInput(instance_type="ml.m5.large")
)

## Transform Step

In [6]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep
from sagemaker.transformer import Transformer

output_data="s3://sagemaker-eu-west-1-827345860551/curso_sagemaker/data/iris.out"

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    accept="text/csv",
    instance_count=1,
    output_path=output_data
)

step_transform = TransformStep(
    name="TransformStep", 
    transformer=transformer,
    inputs=TransformInput(data=s3_path, content_type="text/csv")
)

## RegisterModel Step

In [None]:
from sagemaker.workflow.step_collections import RegisterModel

register_step = RegisterModel(
    name="RegistroModelo",
    model=model,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name="SklearnIris",
    approval_status="Approved",
)


# Ejecucion del Pipeline

In [7]:
import json
from pprint import pprint
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name="SklearnIrisPipeline",
    steps=[
        step_process,
        step_train,
        step_create_model,
        step_transform,
        register_step
    ],
)

pipeline.upsert(role_arn=role)
definition = json.loads(pipeline.definition())

try:
    execution = pipeline.start()
    execution.wait()
    pprint(execution.list_steps())
except:
    pprint(execution.list_steps())

[{'AttemptCount': 0,
  'EndTime': datetime.datetime(2022, 4, 25, 18, 36, 59, 297000, tzinfo=tzlocal()),
  'FailureReason': 'ClientError: AlgorithmError: See job logs for more '
                   'information',
  'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:eu-west-1:827345860551:transform-job/pipelines-velzm77h9gsp-transformstep-6wqwedzpf8'}},
  'StartTime': datetime.datetime(2022, 4, 25, 18, 32, 28, 377000, tzinfo=tzlocal()),
  'StepName': 'TransformStep',
  'StepStatus': 'Failed'},
 {'AttemptCount': 0,
  'EndTime': datetime.datetime(2022, 4, 25, 18, 32, 27, 571000, tzinfo=tzlocal()),
  'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:eu-west-1:827345860551:model/pipelines-velzm77h9gsp-createmodel-6alvbpozfk'}},
  'StartTime': datetime.datetime(2022, 4, 25, 18, 32, 26, 418000, tzinfo=tzlocal()),
  'StepName': 'CreateModel',
  'StepStatus': 'Succeeded'},
 {'AttemptCount': 0,
  'EndTime': datetime.datetime(2022, 4, 25, 18, 32, 25, 662000, tzinfo=tzlocal()),
  'Metadata': {'T