## Procesamiento de datos usando un contenedor Scikit-Learn deSageMaker

import boto3
import sagemaker
from sagemaker import get_execution_role
region = boto3.session.Session().region_name
role = get_execution_role()

In [2]:
from sagemaker.sklearn.processing import SKLearnProcessor
sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
 role=role,
 instance_type='ml.m5.xlarge',
 instance_count=1)

In [91]:
!aws s3 mb s3://data-processing-mlops

make_bucket: data-processing-mlops2


In [26]:
import pandas as pd
input_data = 's3://data-processing-mlops/Train.csv'
df = pd.read_csv(input_data)
df.head()

Unnamed: 0,Item_Identifier,Item_Weight,Item_Fat_Content,Item_Visibility,Item_Type,Item_MRP,Outlet_Identifier,Outlet_Establishment_Year,Outlet_Size,Outlet_Location_Type,Outlet_Type,Item_Outlet_Sales
0,FDA15,9.3,Low Fat,0.016047,Dairy,249.8092,OUT049,1999,Medium,Tier 1,Supermarket Type1,3735.138
1,DRC01,5.92,Regular,0.019278,Soft Drinks,48.2692,OUT018,2009,Medium,Tier 3,Supermarket Type2,443.4228
2,FDN15,17.5,Low Fat,0.01676,Meat,141.618,OUT049,1999,Medium,Tier 1,Supermarket Type1,2097.27
3,FDX07,19.2,Regular,0.0,Fruits and Vegetables,182.095,OUT010,1998,,Tier 3,Grocery Store,732.38
4,NCD19,8.93,Low Fat,0.0,Household,53.8614,OUT013,1987,High,Tier 3,Supermarket Type1,994.7052


In [34]:
%%writefile preprocessing.py

import argparse
import os
import warnings

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder

from sklearn.exceptions import DataConversionWarning

warnings.filterwarnings(action='ignore', category=DataConversionWarning)

# Aquí definimos todas las columnas del dataset
columns = ['Item_Identifier', 'Item_Weight', 'Item_Fat_Content', 
'Item_Visibility','Item_Type', 'Item_MRP', 'Outlet_Identifier',
'Outlet_Establishment_Year', 'Outlet_Size', 'Outlet_Location_Type',
'Outlet_Type', 'Item_Outlet_Sales']

def print_shape(df):
    print('Data shape: {}'.format(df.shape))

if __name__=='__main__':
    # En el momento de la ejecución del contenedor, usaremos este parser para definir nuestra división de validación de
    # entrenamiento. El valor predeterminado que se mantiene es el 10%
    parser = argparse.ArgumentParser()
    parser.add_argument('--train-test-split-ratio', type=float, default=0.1)
    args, _ = parser.parse_known_args()
    
    print('Received arguments {}'.format(args))
    
    # Esta es la ruta de datos dentro del contenedor donde se descargará y guardará Train.csv
    input_data_path = os.path.join('/opt/ml/processing/input', 'Train.csv')
    
    print('Reading input data from {}'.format(input_data_path))
    data = pd.read_csv(input_data_path)
    data = pd.DataFrame(data=data, columns=columns)
    for i in data.Item_Type.value_counts().index:
        data.loc[(data['Item_Weight'].isna()) & (data['Item_Type'] == i), ['Item_Weight']] = \
        data.loc[data['Item_Type'] == 'Fruits and Vegetables', ['Item_Weight']].mean()[0]
    cat_data = data[['Item_Identifier','Item_Fat_Content','Item_Type','Outlet_Identifier','Outlet_Size','Outlet_Location_Type','Outlet_Type']]
    num_data = data[['Item_Weight','Item_Visibility','Item_MRP','Outlet_Establishment_Year','Item_Outlet_Sales']]
    cat_data.loc[(cat_data['Outlet_Size'].isna()) & (cat_data['Outlet_Type'] == 'Grocery Store'), ['Outlet_Size']] = 'Small'
    cat_data.loc[(cat_data['Outlet_Size'].isna()) & (cat_data['Outlet_Type'] == 'Supermarket Type1'), ['Outlet_Size']] = 'Small'
    cat_data.loc[(cat_data['Outlet_Size'].isna()) & (cat_data['Outlet_Type'] == 'Supermarket Type2'), ['Outlet_Size']] = 'Medium'
    cat_data.loc[(cat_data['Outlet_Size'].isna()) & (cat_data['Outlet_Type'] == 'Supermarket Type3'), ['Outlet_Size']] = 'Medium'
    cat_data.loc[cat_data['Item_Fat_Content'] == 'LF' , ['Item_Fat_Content']] = 'Low Fat'
    cat_data.loc[cat_data['Item_Fat_Content'] == 'reg' , ['Item_Fat_Content']] = 'Regular'
    cat_data.loc[cat_data['Item_Fat_Content'] == 'low fat' , ['Item_Fat_Content']] = 'Low Fat'
    
    le = LabelEncoder()
    cat_data = cat_data.apply(le.fit_transform)
    ss = StandardScaler()
    num_data = pd.DataFrame(ss.fit_transform(num_data.drop(['Item_Outlet_Sales'], axis=1)), \
                            columns = num_data.drop(['Item_Outlet_Sales'],axis=1).columns)
    cat_data = pd.DataFrame(ss.fit_transform(cat_data.drop(['Item_Identifier'], axis=1)), \
                            columns = cat_data.drop(['Item_Identifier'], axis=1).columns)
    final_data = pd.concat([num_data,cat_data],axis=1)
    print('Data after cleaning: {}'.format(final_data.shape))
    X = final_data
    y = data['Item_Outlet_Sales']
    split_ratio = args.train_test_split_ratio
    print('Splitting data into train and test sets with ratio {}'.format(split_ratio))
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=split_ratio, random_state=0)
    
    # Esto define la ruta de salida dentro del contenedor desde donde se tomarán todos los csv y se cargarán en S3.
    train_features_output_path = os.path.join('/opt/ml/processing/train', 'train_features.csv')
    train_labels_output_path = os.path.join('/opt/ml/processing/train','train_labels.csv')
    test_features_output_path = os.path.join('/opt/ml/processing/test','test_features.csv')
    test_labels_output_path = os.path.join('/opt/ml/processing/test', 'test_labels.csv')
    print('Saving training features to {}'.format(train_features_output_path))
    pd.DataFrame(X_train).to_csv(train_features_output_path, header=False, index=False)
    print('Saving test features to {}'.format(test_features_output_path))
    pd.DataFrame(X_test).to_csv(test_features_output_path, header=False, index=False)
    print('Saving training labels to {}'.format(train_labels_output_path))
    y_train.to_csv(train_labels_output_path, header=False, index=False)
    print('Saving test labels to {}'.format(test_labels_output_path))
    y_test.to_csv(test_labels_output_path, header=False, index=False)

Overwriting preprocessing.py


In [35]:
from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor.run(code='preprocessing.py',
                      inputs=[ProcessingInput(source=input_data, destination='/opt/ml/processing/input')],
                      outputs=[
                          ProcessingOutput(output_name='train_data',source='/opt/ml/processing/train',
                                                destination='s3://data-processing-mlops/'),
                          ProcessingOutput(output_name='test_data', source='/opt/ml/processing/test',
                                                destination='s3://data-processing-mlops/')
                      ],
                      arguments=['--train-test-split-ratio', '0.1']
                     )


Job Name:  sagemaker-scikit-learn-2021-09-04-07-40-07-689
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://data-processing-mlops/Train.csv', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-962542737140/sagemaker-scikit-learn-2021-09-04-07-40-07-689/input/code/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train_data', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://data-processing-mlops/', 'LocalPath': '/opt/ml/processing/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'test_data', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://data-processing-mlops/', 'LocalPath': '/opt/m

In [36]:
preprocessing_job_description = sklearn_processor.jobs[-1].describe()

In [37]:
preprocessing_job_description

{'ProcessingInputs': [{'InputName': 'input-1',
   'AppManaged': False,
   'S3Input': {'S3Uri': 's3://data-processing-mlops/Train.csv',
    'LocalPath': '/opt/ml/processing/input',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}},
  {'InputName': 'code',
   'AppManaged': False,
   'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-962542737140/sagemaker-scikit-learn-2021-09-04-07-40-07-689/input/code/preprocessing.py',
    'LocalPath': '/opt/ml/processing/input/code',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}}],
 'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'train_data',
    'S3Output': {'S3Uri': 's3://data-processing-mlops/',
     'LocalPath': '/opt/ml/processing/train',
     'S3UploadMode': 'EndOfJob'},
    'AppManaged': False},
   {'OutputName': 'test_data',
    'S3Output': {'S3Uri': 's3://data

In [38]:
output_config = preprocessing_job_description['ProcessingOutputConfig']
for output in output_config['Outputs']:
    if output['OutputName'] == 'train_data':
        preprocessed_training_data = output['S3Output']['S3Uri']
    if output['OutputName'] == 'test_data':
        preprocessed_test_data = output['S3Output']['S3Uri']

In [39]:
training_features = pd.read_csv(preprocessed_training_data + 'train_features.csv', nrows=10, header=None)
print('Training features shape: {}'.format(training_features.shape))
training_features.head(10)

Training features shape: (10, 10)


Unnamed: 0,0,1,2,3,4,5,6,7,8,9
0,0.071924,4.22395,-0.56897,-1.532846,-0.738147,1.371418,-0.25459,0.799954,-1.369334,-1.508289
1,-0.619814,0.075491,1.96928,0.736822,-0.738147,-0.766479,0.450371,0.799954,-0.138882,-0.252658
2,0.751946,-0.350031,-0.232154,1.09519,1.354743,-0.528935,-0.959551,0.799954,-0.138882,-0.252658
3,0.071924,-0.335116,-1.224896,-1.532846,-0.738147,1.608963,0.097891,-0.66408,1.091569,2.258603
4,0.964806,1.359713,0.480442,1.334103,-0.738147,-0.291391,-0.607071,-0.66408,1.091569,1.002972
5,1.603384,-0.248602,-1.32966,0.139541,-0.738147,-0.291391,1.507813,-0.66408,-1.369334,-0.252658
6,1.16584,1.553906,-0.752339,-1.293934,-0.738147,-1.479112,-1.312032,-2.128115,1.091569,-0.252658
7,1.556082,-0.977235,0.656289,-1.293934,-0.738147,0.421242,-1.312032,-2.128115,1.091569,-0.252658
8,1.319572,-0.075335,0.077869,0.736822,1.354743,-0.528935,0.450371,0.799954,-0.138882,-0.252658
9,-1.623801,-0.786506,0.281015,1.09519,-0.738147,-0.291391,-0.959551,0.799954,-0.138882,-0.252658


## Creación de contener propio con ScriptProcessor

In [40]:
%%writefile Dockerfile

FROM python:3.7-slim-buster
RUN pip3 install pandas==0.25.3 scikit-learn==0.21.3
ENV PYTHONUNBUFFERED=TRUE
ENTRYPOINT ["python3"]

Writing Dockerfile


In [61]:
import boto3
account_id = boto3.client('sts').get_caller_identity().get('Account')
ecr_repository = 'sagemaker-processing-container'
tag = ':latest'
region = boto3.session.Session().region_name

In [83]:
%cd ~/SageMaker/docker/

/home/ec2-user/SageMaker/docker


In [84]:
! pwd

/home/ec2-user/SageMaker/docker


In [86]:
processing_repository_uri = '{}.dkr.ecr.{}.amazonaws.com/{}'.format(account_id, region, ecr_repository + tag)

# Crear un repositorio de ECR y pushear imagen de docker
! docker build -t $ecr_repository . # Esto construye la imagen
! $(aws ecr get-login --region $region --registry-ids $account_id --no-include-email) # Logs en AWS
! aws ecr create-repository --repository-name $ecr_repository # Crea el repositorio de ECR
! docker tag {ecr_repository + tag} $processing_repository_uri # Etiqueta la imagen para diferenciarla de otras imágenes
! docker push $processing_repository_uri # Pushea la imagen a ECR

https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded
{
    "repository": {
        "repositoryArn": "arn:aws:ecr:us-east-1:962542737140:repository/sagemaker-processing-container",
        "registryId": "962542737140",
        "repositoryName": "sagemaker-processing-container",
        "repositoryUri": "962542737140.dkr.ecr.us-east-1.amazonaws.com/sagemaker-processing-container",
        "createdAt": 1630743246.0,
        "imageTagMutability": "MUTABLE",
        "imageScanningConfiguration": {
            "scanOnPush": false
        },
        "encryptionConfiguration": {
            "encryptionType": "AES256"
        }
    }
}
The push refers to repository [962542737140.dkr.ecr.us-east-1.amazonaws.com/sagemaker-processing-container]

[1Bc59eefbd: Preparing 
[1B6309f96d: Preparing 
[1Bf11ed098: Preparing 
[1B6d9d854d: Preparing 
[1B5d8b8f0f: Preparing 
[6Bc59eefbd: Pushed     307MB/301.1MB[4A[2K[5A[2K[6A[2K[2A[2K[3A[2K[2A[2K[3

In [87]:
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role
role = get_execution_role()
script_processor = ScriptProcessor(command=['python3'],
                                   image_uri=processing_repository_uri,
                                   role=role,
                                   instance_count=1,
                                   instance_type='ml.m5.xlarge')

In [88]:
input_data = 's3://data-processing-mlops/Train.csv'
script_processor.run(code='preprocessing.py',
                     inputs=[ProcessingInput(source=input_data, destination='/opt/ml/processing/input')],
                     outputs=[ProcessingOutput(source='/opt/ml/processing/train', destination='s3://data-processing-mlops/'),
                              ProcessingOutput(source='/opt/ml/processing/test', destination='s3://data-processing-mlops/')])


Job Name:  sagemaker-processing-container-2021-09-04-08-16-56-620
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://data-processing-mlops/Train.csv', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-962542737140/sagemaker-processing-container-2021-09-04-08-16-56-620/input/code/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'output-1', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://data-processing-mlops/', 'LocalPath': '/opt/ml/processing/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'output-2', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://data-processing-mlops/', 'LocalP