# <B> SageMaker pileline for Anormaly Detection based on AutoEncoder </B>
* Container: codna_pytorch_p310

## AutoReload

In [None]:
%load_ext autoreload
%autoreload 2

## parameter store 설정

In [None]:
import boto3
from utils.ssm import parameter_store

In [None]:
strRegionName=boto3.Session().region_name
pm = parameter_store(strRegionName)
strPrefix = pm.get_params(key="PREFIX")

## pramamters for tasks

In [None]:
strAccountId = pm.get_params(key="-".join([strPrefix, "ACCOUNT-ID"]))
strBucketName = pm.get_params(key="-".join([strPrefix, "BUCKET"]))
strExecutionRole = pm.get_params(key="-".join([strPrefix, "SAGEMAKER-ROLE-ARN"]))
strS3DataPath = pm.get_params(key="-".join([strPrefix, "S3-DATA-PATH"]))

In [None]:
print (f"prefix: {strPrefix}")
print (f"account_id: {strAccountId}")
print (f"defaulut_bucket: {strBucketName}")
print (f"sagemaker_role: {strExecutionRole}")
print (f"s3_data_path: {strS3DataPath}")

## 1. Data manipulation and visualization

In [None]:
import os
import pandas as pd
from utils.util import plot_click_w_fault_and_res, plot_click_w_fault_res_ad, plot_click_w_ad_exp

* load data and derive features

In [None]:
clicks_1T = pd.read_csv(os.path.join(strS3DataPath, "clicks_1T.csv"), parse_dates=["timestamp"]).set_index("timestamp")
clicks_1T["residual"] = clicks_1T['click'] - clicks_1T['user'] 
clicks_1T["fault"] = pd.read_csv(os.path.join(strS3DataPath, "fault_label_1T.csv"), header=None).values[0] ## label
clicks_1T["time"] = [int(str(time).split(" ")[1].split(":")[0]) for time in clicks_1T.index] ## time variable

In [None]:
print (f'data shape: {clicks_1T.shape}')
print (f'timestamp min: {clicks_1T.index.min()}, max: {clicks_1T.index.max()}')

* visualization

In [None]:
plot_click_w_fault_and_res(clicks_1T)

* upload data to s3 and local

In [None]:
strTrainDataName = "merged_clicks_1T.csv"
clicks_1T.to_csv(os.path.join(strS3DataPath, strTrainDataName), index=True) # to s3
clicks_1T.to_csv(os.path.join("./data", strTrainDataName), index=True) # to local

print (f'train_data_name: {strTrainDataName}')

## 2. Pipeline definition
 - [SageMaker Pipeline Execution using Local Mode](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/pipelines-local-mode.html)

In [None]:
import os
import time
import boto3
import logging
import argparse
from pprint import pprint
from pipeline_config.config import config_handler

from sagemaker.pytorch.estimator import PyTorch
from sagemaker.pytorch.model import PyTorchModel
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.steps import CacheConfig, ProcessingStep, TrainingStep
from sagemaker.workflow.pipeline_context import PipelineSession, LocalPipelineSession
from sagemaker.processing import ProcessingInput, ProcessingOutput, FrameworkProcessor
from sagemaker.workflow.retry import StepRetryPolicy, StepExceptionTypeEnum, SageMakerJobExceptionTypeEnum, SageMakerJobStepRetryPolicy

In [None]:
class mlops_pipeline():

    def __init__(self, args):

        self.args = args

        self.strRegionName = self.args.config.get_value("COMMON", "region")
        self.pm = parameter_store(self.strRegionName)
        self._env_setting()        

    def _env_setting(self, ):
        
        self.strPrefix = self.args.config.get_value("COMMON", "prefix")
        self.strExecutionRole = self.args.config.get_value("COMMON", "role")
        self.strBucketName = self.args.config.get_value("COMMON", "bucket")
        self.strModelName = self.args.config.get_value("COMMON", "model_name")
        self.strImageUri = self.args.config.get_value("COMMON", "image_uri")
        self.strPrepImageUri = self.args.config.get_value("COMMON", "image_uri_prep")
        self.strPipelineName = "-".join([self.strPrefix, self.strModelName])
            
        self.cache_config = CacheConfig(
            enable_caching=self.args.config.get_value("PIPELINE", "enable_caching", dtype="boolean"),
            expire_after=self.args.config.get_value("PIPELINE", "expire_after")
        )
        
        self.retry_policies=[                
            # retry when resource limit quota gets exceeded
            SageMakerJobStepRetryPolicy(
                exception_types=[SageMakerJobExceptionTypeEnum.RESOURCE_LIMIT],
                expire_after_mins=180,
                interval_seconds=60,
                backoff_rate=1.0
            ),
        ]
        
        # self.git_config = {
        #     'repo': f'https://{self.pm.get_params(key="-".join([self.strPrefix, "CODE-REPO"]))}',
        #     'branch': 'main',
        #     'username': self.pm.get_params(key="-".join([self.strPrefix, "CODECOMMIT-USERNAME"]), enc=True),
        #     'password': self.pm.get_params(key="-".join([self.strPrefix, "CODECOMMIT-PWD"]), enc=True)
        # }
        
        
        if self.args.config.get_value("LOCALMODE", "mode", dtype="boolean"): self.pipeline_session = LocalPipelineSession()
        else: self.pipeline_session = PipelineSession()
        
        session = boto3.Session()
        self.credentials = session.get_credentials()

        self.pm.put_params(key="-".join([self.strPrefix, "PIPELINE-NAME"]), value=self.strPipelineName, overwrite=True)
        
        print (f" == Envrionment parameters == ")
        print (f"   SAGEMAKER-ROLE-ARN: {self.strExecutionRole}")
        print (f"   PREFIX: {self.strPrefix}")
        print (f"   BUCKET: {self.strBucketName}")
        print (f"   IMAGE-URI: {self.strImageUri}")

    def _step_preprocessing(self, ):
        
        if self.args.config.get_value("LOCALMODE", "mode", dtype="boolean"): instance_type = "local"
        else: instance_type = self.args.config.get_value("PREPROCESSING", "instance_type")
                
        strPrefixPrep = "/opt/ml/processing/"
        strDataPath = self.args.config.get_value("PREPROCESSING", "data_path")
        strTrainDataName = self.args.config.get_value("PREPROCESSING", "data_name")
        
        # network_config로 받으면 된다
        prep_processor = FrameworkProcessor(
            estimator_cls=PyTorch,
            framework_version=self.args.config.get_value("PREPROCESSING", "framework_version"),
            py_version="py310",
            image_uri=None,
            instance_type=instance_type,
            instance_count=self.args.config.get_value("PREPROCESSING", "instance_count", dtype="int"),
            role=self.strExecutionRole,
            base_job_name="preprocessing", # bucket에 보이는 이름 (pipeline으로 묶으면 pipeline에서 정의한 이름으로 bucket에 보임)
            sagemaker_session=self.pipeline_session
        )
        
        step_args = prep_processor.run(
            #job_name="preprocessing", ## 이걸 넣어야 캐시가 작동함, 안그러면 프로세서의 base_job_name 이름뒤에 날짜 시간이 붙어서 캐시 동작 안함
            #git_config=git_config,
            code='preprocessing.py', #소스 디렉토리 안에서 파일 path
            source_dir= "./src/preprocessing", #현재 파일에서 소스 디렉토리 상대경로 # add processing.py and requirements.txt here
            inputs=[
                ProcessingInput(
                    input_name="input-data",
                    source=strDataPath,
                    destination=os.path.join(strPrefixPrep, "input")
                ),
            ],
            outputs=[
                ProcessingOutput(
                    output_name="output-data",
                    source=os.path.join(strPrefixPrep, "output"),
                    destination=os.path.join(
                        "s3://{}".format(self.strBucketName),
                        self.strPipelineName,
                        "preprocessing",
                        "output"
                    )
                ),
            ],
            arguments=[
                "--proc_prefix", strPrefixPrep, \
                "--shingle_size", str(self.args.config.get_value("PREPROCESSING", "shingle_size", dtype="int")), \
                "--train_data_name", strTrainDataName
            ]
        )

        self.preprocessing_process = ProcessingStep(
            name="PreprocessingProcess", ## Processing job이름
            step_args=step_args,
            cache_config=self.cache_config,
        )
        
        print ("  \n== Preprocessing Step ==")
        print ("   \nArgs: ")
        
        for key, value in self.preprocessing_process.arguments.items():
            print ("===========================")
            print (f'key: {key}')
            pprint (value)
        
        print ("  \n== Preprecessing Step ==")
        print ("   \nArgs: ", self.preprocessing_process.arguments.items())            
        print (type(self.preprocessing_process.properties))
            
    
    
    def _step_training(self, ):
               
        if self.args.config.get_value("LOCALMODE", "mode", dtype="boolean"):
            instance_type = "local_gpu"
            environment = {
                'AWS_ACCESS_KEY_ID': self.credentials.access_key,
                'AWS_SECRET_ACCESS_KEY': self.credentials.secret_key,
                'AWS_SESSION_TOKEN': self.credentials.token,
                'AWS_REGION': self.strRegionName
            }
        else:
            instance_type = self.args.config.get_value("TRAINING", "instance_type")
            environment = {}
        
        dicHyperParams = {
            "epochs":"50",
            "batch_size":"128", 
            "lr":"1e-2",
            "shingle_size":str(self.args.config.get_value("PREPROCESSING", "shingle_size", dtype="int")),
            "num_features":"4",
            "emb_size":"4",
            "workers":"2",
        }

        strOutputPath = os.path.join(
            "s3://{}".format(self.strBucketName),
            self.strPipelineName,
            "training",
            "model-output"
        )

        strCodeLocation = os.path.join(
            "s3://{}".format(self.strBucketName),
            self.strPipelineName,
            "training",
            "backup_codes"
        )

        num_re = "([0-9\\.]+)(e-?[[01][0-9])?"
        metric_definitions = [
            {"Name": "Train loss", "Regex": f"loss={num_re}"},
            {"Name": "Train cos", "Regex": f"wer:{num_re}"},
            {"Name": "Val cos", "Regex": f"wer:{num_re}"}
        ]

        bSpotTraining = False
        if bSpotTraining:
            nMaxWait = 1*60*60
            nMaxRun = 1*60*60

        else:
            nMaxWait = None
            nMaxRun = 1*60*60

        bUseTrainWarmPool = False ## training image 다운받지 않음, 속도 빨라진다
        if bUseTrainWarmPool: nKeepAliveSeconds = 3600 ## 최대 1시간 동안!!, service quota에서 warmpool을 위한 request 필요
        else: nKeepAliveSeconds = None
        if bSpotTraining:
            bUseTrainWarmPool = False # warmpool은 spot instance 사용시 활용 할 수 없음
            nKeepAliveSeconds = None
            
        self.estimator = PyTorch(
            entry_point="main.py", # the script we want to run
            source_dir="./src/training", # where our conf/script is
            #git_config=git_config,
            role=self.strExecutionRole,
            instance_type=instance_type,
            instance_count=self.args.config.get_value("TRAINING", "instance_count", dtype="int"),
            image_uri=None,
            framework_version=self.args.config.get_value("TRAINING", "framework_version"),
            py_version="py310",
            volume_size=128,
            code_location=strCodeLocation,
            output_path=strOutputPath,
            #disable_profiler=True,
            #debugger_hook_config=False,
            hyperparameters=dicHyperParams, #{'config-path': 'conf'},
            #distribution={"smdistributed":{"dataparallel":{"enabled":True, "fp16": True}}},
            sagemaker_session=self.pipeline_session,
            metric_definitions=metric_definitions,
            max_run=nMaxRun,
            use_spot_instances=bSpotTraining,  # spot instance 활용
            max_wait=nMaxWait,
            keep_alive_period_in_seconds=nKeepAliveSeconds,
            enable_sagemaker_metrics=True,
            environment=environment
            #container_log_level=logging.DEBUG  # 로그 레벨을 DEBUG로 설정
        )
        step_training_args = self.estimator.fit(
            job_name="training",
            inputs={
                "train": self.preprocessing_process.properties.ProcessingOutputConfig.Outputs["output-data"].S3Output.S3Uri,
                "validation": self.preprocessing_process.properties.ProcessingOutputConfig.Outputs["output-data"].S3Output.S3Uri,
            },
           logs="All",
        )
        
        self.training_process = TrainingStep(
            name="TrainingProcess",
            step_args=step_training_args,
            cache_config=self.cache_config,
            #depends_on=[self.preprocessing_process],
            retry_policies=self.retry_policies
        )

        print ("  \n== Training Step ==")
        print ("   \nArgs: ")
        
        for key, value in self.training_process.arguments.items():
            print ("===========================")
            print (f'key: {key}')
            pprint (value)
        print (self.training_process.arguments.items())
            
    def _step_model_registration(self, ):
        
        self.strModelPackageGroupName = "-".join(["MPG", self.strPrefix, self.strModelName])
        self.pm.put_params(key="-".join([self.strPrefix, "MODEL-GROUP-NAME"]), value=self.strModelPackageGroupName, overwrite=True)
                                                                              
        # model_metrics = ModelMetrics(
        #     model_statistics=MetricsSource(
        #         s3_uri=Join(
        #             on="/",
        #             values=[
        #                 self.evaluation_process.properties.ProcessingOutputConfig.Outputs["evaluation-metrics"].S3Output.S3Uri,
        #                 #print (self.evaluation_process.arguments.items())로 확인가능
        #                 f"evaluation-{self.strModelName}.json"
        #             ],
        #         ),
        #         content_type="application/json")
        # )
        
        model = PyTorchModel(
            source_dir="./src/deploy",
            entry_point="inference.py",
            model_data=self.training_process.properties.ModelArtifacts.S3ModelArtifacts,
            role=self.strExecutionRole,
            framework_version=self.args.config.get_value("MODEL_REGISTER", "framework_version"),
            py_version='py310',
            model_server_workers=1,
            code_location=os.path.join(
                "s3://",
                self.strBucketName,
                self.strPipelineName,
                "inference",
                "model"
            ),
            sagemaker_session=self.pipeline_session,
        )

        step_args = model.register(
            content_types=["application/json", "file-path/raw-bytes", "text/csv"],
            response_types=["application/json"],
            inference_instances=self.args.config.get_value("MODEL_REGISTER", "inference_instances", dtype="list"),
            transform_instances=self.args.config.get_value("MODEL_REGISTER", "transform_instances", dtype="list"),
            model_package_group_name=self.strModelPackageGroupName,
            approval_status=self.args.config.get_value("MODEL_REGISTER", "model_approval_status_default"),
            ## “Approved”, “Rejected”, or “PendingManualApproval” (default: “PendingManualApproval”).
            #model_metrics=model_metrics
        )

        self.register_process = ModelStep(
            name="ModelRegisterProcess",
            step_args=step_args,
            #depends_on=[self.evaluation_process]
        )
        
    def _step_deploy(self, ):
        
        strInstanceType = self.args.config.get_value("DEPLOY", "processing_instance_type")
        nInstanceCount = self.args.config.get_value("DEPLOY", "processing_instance_count", dtype="int")
        strDepolyInstanceType = self.args.config.get_value("DEPLOY", "instance_type")
        strEndpointName = f'endpoint--{self.strPipelineName}-{int(time.time())}'
        strProcPrefixPath = "/opt/ml/processing"
        
        deploy_processor = FrameworkProcessor(
            estimator_cls=PyTorch,
            framework_version=self.args.config.get_value("DEPLOY", "processing_framework_version"),
            py_version="py310",
            image_uri=None,
            role=self.strExecutionRole,
            instance_type=strInstanceType,
            instance_count=nInstanceCount,
            base_job_name="deploy", # bucket에 보이는 이름 (pipeline으로 묶으면 pipeline에서 정의한 이름으로 bucket에 보임)
            sagemaker_session=self.pipeline_session
        )
        
        step_deploy_args = deploy_processor.run(
            code="deploy.py",
            source_dir="src/deploy/",
            arguments=[
                "--prefix_deploy", strProcPrefixPath, \
                "--region", self.strRegionName, \
                "--instance_type", strInstanceType, \
                "--depoly_instance_type", strDepolyInstanceType, \
                "--model_package_group_name", self.strModelPackageGroupName, \
                "--endpoint_name", strEndpointName, \
                "--execution_role", self.strExecutionRole, \
            ],
            job_name="deploy",
        )
        
        self.pm.put_params(key=self.strPrefix + "-ENDPOINT-NAME", value=strEndpointName, overwrite=True)
        
        self.deploy_process = ProcessingStep(
            name="DeployProcess", ## Processing job이름
            step_args=step_deploy_args,
            depends_on=[self.register_process],
            cache_config=self.cache_config,
            retry_policies=self.retry_policies
        )
        
        print ("  \n== Deploy Step ==")
        print ("   \nArgs: ")

        for key, value in self.deploy_process.arguments.items():
            print ("===========================")
            print (f'key: {key}')
            pprint (value)
            
    def _get_pipeline(self, ):

        pipeline = Pipeline(
            name=self.strPipelineName,
            #steps=[self.preprocessing_process, self.training_process, self.register_process, self.deploy_process],
            steps=[self.preprocessing_process, self.training_process],
            sagemaker_session=self.pipeline_session
        )

        return pipeline

    def execution(self, ):

        self._step_preprocessing()
        self._step_training()
        self._step_model_registration()
        self._step_deploy()

        pipeline = self._get_pipeline()
        pipeline.upsert(role_arn=self.strExecutionRole) ## Submit the pipeline definition to the SageMaker Pipelines service 
        execution = pipeline.start()
        desc = execution.describe()
        
        self.pm.put_params(
            key="-".join([self.strPrefix, "PIPELINE-ARN"]),
            value=desc["PipelineArn"],
            overwrite=True
        )
        print ("PipelineArn: ", desc["PipelineArn"])
        print (execution.describe())

In [None]:
parser = argparse.ArgumentParser()
args, _ = parser.parse_known_args()
args.config = config_handler()

print("Received arguments {}".format(args))
os.environ['AWS_DEFAULT_REGION'] = args.config.get_value("COMMON", "region")

pipe_tr = mlops_pipeline(args)
pipe_tr.execution()