# MLOps with SageMaker Pipelines
[Check]
- ROLE :  sagemaker role arn: arn:aws:iam::353411055907:role/service-role/AmazonSageMaker-ExecutionRole-20230315T235247
- `AmazonSageMakerFullAccess`와 `AmazonSageMakerPipelinesIntegrations` policy 필수 


In [27]:
# The necessary packages are already set to be installed through a script written in the LifeCycle Configurations when the  Sagenmaker Studio's Notebook Jobs starts
%load_ext autoreload
%autoreload 2
import sys
import IPython

#install_needed = True
install_needed = False

if install_needed:
    print("===> Installing deps and restarting kernel. Please change 'install_needed = False' and run this code cell again.")
    !{sys.executable} -m pip install -U "nbformat" "argparse" "torchvision==0.14.1"  "awscli==1.27.68" "boto3==1.26.68" "botocore==1.29.68" "datasets==1.18.4" "sagemaker==2.143.0" "s3fs==0.4.2" "s3transfer==0.6.0" "transformers==4.17.0" "nvidia-cublas-cu11==11.10.3.66" "nvidia-cuda-nvrtc-cu11==11.7.99" "nvidia-cuda-runtime-cu11==11.7.99" "nvidia-cudnn-cu11==8.5.0.96"  
    IPython.Application.instance().kernel.do_shutdown(True)

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [28]:
import boto3
import os
import numpy as np
import sagemaker
import sys
import time

import sagemaker
import sagemaker.huggingface
from sagemaker.huggingface import HuggingFace, HuggingFaceModel

from sagemaker.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString

from sagemaker.lambda_helper import Lambda

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.huggingface.processing import HuggingFaceProcessor

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import CacheConfig, ProcessingStep

from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.step_collections import CreateModelStep, RegisterModel

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo,ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

from sagemaker.workflow.pipeline import Pipeline, PipelineExperimentConfig
from sagemaker.workflow.execution_variables import ExecutionVariables

In [29]:
sess = sagemaker.Session()
region = sess.boto_region_name

# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session(default_bucket=sagemaker_session_bucket)

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sagemaker_session.default_bucket()}")
print(f"sagemaker session region: {sagemaker_session.boto_region_name}")

sagemaker role arn: arn:aws:iam::353411055907:role/service-role/AmazonSageMaker-ExecutionRole-20230315T235247
sagemaker bucket: sagemaker-us-east-1-353411055907
sagemaker session region: us-east-1


In [30]:
# Bring the necessary src and utils from the S3 bucket to the current working directory for the Scheduling Job by SageMaker Studio Notebook Jobs
import boto3
import os
import zipfile

s3_uri = 's3://sagemaker-us-east-1-353411055907/GP-LJP-mlops/LJP_MLops.zip'

current_dir = os.getcwd()

s3 = boto3.client('s3')
bucket, key = s3_uri.split('//')[1].split('/', 1)
s3.download_file(bucket, key, os.path.join(current_dir, os.path.basename(key)))

zip_file = os.path.join(current_dir, os.path.basename(key))
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
    zip_ref.extractall(current_dir)


## Defining the Pipeline
---


###  Pipeline parameters

In [31]:
# S3 prefix where every assets will be stored
s3_prefix = "GP-LJP-mlops"

# s3 bucket used for storing assets and artifacts
bucket = sagemaker_session.default_bucket()

# aws region used
region = sagemaker_session.boto_region_name

# base name prefix for sagemaker jobs (training, processing, inference)
base_job_prefix = s3_prefix

# Cache configuration for workflow
cache_config = CacheConfig(enable_caching=True, expire_after="7d")

# package versions
transformers_version = '4.17.0'
pytorch_version = '1.10.2'
py_version = "py38"

model_id_ = "lawcompany/KLAID_LJP_base"
tokenizer_id_ = "lawcompany/KLAID_LJP_base"
dataset_name_ = "lawcompany/KLAID"

model_id = ParameterString(name="ModelId", default_value=model_id_)
tokenizer_id = ParameterString(name="TokenizerId", default_value=tokenizer_id_)
dataset_name = ParameterString(name="DatasetName", default_value=dataset_name_)

### Processing Step

In [32]:
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
from sagemaker.workflow.parameters import ParameterString, ParameterInteger
from sagemaker.workflow.steps import ProcessingStep
import os


data_processing_script_py = "./src/collecting_data.py"  

# S3 버킷 이름과 경로 설정
s3_bucket = bucket  # S3 버킷 이름
s3_prefix = "GP-LJP-mlops"  # S3에 저장할 폴더 이름


# 데이터 전처리 스크립트 파일과 출력 경로 설정
data_processing_script = "./src/collecting_data.py"  # 데이터 전처리를 위한 Python 스크립트 파일
output_data_path = f"s3://{s3_bucket}/{s3_prefix}/data/collected_data"  # 전처리된 데이터를 저장할 S3 경


# file_name 파라미터 정의
file_name = ParameterString(name='FileName', default_value='final_data.csv')

# SageMaker SKLearnProcessor 생성
sklearn_processor = SKLearnProcessor(
    framework_version='0.23-1',  # 사전 정의된 scikit-learn 버전 지정
    role=role,  # 미리 생성한 IAM 역할 ARN을 사용
    instance_count=1,
    instance_type='ml.m5.xlarge',
)


# SageMaker Processing Job 정의
step_data_collection = ProcessingStep(
    name='DataProcessing',
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(
            source=f"s3://{s3_bucket}/{s3_prefix}/data/labels.csv",
            destination="/opt/ml/processing/input",
        ),
        # 다른 입력 데이터에 대한 설정 추가 (필요에 따라)
    ],
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/output",
            destination=output_data_path,
            output_name='file_name',
        ),
        ProcessingOutput(
            source="/opt/ml/processing/processed_output",
            destination=output_data_path,
        )
    ],
    code=data_processing_script,
)



In [33]:
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge")
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_script = ParameterString(name="ProcessingScript", default_value="./src/processing_sklearn.py")

In [34]:
processing_output_destination = f"s3://{bucket}/{s3_prefix}/data"

sklearn_processor = SKLearnProcessor(
    instance_type="ml.m5.xlarge", 
    instance_count=processing_instance_count,
    framework_version="1.0-1",    
    base_job_name=base_job_prefix + "-preprocessing",
    sagemaker_session=sagemaker_session,    
    role=role
)

step_process = ProcessingStep(
    name="ProcessDataForTraining",
    #cache_config=cache_config,
    processor=sklearn_processor,
    inputs=[
        # 여기에 step_data_collection의 file_name을 전달해줍니다.
        ProcessingInput(
            input_name='file_name',
            source=step_data_collection.properties.ProcessingOutputConfig.Outputs['file_name'].S3Output.S3Uri,
            destination="/opt/ml/processing/file_name",
        ),
    ],
        
    job_arguments=["--model_id", model_id_,
                   "--tokenizer_id", tokenizer_id_,
                   "--dataset_name", dataset_name_,
                   "--transformers_version", transformers_version,
                   "--pytorch_version", pytorch_version,
                   #"--file_name", file_name  # file_name을 job_arguments로 추가
                   
                  ],
    outputs=[
        ProcessingOutput(
            output_name="train",
            destination=f"{processing_output_destination}/train",
            source="/opt/ml/processing/train",
        ),
        ProcessingOutput(
            output_name="validation",
            destination=f"{processing_output_destination}/test",
            source="/opt/ml/processing/validation",
        ),
        ProcessingOutput(
            output_name="test",
            destination=f"{processing_output_destination}/test",
            source="/opt/ml/processing/test",
        )        
    ],
    code="./src/processing_sklearn.py"
)

###  Model Training Step

In [35]:
# training step parameters
training_entry_point = ParameterString(name="TrainingEntryPoint", default_value="train.py")
training_source_dir = ParameterString(name="TrainingSourceDir", default_value="./src")
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.p3.8xlarge")
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)

# hyperparameters, which are passed into the training job
n_gpus = ParameterString(name="NumGPUs", default_value="1")
epochs = ParameterString(name="Epochs", default_value="1")
seed = ParameterString(name="Seed", default_value="42")
train_batch_size = ParameterString(name="TrainBatchSize", default_value="1")
eval_batch_size = ParameterString(name="EvalBatchSize", default_value="2")           
learning_rate = ParameterString(name="LearningRate", default_value="5e-5") 

model_id = ParameterString(name="ModelId", default_value=model_id_)
tokenizer_id = ParameterString(name="TokenizerId", default_value=tokenizer_id_)
dataset_name = ParameterString(name="DatasetName", default_value=dataset_name_)

In [36]:
hyperparameters = {
    'n_gpus': n_gpus,                       # number of GPUs per instance
    'epochs': epochs,                       # number of training epochs
    'seed': seed,                           # seed
    'train_batch_size': train_batch_size,   # batch size for training
    'eval_batch_size': eval_batch_size,     # batch size for evaluation
    'warmup_steps': 0,                      # warmup steps
    'learning_rate': learning_rate,         # learning rate used during training
    'tokenizer_id': model_id,               # pre-trained tokenizer
    'model_id': tokenizer_id                # pre-trained model
} 

chkpt_s3_path = f's3://{bucket}/{s3_prefix}/processing/checkpoints'

In [37]:
huggingface_estimator = HuggingFace(
    entry_point="train.py",
    source_dir="./src",
    base_job_name=base_job_prefix + "-training",
    instance_type="ml.p3.8xlarge",
    instance_count=training_instance_count,
    role=role,
    transformers_version=transformers_version,
    pytorch_version=pytorch_version,
    py_version=py_version,
    hyperparameters=hyperparameters,
    sagemaker_session=sagemaker_session,    
    disable_profiler=True,
    debugger_hook_config=False,
    checkpoint_s3_uri=chkpt_s3_path,
    checkpoint_local_path='/opt/ml/checkpoints'
)

step_train = TrainingStep(
    name="TrainModel",
    estimator=huggingface_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri
        ),
    },
    cache_config=cache_config,
)

### Model evaluation Step

In [38]:
evaluation_script = ParameterString(name="EvaluationScript", default_value="./src/evaluate.py")
evaluation_instance_type = ParameterString(name="EvaluationInstanceType", default_value="ml.m5.xlarge")
evaluation_instance_count = ParameterInteger(name="EvaluationInstanceCount", default_value=1)

In [40]:
script_eval = SKLearnProcessor(
    framework_version="1.0-1",
    instance_type="ml.m5.xlarge",
    instance_count=evaluation_instance_count,
    base_job_name=base_job_prefix + "-evaluation",
    sagemaker_session=sagemaker_session,
    role=role,
)

evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

step_eval = ProcessingStep(
    name="EvalLoss",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation",
            destination=f"s3://{bucket}/{s3_prefix}/evaluation_report",
        ),
    ],
    code="./src/evaluate.py",
    property_files=[evaluation_report],
    cache_config=cache_config,
)

### Register the model

In [41]:
model = HuggingFaceModel(
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    role=role,
    transformers_version=transformers_version,
    pytorch_version=pytorch_version,
    py_version=py_version,
    sagemaker_session=sagemaker_session,
)
model_package_group_name = "LJPModelPackageGroup"
step_register = RegisterModel(
    name="RegisterModel",
    model=model,
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.m5.xlarge", "ml.g4dn.xlarge"],
    transform_instances=["ml.m5.xlarge", "ml.g4dn.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status="Approved",
)

### Model Deployment


In [43]:
# custom Helper Step for ModelDeployment
from utils.deploy_step import ModelDeployment

# we will use the iam role from the notebook session for the created endpoint
# this role will be attached to our endpoint and need permissions, e.g. to download assets from s3
sagemaker_endpoint_role=sagemaker.get_execution_role()
model_n_ = "lawcompany/LJP"
model_name = f"{model_n_.split('/')[-1]}-{time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime())}"

step_deployment = ModelDeployment(
    model_name=model_name,
    registered_model=step_register.steps[0],
    endpoint_instance_type="ml.m5.xlarge",
    sagemaker_endpoint_role=sagemaker_endpoint_role,
    autoscaling_policy=None,
)

Using ARN from existing role: sagemaker-pipelines-model-deployment-role


### Condition for deployment


In [44]:
threshold_accuracy = ParameterFloat(name="ThresholdAccuracy", default_value=0.95)

In [45]:
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="eval_accuracy",
    ),
    right=threshold_accuracy,
)

step_cond = ConditionStep(
    name="CheckEvalAccuracy",
    conditions=[cond_gte],
    if_steps=[step_register, step_deployment],
    else_steps=[],
)



## Pipeline definition and execution
---

In [46]:
pipeline = Pipeline(
    name=f"LJP-Pipeline",
    parameters=[
        file_name,
        model_id,
        tokenizer_id,        
        dataset_name,
        processing_instance_type,
        processing_instance_count,
        processing_script,
        training_entry_point,
        training_source_dir,
        training_instance_type,
        training_instance_count,
        evaluation_script,
        evaluation_instance_type,
        evaluation_instance_count,
        threshold_accuracy,
        n_gpus,
        epochs,
        seed,
        eval_batch_size,
        train_batch_size,
        learning_rate,
    ],
    steps=[step_data_collection, step_process, step_train, step_eval, step_cond],
    sagemaker_session=sagemaker_session,
)

In [47]:
# pipeline definition
import json

definition = json.loads(pipeline.definition())
definition

Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'FileName',
   'Type': 'String',
   'DefaultValue': 'final_data.csv'},
  {'Name': 'ModelId',
   'Type': 'String',
   'DefaultValue': 'lawcompany/KLAID_LJP_base'},
  {'Name': 'TokenizerId',
   'Type': 'String',
   'DefaultValue': 'lawcompany/KLAID_LJP_base'},
  {'Name': 'DatasetName',
   'Type': 'String',
   'DefaultValue': 'lawcompany/KLAID'},
  {'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'ProcessingScript',
   'Type': 'String',
   'DefaultValue': './src/processing_sklearn.py'},
  {'Name': 'TrainingEntryPoint', 'Type': 'String', 'DefaultValue': 'train.py'},
  {'Name': 'TrainingSourceDir', 'Type': 'String', 'DefaultValue': './src'},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.p3.8xlarge'},
  {'Name': 'TrainingInstanceCount', 'Type': 'Integer', 'Default

In [48]:
pipeline.upsert(role_arn=role)

Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.
Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:353411055907:pipeline/ljp-pipeline',
 'ResponseMetadata': {'RequestId': '5f45c8c6-b246-42f6-b166-9a094ca61349',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '5f45c8c6-b246-42f6-b166-9a094ca61349',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '80',
   'date': 'Tue, 05 Sep 2023 17:03:14 GMT'},
  'RetryAttempts': 0}}

In [49]:
# Run the pipeline
execution = pipeline.start()

In [50]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:353411055907:pipeline/ljp-pipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:353411055907:pipeline/ljp-pipeline/execution/gyhdro4q6e7l',
 'PipelineExecutionDisplayName': 'execution-1693933395497',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2023, 9, 5, 17, 3, 15, 380000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 9, 5, 17, 3, 15, 380000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:353411055907:user-profile/d-irdpcbrlhtyb/default-1680148488767',
  'UserProfileName': 'default-1680148488767',
  'DomainId': 'd-irdpcbrlhtyb'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:353411055907:user-profile/d-irdpcbrlhtyb/default-1680148488767',
  'UserProfileName': 'default-1680148488767',
  'DomainId': 'd-irdpcbrlhtyb'},
 'ResponseMetadata': {'RequestId': '0a25a826-5422-4d9f-89d5-8da77aa78b40',
  'HTTPStatusCode': 200,
  'HTTPHeade

In [51]:
execution.wait()

In [52]:
execution.list_steps()

[{'StepName': 'ModelDeployment',
  'StartTime': datetime.datetime(2023, 9, 5, 17, 27, 13, 222000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 9, 5, 17, 27, 17, 180000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'Lambda': {'Arn': 'arn:aws:lambda:us-east-1:353411055907:function:sagemaker-pipelines-model-deployment-09-05-17-03-09',
    'OutputParameters': [{'Name': 'other_key', 'Value': 'example_value'},
     {'Name': 'body', 'Value': '"Created Endpoint!"'},
     {'Name': 'statusCode', 'Value': '200.0'}]}}},
 {'StepName': 'RegisterModel-RegisterModel',
  'StartTime': datetime.datetime(2023, 9, 5, 17, 27, 10, 958000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 9, 5, 17, 27, 12, 354000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:353411055907:model-package/ljpmodelpackagegroup/10'}}},
 {'StepName': 'CheckEvalAccuracy',
  'StartTime': 


## Clean up
---

In [53]:
sm_client = boto3.client("sagemaker")

# Delete the Lambda function
step_deployment.func.delete()

# Endpoint 는 그냥 추후에 미사용시 삭제 
# Delete the endpoint
#hf_predictor.delete_endpoint()

{'ResponseMetadata': {'RequestId': '715ee90a-d0e7-4a93-af7e-81daf7f75bf0',
  'HTTPStatusCode': 204,
  'HTTPHeaders': {'date': 'Tue, 05 Sep 2023 17:27:22 GMT',
   'content-type': 'application/json',
   'connection': 'keep-alive',
   'x-amzn-requestid': '715ee90a-d0e7-4a93-af7e-81daf7f75bf0'},
  'RetryAttempts': 0}}