# <B> MLOps pipeline examples </B>

## AutoReload

In [47]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## 1. parameter store 셋팅

In [48]:
import boto3
from utils.ssm import parameter_store

In [49]:
strRegionName=boto3.Session().region_name
pm = parameter_store(strRegionName)
strPrefix = pm.get_params(key="PREFIX")

## 2. mlops pipeline 만들기
* pipeline: https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#id2

In [50]:
import sagemaker
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import CacheConfig, ProcessingStep, TuningStep
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

In [53]:
class pipeline():
    
    def __init__(self, ): 
        self.strRegionName = boto3.Session().region_name
        self.pm = parameter_store(self.strRegionName)
        strPrefix = pm.get_params(key="PREFIX")
        
        self.account_id = pm.get_params(key=strPrefix + "ACCOUNT-ID")
        self.default_bucket = self.pm.get_params(key=strPrefix + "BUCKET") # sagemaker.Session().default_bucket()
        self.role = self.pm.get_params(key=strPrefix + "SAGEMAKER-ROLE-ARN")#self.args.config.get_value("ROLE", "sagemaker_execution_role")
        self.strInOutPrefix = '/opt/ml/processing'
        self.sm_client = boto3.client('sagemaker') 
        self.pipeline_session = PipelineSession()
        self.cache_config = CacheConfig(
            enable_caching=True,
            expire_after="T48H"
        )    
        self.model_image_uri = sagemaker.image_uris.retrieve("blazingtext", self.strRegionName)
       
        print (f'  Account-ID: {self.account_id}, \nRegion: {self.strRegionName}, \nRole: {self.role}, \nDefault_bucket: {self.default_bucket}')
        print (f'  pipeline_session: {self.pipeline_session}')
        
    def _step_preprocessing(self, ):
        
        sklearn_processor = SKLearnProcessor(
            framework_version="0.20.0",
            role=self.role,
            instance_type="ml.m5.xlarge",
            #instance_type="local",
            instance_count=1,
            base_job_name="preprocessing", # bucket에 보이는 이름 (pipeline으로 묶으면 pipeline에서 정의한 이름으로 bucket에 보임)
            sagemaker_session=self.pipeline_session ## Add
        )
        
        step_args = sklearn_processor.run(
            code='./sources/preprocessing/preprocessing.py',
            inputs=[
                ProcessingInput(
                    source=pm.get_params(key=strPrefix + "REVIEW-DATA-PATH"),
                    destination='/opt/ml/processing/input'
                ),
                ProcessingInput(
                    input_name="requirements",
                    source='./sources/preprocessing/requirements.txt',
                    destination="/opt/ml/processing/input/requirements"
                ),
            ],
            outputs=[
                ProcessingOutput(output_name="train_data", source='/opt/ml/processing/output/train'),
                ProcessingOutput(output_name="validation_data", source='/opt/ml/processing/output/validation'),
                ProcessingOutput(output_name="test_data", source='/opt/ml/processing/output/test')
            ],
            arguments=["--input_name", "reviews.tsv.gz", "--region", strRegionName],
            job_name="preprocessing",
        )
        
        self.preprocessing_process = ProcessingStep(
            name="PreprocessingProcess", ## Processing job이름
            step_args=step_args,
            cache_config=self.cache_config,
        )
        
    def _step_training(self, ):
        
        max_jobs = 6
        max_parallel_jobs = 3
        objective_name = "validation:accuracy"
        
        self.estimator=Estimator(
            image_uri=self.model_image_uri,
            role=self.role, 
            instance_count=1,
            instance_type="ml.m5.xlarge",
            volume_size=30,
            max_run=360000,
            input_mode= 'File',
            sagemaker_session=self.pipeline_session ## add
        )

        self.estimator.set_hyperparameters(
            mode="supervised",
            epochs=10,
            min_epochs=5, # Min epochs before early stopping is introduced
            early_stopping=True,
            patience=2,
            learning_rate=0.01,
            min_count=2, # words that appear less than min_count are discarded 
            word_ngrams=1, # the number of word n-gram features to use.
            vector_dim=16, # dimensions of embedding layer
        ) 

        hyperparameter_ranges={
            'epochs': IntegerParameter(5, 50),
            'learning_rate': ContinuousParameter(0.005, 0.01),
            'min_count': IntegerParameter(0, 100),
            'vector_dim': IntegerParameter(32, 300),
            'word_ngrams': IntegerParameter(1, 3)
        }
        
        tuner = HyperparameterTuner(
            self.estimator, 
            objective_name,
            hyperparameter_ranges,
            max_jobs=max_jobs,
            max_parallel_jobs=max_parallel_jobs,
            base_tuning_job_name="hyper-param-tune",
        )
        
        step_tuner_args = tuner.fit(
            inputs={
                "train": TrainingInput(
                    s3_data=self.preprocessing_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri,
                    content_type="text/csv"
                ),
                "validation": TrainingInput(
                    s3_data=self.preprocessing_process.properties.ProcessingOutputConfig.Outputs["validation_data"].S3Output.S3Uri,
                    content_type="text/csv"
                )
            }
        )
        
        self.tuning_process = TuningStep(
            name="TrainWithHyperParamTuningProcess",
            step_args=step_tuner_args,
            cache_config=self.cache_config,
            depends_on=[self.preprocessing_process]
        )
    
    def _get_pipeline(self, ):
        
        pipeline_name = 'pipeline-demo'
        pipeline = Pipeline(
            name=pipeline_name,
            steps=[self.preprocessing_process, self.tuning_process],
        )

        return pipeline
                            
    def execution(self, ):
        
        self._step_preprocessing()
        self._step_training()
        
        pipeline = self._get_pipeline()
        pipeline.upsert(role_arn=self.role) ## Submit the pipeline definition to the SageMaker Pipelines service 
        execution = pipeline.start()
        execution.describe()

In [54]:
pipe = pipeline()
pipe.execution()

  Account-ID: 419974056037, 
Region: ap-northeast-2, 
Role: arn:aws:iam::419974056037:role/service-role/AmazonSageMaker-ExecutionRole-20221206T163436, 
Default_bucket: sagemaker-ap-northeast-2-419974056037
  pipeline_session: <sagemaker.workflow.pipeline_context.PipelineSession object at 0x7f66ef68cbe0>


No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config



Job Name:  preprocessing
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-ap-northeast-2-419974056037/reviews-helpfulness-pipeline/data/reviews.tsv.gz', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'requirements', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-ap-northeast-2-419974056037/preprocessing/input/requirements/requirements.txt', 'LocalPath': '/opt/ml/processing/input/requirements', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-ap-northeast-2-419974056037/preprocessing/input/code/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated