In [1]:
import boto3
import sagemaker
from sagemaker import get_execution_role
region = boto3.session.Session().region_name
role = get_execution_role()
sess = sagemaker.Session()

In [2]:
from sagemaker.sklearn.processing import SKLearnProcessor
sklearn_processor = SKLearnProcessor(framework_version='0.20.0', role=role, instance_type='ml.t3.medium', instance_count=1)

In [3]:
import pandas as pd
input_data = 's3://steelrawdata/faults.csv'
df = pd.read_csv(input_data)

In [4]:
%%writefile preprocessing.py

import argparse
import os
import warnings
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

if __name__=='__main__':
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--train-test-split-ratio', type=float, default=0.2)
    args, _ = parser.parse_known_args()
    print('Received arguments {}'.format(args))
    input_data_path = os.path.join('/opt/ml/processing/input', 'faults.csv')
    print('Reading input data from {}'.format(input_data_path))
    df = pd.read_csv(input_data_path)
    
    label_list=df.columns.values[-7:]
    
    features_list=[]
    for i, column in enumerate(df.columns.values):
        if column not in label_list:
            features_list.append(column)
    
    categorical_features = [] + label_list.tolist()
    for feature in features_list:
        for char in feature:
            if char.isdigit():
                if feature not in categorical_features:
                    categorical_features.append(feature)
    
    numerical_features = []
    for feature in features_list:
        if feature not in categorical_features:
            numerical_features.append(feature)
    
    ss = StandardScaler()
    df[numerical_features] = ss.fit_transform(df[numerical_features])
    
    split_ratio = args.train_test_split_ratio
    print('Splitting data into train and test sets with ratio {}'.format(split_ratio))
    
    df['Pastry'].replace({1: 'Pastry', 0: ''}, inplace = True)
    df['Z_Scratch'].replace({1: 'Z_Scratch', 0: ''}, inplace = True)
    df['K_Scatch'].replace({1: 'K_Scatch', 0: ''}, inplace = True)
    df['Stains'].replace({1: 'Stains', 0: ''}, inplace = True)
    df['Dirtiness'].replace({1: 'Dirtiness', 0: ''}, inplace = True)
    df['Bumps'].replace({1: 'Bumps', 0: ''}, inplace = True)
    df['Other_Faults'].replace({1: 'Other_Faults', 0: ''}, inplace = True)
    df['targets']= wdf_1['Pastry'] + wdf_1['Z_Scratch'] + wdf_1['K_Scatch'] + wdf_1['Stains'] + wdf_1['Dirtiness'] + wdf_1['Bumps'] + wdf_1['Other_Faults']
    df.drop(columns = label_list, inplace = True)
    df = pd.concat([df['targets'], df.drop(['targets'], axis=1)], axis=1)
    
    X = df[features_list].values
    y = df['targets'].values
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=split_ratio, random_state=1, shuffle=True, stratify=y)
    
    train_features_output_path = os.path.join('/opt/ml/processing/train', 'train_features.csv')
    train_labels_output_path = os.path.join('/opt/ml/processing/train', 'train_labels.csv')
    test_features_output_path = os.path.join('/opt/ml/processing/test', 'test_features.csv')
    test_labels_output_path = os.path.join('/opt/ml/processing/test', 'test_labels.csv')
    print('Saving training features to {}'.format(train_features_output_path))
    pd.DataFrame(X_train).to_csv(train_features_output_path, header=False,index=False)
    print('Saving test features to {}'.format(test_features_output_path))
    pd.DataFrame(X_test).to_csv(test_features_output_path, header=False, index=False)
    print('Saving training labels to {}'.format(train_labels_output_path))
    y_train.to_csv(train_labels_output_path, header=False, index=False)
    print('Saving test labels to {}'.format(test_labels_output_path))
    y_test.to_csv(test_labels_output_path, header=False, index=False)

Overwriting preprocessing.py


In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor.run(code='preprocessing.py',
            inputs=[ProcessingInput(source=input_data, destination='/opt/ml/processing/input')], 
            outputs=[ProcessingOutput(output_name='train_data', source='/opt/ml/processing/train',destination='s3://steelrawdata/train/'),
                     ProcessingOutput(output_name='test_data', source='/opt/ml/processing/test', destination='s3://steelrawdata/test/')],
            arguments=['--train-test-split-ratio', '0.2'])

In [None]:
preprocessing_job_description = sklearn_processor.jobs[-1].describe()
output_config = preprocessing_job_description['ProcessingOutputConfig']
for output in output_config['Outputs']:
    print(output['S3Output']['S3Uri'])

In [None]:
new_role = 'arn:aws:iam::322704388865:role/practice'

In [None]:
%%writefile training.py

import os, argparse
import xgboost as xgb
import pandas as pd
import numpy as np

def model_fn(model_dir):
    model = xgb.Booster()
    model.load_model(os.path.join(model_dir, 'xgb.model'))
    return model


if __name__ == '__main__':
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--num_round', type=int, default=50)
    parser.add_argument('--max_depth', type=int, default=5)
    parser.add_argument('--eta', type=float, default=0.2)
    parser.add_argument('--objective', type=str, default='multi:softmax')
    parser.add_argument('--early-stopping-rounds', type=int, default=10)
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--training-dir', type=str, default=os.environ['SM_CHANNEL_TRAIN'])
    parser.add_argument('--validation-dir', type=str, default=os.environ['SM_CHANNEL_VALIDATION'])

    args, _ = parser.parse_known_args()
    model_dir = args.model_dir
    training_dir = args.training_dir
    validation_dir = args.validation_dir
    chk_dir = '/opt/ml/checkpoints'
    
    x_train = os.path.join(training_dir, 'train_features.csv')
    y_train = os.path.join(training_dir, 'train_labels.csv')
    x_test = os.path.join(validation_dir, 'test_features.csv')
    y_test = os.path.join(validation_dir, 'test_labels.csv')

    train_hp = {
        'num_round': args.num_round
        'max_depth': args.max_depth,
        'eta': args.eta,
        'objective': args.objective,
        'early_stopping_rounds': args.early_stopping_rounds}
    
    dtrain = xgb.DMatrix(x_train, label = y_train) # assuming csv will work
    dval = xgb.DMatrix(x_test, label = y_test)
    watchlist = [(dval,'eval'), (dtrain, 'train')]
    
    callbacks = [save_checkpoint(chk_dir)]
    prev_checkpoint, n_iterations_prev_run = load_checkpoint(chk_dir)
    bst = xgb.train(
            params=train_hp,
            dtrain=dtrain,
            evals=watchlist,
            num_boost_round=(args.num_round - n_iterations_prev_run),
            xgb_model=prev_checkpoint,
            callbacks=callbacks) 

    bst.save_model(os.path.join(model_dir, 'xgb.model'))

In [None]:
# Use this cell for local mode or managed mode
enable_local_mode_training = False

if enable_local_mode_training:
    train_dir = os.path.join(os.getcwd(), "data/train")
    test_dir = os.path.join(os.getcwd(), "data/test")
    output_dir = os.path.join(os.getcwd(), "model/output")
    os.makedirs(train_dir, exist_ok=True)
    os.makedirs(test_dir, exist_ok=True)
    os.makedirs(output_dir, exist_ok=True)
    pd.DataFrame(X_train).to_csv(f"{train_dir}/train_features.csv", header=False, index=False)
    pd.DataFrame(X_test).to_csv(f"{train_dir}/train_labels.csv", header=False, index=False)
    y_train.to_csv(f"{test_dir}/test_features.csv", header=False, index=False)
    y_test.to_csv(f"{test_dir}/test_labels.csv", header=False, index=False)
    
    training_path = f"file://{train_dir}"
    validation_path = f"file://{test_dir}"
    output_path   = f"file://{output_dir}"
    train_instance_type = deploy_instance_type = 'local'

else:
    training_path = "s3://steelrawdata/train/"
    validation_path = "s3://steelrawdata/test/"
    output_path   = 's3://steelrawdata/model/'
    train_instance_type = deploy_instance_type = 'ml.t3.medium'

if this cell doesnt work, do whats done in learn sagemaker book

In [None]:
s3_input_train = TrainingInput(s3_data=training_path, content_type="csv")
s3_input_test = TrainingInput(s3_data=validation_path, content_type="csv")

In [None]:
from sagemaker.debugger import rule_configs, Rule, CollectionConfig, DebuggerHookConfig

rules = [Rule.sagemaker(
            base_config=rule_configs.overfit(),
            rule_parameters={"patience": "10",
                             "ratio_threshold": "0.1"},
            collections_to_save=[
                CollectionConfig(name="losses", 
                                 parameters={"train.save_interval": "5", 
                                             "eval.save_interval": "5"})]),
        Rule.sagemaker(
            base_config=rule_configs.overtraining(),
            rule_parameters={"patience_train": "5",
                             "patience_validation": "10",
                             "delta": "0.01"},
            collections_to_save=[
                CollectionConfig(name="losses", 
                                 parameters={"save_interval": "5"})])
        Rule.sagemaker(
            base_config=rule_configs.loss_not_decreasing(),
            rule_parameters={
                "collection_names": "losses"
                "use_losses_collection": "True",
                "num_steps": "10",
                "diff_percent": "0.1",
                "increase_threshold_percent": "5",},
            collections_to_save=[ 
                CollectionConfig(name="losses", 
                                 parameters={"save_interval": "5"})])]

debugger_hook_config = DebuggerHookConfig(s3_output_path='s3://steelrawdata/debug',
                                          collection_configs=[CollectionConfig(name='metrics',
                                                                               parameters={"save_interval":'2'}),
                                                              CollectionConfig(name='average_shap', 
                                                                               parameters={"save_interval":'2'}),
                                                              CollectionConfig(name='feature_importance',
                                                                               parameters={"save_interval": '2'})])

In [None]:
use_spot_instances = True
max_run = 60
max_wait = 60 if use_spot_instances else None
chkp_path = ("s3://steelrawdata/checkpoints/" if use_spot_instances else None)

In [None]:
from sagemaker.xgboost import XGBoost


xgb_estimator = XGBoost(
    entry_point='training.py', 
    role=new_role #sagemaker.get_execution_role(),
    instance_count=1, 
    instance_type=train_instance_type,
    use_spot_instances=use_spot_instances,
    max_run=max_run,
    max_wait=max_wait,
    checkpoint_s3_uri=chkp_path
    framework_version='1.2-2',
    output_path="s3://steelrawdata/model/",
    hyperparameters={'num_round': 100, 'num_class': 7}, # check if this will override the default value
    rules = rules,
    debugger_hook_config=debugger_hook_config)

### hyperparameter tuning

In [None]:
objective_metric_name = 'validation:accuracy'
objective_type = 'Maximize'
metric_definitions = [
    {'Name': 'validation:accuracy', 'Regex': 'val_accuracy: ([0-9\\.]+)'}]

In [None]:
from sagemaker.tuner import ContinuousParameter, IntegerParameter

hyperparameter_ranges = {
    "eta": ContinuousParameter(0.1, 0.5),
    "min_child_weight": ContinuousParameter(1, 10),
    "alpha": ContinuousParameter(0, 2, scaling_type="Logarithmic"), 
    "max_depth": IntegerParameter(1, 10)}

In [None]:
from sagemaker.tuner import HyperparameterTuner
tuner = HyperparameterTuner(xgb_estimator,
                            objective_metric_name,
                            hyperparameter_ranges,
                            metric_definitions=metric_definitions,
                            objective_type=objective_type,
                            strategy='Random'
                            max_jobs=30,
                            max_parallel_jobs=2,
                            early_stopping_type='Auto')

In [None]:
tuner.fit({"train": s3_input_train, "validation": s3_input_test})
training_job_description = sklearn.jobs[-1].describe()

In [None]:
# Wait for a couple of jobs to start

from sagemaker.analytics import HyperparameterTuningJobAnalytics

hp_results = HyperparameterTuningJobAnalytics(
  hyperparameter_tuning_job_name=tuner.latest_tuning_job.name)

hp_results = exp.dataframe()

jobs.sort_values('FinalObjectiveValue', ascending=0)

In [None]:
description = tuner.latest_training_job.rule_job_summary() # if tuner doesnt work, try xgb_estimator

for rule in description:
    rule.pop('LastModifiedTime')
    rule.pop('RuleEvaluationJobArn')
    print(rule)

In [None]:
from smdebug.trials import create_trial

trial = create_trial(tuner.latest_job_debugger_artifacts_path())
trial.tensor_names()

In [None]:
%matplotlib inline 
import matplotlib.pyplot as plt

steps = trial.tensor("train-acc").steps()
train_acc = [trial.tensor('train-acc').value(s) for s in steps]
val_acc = [trial.tensor('validation-acc').value(s) for s in steps]

plt.title('acc over steps')
plt.autoscale()
plt.plot(steps, train_acc, label='train', color='black')
plt.plot(steps, val_acc, label='val', color='grey')
plt.legend()

In [None]:
def plot_features(tensor_prefix):
    num_features = len(df.columns)-1
    for i in range(0,num_features):
        f_name = tensor_prefix+'/f'+str(i)
        steps = trial.tensor(f_name).steps()
        v = [trial.tensor(f_name).value(s) for s in steps]
        plt.plot(steps, v, label=dataset.columns[i+1])
    plt.autoscale()
    plt.title(tensor_prefix)
    plt.legend(loc='upper left')
    plt.show()

In [None]:
plot_features('average_shap')

In [None]:
plot_features('feature_importance/weight')

In [None]:
from time import strftime,gmtime
from sagemaker.model_monitor.data_capture_config import DataCaptureConfig

xgb_endpoint_name = 'xgboost-steel'+strftime('%Y-%m-%d-%H-%M-%S', gmtime())
capture_path = 's3://steelrawdata/capture/'

xgb_predictor = tuner.deploy(
    endpoint_name=xgb_endpoint_name,
    initial_instance_count=1, 
    instance_type=deploy_instance_type
    data_capture_config=DataCaptureConfig(       
        enable_capture=True,                     # Capture data
        sampling_percentage=100,                 
        capture_options=['REQUEST', 'RESPONSE'], # Default value
        destination_s3_uri=capture_path          # Save data here
    )
)

print(xgb_endpoint_name)

In [None]:
from sagemaker.predictor import csv_serializer, json_deserializer

xgb_predictor.content_type = 'text/csv'
xgb_predictor.serializer = csv_serializer
xgb_predictor.deserializer = json_deserializer

In [None]:
predictions = xgb_predictor.predict(X_test)
print(predictions)

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score
print(precision_score(y_test, predictions, average='weighted'))
print(recall_score(y_test, predictions, average='weighted'))
print(f1_score(y_test, predictions, average='weighted'))

### monitoring predicton quality

In [None]:
data = pd.concat([X_train, y_train)], axis = 1)
data.to_csv('baseline_data.csv',index=False)
baseline_data = sess.upload_data(path = 'baseline_data.csv', bucket = 's3://steelrawdata/model-monitor/')

In [None]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

xgb_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1, 
    instance_type='ml.t3.medium'
)

baseline_path = 's3://steelrawdata/model-monitor/'

xgb_monitor.suggest_baseline(
    baseline_dataset=baseline_data,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_path
)

In [None]:
baseline_results = xgb_monitor.latest_baselining_job

schema = pd.io.json.json_normalize(baseline_results.baseline_statistics().body_dict["features"])

In [None]:
constraints = pd.io.json.json_normalize(baseline_results.suggested_constraints().body_dict["features"])

In [None]:
from sagemaker.model_monitor import CronExpressionGenerator

xgb_monitor_name = prefix+'-mon-'+strftime("%Y-%m-%d-%H-%M-%S", gmtime())
report_path = 's3://steelrawdata/report'

xgb_monitor.create_monitoring_schedule(
    monitor_schedule_name=xgb_monitor_name,
    endpoint_input=xgb_predictor.endpoint_name,
    output_s3_uri=report_path,
    statistics=xgb_monitor.baseline_statistics(),
    constraints=xgb_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly()
)

In [None]:
violations = xgb_monitor.latest_monitoring_constraint_violations()
violations = pd.io.json.json_normalize(violations.body_dict["violations"])
violations

In [None]:
xgb_monitor.delete_monitoring_schedule()

In [None]:
sagemaker.Session().delete_endpoint(xgb_predictor.endpoint)

### canary rollout

In [None]:
model_name_1 = 'xgb'
model_name_2 = 'xgb_updated'

In [None]:
production_variants = [
        {
            'VariantName': 'variant-1',
            'ModelName': model_name_1,
            'InitialInstanceCount': 1,
            'InitialVariantWeight': 9,
            'InstanceType': 'ml.t3.medium'
        },
        {
            'VariantName': 'variant-2',
            'ModelName': model_name_2,
            'InitialInstanceCount': 1,
            'InitialVariantWeight': 1,
            'InstanceType': 'ml.t3.medium'
        }
]

In [None]:
import boto3
sm = boto3.client('sagemaker')

In [None]:
import time
endpoint_config_name = 'xgboost-two-models-epc-'+time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())

In [None]:
endpoint_config = sm.create_endpoint_config(
    EndpointConfigName='my_endpoint_config_name',
    ProductionVariants=production_variants)

In [None]:
sm.update_endpoint(
    EndpointName='my_endpoint_name',
    EndpointConfigName=my_endpoint_config_name
)

### blue/green deployments

In [None]:
# assuming the new model performs well:

updated_endpoint_config=[
    {
     'VariantName': 'variant-1',
     'ModelName': model_name_1,
     'InstanceType':'ml.m3.medium',
     'InitialInstanceCount': 1,
     'InitialVariantWeight': 0,
    },
    {
     'VariantName': 'variant-2',
     'ModelName': 'ModelB',
     'InstanceType':model_name_2,
     'InitialInstanceCount': 1,
     'InitialVariantWeight': 1,
    }
]
sm.update_endpoint_weights_and_capacities(
    EndpointName='my_endpoint_name',
    DesiredWeightsAndCapacities=updated_endpoint_config
)

In [None]:
updated_endpoint_config=[
    {
     'VariantName': 'variant-2',
     'ModelName': model_name_2,
     'InstanceType':'ml.m3.medium',
     'InitialInstanceCount': 1,
     'InitialVariantWeight': 1,
    }
]

sm.update_endpoint(
    EndpointName='my_endpoint_name',
    EndpointConfigName='my_endpoint_config_name'
)

### A/B testing

In [None]:
updated_endpoint_config=[
    {
     'VariantName': 'variant-1',
     'ModelName': model_name_1,
     'InstanceType':'ml.m3.medium',
     'InitialInstanceCount': 1,
     'InitialVariantWeight': 50,
    },
    {
     'VariantName': 'variant-2',
     'ModelName': 'ModelB',
     'InstanceType':model_name_2,
     'InitialInstanceCount': 1,
     'InitialVariantWeight': 50,
    }
]
sm.update_endpoint_weights_and_capacities(
    EndpointName='my_endpoint_name',
    DesiredWeightsAndCapacities=updated_endpoint_config
)

In [None]:
# assuming monitored performance for variant B after weeks is better

updated_endpoint_config=[
    {
     'VariantName': 'variant-2',
     'ModelName': 'ModelB',
     'InstanceType':model_name_2,
     'InitialInstanceCount': 1,
     'InitialVariantWeight': 1,
    }
]

sm.update_endpoint(
    EndpointName='my_endpoint_name',
    EndpointConfigName='my_endpoint_config_name'
)