In [None]:
import sys

! {sys.executable} -m pip install sagemaker -U
! {sys.executable} -m pip install sagemaker-utils -U
! {sys.executable} -m pip install datetime
! {sys.executable} -m pip install matplotlib.pyplot
! {sys.executable} -m pip install imblearn


In [None]:
! wget http://amazon-sagemaker.com/dependencies/dependencies.zip -O dependencies.zip
! unzip -o dependencies.zip

In [None]:
import os
import datetime
import sagemaker
import sagemaker_utils
import numpy as np
import matplotlib.pyplot as plt
from time import gmtime, strftime
from sklearn.metrics import confusion_matrix
from sagemaker import Session, get_execution_role
from sagemaker.estimator import Estimator
from sagemaker.processing import Processor, ProcessingInput, ProcessingOutput
from sagemaker.tuner import HyperparameterTuner, ContinuousParameter, IntegerParameter, CategoricalParameter
from sagemaker.inputs import TrainingInput, CreateModelInput, TransformInput
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep, TransformStep
from sagemaker.workflow.parameters import ParameterString, ParameterFloat
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

In [5]:
#Verificamos que la versión del SDK de SageMaker sea 2.76.0 o superior.

sagemaker.__version__

'2.88.2'

In [None]:
session = Session()
sagemaker_role = get_execution_role()

data_file = 'dataset_credit_risk.csv'

region = session.boto_region_name
account_id = session.account_id()
bucket = session.default_bucket()

prefix = 'credit_risk'
datasets_prefix = f'{prefix}/datasets'
processed_data_prefix = f'{prefix}/processed'
eval_prefix = f'{prefix}/eval'
transformed_data_prefix = f'{prefix}/transformed'
images_directory = f'{prefix}/images'
code_prefix = f'{prefix}/code'
model_prefix = f'{prefix}/models'

In [None]:
#Para crear los contenedores Docker utilizaremos el servicio AWS Code Build

secret_name = 'dockerhub'
sagemaker_utils.create_secret(secret_name,'gonzalosaravia','demo')

In [None]:
#Necesitaremos un rol de ejecución para ser utilizado en el proyecto de AWS Code Build. 
# Si estamos ejecutando el Notebook con permisos suficientes para crear un rol de IAM, 
# podemos crear el rol simplemente ejecutando el siguiente método, de lo contrario tendría que ser creado de forma manual.

In [None]:
policy_document={
        "Version": "2012-10-17",
        "Statement": [               
            {
                "Effect": "Allow",
                "Action": [
                    "ecr:BatchCheckLayerAvailability",
                    "ecr:CompleteLayerUpload",
                    "ecr:GetAuthorizationToken",
                    "ecr:InitiateLayerUpload",
                    "ecr:PutImage",
                    "ecr:UploadLayerPart",
                    "ecr:BatchGetImage",
                    "ecr:GetDownloadUrlForLayer",
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents",
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:GetObjectVersion",
                    "secretsmanager:GetSecretValue"
                ],
                "Resource": "*"
            }
        ]
    }

codebuild_role = sagemaker_utils.create_codebuild_execution_role('CodeBuildExecutionRole', policy_document)

In [None]:
#Especificamos las dependencias requeridas para cada uno de los contenedores Docker que crearemos.

In [None]:
docker_images = {'Processing':{'libraries':{'pandas':'1.2.4',
                                            'numpy':'1.20.2',
                                            'scikit-learn':'0.24.2'}},
                 'Training':{'libraries':{'pandas':'1.2.4',
                                          'numpy':'1.20.2',
                                          'scikit-learn':'0.24.2',
                                          'sagemaker-training':'3.9.2'}},
                 'Inference':{'libraries':{'pandas':'1.2.4',
                                           'numpy':'1.20.2',
                                           'scikit-learn':'0.24.2',
                                           'multi-model-server':'1.1.8',                            
                                           'sagemaker-inference':'1.5.11',
                                           'boto3':'1.21.43',
                                           'itsdangerous':'2.0.1'},
                              'dependencies':[('serving','/opt/ml/serving')],
                              'others':['RUN pip install -e /opt/ml/serving',
                                        'LABEL com.amazonaws.sagemaker.capabilities.multi-models=false',
                                        'LABEL com.amazonaws.sagemaker.capabilities.accept-bind-to-port=true'],
                              'entrypoint':['python','/opt/ml/serving/custom_inference/serving.py'],
                              'cmd':['serve']}}

In [None]:
#Creamos y publicamos las imágenes Docker en Amazon Elastic Container Registry 
# para posteriormente poder ser utilizados en los jobs que crearemos y lanzaremos en Amazon SageMaker.

In [None]:
for image in docker_images:
    parameters = {'image_name': f'{prefix}-{image.lower()}',
                  'base_image': 'python:3.7.6-slim-buster',
                  's3_path': f's3://{bucket}/{images_directory}',
                  'role': codebuild_role,  
                  'secret': secret_name,
                  'wait': False}
    
    parameters.update(docker_images[image])
    
    docker_images[image]['build_id'] = sagemaker_utils.create_docker_image(**parameters)

In [None]:
#Debido a que la creación de los containers ocurre de manera asíncrona,
# esperamos a que termine la creación de los tres contenedores

In [None]:
image_uris = sagemaker_utils.wait_for_build([docker_images[image]['build_id'] for image in docker_images])
for image in docker_images:
    docker_images[image]['image_uri'] = image_uris[docker_images[image]['build_id']]

In [None]:
#Preparación de los datos

In [None]:
#primero debemos subir los datos a Amazon S3.
# Es lo que haremos en este capítulo y posteriormente crearemos un Job de Procesamiento en SageMaker
# el cual nos permitirá realizar las transformaciones necesarias a nuestro dataset como preparación
# para el entrenamiento de nuestros modelos de Machine Learning.

Los jobs de procesamiento de Amazon SageMaker pueden ser utilizados para preparar los datasets o evaluar modelos de Machine Learning. ingeniería de características, validación de datos, evaluación de modelos e interpretación de modelos. Incluso Amazon SageMaker Processing pudiera ser utilizado para evaluar el desempeño de un modelo una vez que este ha sido desplegado.

SageMaker Processing Job

Para crear un job de procesamiento utilizando Amazon SageMaker Processing, se debe proporcionar la siguiente información:

Script Python a ser utilizado por el job de procesamiento
Los recursos de cómputo que queremos que Amazon SageMaker utilice para el procesamiento
La URL del bucket de Amazon S3 en dónde se encuentran los datos que serán utilizados en el job de procesamiento
La URL del bucket de Amazon S3 en el cual queremeos que se guarde el resultado del job de procesamiento
La ruta de Amazon Elastic Container Registry de la imagen Docker a ser utilizada para ejecutar el script del job de procesamiento. Puede ser alguno de los proporcionados por SageMaker o una imagen Docker creada personalizada
La infraestructura utilizada durante la ejecución del job de procesamiento es totalmente administrada por Amazon SageMaker. Los recursos del clúster se aprovisionan para la duración del job y eliminados posteriormente. El resultado del job de procesamiento es almacenado en el bucket de Amazon S3 especificado.


In [None]:
data_s3_path = sagemaker_utils.upload(data_file, f's3://{bucket}/{datasets_prefix}')

Para crear un Job de procesamiento de Amazon SageMaker primero crearemos un script python el cual nombraremos processing.py y tendrá toda la lógica necesaria para realizar las mismas transformaciones que en el Jupyter Notebook de ejemplo descargado

In [None]:
data_prep_script_file = 'code/data_prep.py'
sagemaker_utils.make_dirs(data_prep_script_file)

In [None]:

    # Cargar dataset
    df = pd.read_csv(data_path)

    df = df.sort_values(by=["id", "loan_date"])
    df = df.reset_index(drop=True)
    df["loan_date"] = pd.to_datetime(df.loan_date)

    #Feature nb_previous_loans
    df_grouped = df.groupby("id")
    df["nb_previous_loans"] = df_grouped["loan_date"].rank(method="first") - 1

    # Feature avg_amount_loans_previous
    df['avg_amount_loans_previous'] = (df.groupby('id')['loan_amount'].apply(lambda x: x.shift().expanding().mean()))

    # Feature age
    from datetime import datetime, date

    df['birthday'] = pd.to_datetime(df['birthday'], errors='coerce')
    df['age'] = (pd.to_datetime('today').normalize() - df['birthday']).dt.days // 365

    # Feature years_on_the_job

    df['job_start_date'] = pd.to_datetime(df['job_start_date'], errors='coerce')
    df['years_on_the_job'] = (pd.to_datetime('today').normalize() - df['job_start_date']).dt.days // 365

    # Feature flag_own_car

    df['flag_own_car'] = df.flag_own_car.apply(lambda x : 0 if x == 'N' else 1)

    # Selección de columnas
    columns = ['id', 'age', 'years_on_the_job', 'nb_previous_loans', 'avg_amount_loans_previous', 'flag_own_car', 'status']
    
    df = df[columns]

    cust_df = df.copy()
    cust_df.fillna(0, inplace=True)

    Y = cust_df['status'].astype('int')

    cust_df.drop(['status'], axis=1, inplace=True)
    cust_df.drop(['id'], axis=1, inplace=True)


    X = cust_df
        
    import pandas as pd

    import matplotlib.pyplot as plt
    %matplotlib inline

    from imblearn.over_sampling import SMOTE
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import (
        accuracy_score, confusion_matrix, recall_score, 
        plot_confusion_matrix, precision_score, plot_roc_curve
    )

    from sklearn.ensemble import RandomForestClassifier

    X_train, X_test, y_train, y_test = train_test_split(X, Y, stratify=Y, test_size=0.3, random_state = 123)

    # Using Synthetic Minority Over-Sampling Technique(SMOTE) to overcome sample imbalance problem.
    X_train, y_train = SMOTE().fit_resample(X_train, y_train)
    X_train = pd.DataFrame(X_train, columns=X.columns)

    # Guardar los dataframes resultantes y el encoder
    X_train.to_csv(os.path.join(output_path, 'train_data', args.train_data_file), index=False)
    y_train.to_csv(os.path.join(output_path, 'train_target', args.train_target_file), index=False)
    X_test.to_csv(os.path.join(output_path, 'test_data', args.test_data_file), index=False)
    y_test.to_csv(os.path.join(output_path, 'test_target', args.test_target_file), index=False)
    to_pkl(encoder, os.path.join(output_path, 'encoder', args.encoder_file))

    print(f'INFO: {script_name}: Finalizando la preparación de los datos')

subimos el script creado a un bucket de Amazon S3

In [None]:
data_prep_script_path = sagemaker_utils.upload(data_prep_script_file, f's3://{bucket}/{code_prefix}')

Utilizaremos uno de los contenedores previamente creados para correr nuestro script que acabamos de crear, para esto utilizamos la clase Processor. Y debemos especificar los recursos requeridos para la instancia en dónde se ejecutará el proceso, así cómo la imagen Docker a utilizar y la ubicación del script Python a ejecutar con la lógica para la preparación del conjunto de datos.

In [None]:
processor = Processor(
    image_uri=docker_images['Processing']['image_uri'],
    role=sagemaker_role,
    instance_count=1,
    instance_type='ml.m5.4xlarge',
    entrypoint=['python3',f'/opt/ml/processing/input/code/{os.path.basename(data_prep_script_file)}'],
    volume_size_in_gb=5,
    max_runtime_in_seconds=60*60*2)# dos horas 


In [None]:
train_data_file = 'train_data.csv'
train_target_file = 'train_target.csv'
test_data_file = 'test_data.csv'
test_target_file = 'test_target.csv'
encoder_file = 'encoder.pkl'

finalmente ejecutamos el Job utilizando el metodo run del objeto creado mediante la clase Processor. Debemos pasar las rutas de los buckets de Amazon S3 tanto para inputs (entradas) como para outputs(salidas). De esta forma SageMaker sabe de dónde tomar los datos de entrada y en dónde colocar los archivos resultantes de ejecutar el Job de procesamiento.

In [None]:
data_prep_parameters = {
    'inputs':[ProcessingInput(input_name='input',
                    source=f's3://{bucket}/{datasets_prefix}',
                    destination='/opt/ml/processing/input'),
              ProcessingInput(input_name='code',
                    source=data_prep_script_path,
                    destination='/opt/ml/processing/input/code')],
    'outputs':[ProcessingOutput(output_name='train_data',
                    source=f'/opt/ml/processing/output/train_data',
                    destination=f's3://{bucket}/{processed_data_prefix}/train_data'),
               ProcessingOutput(output_name='train_target',
                    source=f'/opt/ml/processing/output/train_target',
                    destination=f's3://{bucket}/{processed_data_prefix}/train_target'),
               ProcessingOutput(output_name='test_data',
                    source=f'/opt/ml/processing/output/test_data',
                    destination=f's3://{bucket}/{processed_data_prefix}/test_data'),
               ProcessingOutput(output_name='test_target',
                    source=f'/opt/ml/processing/output/test_target',
                    destination=f's3://{bucket}/{processed_data_prefix}/test_target'),
               ProcessingOutput(output_name='encoder',
                    source=f'/opt/ml/processing/output/encoder',
                    destination=f's3://{bucket}/{processed_data_prefix}/encoder')],
    'arguments':['--test-size', '0.1',
                 '--data-file', 'churn.txt',
                 '--train-data-file', train_data_file,
                 '--train-target-file', train_target_file,
                 '--test-data-file', test_data_file,
                 '--test-target-file', test_target_file,
                 '--encoder-file', encoder_file]}

processor.run(**data_prep_parameters)

Entrenamiento del modelo

In [None]:
Para entrenar un modelo en Amazon SageMaker, se debe crear un job de entrenamiento con la siguiente información:

La URL del bucket de Amazon S3 en dónde se encuentran los datos que serán utilizados para el entrenamiento
Los recursos de cómputo que queremos que Amazon SageMaker utilice para el entrenamiento del modelo. Los recursos de cómputo son instancias para Machine Learning administradas por Amazon SageMaker
La URL del bucket de Amazon S3 en el cual queremos que se guarde el resultado del proceso de entrenamiento
La ruta de Amazon Elastic Container Registry en dónde el código de entrenamiento está almacenado. Para conocer más acerca de las rutas de los algoritmos incluidos en Amazon SageMaker consultar la documentación
Existen las siguientes alternativas para entrenar un modelo utilizando Amazon SageMaker:

Utilizar un algoritmo incluido en Amazon SageMaker - si alguno de estos cubre tus necesidades, basta con proporcionarle los datos e hiperpárametros que este require. Para obtener una lista de los algoritmos incluidos consultar la documentación

Utilizar tu propio código de entrenamiento con algún framework soportado por Amazon SageMaker - los frameworks soportados son: TensorFlow, Pytorch, MXNet, Chainer, Scikit-learn y la biblioteca XGBoost

Utilizar tu propio algoritmo - colocando tu código en una imagen Docker con todas las dependencias necesarias. Para mayor información consultar la documentación

Utilizar algún algoritmo mediante una suscripción a través de AWS Marketplace - para mayor información acerca de como funcionan de estas suscripciones consultar la documentación

En lugar de crear tres scripts distintos para cada algoritmo, crearemos uno sólo el cual a partir de los parámetros recibidos, entrenará el modelo con el algoritmo indicado. Utilizaremos la misma técnica de k-fold Cross-Validation utilizada en el Jupyter Notebook descargado de Introducción .

Adicionalmente, este script nos servirá no solamente para el entrenamiento sino también para el despliegue posterior de nuestro modelo.

In [None]:
training_script_file = 'code/train_and_serve.py'


In [None]:
 %%writefile $training_script_file
import argparse
import pickle
import os
import io
import json
import pandas as pd
import numpy as np
from sklearn.model_selection import cross_validate, StratifiedKFold
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import ExtraTreesClassifier

# Carga el modelo en memoria
def model_fn(model_dir):
    print('Cargando modelo: model_fn')
    clf = read_pkl(os.path.join(model_dir, "model.pkl"))
    return clf

# Deserealiza el body de la petición para poder generar las predicciones
def input_fn(request_body, request_content_type):

    if request_content_type == 'application/json':
        input_data = json.loads(request_body)
        input_data = pd.DataFrame.from_dict(input_data)
        # TODO: Es importante asegurarse de que las columnas se encuentran en el orden adecuado
        return input_data
        
    elif request_content_type == 'text/csv':      
        input_data = io.StringIO(request_body)        
        return pd.read_csv(input_data, header=None)
    else:
        raise ValueError()
                
# Genera la predicción sobre el objeto deserializado, con el modelo previamente cargado en memoria
def predict_fn(input_data, model):
    predict_proba = getattr(model, 'predict_proba', None)
    if callable(predict_proba):
        return predict_proba(input_data)[:, 1]
    else:
        return model.predict(input_data)

# Serializa el resultado de la predicción al correspondiente content type deseado
def output_fn(predictions, response_content_type):
    if response_content_type == 'application/json':        
        return json.dumps(predictions.tolist())
    elif response_content_type == 'text/csv':
        predictions_response = io.StringIO()
        np.savetxt(predictions_response, predictions, delimiter=',')
        return predictions_response.getvalue()
    else:
        raise ValueError("El endpoint del modelo solamente soporta Content-Types: 'application/json' o 'text/csv' como respuesta")        
        
def read_pkl(file):
    with open(file, 'rb') as f:
        return pickle.load(f)
    
def to_pkl(data, file):
    with open(file, 'wb') as f:
        pickle.dump(data, f)

def random_forest(**hyperparameters):
    return RandomForestClassifier(n_jobs=-1, 
                                  min_samples_split=hyperparameters['min_samples_split'],
                                  n_estimators=hyperparameters['n_estimators'],
                                  max_depth=hyperparameters['max_depth'],
                                  max_features=hyperparameters['max_features'])

def invalid_algorithm(**hyperparameters):
    raise Exception('Invalid Algorithm')
    
def algorithm_selector(algorithm, **hyperparameters):
    algorithms = {
        'RandomForest': random_forest
        }
    
    clf = algorithms.get(algorithm, invalid_algorithm)    
    return clf(**hyperparameters)


if __name__=='__main__':
    script_name = os.path.basename(__file__)
    print(f'INFO: {script_name}: Iniciando entrenamiento del modelo')
    
    parser = argparse.ArgumentParser()
    
    parser.add_argument('--output-data-dir', type=str, default=os.environ.get('SM_OUTPUT_DATA_DIR'))
    parser.add_argument('--model-dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
    parser.add_argument('--train-data', type=str, default=os.environ.get('SM_CHANNEL_TRAIN_DATA'))
    parser.add_argument('--train-target', type=str, default=os.environ.get('SM_CHANNEL_TRAIN_TARGET'))
    
    parser.add_argument('--algorithm', type=str)
    parser.add_argument('--splits', type=int, default=10)
    parser.add_argument('--target-metric', type=str)
    
    parser.add_argument('--learning-rate', type=float)
    parser.add_argument('--min-samples-split', type=int)
    parser.add_argument('--n-estimators', type=int)
    parser.add_argument('--max-depth', type=int)
    parser.add_argument('--max-features', type=int)
    
            
    args, _ = parser.parse_known_args()
    
    print(f'INFO: {script_name}: Parametros recibidos: {args}')

    # Cargar datasets
    files = os.listdir(args.train_data)
    if len(files) == 1:
        train_data = pd.read_csv(os.path.join(args.train_data, files[0]))
    else:
        raise Exception('Mas de un archivo recibido para el channel Data')
    
    files = os.listdir(args.train_target)
    if len(files) == 1:
        train_target = pd.read_csv(os.path.join(args.train_target, files[0]))
        train_target = train_target['Churn'].tolist()
    else:
        raise Exception('Mas de un archivo recibido para el channel Target')

     
    clf = algorithm_selector(args.algorithm,
                             min_samples_split=args.min_samples_split,
                             n_estimators=args.n_estimators,
                             max_depth=args.max_depth,
                             max_features=args.max_features)
    
    skf = StratifiedKFold(n_splits=args.splits)    
    cv_scores = cross_validate(clf, train_data, train_target, cv=skf, scoring=args.target_metric, n_jobs=-1)
    print('{} = {}%'.format(args.target_metric, cv_scores['test_score'].mean().round(4)*100))
    
    # Entrenar el modelo
    clf.fit(train_data, train_target) 
    
    # Guardar modelo
    to_pkl(clf, os.path.join(args.model_dir, 'model.pkl'))

    print(f'INFO: {script_name}: Finalizando el entrenamiento del modelo')   



Empaquetamos el script en un archvio .tar.gz y lo subimos a un bucket de Amazon S3 para poder utilizarlo en el job de entrenamiento y posteriormente para el despliegue del modelo.

In [None]:
training_script_tar_file = os.path.join('code',os.path.splitext(os.path.basename(training_script_file))[0] + '.tar.gz')

sagemaker_utils.create_tar_gz(training_script_file, training_script_tar_file)

training_script_path = sagemaker_utils.upload(training_script_tar_file, f's3://{bucket}/{code_prefix}')


Para crear el Job de entrenamiento en Amazon SageMaker utilizamos la clase CustomEstimator (descargada con las dependencias) para crear un Estimator el cual permita integrar nuestro script train_and_serve.py con el contenedor Docker que previamente creamos para el entrenamiento de nuestros modelos.

También utilizaremos la expresión regular recall = (\d+\.\d{1,2})? para definir una métrica de desempeño de nuestro algoritmo, en este caso por tratarse de un problema de riesgo, lo que buscamos es incrementar el Recall, esta métrica es calculada mediante el uso de k-Fold Cross-Validation. Amazon SageMaker aplica esta expresión regular a los mensajes de la salida estándar. Posteriormente mediante el uso de esta métrica podremos crear un Job de optimización de hiperparámetros.

Para ejecutar los procesos de entrenamiento lo haremos llamando el método fit del estimator previamente creado.

We’ve identified earlier that we are dealing with an imbalanced dataset and so we need to make sure we’re using the appropriate evaluation metrics for our case. For this reason, we’ll be looking at the common Accuracy metric with a grain of salt. To illustrate why this is the case, accuracy calculates the ratio of total truly predicted values to the total number of input samples, meaning that our model would get pretty high accuracy by predicting the majority class but would fail to capture the minority class, default, which is no bueno. This is why the evaluation metrics that we’ll be focusing on to assess the classification performance of our models are Precision, Recall and F1 score.

Firstly, Precision gives us the ratio of true positives to the total positives predicted by a classifier where positives denote default cases in our context. Given that they’re the minority class in our dataset, we can see that our models do a good job at correctly predicting those minor instances. Moreover, Recall, a.k.a true positive rate, gives us the number of true positives divided by the total number of elements that actually belong to the positive class. In our case, Recall is a more important metric as opposed to Precision given that we’re more concerned with false negatives (our model predicting that someone is not gonna default but they do) than false positives (our model predicting that someone is gonna default but they don’t). Lastly, F1 Score provides a single score to measure both Precision and Recall. Now that we know what to look for, we can clearly see that XGboost performs the best across all 3 metrics. Although it scored better on Precision as opposed to Recall, it still has a pretty good F1 score of 0.81.

We’ll now have a look at ROC which is a probability curve with False Positive Rate (FPR) on the x-axis and True Positive Rate (TPR, recall) on the y-axis. The best model should maximize the TPR to 1 and minimize FPR to 0. With this said, we can compare classifiers using the area under the curve of the ROC curve, AUC, where the higher its value, the better the model is at predicting 0s as 0s and 1s as 1s.

In [None]:
estimators = {'RandomForest':{}}
metric_name = 'cross-val:recall'
metric_regex = 'recall = (\d+\.\d{1,2})?'

for algorithm in estimators:   
    estimators[algorithm] = Estimator(
        image_uri = docker_images['Training']['image_uri'],        
        entry_point = os.path.basename(training_script_file),
        source_dir = training_script_path,
        role = sagemaker_role,
        instance_count = 1,
        instance_type = 'ml.m5.xlarge',
        output_path = f's3://{bucket}/{model_prefix}',
        metric_definitions = [{'Name': metric_name, 'Regex': metric_regex}],
        volume_size = 5,
        max_run = 60*60*2, # dos horas
        hyperparameters={
            'algorithm':algorithm,
            'splits':5,
            'target-metric':'recall',
            'learning-rate': 0.1, 
            'min-samples-split': 3, 
            'n-estimators': 300,
            'max-depth': 25,
            'max-features':20})
    
    estimators[algorithm].fit(
        {'train_data': sagemaker_utils.get_processor_output_path(processor, 'train_data'),
        'train_target': sagemaker_utils.get_processor_output_path(processor, 'train_target')},
        wait=False)


Para esperar a que terminen de ejecutarse los procesos de entrenamiento en Amazon SageMaker, podemos utilizar el siguiente método:

In [None]:
sagemaker_utils.wait_for_training_jobs(estimators)

In [None]:
for estimator in estimators:
    metrics = estimators[estimator].training_job_analytics.dataframe()
    test_recall = metrics[metrics['metric_name'] == metric_name]['value'].values[0]
    print(f'{estimator}: cross-val:recall = {test_recall}%')


Optimización de hiperparámetros

Amazon SageMaker nos ofrece la funcionalidad de optimización de hiperpárametros, ya sea utilizando una búsqueda aleatoria o un método bayesiano, en este caso vamos a utilizar el segundo método el cual permite entrenar un modelo de forma iterativa e ir identificando que combinación de hiperparámetros, nos permite minimizar o maximizar más la métrica objetivo

In [None]:
tuners = {}

total_jobs = 16
parallel_jobs = 2

tuners['RandomForest'] = HyperparameterTuner(
        estimator=estimators['RandomForest'],
        objective_metric_name=metric_name,
        objective_type='Maximize',
        hyperparameter_ranges={'min-samples-split': IntegerParameter(3,10), 
                               'n-estimators': IntegerParameter(150,300),
                               'max-depth': IntegerParameter(20,35),
                               'max-features': IntegerParameter(15,30)},
        metric_definitions=[{'Name': metric_name, 
                              'Regex': metric_regex}],
        max_jobs=total_jobs,
        max_parallel_jobs=parallel_jobs)

La variable total_jobs especifíca el número total de procesos de entrenamiento (combinaciones distintas de valores de hiperparámetros) a ejecutar y la variable parallel_jobs especifíca el número máximo de procesos a ejecutar en paralelo.

Para ejecutar cada uno de los procesos utilizamos el método fit() pasando como parámetros la ubicación de los datasets y mediante el parámetro wait=False es que le indicamos que ejecute el proceso de manera asíncrona (sin esperar a que cada uno de los procesos termine).

In [None]:
for tuner in tuners:
    tuners[tuner].fit({'train_data': sagemaker_utils.get_processor_output_path(processor, 'train_data'),
                       'train_target': sagemaker_utils.get_processor_output_path(processor, 'train_target')}, 
                      job_name= f'{prefix}-{tuner}-{strftime("%M-%S", gmtime())}',
                      wait=False)

In [None]:
sagemaker_utils.wait_for_optmimization_jobs(tuners)

Mediante la invocación del método describe() podemos obtener el proceso de entrenamiento cuyo modelo resultante obtuvo el mejor desempeño, en este caso maximizando la métrica objetivo Recall. Posteriormente de los metadatos devueltos del mejor proceso de entrenamiento, podemos obtener los valores de los hiperparámetros así cómo el valor obtenido en la métrica objetivo.

In [None]:
hyperparameters = {}
for tuner in tuners:
    best_training_job = tuners[tuner].describe()['BestTrainingJob']
    objective_metric = best_training_job['FinalHyperParameterTuningJobObjectiveMetric']
    
    hyperparameters[tuner] = best_training_job['TunedHyperParameters']
    print(tuner)
    print(f"\thyper parameters: {hyperparameters[tuner]}")
    print(f"\t{objective_metric['MetricName']}: {objective_metric['Value']}\n")

Ahora que tenemos el mejor candidato (modelo con mejor desempeño) para cada uno de los tres algoritmos, podemos pasar a comparar el desempeño entre estos para seleccionar el mejor.

In [None]:
Evaluación de desempeño

Al igual que cómo hicimos con la preparación de los datos, utilizaremos un Processing Job de Amazon SageMaker pero en este caso para buscar el umbral de clasificación, es decir el modelo nos devuelve la probabilidad de que el cliente haga Churn y lo que queremos en este caso es encontrar el valor a partir del cual clasificaremos como 1 (churn) o 0 (no-churn). Esto lo haremos buscando maximizar la métrica Recall pero manteniendo un mínimo valor para la métrica Precision, que recibiremos como parámetro.

En el caso del Jupyter Notebook descargado podemos encontrar que esto se logra mediante la invocación de la siguiente función:

En la que básicamente se obtienen las predicciones del modelos y posteriormente mediante el uso de la función precision_recall_curve() del Framework Scikit-learn y después a través del método argmax() de la librería numpy obtenemos el punto en el cual la métrica Recall se maximiza manteniendo el mínimo de la métrica Precision buscado.

El programa que crearemos para el Processing Job implementará una lógica similar y al final guardaremos los umbrales encontrados para cada modelo en un archivo separado por comas.

In [None]:
evaluate_models_script_file = 'code/evaluate_models.py'


In [None]:
%%writefile $evaluate_models_script_file
import argparse
import pickle
import os
import json
import tarfile
import pandas as pd
import numpy as np

from sklearn.metrics import precision_recall_curve

def load_model(file, model_file='model.pkl'):
    if file.endswith('tar.gz'):
        with tarfile.open(file, 'r:gz') as tar:
            for name in tar.getnames():
                if name == model_file:
                    f = tar.extractfile(name)
                    return pickle.load(f)
            return None
    elif file.endswith('pkl'):
        with open(file, 'rb') as f:
            return pickle.load(f)
    else:
        return None

if __name__=='__main__':
    script_name = os.path.basename(__file__)
    print(f'INFO: {script_name}: Iniciando la evaluación de los modelos')
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--algos', type=str, required=True)
    parser.add_argument('--min-precision', type=float, required=True)    
    parser.add_argument('--test-data-file', type=str, required=True)
    parser.add_argument('--test-target-file', type=str, required=True)
    parser.add_argument('--thresholds-file', type=str, required=True)   
    parser.add_argument('--metrics-report-file', type=str, required=True)    
    
    args, _ = parser.parse_known_args()    
    
    print(f'INFO: {script_name}: Parámetros recibidos: {args}')
    
    input_path = '/opt/ml/processing/input'
    output_path = '/opt/ml/processing/output'
    
    # Cargar datasets
    test_target_path = os.path.join(input_path, 'target', args.test_target_file)     
    test_target = pd.read_csv(test_target_path)
    
    test_data_path = os.path.join(input_path, 'data', args.test_data_file)     
    test_data = pd.read_csv(test_data_path)
    
    # Umbrales de decision por algoritmo
    algo_metrics = {'Algorithm':[], 'Threshold':[], 'Precision':[], 'Recall':[]}
    
    metrics_report = {}
    
    algos = args.algos.split(',')
    for algo in algos:
        model_path = os.path.join(input_path, algo, 'model.tar.gz')         

        # Carga modelo en memoria
        print(f'Cargando modelo: {model_path}')
        clf = load_model(model_path)
        
        # Obtiene predicciones con dataset para pruebas
        predictions = clf.predict_proba(test_data)[:, 1]
        
        # Busca umbral de decision
        precision, recall, thresholds = precision_recall_curve(test_target, predictions)
        operating_point_idx = np.argmax(precision>=args.min_precision)
        
        algo_metrics['Threshold'].append(thresholds[operating_point_idx])
        algo_metrics['Precision'].append(precision[operating_point_idx])
        algo_metrics['Recall'].append(recall[operating_point_idx])
        algo_metrics['Algorithm'].append(algo)
        
        metrics_report[algo] = {
            'precision': {'value': precision[operating_point_idx], 'standard_deviation': 'NaN'},
            'recall': {'value': recall[operating_point_idx], 'standard_deviation': 'NaN'}}
         
    
    # Guardar Thresholds    
    metrics = pd.DataFrame(algo_metrics)
    print(f'INFO: {script_name}: Thresholds encontrados')
    print(metrics)
    metrics.to_csv(os.path.join(output_path, args.thresholds_file), index=False)    
    
    # Guardar reporte de metricas para cada modelo
    for algo in metrics_report:
        with open(os.path.join(output_path, f'{algo}_metrics.json'), 'w') as f:
            json.dump({'binary_classification_metrics':metrics_report[algo]},f)        

    
    print(f'INFO: {script_name}: Finalizando la evaluación de los modelos')


Subimos el script a un bucket de Amazon S3 para poder utilizarlo con el Processing Job

In [None]:
evaluate_models_script_path = sagemaker_utils.upload(evaluate_models_script_file, f's3://{bucket}/{code_prefix}')


Ya que tenemos el script listo en S3, podemos pasar a crear el Processing Job

Al igual que lo hicimos cuando creamos el Processing Job para la preparación de los datos, en este caso creamos otro nuevamente utilizando la clase Processor.

In [None]:
evaluation_processor = Processor(
    image_uri=docker_images['Processing']['image_uri'],
    role=sagemaker_role,
    instance_count=1,
    instance_type='ml.m5.large',
    entrypoint=['python3',f'/opt/ml/processing/input/code/{os.path.basename(evaluate_models_script_file)}'],
    volume_size_in_gb=5,
    max_runtime_in_seconds=60*60*2)# dos horas 


ejecutamos el proceso con el método run()

In [None]:
thresholds_file = 'thresholds.csv'
metrics_report_file = 'metrics_report.json'

eval_parameters = {
    'inputs':[ProcessingInput(
                  input_name='code',
                  source=evaluate_models_script_path,
                  destination='/opt/ml/processing/input/code'),
              ProcessingInput(
                  source=sagemaker_utils.get_processor_output_path(processor, 'test_target'), 
                  destination='/opt/ml/processing/input/target'),
              ProcessingInput(
                  source=sagemaker_utils.get_processor_output_path(processor, 'test_data'), 
                  destination='/opt/ml/processing/input/data'),
              ProcessingInput(
                  source=sagemaker_utils.get_tuner_best_model_artifacts_path(tuners['GradientBoosting']), 
                  destination='/opt/ml/processing/input/GradientBoosting'),
              ProcessingInput(
                  source=sagemaker_utils.get_tuner_best_model_artifacts_path(tuners['RandomForest']),
                  destination='/opt/ml/processing/input/RandomForest'),
              ProcessingInput(
                  source=sagemaker_utils.get_tuner_best_model_artifacts_path(tuners['ExtraTrees']), 
                  destination='/opt/ml/processing/input/ExtraTrees')],
    'outputs':[ProcessingOutput(
                   output_name='eval',
                   source='/opt/ml/processing/output',
                   destination=f's3://{bucket}/{eval_prefix}')],
    'arguments':['--algos', ','.join(estimators.keys()),
                 '--min-precision', '0.85',
                 '--test-data-file', test_data_file,
                 '--test-target-file', test_target_file,
                 '--thresholds-file', thresholds_file,
                 '--metrics-report-file', metrics_report_file]}

evaluation_processor.run(**eval_parameters)


Para seleccionar automaticamente el mejor modelo podemos descargar uno de los archivos resultantes del proceso de evaluación, en el cual se almacenaron los umbrales de decisión y seleccionar aquel que haya obtenido el valor más alto de Recall

In [None]:
thresholds_path = sagemaker_utils.get_processor_output_path(evaluation_processor,'eval')
metrics = sagemaker_utils.read_csv(f'{thresholds_path}/{thresholds_file}')

max_recall = metrics[metrics['Recall']==metrics['Recall'].max()]
best_model_found = max_recall.loc[max_recall['Precision'].idxmax()]

best_model_found

Despliegue del mejor modelo

Desplegar el modelo exponiendolo como un endpoint el cual permita recibir peticiones HTTPs tipo REST mediante el uso de Amazon SageMaker

Desplegar un modelo con Amazon SageMaker consiste en tres pasos:

Crear un modelo en SageMaker – al crear un modelo, se le especifica a Amazon SageMaker en dónde encontrar los componentes del modelo. Esto incluye la ruta del bucket de Amazon S3 en dónde los artefactos del modelo se encuentran almacenados. Estos deben estar empaquetados en un archivo con nombre model.tar.gz. Así como la ruta de Amazon Elastic Container Registry en dónde se encuentra la imagen Docker que contiene el código para la inferencia
Crear una configuración para el endpoint – se especifíca el nombre del modelo a utilizar y las instancias de cómputo para Machine Learning que se desea que Amazon SageMaker lance para hostear el modelo. Es posible configurar el endpoint para que automáticamente escale el número de instancias aprovisionadas. Cuando se especifican dos o más instancias, Amazon SageMaker las lanza en Zonas de Disponibilidad distintas y gestiona el reemplazo de las instancias cuando es necesario, esto asegura una disponibilidad continua
Crear un endpoint HTTPS – a través de la configuración del endpoint proporcionada, Amazon SageMaker aprovisiona las instancias de cómputo requeridas y despliega el modelo conforme a la especificación de la configuración. Para obtener predicciones, la aplicación cliente puede realizar una petición al endpoint a través del protocolo HTTPS.

Para hacer el deployment del modelo seleccionado basta con invocar el método deploy() del objeto Estimator, del proceso de entrenamiento que mejor desempeño tuvo para ese algoritmo, proporcionando los siguientes parámetros:

endpoint_name – nombre del endpoint a crear
initial_instance_count – número inicial de instancias a utilizar en el clúster del endpoint
instance_type – tipo de instancia(s) a utilizar
image_uri – imagen Docker a utilizar. En este caso la creada para el despliegue del modelo

In [None]:
endpoint_name = f'{prefix}-best-model-{strftime("%M-%S", gmtime())}'

best_estimator = tuners[best_model_found['Algorithm']].best_estimator()

predictor = best_estimator.deploy(endpoint_name=endpoint_name,
                                  initial_instance_count=1, 
                                  instance_type='ml.m5.large',
                                  entry_point = os.path.basename(training_script_file),
                                  source_dir = training_script_path,
                                  image_uri=docker_images['Inference']['image_uri'])

print(f'\nEndpoint Name: {endpoint_name}')


Obtención de predicciones

Obtener predicciones mediante la invocación del endpoint creado a partir del despligue del modelo realizado, con la finalidad de evaluar el desempeño del modelo

Antes de realizar las peticiones al endpoint, descargaremos de Amazon S3 el dataset que utilizaremos para esta prueba. Para esto utilizaremos el método S3Downloader.download indicandole la ruta del archivo en el bucket de Amazon S3 así como la ruta local en dónde queremos que sea descargado el archivo.

In [None]:
test_data = sagemaker_utils.read_csv(f'{sagemaker_utils.get_processor_output_path(processor, "test_data")}/{test_data_file}')
test_target = sagemaker_utils.read_csv(f'{sagemaker_utils.get_processor_output_path(processor, "test_target")}/{test_target_file}')

test_data.head()


Una vez que tenemos el dataset que utilizaremos para obtener predicciones del endpoint, podemos realizar peticiones a este utilizando el cliente de runtime de Amazon SageMaker mediante el uso del SDK para Python de AWS llamado boto3.

El código a continuación es completamente independiente y podría ser ejecutado desde cualquier otro servicio de AWS, como por ejemplo AWS Lambda o Amazon EC2. O desde la PC local, instalando previamente el SDK boto3.

In [None]:
import boto3
import json

sagemaker_runtime = boto3.client('sagemaker-runtime')

request_body = json.dumps(test_data.values.tolist())

response = sagemaker_runtime.invoke_endpoint(EndpointName = endpoint_name,
                                             ContentType = 'application/json',
                                             Body = request_body)

predictions = json.loads(response['Body'].read())

Una vez que hemos obtenido las predicciones utilizando el dataset, podemos comparar estas contra las etiquetas reales y medir el desempeño

Comparar predicción vs real

In [None]:
def plot_confusion_matrix(test_set, predictions, classes, title):
    classes=np.array(classes)
    cm = confusion_matrix(test_set, predictions)
    
    fig, (ax1,ax2) = plt.subplots(1,2, gridspec_kw={'width_ratios': [2, 3]})
    im = ax1.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    ax1.figure.colorbar(im, ax=ax1)
    
    # We want to show all ticks...
    ax1.set(xticks=np.arange(cm.shape[1]),
           yticks=np.arange(cm.shape[0]),
           # ... and label them with the respective list entries
           xticklabels=classes, yticklabels=classes,
           title="Confusion Matrix",
           ylabel='True label',
           xlabel='Predicted label')

    # Rotate the tick labels and set their alignment.
    plt.setp(ax1.get_yticklabels(), rotation=90, ha="center", rotation_mode="anchor")

    # Loop over data dimensions and create text annotations.
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax1.text(j, i, format(cm[i, j], 'd'),
                    ha="center", va="center",
                    color="white" if cm[i, j] > thresh else "black")
    
    tn, fp, fn, tp = cm.ravel()
    accuracy = (tp+tn)/(tn+fp+fn+tp)
    precision = tp/(fp+tp)
    recall = tp/(fn+tp)
    specifity = tn/(tn+fp)
    
    ax2.axis('off')
    ax2.text(0,0.9, s='The overall model accuracy is {}% [ACCURACY]'.format(round(accuracy*100,2)), 
             size='12', ha='left', va='center')
    
    ax2.text(0,0.7, s='Out of the customers the model predicted as will churn, {}% will actually churn [PRECISION]'.format(round(precision*100,2)), 
             size='12', ha='left', va='center')
    
    ax2.text(0,0.5, s='The model will catch {}% of the customers who will actually churn [RECALL / SENSITIVITY]'.format(round(recall*100,2)), 
             size='12', ha='left', va='center')
    
    ax2.text(0,0.3, s='The model will catch {}% of the customers who will actually NOT churn [SPECIFITY]'.format(round(specifity*100,2)), 
             size='12', ha='left', va='center')
    
    fig.set_figheight(3)
    fig.set_figwidth(16)
    fig.suptitle(title, fontsize=16)
    plt.show()


Antes de poder graficar la matriz de confusión, debemos cambiar las predicciones obtenidas las cuales representan el porcentaje de probabilidad de hacer Churn, por un 0 o un 1 utilizando el umbral obtenido previamente.

In [None]:
decision_threshold = best_model_found['Threshold']
predictions=[1 if prediction >= decision_threshold else 0 for prediction in predictions]


In [None]:
Y finalmente podemos crear la matriz de confusión.

In [None]:
labels = ['Not Churn','Churn']
plot_confusion_matrix(test_target, predictions, labels, 'Best Model')

Automatización del Pipeline

Integrar todos los componentes previamente construidos en un workflow orquestado por Amazon SageMaker Pipelines y que permita ejecutar todo el pipeline sin necesidad de depender del Jupyter Notebook.

Una vez que terminemos, el resultado será el pipeline que se muestra en la imagen, la representación gráfica es generada por Amazon SageMaker Pipelines. Lo crearemos reutilizando los componentes previamente creados.

Amazon SageMaker Model Building Pipelines permite construir pipelines de Machine Learning tomando ventaja de su integración directa con SageMaker, gracias a esto es posible crear un pipeline y configurar SageMaker Projects para orquestar el despliegue de los modelos.

Utilizaremos Amazon SageMaker Model Building Pipelines para crear un workflow que nos permita integrar y automatizar todos los pasos antes creados.

Definición de parámetros del pipeline

In [None]:
dataset_path = ParameterString(name='DatasetPath', default_value=f's3://{bucket}/{datasets_prefix}')
model_approval_status = ParameterString(name='ModelApprovalStatus', default_value='PendingManualApproval')  # "Approved" Si no se requiere aprobación manual
minimum_precision = ParameterFloat(name='MinimumPrecision', default_value=0.85)

parameters_list = [dataset_path, model_approval_status, minimum_precision]


Agregar paso al pipeline para ejecutar Processing Job para la preparación del dataset.

In [None]:
data_prep_step_parameters = {
    'name':'Preparacion-de-Datos',
    'processor':processor}

data_prep_step_parameters.update(data_prep_parameters)
data_prep_step_parameters['job_arguments'] = data_prep_step_parameters.pop('arguments')

data_prep_step_parameters['inputs']=[ProcessingInput(input_name='input',
                                         source=dataset_path,
                                         destination='/opt/ml/processing/input'),
                                     ProcessingInput(input_name='code',
                                         source=data_prep_script_path,
                                         destination='/opt/ml/processing/input/code')]

data_prep_step = ProcessingStep(**data_prep_step_parameters)
pipeline_steps = [data_prep_step]


Agregar paso al pipeline para entrenamiento de los modelos utilizando Training Jobs

In [None]:
training_steps = {}
for algorithm in estimators:       
    training_steps[algorithm] = TrainingStep(
        name=f'Entrenamiento-con-{algorithm}',
        estimator=tuners[algorithm].best_estimator(),
        inputs={
            'train_data': TrainingInput(
                data_prep_step.properties.ProcessingOutputConfig.Outputs['train_data'].S3Output.S3Uri),
            'train_target': TrainingInput(
                data_prep_step.properties.ProcessingOutputConfig.Outputs['train_target'].S3Output.S3Uri)})
    
    pipeline_steps.append(training_steps[algorithm])


Agregar paso al pipeline para evaluación de desempeño de los modelos, utilizando un Processing Job

In [None]:
property_files = {}

for algorithm in estimators:
    property_file = PropertyFile(
        name=f'{algorithm}Metrics',
        output_name="eval",
        path=f'{algorithm}_metrics.json')
        
    property_files[algorithm] = property_file

eval_step_parameters = {
    'name':'Evaluacion-de-modelos',
    'processor':evaluation_processor,
    'property_files':[property_files[file] for file in property_files]}

eval_step_parameters.update(eval_parameters)
eval_step_parameters['job_arguments'] = eval_step_parameters.pop('arguments')

eval_step_parameters['inputs'] = [
    ProcessingInput(
        input_name='code',
        source=evaluate_models_script_path,
        destination='/opt/ml/processing/input/code'),
    ProcessingInput(
        source=data_prep_step.properties.ProcessingOutputConfig.Outputs['test_target'].S3Output.S3Uri, 
        destination='/opt/ml/processing/input/target'),
    ProcessingInput(
        source=data_prep_step.properties.ProcessingOutputConfig.Outputs['test_data'].S3Output.S3Uri, 
        destination='/opt/ml/processing/input/data'),
    ProcessingInput(
        source=training_steps['GradientBoosting'].properties.ModelArtifacts.S3ModelArtifacts, 
        destination='/opt/ml/processing/input/GradientBoosting'),
    ProcessingInput(
        source=training_steps['RandomForest'].properties.ModelArtifacts.S3ModelArtifacts,
        destination='/opt/ml/processing/input/RandomForest'),
    ProcessingInput(
        source=training_steps['ExtraTrees'].properties.ModelArtifacts.S3ModelArtifacts, 
        destination='/opt/ml/processing/input/ExtraTrees')]

eval_step = ProcessingStep(**eval_step_parameters)
pipeline_steps.append(eval_step)


Agregar condición para registrar modelo en el Model Registry.

In [None]:
package_group_name = f'{prefix}-PackageGroup'

for algorithm in estimators:   
    model_metrics = ModelMetrics(
        model_statistics = MetricsSource(
            s3_uri="{}/{}_metrics.json".format(
                eval_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"],
                algorithm),
            content_type="application/json"))
    
    register_step = RegisterModel(
        name=f"Registra{algorithm}",
        estimator=estimators[algorithm],
        model_data=training_steps[algorithm].properties.ModelArtifacts.S3ModelArtifacts,
        content_types=["text/csv"],
        response_types=["text/csv"],
        inference_instances=["ml.t2.medium", "ml.m5.large"],
        transform_instances=["ml.m5.large"],
        model_package_group_name=package_group_name,
        approval_status=model_approval_status,
        description=f'Churn prediction using {algorithm}',
        model_metrics=model_metrics,
        image_uri=docker_images['Inference']['image_uri'],
        entry_point = training_script_file
    )
    
    condition = ConditionGreaterThanOrEqualTo(
        left = JsonGet(
            step_name = 'Evaluacion-de-modelos',
            property_file = property_files[algorithm],
            json_path = f'binary_classification_metrics.precision.value'),
        right = minimum_precision)
    
    condition_step = ConditionStep(
        name=f"{algorithm}Precision",
        conditions=[condition],
        if_steps=[register_step],
        else_steps=[])
    
    pipeline_steps.append(condition_step)
    
print(f'Package Group Name: {package_group_name}')


Al ejecutar la celda veremos un resultado como el siguiente: Package Group Name: churn-clf-PackageGroup

Ejecución del pipeline

In [None]:
pipeline = Pipeline(name=f'{prefix}-pipeline-{strftime("%M-%S", gmtime())}',
                    parameters=parameters_list,
                    steps=pipeline_steps)


Genera definición del pipeline para ver que no exista ningún problema, si no arroja ningún error la ejecución de la siguiente celda, todo está bien.

In [None]:
definition = json.loads(pipeline.definition())

Crea o actualiza un pipeline en SageMaker

In [None]:
pipeline.upsert(role_arn=sagemaker_role)

Inicia la ejecución del pipeline creado. Al no especificar valores de parámetros al invocar el método start, el pipeline es ejecutado con los valores por default definidos.

In [None]:
execution = pipeline.start()

Lista los pasos que se han ejecutado del pipeline

In [None]:
execution.list_steps()

##Despliegue del modelo(ver 10.4)


Después de haberse ejecutado el pipeline, tendremos 3 versiones del modelo listas para nuestra revisión en el Model Registry y queremos que una vez que alguna de estas versiones sea aprobada, se despliegue como un Endpoint de SageMaker. Esto lo lograremos mediante el uso de SageMaker Projects.

Un SageMaker Project es en realidad un producto aprovisionado mediante un template de AWS Service Catalog con el cual se pueden construir soluciones de punta a punta con Machine Learning.

Es posible utilizar Amazon SageMaker Projects para generar modelos basados en triggers, como por ejemplo cuando alguien realiza un check in de un cambio de código.

En este caso utilizaremos uno de los Project templates que SageMaker proporciona, llamado MLOps template for model deployment.

Debemos seleccionar el último ícono que aparece en el menú de la izquierda “SageMaker resources”.

Calendarización del Workflow

Calendarizar la ejecución automática del workflow previamente creado mediante la integración con Amazon EventBridge

Crear rol de ejecución
Creación de rol de ejecución a ser utilizado con la regla de EventBridge que en seguida crearemos para poder calendarizar la ejecución del pipeline

In [None]:
policy_document={
        "Version": "2012-10-17",
        "Statement": [               
            {
                "Effect": "Allow",
                "Action": [
                    "sagemaker:StartPipelineExecution"
                ],
                "Resource": "*"
            }
        ]
    }

pipeline_execution_role = sagemaker_utils.create_pipeline_execution_role('test_pipeline', policy_document)


Una vez creado el rol que utilizaremos, podemos pasar a crear la regla.

Para crear la regla de Amazon EventBridge utilizamos el SDK de AWS para Python boto3 y mediante la invocación del método put_rule( ) creamos la regla, debemos proporcionar los siguientes parámetros:

Name – nombre de la regla a crear
ScheduleExpression – expresión que indica la frecuencia de ejecución de la regla. Para conocer mas detalles sobre la definición de estas expresiones, consultar la documentación
State– estado de la regla, en este caso ENABLED para indicar que está activa
Description – descripción de la regla a crear
RoleArn – rol de ejecución a ser utilizado por la regla
En este caso la regla será ejecutada 10 minutos después de su creación.

In [None]:
events = boto3.client('events')

schedule_expression = 'cron({date:%M %H} * * ? *)'.format( date=datetime.datetime.now() + datetime.timedelta(minutes = 10) )

rule_name = f'{prefix}-pipeline-execution'
put_rule_response = events.put_rule(
                        Name=rule_name,
                        ScheduleExpression=schedule_expression,
                        State='ENABLED',
                        Description='Rule for scheduling pipeline execution',
                        RoleArn=pipeline_execution_role)


Hasta el momento sólo creamos la regla de Amazon EventBridge pero no hemos indicado que es lo que debe ejecutar la regla, que en este caso será el pipeline de SageMaker que creamos previamente.

El último paso consiste en agregar el target a la regla, es decir lo que la regla ejecutará de acuerdo a su calendarización. En este caso lo que nos interesa ejecutar el es pipline previamente creado.

Para agregarlo utilizamos el método put_targets como se muestra a continuación.

In [None]:
events.put_targets(
    Rule=rule_name,
    Targets=[{
        'Id': pipeline.name,
        'Arn': pipeline.describe()['PipelineArn'],
        'RoleArn': pipeline_execution_role,
        'SageMakerPipelineParameters': {
            'PipelineParameterList': [
                {
                    'Name': 'DatasetPath',
                    'Value': f's3://{bucket}/{datasets_prefix}'
                },
                {
                    'Name': 'ModelApprovalStatus',
                    'Value': 'PendingManualApproval'
                },
                {
                    'Name': 'MinimumPrecision',
                    'Value': '0.85'
                },
            ]
        },
    }]
)


Creación y publicación de API para invocación de endpoint

Objetivo

Crear y publicar un API REST que permita simplificar la invocación del endpoint del modelo expuesto, permitiendo realizar las transformaciones necesarias a los datos previo a la invocación del endpoint para obtener las predicciones.

Creación de script con lógica para la función Lambda

Creación de layer para Sklearn y encoder

In [None]:
! rm -rf lambda_function; mkdir lambda_function


In [None]:
%%writefile lambda_function/app.py
import boto3
import sklearn
import pickle
import json
import os
import pandas as pd
from collections.abc import Mapping
        
sagemaker_runtime = boto3.client('sagemaker-runtime')
s3 = boto3.client('s3')

encoder = None
endpoint_name = None
decision_threshold = None

def read_pkl(file):
    with open(file, 'rb') as f:
        return pickle.load(f)
    
def download_s3_file(s3_path):
    cmpnts = s3_path.split('/')
    bucket = cmpnts[2]
    key = '/'.join(cmpnts[3:])
    local_file = '/tmp/{}'.format(cmpnts[-1])

    s3.download_file(bucket, key, local_file)

    return local_file
    
def init():
    global encoder
    global endpoint_name
    global decision_threshold
    
    if encoder == None or endpoint_name == None or decision_threshold == None:             
        endpoint_name = os.environ['ENDPOINT_NAME']
        encoder_s3_path = os.environ['ENCODER_S3_PATH']
        thresholds_s3_path = os.environ['THRESHOLDS_S3_PATH']
        
        print(f'endpoint_name={endpoint_name}')
        print(f'encoder_s3_path={encoder_s3_path}')
        print(f'thresholds_s3_path={thresholds_s3_path}')

        encoder = read_pkl(download_s3_file(encoder_s3_path))
        thresholds = pd.read_csv(download_s3_file(thresholds_s3_path))
        
        decision_threshold = thresholds.max()['Threshold']
    

def handler(event, context):
    init()
    response = {
        'statusCode': 500} #Internal server error

    
    print(event)
    
    try:
        body = json.loads(event['body'])
        
        # Crear DataFrame con los datos recibidos
        if isinstance(body, Mapping):
            data = pd.DataFrame(body.items()).transpose()
            header = data.iloc[0]
            data = pd.DataFrame(data[1:].values, columns=header)
        elif isinstance(body, list):
            data = pd.DataFrame(body)        
        else:
            print('ERROR: Input type not supporte: it should be a dictionary or a list')
            return response                
        
        # Realizar one hot encoding de variables categóricas
        columns = ['State','Area_Code']        
        transformed = encoder.transform(data[columns]).toarray()
        
        data.drop(columns,axis=1, inplace=True)
        data = pd.concat([data,pd.DataFrame(transformed, columns=encoder.get_feature_names())],axis=1)        
        
        # Reemplazar yes/no por 1/0 en columnas Int_l_Plan y VMail_Plan
        data['Int_l_Plan'] = data['Int_l_Plan'].map(dict(yes=1, no=0))
        data['VMail_Plan'] = data['VMail_Plan'].map(dict(yes=1, no=0))        

        # Ordenar columnas
        columns = 'Account_Length,Int_l_Plan,VMail_Plan,VMail_Message,Day_Mins,Day_Calls,'+\
              'Eve_Mins,Eve_Calls,Night_Mins,Night_Calls,Intl_Mins,Intl_Calls,CustServ_Calls,'+\
              'x0_AK,x0_AL,x0_AR,x0_AZ,x0_CA,x0_CO,x0_CT,x0_DC,x0_DE,x0_FL,x0_GA,x0_HI,x0_IA,'+\
              'x0_ID,x0_IL,x0_IN,x0_KS,x0_KY,x0_LA,x0_MA,x0_MD,x0_ME,x0_MI,x0_MN,x0_MO,x0_MS,'+\
              'x0_MT,x0_NC,x0_ND,x0_NE,x0_NH,x0_NJ,x0_NM,x0_NV,x0_NY,x0_OH,x0_OK,x0_OR,x0_PA,'+\
              'x0_RI,x0_SC,x0_SD,x0_TN,x0_TX,x0_UT,x0_VA,x0_VT,x0_WA,x0_WI,x0_WV,x0_WY,x1_408,x1_415,x1_510'

        columns = columns.split(',')
        data = data[columns]
        
        # Obtener predicciones
        request_body = json.dumps(data.values.tolist())

        response = sagemaker_runtime.invoke_endpoint(EndpointName = endpoint_name,
                                                     ContentType = 'application/json',
                                                     Body = request_body)

        predictions = json.loads(response['Body'].read())
        
        predictions=[1 if prediction >= decision_threshold else 0 for prediction in predictions]
        
        response = {
            'statusCode': 200,
            'body':json.dumps({'predictions': predictions})}

    except Exception as e:
        print('ERROR: Executing endpoint: {}'.format(endpoint_name))
        print(e)
            
    return response


Creación de contenedor para función Lambda

In [None]:
docker_images['Lambda']={'libraries':{'awslambdaric':'1.0.0',
                                      'boto3':'1.17.77'},
                         'dependencies':[('lambda_function','/function')],
                         'others':['WORKDIR /function'],
                         'entrypoint':['python','-m','awslambdaric'],
                         'cmd':['app.handler']}

lambda_docker_parameters = {'image_name': f'{prefix}-lambda',
                            'base_image': docker_images['Processing']['image_uri'],
                            's3_path': f's3://{bucket}/{images_directory}',
                            'role': codebuild_role,  
                            'secret': secret_name}

lambda_docker_parameters.update(docker_images['Lambda'])
docker_images['Lambda']['image_uri'] = sagemaker_utils.create_docker_image(**lambda_docker_parameters)  


In [None]:
Creación de función

In [None]:
policy_document = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "sagemaker:*",
                    "s3:*",
                    "states:StartExecution",
                    "iam:PassRole",
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents"
                ],
                "Resource": "*"
            }
        ]
    }

lambda_role = sagemaker_utils.create_lambda_execution_role(f'{prefix}-LambdaExecutionRole', policy_document)


In [None]:
endpoint_function_name = f'{prefix}-endpoint-function'
lambda_function_params = {
    'FunctionName': endpoint_function_name,
    'Role': lambda_role,
    'Code': {'ImageUri':docker_images['Lambda']['image_uri']},
    'Description': 'Function for invoking SageMaker Endpoints',
    'Timeout': 300, # 5 mins
    'MemorySize': 256, # MB
    'PackageType': 'Image',
    'Environment': {
        'Variables': {
            'ENDPOINT_NAME': 'Colocar aqui el nombre del endpoint',
            'ENCODER_S3_PATH': f"{sagemaker_utils.get_processor_output_path(processor,'encoder')}/{encoder_file}",
            'THRESHOLDS_S3_PATH': f"{sagemaker_utils.get_processor_output_path(evaluation_processor,'eval')}/{thresholds_file}"
        }
    }
}



lambda_response = sagemaker_utils.create_lambda_function(**lambda_function_params)


Creación de API

Creación de API para exponer la función Lambda creada

In [None]:
apigateway = boto3.client('apigateway')

api_creation_response = apigateway.create_rest_api(name='{}-api-{}'.format(prefix,strftime("%M-%S", gmtime())),
                           description='API for exposing Lambda function to invoke SageMaker Endpoint',
                           endpointConfiguration={'types':['REGIONAL']})
api_creation_response['id']


Crear método POST para el API

In [None]:
root_resource = apigateway.get_resources(restApiId=api_creation_response['id'])['items'][-1]

apigateway.put_method(restApiId=api_creation_response['id'],
                      resourceId=root_resource['id'],
                      httpMethod='POST',
                      authorizationType='NONE'
                     )



In [None]:
lambda_uri = 'arn:aws:apigateway:{}:lambda:path/2015-03-31/functions/{}/invocations'.format(session.boto_region_name,
                                                                        lambda_response['FunctionArn'])
apigateway.put_integration(restApiId=api_creation_response['id'],
                           resourceId=root_resource['id'],
                           httpMethod='POST',
                           integrationHttpMethod='POST',
                           type='AWS_PROXY',
                           uri=lambda_uri)


Dar permisos a la función Lambda para poder ser invocada desde API Gateway

In [None]:
lambda_client = boto3.client('lambda')
lambda_client.add_permission(FunctionName=lambda_response['FunctionName'],
                             StatementId='apigateway-post',
                             Action='lambda:InvokeFunction',
                             Principal='apigateway.amazonaws.com'
                            )

Despliegue de API

In [None]:
stage_name = 'v1'
apigateway.create_deployment(restApiId=api_creation_response['id'],
                             stageName=stage_name,
                             stageDescription='API v1 for Invoking SageMaker Endpoint'
                            )


In [None]:
invoke_url = 'https://{}.execute-api.{}.amazonaws.com/{}'.format(api_creation_response['id'], 
                                                                 session.boto_region_name,
                                                                 stage_name)

print('invoke URL: {}'.format(invoke_url))


Probar el API REST para obtener predicciones del modelo

In [None]:
import urllib

payload={
  "State": "LA",
  "Account_Length": 52,
  "Area_Code": 408,
  "Int_l_Plan": "yes",
  "VMail_Plan": "no",
  "VMail_Message": 31,
  "Day_Mins": 223,
  "Day_Calls": 98,
  "Eve_Mins": 264,
  "Eve_Calls": 136,
  "Night_Mins": 158,
  "Night_Calls": 73,
  "Intl_Mins": 14,
  "Intl_Calls": 6,
  "CustServ_Calls": 3
}

data = json.dumps(payload).encode('utf8')
req =  urllib.request.Request(invoke_url, data=data, headers={'content-type': 'application/json'})
resp = urllib.request.urlopen(req)
predictions = json.loads(resp.read())['predictions']
predictions


Cómo se obtienen las predicciones en modo batch con Amazon SageMaker

Depués de entrenar un modelo, se puede hacer el despligue del mismo con Amazon SageMaker para obtener predicciones de dos formas:

Creando un endpoint persistente el cual nos permite obtener predicciones en tiempo-real, utilizando los servicios de hosting de Amazon SageMaker
Obteniendo predicciones en modo batch para un dataset completo, utilizando Amazon SageMaker batch transform
Para poder obtener predicciones utilizando un proceso batch transform de Amazon SageMaker, el dataset para el cual se desea generar predicciones debe ser previamente almacenado en Amazon S3 y el resultado de las predicciones generadas también será almacenado en un bucket de Amazon S3. Amazon SageMaker batch transform gestiona todos los recursos de cómputo requeridos para obtener las predicciones, esto incluye el lanzar las instancias de cómputo y su eliminación una vez que el proceso batch transform haya terminado. Batch transform administra las interacciones entre el dataset y el modelo a través de un objeto dentro del nodo de la instancia llamado agente.

Para crear un proceso batch transform se debe proporcionar lo siguiente:

La ruta al bucket de Amazon S3 en dónde están almacenados los datos que se desea transformar
Los recursos de cómputo que se desea que Amazon SageMaker utilice para ejecutar el proceso batch de transformación. Los recursos son instancias de cómputo para Machine Learning que son administradas por Amazon SageMaker
La ruta del bucket de Amazon S3 en el cúal se desea almacenar el resultado generado por el proceso
El nombre del modelo en Amazon SageMaker que se desea utilizar para crear las inferencias. Se debe utilizar un modelo que previamente haya sido creado en Amazon SageMaker

Para crear un proceso de transformación (para obtener predicciones) en modo batch, primero debemos crear o registrar el modelo en SageMaker, esto lo hacemos mediante la clase SKLearnModel proporcionando los siguientes parámetros:

image – ruta de Amazon Elastic Container Registry de la imagen Docker a utilizar
model_data – ruta de Amazon S3 en dónde se encuentra el archivo model.tar.gz a desplegar
role – rol de Identity and Access Management a utilizar
entry_point – script Python a utilizar con la lógica para cargar el modelo en memoria y generar predicciones
framework_version – versión del Scikit-learn a utilizar
name – nombre del modelo
Una vez creado el modelo, es posible crear el proceso batch mediante la invocación del método transform() del objeto modelo previamente creado. Debemos proporcionar los siguientes parámetros:

instance_count – número de instancias de cómputo a utilizar
instance_type – tipo de instancia de cómputo a tulizar
strategy – la forma como deben enviarse los registros del dataset del cual se desean obtener las predicciones
assemble_with – el delimitador para separar los registros del archivo de predicciones resultante
accept – el content-type esperado como respuesta
output_path – ruta de algún bucket de Amazon S3 en dónde queramos que se guarden las predicciones generadas
Y finalmente podemos ejecutar el proceso mediante la invocación del método transform() del objeto transformer previamente creado. Debemos proporcionar los siguientes parámetros:

data – ruta en Amazon S3 del archivo a utilizar para generar las predicciones
split_type – delimitador de cada registro
content_type – content type correspondiente al archivo a utilizar
wait – indica si queremos esperar a que termine la ejecución del proceso

In [None]:
from sagemaker.sklearn.model import SKLearnModel

models = {}
transformers = {}
id = '{date:%f}'.format( date=datetime.datetime.now() )[-4:]

for tuner in tuners:
    training_job_description = sess.describe_training_job(job_name=tuners[tuner].best_training_job())
    model_artifacts = training_job_description['ModelArtifacts']['S3ModelArtifacts']
    training_image = training_job_description['AlgorithmSpecification']['TrainingImage']

    models[tuner] = SKLearnModel(image_uri=training_image,
                                 model_data=model_artifacts,
                                 role=sagemaker_role,
                                 entry_point='train_and_deploy.py',
                                 framework_version='0.23-1',
                                 name=f'{prefix}-{tuner}-{id}')

    transformers[tuner] = models[tuner].transformer(instance_count=1,
                                                    instance_type='ml.m5.large',
                                                    strategy='MultiRecord',
                                                    assemble_with='Line',
                                                    accept='text/csv',
                                                    output_path=f's3://{bucket}/{transformed_data_prefix}/{tuner}')

    transformers[tuner].transform(data=f's3://{bucket}/{processed_data_prefix}/{test_data_file}', 
                                  split_type='Line', content_type='text/csv', wait=False)


Y con el siguiente método podemos esperar a que termine la ejecución de los tres procesos.

In [None]:
utils.wait_for_transform_jobs(transformers)


Automatización del Pipeline

Integrar todos los componentes previamente construidos en un workflow orquestado por AWS Step Functions y que permita ejecutar todo el pipeline sin necesidad de depender del Jupyter Notebook.

Una vez que terminemos, el resultado será el pipeline que se muestra en la animación, la representación gráfica es generada por AWS Step Functions. Lo crearemos reutilizando los componentes previamente creados.

AWS Step Functions permite coordinar múltiples servicios AWS en un workflow serverless. Utilizando AWS Step Functions, es posible ejecutar workflows que combinen servicios como Amazon SageMaker y AWS Lambda, entre otros. Los workflows constan de pasos, en los cuales la salida de un paso se convierte en la entrada del siguiente.

El SDK de Step Functions para Data Science permite crear y ejecutar workflows de Machine Learning con AWS Step Functions directamente desde Python, y desde un Jupyter Notebook. Cuando la definición del workflow esta terminada, es subido al servicio AWS Step Functions para su ejecución en el Cloud. De esa forma, una vez creado o actualizado el workflow, este vive en el Cloud y puede ser reutilizado.

El workflow puede ser ejecutado tantas veces como sea necesario, y opcionalmente es posible cambiar la entrada de datos del workflow en cada ejecución. Cada que el workflow es ejecutado, este crea una nueva instancia de ejecución en el Cloud. Es posible ejecutarlo varias veces en paralelo.

A través del SDK es posible crear los pasos, encadenarlos juntos para un workflow, crear el workflow en AWS StepFunctions, y ejecutar el workflow en AWS cloud

Lo primero que tenemos que hacer antes de crear el workflow para automatizar el pipeline, es crear dos roles que serán utilizados por los servicios AWS Step Functions y AWS Lambda.

Empezamos por importar las dependencias que utilizaremos para la definición y ejecución del workflow.

In [None]:
import datetime
import stepfunctions
import logging
import time

from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow

stepfunctions.set_stream_logger(level=logging.INFO)


In [None]:
workflow_role_name = '{}-StepFunctionsWorkflowExecutionRole'.format(prefix) 
lambda_role_name = '{}-LambdaExecutionRole'.format(prefix)
model_function_name = '{}-ModelFunction'.format(prefix)
select_model_function_name = '{}-SelectModelFunction'.format(prefix)



A continuación definimos la política de permisos para el Rol de Ejecución para AWS Step Functions, es decir para el workflow como tal.

In [None]:
step_functions_policy_document={
    "Version": "2012-10-17",
    "Statement": [           
        {
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": "sagemaker.amazonaws.com"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "states:*",
                "sagemaker:*",
                "lambda:*",
                "cloudwatch:*",
                "events:*"
            ],
            "Resource": "*"
        }
    ]
}


In [None]:
step_functions_asume_role_document = {
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
        "Principal": {
            "Service": "states.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
    }]
}


Y utilizando el siguiente método para crear el Rol de Ejecución para AWS Step Functions.

In [None]:
workflow_role = utils.create_or_update_iam_role(role_name = workflow_role_name, 
                          role_desc = 'Execution role for Step Functions workflow', 
                          asume_role_policy_document = step_functions_asume_role_document,
                          policy_name = 'StepFunctionsWorkflowExecutionPolicy',
                          policy_document = step_functions_policy_document)


Siguiendo el mismo procedimiento creamos el Rol de Ejecución para AWS Lambda.

In [None]:
lambda_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sagemaker:*",
                "s3:*",
                "states:StartExecution",
                "iam:PassRole",
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        }
    ]
}


In [None]:
lambda_asume_role_document = {
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
        "Principal": {
            "Service": "lambda.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
    }]
}


In [None]:
lambda_role = utils.create_or_update_iam_role(role_name = lambda_role_name, 
                                        role_desc = 'Execution role for Lambda functions', 
                                        asume_role_policy_document = lambda_asume_role_document,
                                        policy_name = 'LambdaExecutionPolicy',
                                        policy_document = lambda_policy_document)


Parámetros del workflow

Empezamos por definir los parámetros de entrada que recibirá el workflow, estos parámetros podremos posteriormente utilizarlos en la ejecución de los pasos del workflow y de esta forma parametrizar algunas cosas y volver más flexible nuestro pipeline. Esto lo hacemos mediante el uso de la función ExecutionInput() .

In [None]:
execution_input = ExecutionInput(schema={
    'ProcessingJobName': str,           
    'GradientBoostingTuningJobName': str,
    'RandomForestTuningJobName': str,
    'ExtraTreesTuningJobName': str,
    'GradientBoostingTransformJobName': str,
    'RandomForestTransformJobName': str,
    'ExtraTreesTransformJobName': str,
    'GradientBoostingModelName': str,
    'RandomForestModelName': str,
    'ExtraTreesModelName': str,
    'GradientBoostingEvaluationJobName': str,
    'RandomForestEvaluationJobName': str,
    'ExtraTreesEvaluationJobName': str,
    'ModelLambdaFunctionName': str, 
    'EndpointConfigName': str,
    'EndpointName': str
})


Subir scripts a S3
Para poder agregar la creación de los Jobs de Amazon SageMaker para el procesamiento, entrenamiento, evaluación y despliegue; necesitamos subir a Amazon S3 los scripts necesarios para cada uno de estos procesos. Para esto utilizamos el método S3Uploader.upload( ) .

In [None]:
import tarfile
import os.path

processing_code_path = S3Uploader.upload(local_path='processing.py',
                                         desired_s3_uri='s3://{}/{}'.format(bucket, code_prefix),
                                         sagemaker_session=sess)

with tarfile.open('train_and_deploy.tar.gz', 'w:gz') as tar:
    tar.add('train_and_deploy.py')
    
train_and_deploy_code_path = S3Uploader.upload(local_path='train_and_deploy.tar.gz',
                                         desired_s3_uri='s3://{}/{}'.format(bucket, code_prefix),
                                         sagemaker_session=sess)

model_evaluation_code_path = S3Uploader.upload(local_path='evaluate_models.py',
                                         desired_s3_uri='s3://{}/{}'.format(bucket, code_prefix),
                                         sagemaker_session=sess)


Una vez teniendo los scripts arriba, podemos empezar a crear cada uno de los pasos que agregaremos al workflow.

Para crear un primer paso en el workflow, para el pre-procesamiento de los datos, utilizamos el método steps.sagemaker.ProcessingStep( )

Proporcionando los siguientes parámetros:

state_id – Identificador del estado (paso) del workflow, este aparecerá como el nombre en el diagrama del workflow
processor – El processor que será utilizado para ejecutar el job, en este caso el que fue creado en 3.4 Crear Processing Job
job_name – Nombre del job a crear, en este caso será el valor recibido en le parámetro ProcessingJobName del workflow
inputs – Rutas de Amazon S3 de las cuales se tomarán los archivos de entrada para el proceso
outputs – Ruta de Amazon S3 en la cual queremos que se deposite el resultado de la ejecución del proceso
container_arguments – Parámetros que recibirá el container en el cual se ejecutará el script
container_entrypoint – Ruta del script a ejecutar dentro del container

In [None]:
processing_step = steps.sagemaker.ProcessingStep(
    state_id='Preparación de Datos',
    processor=sklearn_processor,
    job_name=execution_input['ProcessingJobName'],
    inputs=[ProcessingInput(input_name='input',
                            source='s3://{}/{}'.format(bucket, datasets_prefix), 
                            destination='/opt/ml/processing/input'),
            ProcessingInput(input_name='code',
                            source=processing_code_path, 
                            destination='/opt/ml/processing/input/code')],
    outputs=[ProcessingOutput(output_name='output',
                              source='/opt/ml/processing/output/data',
                              destination='s3://{}/{}'.format(bucket, processed_data_prefix))],
    container_arguments=['--test-size', '0.1',
                         '--data-file', 'churn.txt',
                         '--train-data-file', train_data_file,
                         '--train-target-file', train_target_file,
                         '--test-data-file', test_data_file,
                         '--test-target-file', test_target_file,
                         '--encoder-file', encoder_file],
    container_entrypoint=['python3','/opt/ml/processing/input/code/processing.py'])
