# Automatizando la preparación del Dataset y el entrenamiento del modelo con SageMaker Pipelines

Vamos a crear un flujo de trabajo de producción repetible que normalmente se ejecuta fuera de los notebooks. Para demostrar la automatización del workflow, utilizaremos [Amazon SageMaker Pipelines](https://aws.amazon.com/sagemaker/pipelines) para la orquestación. SageMaker Pipelines nos ayuda a automatizar los diferentes pasos del workflow de ML, incluyendo el procesamiento de datos, el entrenamiento del modelo y la predicción por lotes (scoring), y a aplicar condiciones como aprobación de la calidad del modelo. También incluye un registro de modelos y un rastreador de linaje de los modelos.

## Workflow automatizado con SageMaker Pipelines <a class="anchor" id="WorkflowAutomation">

En los pasos anteriores hemos prototipado varios pasos de un proyecto TensorFlow dentro del propio notebook, con algunos pasos ejecutados en recursos externos de SageMaker (entrenamiento alojado, ajuste de modelos, endpoints alojados). Los notebook son excelentes para la creación de prototipos, pero por lo general no se utilizan en la producción de pipelines de aprendizaje automático.  

Un pipeline sencillo en SageMaker incluye el procesamiento del Dataset para prepararlo para el entrenamiento, el propio entrenamiento y, a continuación, el uso del modelo para realizar algún tipo de inferencia, como predicción por lotes (scoring). Utilizaremos SageMaker Pipelines para automatizar estos pasos, manteniendo el pipeline lo más sencillo posible por ahora: se puede extender a un pipeline mucho más complejo.

### Parámetros del Pipeline <a class="anchor" id="PipelineParameters">

Antes de empezar a crear el pipeline en sí, deberíamos pensar en cómo parametrizarlo. Por ejemplo, podemos utilizar diferentes tipos de instancia para diferentes propósitos, como tipos basados en CPU para el procesamiento de datos y tipos basados en GPU o más potentes para el entrenamiento del modelo.  Todos estos son "nudos" del pipeline que podemos parametrizar. La parametrización permite realizar ejecuciones y programaciones personalizadas sin tener que modificar la definición del pipeline.

In [None]:
!pip install --upgrade sagemaker=='v2.90.0'

In [None]:
import os
import sagemaker

sess = sagemaker.session.Session()
bucket = sess.default_bucket() 

raw_s3 = f"s3://{bucket}/tf-2-workflow/data/raw"    # sess.upload_data(path='./data/raw/', key_prefix=rawdata_s3_prefix)


In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

# raw input data
input_data = ParameterString(name="InputData", default_value=raw_s3)

# processing step parameters
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge")
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

# training step parameters
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.c5.2xlarge")
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)

# batch inference step parameters
batch_instance_type = ParameterString(name="BatchInstanceType", default_value="ml.c5.xlarge")
batch_instance_count = ParameterInteger(name="BatchInstanceCount", default_value=1)

### Step de preprocesado <a class="anchor" id="ProcessingStep">

El primer paso en el pipeline será el preprocesamiento de los datos para prepararlos para el entrenamiento. Creamos un objeto `SKLearnProcessor` similar al visto anteriormente, pero ahora parametrizado para que podamos hacer un seguimiento por separado y cambiar la configuración del job según sea necesario, por ejemplo, para aumentar el tamaño del tipo de instancia para trabajar con un Dataset que va creciendo.

In [None]:
%%writefile preprocessing.py

import glob
import numpy as np
import os
from sklearn.preprocessing import StandardScaler

if __name__=='__main__':
    
    input_files = glob.glob('{}/*.npy'.format('/opt/ml/processing/input'))
    print('\nINPUT FILE LIST: \n{}\n'.format(input_files))
    scaler = StandardScaler()
    x_train = np.load(os.path.join('/opt/ml/processing/input', 'x_train.npy'))
    scaler.fit(x_train)
    for file in input_files:
        raw = np.load(file)
        # Solo transformamos las columnas con features
        if 'y_' not in file:
            transformed = scaler.transform(raw)
        if 'train' in file:
            if 'y_' in file:
                output_path = os.path.join('/opt/ml/processing/train', 'y_train.npy')
                np.save(output_path, raw)
                print('SAVED LABEL TRAINING DATA FILE\n')
            else:
                output_path = os.path.join('/opt/ml/processing/train', 'x_train.npy')
                np.save(output_path, transformed)
                print('SAVED TRANSFORMED TRAINING DATA FILE\n')
        else:
            if 'y_' in file:
                output_path = os.path.join('/opt/ml/processing/test', 'y_test.npy')
                np.save(output_path, raw)
                print('SAVED LABEL TEST DATA FILE\n')
            else:
                output_path = os.path.join('/opt/ml/processing/test', 'x_test.npy')
                np.save(output_path, transformed)
                print('SAVED TRANSFORMED TEST DATA FILE\n')

In [None]:
import boto3
import sagemaker
from sagemaker.sklearn.processing import SKLearnProcessor

sess = sagemaker.session.Session()
bucket = sess.default_bucket() 
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="tf-2-workflow-process",
    sagemaker_session=sess,
    role=role,
)

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


step_process = ProcessingStep(
    name="TF2Process",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input", s3_data_distribution_type='ShardedByS3Key'),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="./preprocessing.py",
)

### Steps de entrenamiento y creación de modelo <a class="anchor" id="TrainingModelCreation">

El siguiente código configura un setp del pipeline para realizar el entrenamiento. Comenzamos especificando qué contenedor de entrenamiento TensorFlow 2 preconstruido por SageMaker se utilizará para el trabajo.

In [None]:
from sagemaker.tensorflow import TensorFlow
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.step_collections import RegisterModel

tensorflow_version = '2.3.1'
python_version = 'py37'

image_uri_train = sagemaker.image_uris.retrieve(
  framework="tensorflow",
  region=region,
  version=tensorflow_version,
  py_version=python_version,
  instance_type=training_instance_type,
  image_scope="training"
)

A continuación, especificamos un objeto `Estimator`, y definimos un `TrainingStep` para insertar el job de entrenamiento en el pipeline recibiendo los inputs del paso anterior de preprocesamiento. Nótese que hemos utilizado los hiperparámetros del mejor estimador que encontramos en el notebook. La integración de AutoTuning con los Pipelines de SageMaker ya está disponible, pero aún no ha sido integrada en este notebook (se actualizará pronto). 

Deberíamos usarlo para encontrar el mejor modelo.

In [None]:
import time

model_path = f"s3://{bucket}/TF2WorkflowTrain"
training_parameters = {'epochs': 21, 'batch_size': 247, 'learning_rate': 0.138448, 'for_pipeline': 'true'}

estimator = TensorFlow(
    image_uri=image_uri_train,
    source_dir='code',
    entry_point='train.py',
    instance_type=training_instance_type,
    instance_count=training_instance_count,
    role=role,
    base_job_name="tf-2-workflow-train",
    output_path=model_path,
    hyperparameters=training_parameters
)

In [None]:
step_train = TrainingStep(
    name="TF2WorkflowTrain",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri
        )
    },
)

Como paso adicional, creamos un objeto `Model` para envolver el artefacto del modelo, y lo asociamos con un contenedor de inferencia TensorFlow pre-construido por SageMaker para usarlo potencialmente más tarde.

In [None]:
from sagemaker.model import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep

image_uri_inference = sagemaker.image_uris.retrieve(
    framework="tensorflow",
    region=region,
    version=tensorflow_version,
    py_version=python_version,
    instance_type=batch_instance_type,
    image_scope="inference"
)

model = Model(
    image_uri=image_uri_inference,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=role,
)

inputs_model = CreateModelInput(
    instance_type=batch_instance_type
)

step_create_model = CreateModelStep(
    name="TF2WorkflowCreateModel",
    model=model,
    inputs=inputs_model,
)

### Paso de puntuación por lotes <a class="anchor" id="BatchScoringStep">

El último paso de este proceso es la puntuación por lotes (inferencia/predicción). El inputde este paso serán el modelo que hemos entrenado anteriormente y los datos de prueba. Para realizar la inferencia por lotes solamente necesitamos un sencillo script de Python.

In [None]:
%%writefile batch-score.py

import os
import subprocess
import sys
import numpy as np
import pathlib
import tarfile

def install(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

if __name__ == "__main__":
    
    install('tensorflow==2.3.1')
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path, 'r:gz') as tar:
        tar.extractall('./model')
    import tensorflow as tf
    model = tf.keras.models.load_model('./model/1')
    test_path = "/opt/ml/processing/test/"
    x_test = np.load(os.path.join(test_path, 'x_test.npy'))
    y_test = np.load(os.path.join(test_path, 'y_test.npy'))
    scores = model.evaluate(x_test, y_test, verbose=2)
    print("\nTest MSE :", scores)
    
    output_dir = "/opt/ml/processing/batch"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/score-report.txt"
    with open(evaluation_path, 'w') as writer:
        writer.write(f"Test MSE : {scores}")

Aquí utilizaremos SageMaker Processing para realizar la puntuación por lotes.

In [None]:
batch_scorer = SKLearnProcessor(
                    framework_version=framework_version,
                    instance_type=batch_instance_type,
                    instance_count=batch_instance_count,
                    base_job_name="tf-2-workflow-batch",
                    sagemaker_session=sess,
                    role=role )

step_batch = ProcessingStep(
                    name="TF2WorkflowBatchScoring",
                    processor=batch_scorer,
                    inputs=[
                        ProcessingInput(
                            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
                            destination="/opt/ml/processing/model"
                        ),
                        ProcessingInput(
                            source=step_process.properties.ProcessingOutputConfig.Outputs[
                                "test"
                            ].S3Output.S3Uri,
                            destination="/opt/ml/processing/test"
                        )
                    ],
                    outputs=[
                        ProcessingOutput(output_name="batch", source="/opt/ml/processing/batch"),
                    ],
                    code="./batch-score.py" )

### Creando y ejecutando el pipeline <a class="anchor" id="CreatingExecutingPipeline">

Una vez definidos todos los pasos del pipeline, podemos definir el propio pipeline como un objeto `Pipeline` que comprende una serie de esos pasos.  También son posibles los pasos paralelos y condicionales.

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"TF2Workflow"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        processing_instance_type, 
        processing_instance_count, 
        training_instance_type, 
        training_instance_count,
        batch_instance_type,
        batch_instance_count
    ],
    steps=[step_process, 
        step_train, 
        step_create_model,
        step_batch
    ],
    sagemaker_session=sess
)

Podemos inspeccionar la definición del pipeline en formato JSON

In [None]:
import json

definition = json.loads(pipeline.definition())
definition

After upserting its definition, we can start the pipeline with the `Pipeline` object's `start` method:

Cuando estemos de acuerdo con la definición producida, podemos iniciar el pipeline con el método `start` del objeto `Pipeline`

In [None]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()

Ahora podemos confirmar que el pipeline se está ejecutando. En la salida del log de la siguiente celda confirmamos que `PipelineExecutionStatus` es igual a `Executing`.

In [None]:
execution.describe()

#### Revisar el Pipeline

After the pipeline started executing, you can view the pipeline run. 

To view them, choose the SageMakers Components and registries button.
On the Components and registires drop down, select Pipelines.

Una vez que el pipeline ha comenzado a ejecutarse, podemos ver su ejecución en la consola Web.

Para verlo, seleccionamos el botón Componentes y registros de SageMaker. En el menú desplegable Componentes y registros, seleccionamos Pipelines.

![sageMakers_components_and_registries_button](../media/img/pipelines_execution_1.png)

Hacemos click en el pipeline `TF2Workflow`, y después doble click sobre su ejecución.

![click_the_pipeline_execution](../media/img/pipelines_execution_2.png)

Ahora podemos ver el pipeline ejecutándose. Podemos hacer click en el step `TF2Process` para ver detalles adicionales.

![view_pipeline_execution](../media/img/pipelines_execution_3.png)

En este step específico, podremos ver el output, logs e información adicional.

![view_step_details](../media/img/pipelines_execution_4.png)

Normalmente, este proceso debería tardar unos 10 minutos en completarse. Podemos esperar a que se complete invocando `wait()`. Una vez completada la ejecución, podemos listar el estado de los pasos del pipeline.

In [None]:
execution.wait()
execution.list_steps()

### Comprobación del informe de resultados

Una vez completado el trabajo de puntuación por lotes en el pipeline, éste se sube a S3. Por simplicidad este informe simplemente indica el MSE de la prueba, pero generalmente estos informes pueden incluir tantos detalles como se desee. También pueden ser formateados para su uso en pasos de aprobación condicional en pipelines de SageMaker. Por ejemplo, el pipeline podría tener un paso condicional que sólo permite continuar si el MSE es inferior a un umbral determinado.

In [None]:
report_path = f"{step_batch.outputs[0].destination}/score-report.txt"
!aws s3 cp {report_path} ./score-report.txt --quiet && cat score-report.txt

## Seguimiento del linaje  <a class="anchor" id="LineageOfPipelineArtifacts">

El seguimiento del linaje de SageMaker crea y almacena información sobre los pasos de un flujo de trabajo de ML desde la preparación de los datos hasta el despliegue del modelo. Con la información de seguimiento podemos reproducir los pasos del flujo de trabajo, realizar un seguimiento del linaje del modelo y del Dataset y establecer estándares de auditoría y gobernanza del modelo.

Comprobemos ahora el linaje del modelo generado por el pipeline anterior. La tabla de linaje identifica los recursos utilizados en el entrenamiento, incluyendo las fuentes de datos de entrenamiento y de prueba con marca de tiempo, y la versión específica del contenedor TensorFlow 2 en uso durante el trabajo de entrenamiento.  

In [None]:
from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())

for execution_step in reversed(execution.list_steps()):
    if execution_step['StepName'] == 'TF2WorkflowTrain':
        display(viz.show(pipeline_execution_step=execution_step))

## Extensiones <a class="anchor" id="Extensions">

We've covered a lot of content in this notebook:  SageMaker Processing for data transformation, Automatic Model Tuning, and SageMaker hosted training and inference.  These are central elements for most deep learning workflows in SageMaker.  Additionally, we examined how SageMaker Pipelines helps automate deep learning workflows after completion of the prototyping phase of a project.

Besides all of the SageMaker features explored above, there are many other features that may be applicable to your project.  For example, to handle common problems during deep learning model training such as vanishing or exploding gradients, **SageMaker Debugger** is useful.  To manage common problems such as data drift after a model is in production, **SageMaker Model Monitor** can be applied.

Hemos cubierto mucho contenido en este notebook:

Procesamiento de SageMaker para la transformación de datos, ajuste automático de modelos y entrenamiento e inferencia alojados en SageMaker. Estos elementos son centrales para la mayoría de los workflows de ML en SageMaker.

Además, hemos examinado como automatizar dichos workflows con SageMaker Pipelines tras completar la fase de creación de prototipos de un proyecto.

Además de todas las características de SageMaker anteriores, hay muchas otras características que pueden ser aplicables a nuestros proyecto. Por ejemplo, para solventar problemas comunes durante el entrenamiento del modelo, como problemas con gradientes, es útil **SageMaker Debugger**. Para gestionar problemas como el data drift tras poner un modelo en producción podemos usar **SageMaker Model Monitor**.