## Introduction

This is our fourth notebook (Lab 2) which will dive deep into automating machine learning workflows to create a more repeatable path to production.  

Here, we will put on the hat of a `ML Engineer` and perform the tasks required to automate the tasks within our machine learning workflows as well as orchestrate the steps.  For this, we'll build pipeline steps that include all the previous notebooks components into one singular entity. This pipeline entity accomplishes a repeatable ML workflow with some reliability built in through quality minimal quality gates. 

For this task we will be using Amazon SageMaker Pipelines capabilities to build out an end-to-end machine learning pipeline.   

![Notebook4](images/Notebook-4.png)

Keep in mind, CI/CD practicies are typically more aligned with the *Reliable* stage so you'll notice we have not yet considered a more robust set of pipelines that considers the lifecycle of each stage (build vs deploy), source/version control, automated triggers, or additional quality gates. 

Let's get started!

In [54]:
!pip install -U sagemaker

[0m

In [55]:
%store -r

In [56]:
# Processing imports
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor

# SageMaker Pipeline imports
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep, TransformStep
from sagemaker.workflow.model_step import ModelStep

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

# Other imports
import json
import time
from time import gmtime, strftime
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.model import Model
from sagemaker.tuner import IntegerParameter, HyperparameterTuner
from sagemaker.inputs import TrainingInput
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)

# To test the endpoint once it's deployed
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer, CSVDeserializer
from sagemaker.workflow.pipeline_context import PipelineSession
import sagemaker
import json
import boto3
from sagemaker.model_metrics import ModelMetrics, MetricsSource
import pandas as pd
from sagemaker.feature_store.feature_group import FeatureGroup
from helper_library import *

from sagemaker.workflow.steps import CacheConfig

**Session variables**

In [57]:
# Useful SageMaker variables
session = PipelineSession()
bucket = session.default_bucket()
role_arn= sagemaker.get_execution_role()
region = session.boto_region_name
sagemaker_client = boto3.client('sagemaker')
aws_account_id = boto3.client('sts').get_caller_identity().get('Account')
lambda_role = create_lambda_iam_role('LambdaSageMakerExecutionRole')
# Data paths in S3
s3_prefix = 'aws-sm-ray-workshop'
bucket_prefix = f'{s3_prefix}/data/feature-store'
output_path = f's3://{bucket}/{s3_prefix}/data/sm_processed'
fs_s3_path = f's3://{bucket}/{s3_prefix}/data/feature-store'

Using ARN from existing role: LambdaSageMakerExecutionRole
Done


## Model Build pipeline with SageMaker Pipelines

[Amazon SageMaker Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) provides the ability to create a directed acryclic graph (DAG) containing the pipeline steps need to build and/or deploy machine learning models.  Each pipeline, created through the provided Python SDK, is a series of interconnected steps.  This same pipeline can also be exported as a JSON pipeline definition. 

The structure of a pipeline's DAG is determined by the data dependencies between steps. These data dependencies are created when the properties of a step's output are passed as the input to another step. The following image is a pipeline DAG that we'll be creating for our training pipeline:

![](images/sagemaker-pipelines-dag.png)

#### Pipeline Parameters

SageMaker Pipelines supports [pipeline parameters](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html) allowing you to provide runtime parameters for each run of your pipeline.   This allows you to change key inputs for each run of your pipeline without changing your pipeline code (ex. raw data on input)

Here, we'll identify the parameters and set the parameter default.  You can also use this feature to make it reusable (you'll be able to override these inputs upon executing the pipeline later in the notebook).

In [58]:
# Upload raw data to S3
local_data_path_ray = "data/raw/ray/house_pricing.csv"
raw_data_s3_prefix = '{}/data/raw'.format(s3_prefix)
raw_s3 = session.upload_data(path=local_data_path_ray, key_prefix=raw_data_s3_prefix)

In [59]:
processing_instance_count = ParameterInteger(
    name='ProcessingInstanceCount',
    default_value=1
)
"""
train_feature_group_name = ParameterString(
    name='train_feature_group_name',
    default_value='fs-train-synthetic-housing-data'
)

validation_feature_group_name = ParameterString(
    name='validation_feature_group_name',
    default_value='fs-val-synthetic-housing-data'
)

test_feature_group_name = ParameterString(
    name='test_feature_group_name',
    default_value='fs-test-synthetic-housing-data'
)
"""
bucket_prefix = ParameterString(
    name='bucket_prefix',
    default_value='aws-ray-mlops-workshop/feature-store'
)

train_feature_group_name = 'fs-train-synthetic-housing-data'
validation_feature_group_name = 'fs-val-synthetic-housing-data'
test_feature_group_name = 'fs-test-synthetic-housing-data'

#### Setup Step Caching Configuration

This configuration can be enabled on pipeline steps to allow SageMaker Pipelines to automatically check if a previous (successful) run of a pipeline step with the same values for specific parameters is found. If it is found, Pipelines propogates the results of that step to the next step without re-running the step saving both time and compute costs.

In [60]:
cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")

#### SageMaker Processing step

This should look very similar to the SageMaker Processing Job you configured in notebook 2. The only new line of code is the `ProcessingStep` line at the bottom of the cell below which allows us to take the Processing Job configuration and include it as a pipeline step. 

In [61]:
preprocess_data_processor = SKLearnProcessor(
    framework_version='1.0-1',
    role=role_arn,
    instance_type='ml.m5.xlarge',
    instance_count=processing_instance_count,
    base_job_name='preprocess-data',
    sagemaker_session=session,
    
)

preprocess_dataset_step = ProcessingStep(
    name='PreprocessData',
    code='./pipeline_scripts/preprocessing/script.py',
    processor=preprocess_data_processor,
    inputs=[
        ProcessingInput(
            source=raw_s3,
            destination='/opt/ml/processing/input',
            s3_data_distribution_type='ShardedByS3Key'
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name='train',
            destination=f'{output_path}/train',
            source='/opt/ml/processing/output/train'
        ),
        ProcessingOutput(
            output_name='validation',
            destination=f'{output_path}/validation',
            source='/opt/ml/processing/output/validation'
        ),
        ProcessingOutput(
            output_name='test',
            destination=f'{output_path}/test',
            source='/opt/ml/processing/output/test'
        )
    ],
    cache_config=cache_config
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [62]:
"""
inputs={
        'train': ProcessingInput(
            source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
                'train'
            ].S3Output.S3Uri,
            destination='/opt/ml/processing/input/train',
            s3_data_distribution_type='ShardedByS3Key'
        ),
        'validation': ProcessingInput(
            source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
                'validation'
            ].S3Output.S3Uri,
            destination='/opt/ml/processing/input/validation',
            s3_data_distribution_type='ShardedByS3Key'
        ),
        'test': ProcessingInput(
            source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
                'test'
            ].S3Output.S3Uri,
            
            destination='/opt/ml/processing/input/test',
            s3_data_distribution_type='ShardedByS3Key'
        )
    },
    
    
    inputs=[
        ProcessingInput(
            source=f'{output_path}/train/',
            destination='/opt/ml/processing/input/train',
            s3_data_distribution_type='ShardedByS3Key'
        ),
        ProcessingInput(
            source=f'{output_path}/validation/',
            destination='/opt/ml/processing/input/validation',
            s3_data_distribution_type='ShardedByS3Key'
        ),
        ProcessingInput(
            source=f'{output_path}/test/',
            destination='/opt/ml/processing/input/test',
            s3_data_distribution_type='ShardedByS3Key'
        )
    ],
"""

"""

        ProcessingOutput(output_name="validation", source=f"fs_s3_path/validation/{aws_account_id}/sagemaker/{region}/offline-store/{validation_feature_group_name}/data"),
        ProcessingOutput(output_name="test", source=f"fs_s3_path/test/{aws_account_id}/sagemaker/{region}/offline-store/{test_feature_group_name}/data")
        
        
            outputs=[
        ProcessingOutput(output_name="train", source=Join(on='/', values=[fs_s3_path,'train', aws_account_id, 'sagemaker', region, 'offline-store', train_feature_group_name, 'data'])),
        ProcessingOutput(output_name="validation", source=Join(on='/', values=[fs_s3_path, 'validation', aws_account_id, 'sagemaker', region, 'offline-store', validation_feature_group_name, 'data'])),
        ProcessingOutput(output_name="test", source=Join(on='/', values=[fs_s3_path, 'test', aws_account_id, 'sagemaker', region, 'offline-store', test_feature_group_name, 'data'])),      
    ],
    
    
    
    arguments=[
        Join(
            on=' ',
            values=[
                '--train_feature_group_name',
                train_feature_group_name,
            ],
        ),
        Join(
            on=' ',
            values=[
                '--validation_feature_group_name',
                validation_feature_group_name,
            ],
        ),
        Join(
            on=' ',
            values=[
                '--test_feature_group_name',
                test_feature_group_name,
            ],
        ),
        Join(
            on=' ',
            values=[
                '--bucket_prefix',
                bucket_prefix,
            ],
        ),
        Join(
            on=' ',
            values=[
                '--region',
                region,
            ],
        ),
    ]
    
"""

'\n\n        ProcessingOutput(output_name="validation", source=f"fs_s3_path/validation/{aws_account_id}/sagemaker/{region}/offline-store/{validation_feature_group_name}/data"),\n        ProcessingOutput(output_name="test", source=f"fs_s3_path/test/{aws_account_id}/sagemaker/{region}/offline-store/{test_feature_group_name}/data")\n        \n        \n            outputs=[\n        ProcessingOutput(output_name="train", source=Join(on=\'/\', values=[fs_s3_path,\'train\', aws_account_id, \'sagemaker\', region, \'offline-store\', train_feature_group_name, \'data\'])),\n        ProcessingOutput(output_name="validation", source=Join(on=\'/\', values=[fs_s3_path, \'validation\', aws_account_id, \'sagemaker\', region, \'offline-store\', validation_feature_group_name, \'data\'])),\n        ProcessingOutput(output_name="test", source=Join(on=\'/\', values=[fs_s3_path, \'test\', aws_account_id, \'sagemaker\', region, \'offline-store\', test_feature_group_name, \'data\'])),      \n    ],\n    \n   

In [63]:
from sagemaker.workflow.functions import Join

feature_store_ingestion = SKLearnProcessor(
    framework_version='1.0-1',
    role=role_arn,
    instance_type='ml.m5.2xlarge',
    instance_count=processing_instance_count,
    base_job_name='feature-store-ingestion',
    sagemaker_session=session
)

processor_args = feature_store_ingestion.run(
    code="./pipeline_scripts/feature-store/script.py",
    inputs=[
        ProcessingInput(
            source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
                'train'
            ].S3Output.S3Uri,
            destination='/opt/ml/processing/input/train'
        ),
        ProcessingInput(
            source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
                'validation'
            ].S3Output.S3Uri,
            destination='/opt/ml/processing/input/validation'
        ),
        ProcessingInput(
            source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
                'test'
            ].S3Output.S3Uri,
            destination='/opt/ml/processing/input/test'
        )
    ],
    outputs=[
        ProcessingOutput(output_name="train", s3_upload_mode='Continuous', app_managed=True, feature_store_output = sagemaker.processing.FeatureStoreOutput(feature_group_name = train_feature_group_name)),
        ProcessingOutput(output_name="validation", s3_upload_mode='Continuous', app_managed=True, feature_store_output = sagemaker.processing.FeatureStoreOutput(feature_group_name = validation_feature_group_name)),
    ],  
    arguments=['--train_feature_group_name', train_feature_group_name,
                '--validation_feature_group_name', validation_feature_group_name,
                '--test_feature_group_name', test_feature_group_name,
                '--bucket_prefix', bucket_prefix,
                '--role_arn', role_arn,
                '--region', region,
    ]
)
 
feature_store_ingestion_step = ProcessingStep(
    name='FeatureStoreIngestion',
    step_args=processor_args,
    cache_config=cache_config
)
    
    
            

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


#### SageMaker Training step

This configuration should also look very similar to the SageMaker Training job you did in notebook 2. The only new line of code is the `TrainingStep` line at the bottom of the cell below to allow us to run the training job as a step in our pipeline.

In [64]:
"""
        inputs={
        'train': TrainingInput(feature_store_ingestion_step.properties.ProcessingOutputConfig.Outputs["train"].FeatureStoreOutput.FeatureGroupName),
        'validation': TrainingInput(feature_store_ingestion_step.properties.ProcessingOutputConfig.Outputs["validation"].FeatureStoreOutput.FeatureGroupName)
    },
"""

'\n        inputs={\n        \'train\': TrainingInput(feature_store_ingestion_step.properties.ProcessingOutputConfig.Outputs["train"].FeatureStoreOutput.FeatureGroupName),\n        \'validation\': TrainingInput(feature_store_ingestion_step.properties.ProcessingOutputConfig.Outputs["validation"].FeatureStoreOutput.FeatureGroupName)\n    },\n'

In [65]:
from sagemaker.xgboost.estimator import XGBoost


hyperparams = {
    # Tuned hyperparameters
    "max_depth": "7",
    "eta": "0.389778",
    "min_child_weight": "65",
    "subsample": "0.732408",
    "objective": "reg:linear",
    # Training job params
    "train_feature_group_name": train_feature_group_name,
    "validation_feature_group_name": validation_feature_group_name,
    "role_arn": role_arn,
    "region": region,
}

train_instance_type = 'ml.c5.2xlarge'

estimator_parameters = {
    'source_dir': './pipeline_scripts/train/',
    'entry_point': 'script-pipeline.py',
    'framework_version': '1.7-1',
    'instance_type': train_instance_type,
    'instance_count': 2,
    'hyperparameters': hyperparams,
    'role': role_arn,
    'base_job_name': 'XGBoost-model',
    'output_path': f's3://{bucket}/{s3_prefix}/',
    'image_scope': 'training'
}

estimator = XGBoost(**estimator_parameters)

training_step = TrainingStep(
    name='TrainModel',
    estimator=estimator,
    cache_config=cache_config
)
training_step.add_depends_on([feature_store_ingestion_step])

INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.c5.2xlarge.


#### Model evaluation step

After the training step in our pipeline, we'll want to then evaluate our model's performance. To do that, we can create a SageMaker Processing Step that will utilize evaluation code (evaluation.py) that we specify to perform evaluation of the model using the test hold-out dataset that is output of the preprocess data step configured above. 

In [66]:
dir(feature_store_ingestion_step.properties.ProcessingOutputConfig.Outputs["train"].FeatureStoreOutput)

['FeatureGroupName',
 '__abstractmethods__',
 '__add__',
 '__annotations__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__float__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__int__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__slotnames__',
 '__slots__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_abc_impl',
 '_referenced_steps',
 '_shape_names',
 'expr',
 'path',
 'step_name',
 'to_string']

In [67]:
evaluation_processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=role_arn,
    instance_type='ml.m5.xlarge',
    instance_count=processing_instance_count,
    base_job_name='evaluation',
    sagemaker_session=session,
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [68]:
%%writefile ./pipeline_scripts/evaluate/script.py
import subprocess
import sys
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker', 'ray', 'modin[ray]', 'pydantic==1.10.10', 'xgboost_ray'])
import os
import time
import tarfile
import argparse
import json
import logging
import boto3
import sagemaker
import glob

import pathlib
import numpy as np
from math import sqrt
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

# Sagemaker specific imports
from sagemaker.session import Session
from sagemaker.experiments.run import load_run
import modin.pandas as pd
# Ray specific imports
import ray
from ray.air.checkpoint import Checkpoint
from ray.train.xgboost import XGBoostCheckpoint, XGBoostPredictor


logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))

if __name__ == "__main__":
    logger.debug('Starting evaluation.')
    
    model_dir = '/opt/ml/processing/model'
    for file in os.listdir(model_dir):
        logger.info(file)
        
    model_path = os.path.join(model_dir, 'model.tar.gz')
    # Open the .tar.gz file
    with tarfile.open(model_path, 'r:gz') as tar:
        # Extract all files to the model directory
        tar.extractall(path=model_dir)

    for file in os.listdir(model_dir):
        logger.debug(file)
        
    logger.debug('Loading model.')
    checkpoint = XGBoostCheckpoint.from_directory(f'{model_dir}/model.xgb')
    predictor = XGBoostPredictor.from_checkpoint(checkpoint)

    logger.debug('Reading test data.')
    test_path = "/opt/ml/processing/test/"
    all_files = glob.glob(os.path.join(test_path , "*.csv"))
    frames = []
    for filename in all_files:
        frame = pd.read_csv(filename, index_col=None, header=0)
        frames.append(frame)
    df = pd.concat(frames, axis=0, ignore_index=True)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = df.to_numpy()

    
    logger.info('Performing predictions against test data.')
    predictions = predictor.predict(X_test)

    # See the regression metrics
    # see: https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html
    logger.debug('Calculating metrics.')
    mae = mean_absolute_error(y_test, predictions)
    mse = mean_squared_error(y_test, predictions)
    rmse = sqrt(mse)
    r2 = r2_score(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        'regression_metrics': {
            'mae': {
                'value': mae,
                'standard_deviation': std,
            },
            'mse': {
                'value': mse,
                'standard_deviation': std,
            },
            'rmse': {
                'value': rmse,
                'standard_deviation': std,
            },
            'r2': {
                'value': r2,
                'standard_deviation': std,
            },
        },
    }

    output_dir = '/opt/ml/processing/evaluation'
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    logger.info('Writing out evaluation report with mse: %f', mse)
    evaluation_path = f'{output_dir}/evaluation.json'
    with open(evaluation_path, 'w') as f:
        f.write(json.dumps(report_dict))

Overwriting ./pipeline_scripts/evaluate/script.py


In [69]:
# Specify where we'll store the model evaluation results so
# that other steps can access those results
evaluation_report = PropertyFile(
    name='EvaluationReport',
    output_name='evaluation',
    path='evaluation.json',
)

evaluation_step = ProcessingStep(
    name='EvaluateModel',
    processor=evaluation_processor,
    inputs=[
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination='/opt/ml/processing/model',
        ),
        ProcessingInput(
            source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,
            destination='/opt/ml/processing/test',
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name='evaluation', source='/opt/ml/processing/evaluation'
        ),
    ],
    code='./pipeline_scripts/evaluate/script.py',
    property_files=[evaluation_report],
)

In [70]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri='{}/evaluation.json'.format(
            evaluation_step.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output'][
                'S3Uri'
            ]
        ),
        content_type='application/json',
    )
)

model = Model(
    image_uri=estimator.training_image_uri(),
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    source_dir=estimator.source_dir,
    entry_point=estimator.entry_point,
    role=role_arn,
    sagemaker_session=session
)

model_registry_args = model.register(
    content_types=['text/csv'],
    response_types=['application/json'],
    inference_instances=['ml.t2.medium', 'ml.m5.xlarge'],
    transform_instances=['ml.m5.xlarge'],
    model_package_group_name=model_package_group_name,
    approval_status='PendingManualApproval',
    model_metrics=model_metrics
)

register_step = ModelStep(
    name='RegisterModel',
    step_args=model_registry_args
)



In [71]:
# Condition step for evaluating model quality and branching execution

cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path='regression_metrics.rmse.value',
    ),
    right=13000.0,
)
condition_step = ConditionStep(
    name='CheckEvaluation',
    conditions=[cond_lte],
    if_steps=[register_step],
    else_steps=[],
)

In [72]:
# pipeline_name = 'synthetic-housing-training-pipeline-{}'.format(strftime('%d-%H-%M-%S', gmtime()))
pipeline_name = 'synthetic-housing-training-pipeline-ray'
step_list = [preprocess_dataset_step,
             feature_store_ingestion_step,
             training_step,
             evaluation_step,
             condition_step]

training_pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        train_feature_group_name,
        validation_feature_group_name,
        test_feature_group_name,
        bucket_prefix
    ],
    steps=step_list
)

# Note: If an existing pipeline has the same name it will be overwritten.
training_pipeline.upsert(role_arn=role_arn)

# Viewing the pipeline definition will all the string variables interpolated may help debug pipeline bugs. It is commented out here due to length.
#json.loads(training_pipeline.definition())



Using provided s3_resource




Using provided s3_resource




Using provided s3_resource




Using provided s3_resource


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:523914011708:pipeline/synthetic-housing-training-pipeline-ray',
 'ResponseMetadata': {'RequestId': 'f00b3d08-2db6-4a0c-b3dc-36990465e166',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'f00b3d08-2db6-4a0c-b3dc-36990465e166',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '107',
   'date': 'Sun, 09 Jul 2023 04:11:52 GMT'},
  'RetryAttempts': 0}}

In [73]:
# This is where you could optionally override parameter defaults 
execution = training_pipeline.start()

In [74]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:523914011708:pipeline/synthetic-housing-training-pipeline-ray',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:523914011708:pipeline/synthetic-housing-training-pipeline-ray/execution/29twllgnctji',
 'PipelineExecutionDisplayName': 'execution-1688875913757',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2023, 7, 9, 4, 11, 53, 690000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 7, 9, 4, 11, 53, 690000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:523914011708:user-profile/d-zbtbfrmc31iz/user-1',
  'UserProfileName': 'user-1',
  'DomainId': 'd-zbtbfrmc31iz'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:523914011708:user-profile/d-zbtbfrmc31iz/user-1',
  'UserProfileName': 'user-1',
  'DomainId': 'd-zbtbfrmc31iz'},
 'ResponseMetadata': {'RequestId': '1f8acb19-5a13-4d37-bc07-b3bc9616591f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {

In [None]:
go = 0
if go == 1:
    print("Delete the pipeline")
    sagemaker_client.delete_pipeline(PipelineName=pipeline_name)


In [32]:
fs_train_group = FeatureGroup(
        name="fs-train-synthetic-housing-data", 
        sagemaker_session=session
    )

fs_train_data_loc = fs_train_group.describe().get("OfflineStoreConfig").get("S3StorageConfig").get("ResolvedOutputS3Uri")
fs_train_data_loc



's3://sagemaker-us-east-1-523914011708/aws-ray-mlops-workshop/feature-store/train/523914011708/sagemaker/us-east-1/offline-store/fs-train-synthetic-housing-data-1688849115/data'

In [None]:
s3://sagemaker-us-east-1-523914011708/aws-ray-mlops-workshop/feature-store/train/523914011708/sagemaker/us-east-1/offline-store/fs-train-synthetic-housing-data-1688849115/data

s3://sagemaker-us-east-1-523914011708/aws-ray-mlops-workshop/feature-store/train/523914011708/sagemaker/us-east-1/offline-store/fs-train-synthetic-housing-data/data

In [None]:
import subprocess
import sys
#subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker', 'ray', 'modin[ray]', 'pydantic==1.10.10'])
import os
import time
import tarfile
import argparse
import json
import logging
import boto3
import sagemaker
import glob

import pickle
import pathlib
import numpy as np
from math import sqrt
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

# Sagemaker specific imports
from sagemaker.session import Session
from sagemaker.experiments.run import load_run
import modin.pandas as pd
# Ray specific imports
import ray
from ray.train.xgboost import XGBoostTrainer
from ray.air.config import ScalingConfig
from ray.data import Dataset
from ray.air.result import Result
from ray.air.checkpoint import Checkpoint

In [None]:
fs_test_data_loc

In [None]:
print('Starting evaluation.')
#ray.init()  
#model_dir = '/opt/ml/processing/model'
model_dir = './common/'
model_path = os.path.join(model_dir, 'model.tar.gz')
print(model_path)

# Open the .tar.gz file
with tarfile.open(model_path, 'r:gz') as tar:
    # Extract all files
    tar.extractall(path=model_dir)

# Optional: Print the list of extracted files
with tarfile.open(model_path, 'r:gz') as tar:
    file_names = tar.getnames()
    print("Extracted files:")
    for name in file_names:
        print(name)

checkpoint = ray.train.xgboost.XGBoostCheckpoint.from_directory(f'{model_dir}model.xgb/')
predictor = ray.train.xgboost.XGBoostPredictor.from_checkpoint(checkpoint)
print(dir(predictor))


#test_path = "/opt/ml/processing/test/test.csv"
test_path = fs_test_data_loc

all_files = glob.glob(os.path.join(fs_test_data_loc , "/*.csv"))

li = []

for filename in all_files:
    df = pd.read_csv(filename, index_col=None, header=0)
    li.append(df)

df = pd.concat(li, axis=0, ignore_index=True)
# df = pd.read_csv(test_path, header=0)
df.head(5)
y_test = df.iloc[:, 0].to_numpy()
df.drop(df.columns[0], axis=1, inplace=True)
X_test = df.to_numpy()
predictions = predictor.predict(X_test)



In [None]:
predictions

In [None]:
y_test

In [None]:
# See the regression metrics
# see: https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html
import pathlib
import numpy as np
import pandas as pd
from math import sqrt
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

mae = mean_absolute_error(y_test, predictions)
mse = mean_squared_error(y_test, predictions)
rmse = sqrt(mse)
r2 = r2_score(y_test, predictions)
std = np.std(y_test - predictions)
report_dict = {
    'regression_metrics': {
        'mae': {
            'value': mae,
            'standard_deviation': std,
        },
        'mse': {
            'value': mse,
            'standard_deviation': std,
        },
        'rmse': {
            'value': rmse,
            'standard_deviation': std,
        },
        'r2': {
            'value': r2,
            'standard_deviation': std,
        },
    },
}

output_dir = '/opt/ml/processing/evaluation'
pathlib.Path(model_dir).mkdir(parents=True, exist_ok=True)

evaluation_path = f'{model_dir}/evaluation.json'
with open(evaluation_path, 'w') as f:
    f.write(json.dumps(report_dict))

In [None]:
import joblib


In [None]:
!python ./pipeline_scripts/evaluate/script.py

In [None]:
#function to download the model from Sagemaker Model registry
def download_model(model_name, download_path):
    # Create a Boto3 SageMaker client
    sagemaker_client = boto3.client('sagemaker')


    # Get the details of the model package
    response = sagemaker_client.describe_model_package(
        ModelPackageName=model_name
    )

    # Retrieve the S3 location of the model package
    model_package_location = response['InferenceSpecification']['Containers'][0]['ModelDataUrl']
    print(model_package_location)
    # Download the model package
    s3_client = boto3.client('s3')
    #bucket_name = 'your-bucket-name'  # Replace with your S3 bucket name
    download_path = './common/model.tar.gz'  # Specify the local download path

    s3_client.download_file(bucket, model_package_location, download_path)

    print("Model downloaded successfully.")
    return

In [None]:
download_model(model_package_arn, './common/')