# Create python codes and container images for SageMaker Pipeline
## Steps
0. Setup Path for Data and Codes on S3
1. Search prebuilt image list
2. Container creation and upload to Amazon ECR
3. local ML development and remote training job with Amazon SageMaker
4. Generate preprocess.py for preprocessing and copy code file to S3
5. Generate training.py for train job
6. Generate evaluate code for evaluate processing job

## Step 0. Setup Path for Data and Codes on S3

In [None]:
print(INPUT_DATA)
import pandas as pd
df = pd.read_csv(INPUT_DATA, index_col = 0)

print(df.shape)

In [None]:
df.head()

## Step 1: If you need new Docker Image, Build and upload to Amazon ECR

### Creating a SageMaker-compatible Catboost container
We derive our dockerfile from the SageMaker Scikit-Learn dockerfile https://github.com/aws/sagemaker-scikit-learn-container/blob/master/docker/0.20.0/base/Dockerfile.cpu

### Sending the container to ECR

In [None]:
!pip install sagemaker-studio-image-build

In [None]:
%%writefile build_and_push.sh

REPO_NAME=$1


sm-docker build --repository $REPO_NAME:latest .



In [None]:
! bash build_and_push.sh $ecr_repository_name

In [None]:
container_image_uri = '{0}.dkr.ecr.{1}.amazonaws.com/{2}:latest'.format(account_id, region, ecr_repository_name)
print('ECR container ARN: {}'.format(container_image_uri))



The docker image is now pushed to ECR and is ready for consumption! In the next section, we go in the shoes of an ML practitioner that develops a Catboost model and runs it remotely on Amazon SageMaker

## Step 2: local ML development and remote training job with Amazon SageMaker

We install catboost locally for local development

In [None]:
! pip install xgboost

In [None]:
import xgboost as xgb
from xgboost import XGBClassifier

### Data processing
We use pandas to process a small local dataset into a training and testing piece.

We could also design code that loads all the data and runs cross-validation within the script. 

In [None]:
# !aws s3 cp s3://drivingdata/data_raw/df_dataset.csv $INPUT_DATA

## Step 4. Generate preprocess.py for preprocessing and copy code file to S3

In [None]:
%%writefile preprocess.py

import argparse
import logging
import os
import pathlib
import requests
import tempfile

import boto3
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split


logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

if __name__ == "__main__":

    logger.debug("Starting preprocessing.")
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-data", type=str, required=True)
    args = parser.parse_args()

    base_dir = "/opt/ml/processing"
    pathlib.Path(f"{base_dir}/data").mkdir(parents=True, exist_ok=True)
    input_data = args.input_data
    #input_data = "s3://drivingdata/data_raw/df_dataset.csv"
    bucket = input_data.split("/")[2]
    key = "/".join(input_data.split("/")[3:])

    logger.info("Downloading data from bucket: %s, key: %s", bucket, key)
    fn = f"{base_dir}/data/df_dataset.csv"

    s3 = boto3.resource("s3")
    s3.Bucket(bucket).download_file(key, fn)
    
    logger.debug("Reading downloaded data.")
    df_dataset = pd.read_csv(fn, index_col = 0)
    
    test_size=0.3
    random_state=42
    
    df_dataset['Y'] = df_dataset['driving_style']
    df_dataset= df_dataset.drop('driving_style', axis=1)
    
    df_train, df_test = train_test_split(df_dataset, test_size=test_size, random_state=random_state)
    
    
    df_test, df_valid = train_test_split(df_test, test_size=test_size, random_state=random_state)
    
    if not os.path.exists(f"{base_dir}/train/"):
        os.makedirs(f"{base_dir}/train/")
    if not os.path.exists(f"{base_dir}/test/"):
        os.makedirs(f"{base_dir}/test/")
    if not os.path.exists(f"{base_dir}/validation/"):
        os.makedirs(f"{base_dir}/validation/")
    
    df_train.to_csv(f"{base_dir}/train/train.csv", index=False)
    df_test.to_csv(f"{base_dir}/test/test.csv", index=False)
    df_valid.to_csv(f"{base_dir}/validation/validation.csv", index=False)
    logger.info("finish data preprocessing")



## Test preprocess Script Locally

In [None]:
!echo $INPUT_DATA

In [None]:
# local test
! python preprocess.py \
    --input-data $INPUT_DATA

In [None]:
!pwd

In [None]:
!ls /opt/ml/processing/train

In [None]:
!ls /opt/ml/processing/validation

In [None]:
!ls /opt/ml/processing/test

In [None]:
!cp /opt/ml/processing/test/test.csv /root/mmspml-spm/notebooks/data

In [None]:
!aws s3 cp preprocess.py $PREPROCESS_CODE

### Copy local test file to s3 (for the test)

In [None]:
!aws s3 cp /opt/ml/processing/train/train.csv $DATA_BUCKET_PREFIX/data/train/
!aws s3 cp /opt/ml/processing/test/test.csv $DATA_BUCKET_PREFIX/data/test/
!aws s3 cp /opt/ml/processing/validation/validation.csv $DATA_BUCKET_PREFIX/data/validation/

## Step 5. Generate training.py for train job
### Developing a local training script

In [None]:
%%writefile $TRAINING_PROGRAM

import argparse
import logging
import os

from xgboost import XGBClassifier
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score

if __name__ =='__main__':

    print('extracting arguments')
    parser = argparse.ArgumentParser()
    
    parser.add_argument('--model-dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
    parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAIN'))
    parser.add_argument('--validation', type=str, default=os.environ.get('SM_CHANNEL_VALIDATION'))
    parser.add_argument('--train-file', type=str, default='train.csv')
    parser.add_argument('--validation-file', type=str, default='validation.csv')
    parser.add_argument('--model-name', type=str, default='xgboost_classifiation_model.dump')
#     parser.add_argument('--features', type=str)  # in this script we ask user to explicitly name features
    parser.add_argument('--target', type=str) # in this script we ask user to explicitly name the target
    

    args, _ = parser.parse_known_args()

    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    logging.info('reading data')
    train_df = pd.read_csv(os.path.join(args.train, args.train_file), index_col = 0)
    validation_df = pd.read_csv(os.path.join(args.validation, args.validation_file), index_col = 0)

    print(train_df.head(5))
    logging.info('building training and testing datasets')
    X_train = train_df.drop(columns=[args.target])
    X_validation = validation_df.drop(columns=[args.target])
    y_train = train_df[args.target]
    y_validation = validation_df[args.target]
        
    # define and train model
    model = XGBClassifier(objective="multi:softprob", random_state=42, eval_metric=["auc"], n_estimators=500)
    
    model.fit(X_train, y_train, early_stopping_rounds=5, eval_set=[(X_validation, y_validation)])

    
    # print abs error
    logging.info('validating model')
#     abs_err = np.abs(model.predict(X_validation) - y_validation)
    y_pred = model.predict(X_validation)
    accuracy_score = accuracy_score(y_validation, y_pred)

    # print couple perf metrics
    for q in [10, 50, 90]:
        logging.info('AE-at-' + str(q) + 'th-percentile: '
              + str(np.percentile(a=accuracy_score, q=q)))
    
    # persist model
    path = os.path.join(args.model_dir, args.model_name)
    logging.info('saving to {}'.format(path))
    model.save_model(path)

In [None]:
!pip install xgboost

### Testing our script locally

In [None]:
# local test
! python $TRAINING_PROGRAM \
    --train /opt/ml/processing/train/ \
    --validation /opt/ml/processing/validation/ \
    --model-dir ./ \
    --target Y

## Remote training in SageMaker

### Option 1: Launch a SageMaker training job from code uploaded to S3

With that option, we first need to send code to S3. This could also be done automatically by a build system.

In [None]:
!tar zcvf source.tar.gz $TRAINING_PROGRAM
!aws s3 cp source.tar.gz $TRAINING_PROGRAM_SUBMIT

In [None]:
!ls -alh


In [None]:
import sagemaker
sagemaker.__version__

We then launch a training job with the `Estimator` class

In [None]:
from sagemaker.estimator import Estimator

output_path = DATA_BUCKET_PREFIX + '/training_jobs'


estimator = Estimator(image_uri=TRAINING_DOCKER_IMAGE,
                      role=role,
                      instance_count=1,
                      instance_type=TRAINING_INSTANCE_TYPE,
                      output_path=output_path,
                      hyperparameters={'sagemaker_program': TRAINING_PROGRAM,
                                       'sagemaker_submit_directory': TRAINING_PROGRAM_SUBMIT,
                                       'target': 'Y'})

In [None]:
train_location = DATA_BUCKET_PREFIX + "/data/train/"
validation_location = DATA_BUCKET_PREFIX + "/data/validation/"

In [None]:
estimator.fit({'train':train_location, 'validation': validation_location}, logs=True)

## Step 6. Generate evaluate code for evaluate processing job

In [None]:
%%writefile evaluate.py
import argparse
import logging
import pathlib
import os
import json

import pickle
import tarfile

import numpy as np
import pandas as pd
from xgboost import XGBClassifier

from sklearn.metrics import accuracy_score

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())



if __name__ == "__main__":
    print('extracting arguments')
    parser = argparse.ArgumentParser()
    
    parser.add_argument('--test', type=str, default=os.environ.get('SM_CHANNEL_TEST'))
    parser.add_argument('--test-file', type=str, default='test.csv')
    parser.add_argument('--model-name', type=str, default='xgboost_classifiation_model.dump')
    parser.add_argument('--target', type=str, default='Y') # in this script we ask user to explicitly name the target
    

    args, _ = parser.parse_known_args()

    logger.debug("Starting evaluation.")
    model_path = "/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    logger.debug("Loading xgboost model.")
    model = XGBClassifier(objective="multi:softprob", random_state=42, eval_metric="auc")
    model.load_model(args.model_name)

    logger.debug("Reading test data.")
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, index_col = 0)

    y_test = df[args.target]
    X_test = df.drop(columns=args.target)

    logger.info("Performing predictions against test data.")
    predictions = model.predict(X_test)

    logger.debug("Calculating mean squared error.")
    accuracy = accuracy_score(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
            "binary_classification_metrics": {
                "accuracy": {
                    "value": accuracy,
                    "standard_deviation": std
                    },
                },
        }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    logger.info("Writing out evaluation report with accuracy: %f", accuracy)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))


In [None]:
import boto3

client = boto3.client('sagemaker')
latest_job = client.list_training_jobs()["TrainingJobSummaries"][0]
job_desc = client.describe_training_job(TrainingJobName=latest_job["TrainingJobName"])
#print(job_desc)

MODEL_PATH = job_desc["ModelArtifacts"]["S3ModelArtifacts"]
print(MODEL_PATH)
#!aws sagemaker list-training-jobs --output text --max-items 1 --query "TrainingJobSummaries[*].TrainingJobName"

In [None]:
!aws s3 cp $MODEL_PATH /opt/ml/processing/model/model.tar.gz

In [None]:
!python evaluate.py

In [None]:
!cat /opt/ml/processing/evaluation/evaluation.json

In [None]:
!aws s3 cp evaluate.py $EVALUATE_CODE_URI

In [None]:
!aws s3 ls $DATA_BUCKET_PREFIX/data/test/test.csv

In [None]:
from sagemaker.processing import ScriptProcessor 
from sagemaker.processing import ProcessingInput
from sagemaker.processing import ProcessingOutput


input_data = "/opt/ml/processing/test/test.csv"
output_data = "/opt/ml/processing/output"
s3_model_path = MODEL_PATH

script_processor = ScriptProcessor(command=['python3'],
                image_uri=EVALUATE_DOCKER_IMAGE,
                role=ROLE_ARN,
                instance_count=1,
                instance_type='ml.m5.xlarge')

script_processor.run(code='evaluate.py',
                      inputs=[
                          ProcessingInput(
                        source= DATA_BUCKET_PREFIX + "/data/test/test.csv",
                        destination='/opt/ml/processing/test',
                        s3_data_distribution_type='ShardedByS3Key'),
                          ProcessingInput(
                          source=s3_model_path,
                          destination='/opt/ml/processing/model')
                      ],
                      outputs=[ProcessingOutput(destination=output_data,
                                                source='/opt/ml/processing/output_data',
                                                s3_upload_mode = 'Continuous')],
                         
                      arguments=['--target', "Y"]
                     )
script_processor_job_description = script_processor.jobs[-1].describe()
print(script_processor_job_description)