# <B> SageMaker pileline with `MLflow` for Anormaly Detection based on AutoEncoder </B>
* Container: codna_pytorch_p310
* [Example codes](https://github.com/aws/amazon-sagemaker-examples/tree/main/sagemaker-mlflow)

## [중요] Pipeline에 사용되는 주요 사용자 파라미터는 config.ini 파일(./pipeline_config/config.ini)로 관리되고 있습니다. 
 - 특히!! mlflow arn도 config.ini 에 넣어 주어야 합니다!!

## AutoReload

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import boto3
from utils.ssm import parameter_store

In [3]:
strRegionName=boto3.Session().region_name
pm = parameter_store(strRegionName)
strPrefix = pm.get_params(key="PREFIX")

## pramamters for tasks

In [4]:
strAccountId = pm.get_params(key="-".join([strPrefix, "ACCOUNT-ID"]))
strBucketName = pm.get_params(key="-".join([strPrefix, "BUCKET"]))
strExecutionRole = pm.get_params(key="-".join([strPrefix, "SAGEMAKER-ROLE-ARN"]))
strS3DataPath = pm.get_params(key="-".join([strPrefix, "S3-DATA-PATH"]))
tracking_server_arn = pm.get_params(key="-".join([strPrefix, "MLFLOW-TRACKING-SERVER-ARN"]))

In [5]:
print (f"prefix: {strPrefix}")
print (f"account_id: {strAccountId}")
print (f"defaulut_bucket: {strBucketName}")
print (f"sagemaker_role: {strExecutionRole}")
print (f"s3_data_path: {strS3DataPath}")
print (f"tracking_server_arn: {tracking_server_arn}")

prefix: ad-ts-jdj
account_id: 615299776985
defaulut_bucket: sm-anomaly-detection-jdj
sagemaker_role: arn:aws:iam::615299776985:role/service-role/AmazonSageMaker-ExecutionRole-20241014T132050
s3_data_path: s3://sm-anomaly-detection-jdj/data
tracking_server_arn: arn:aws:sagemaker:us-west-2:615299776985:mlflow-tracking-server/mlflow-tracking-ramp


## 1. Pipeline definition

##### 1. "./pipeline_config/config.ini" 파일열기
##### 2. "tracking_server_arn"을 자신의 mlflow arn으로 반드시 수정할 것 !!!

In [6]:
import os
import time
import boto3
import mlflow
import argparse
from pprint import pprint
from pipeline_config.config import config_handler

from sagemaker.pytorch.estimator import PyTorch
from sagemaker.pytorch.model import PyTorchModel
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.steps import CacheConfig, ProcessingStep, TrainingStep
from sagemaker.workflow.pipeline_context import PipelineSession, LocalPipelineSession
from sagemaker.processing import ProcessingInput, ProcessingOutput, FrameworkProcessor
from sagemaker.workflow.retry import StepRetryPolicy, StepExceptionTypeEnum, SageMakerJobExceptionTypeEnum, SageMakerJobStepRetryPolicy
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo, ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import Join, JsonGet
from sagemaker.workflow.fail_step import FailStep
from sagemaker.model_metrics import MetricsSource, ModelMetrics 

from mlflow.tracking import MlflowClient



sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


In [7]:
class mlops_pipeline():

    def __init__(self, args):

        self.args = args

        self.strRegionName = self.args.config.get_value("COMMON", "region")
        self.pm = parameter_store(self.strRegionName)
        self._env_setting()        

    def _env_setting(self, ):

        self.strPrefix = self.args.config.get_value("COMMON", "prefix")
        self.strExecutionRole = self.args.config.get_value("COMMON", "role")
        self.strBucketName = self.args.config.get_value("COMMON", "bucket")
        self.strModelName = self.args.config.get_value("COMMON", "model_name")
        self.strPrepImageUri = self.args.config.get_value("COMMON", "image_uri_prep")
        self.strTrImageUri = self.args.config.get_value("COMMON", "image_uri_tr")
        self.strInfImageUri = self.args.config.get_value("COMMON", "image_uri_inf")
        self.strPipelineName = "-".join([self.strPrefix, self.strModelName])
        self.tracking_server_arn = self.args.config.get_value("COMMON", "tracking_server_arn")
        self.mlflow_exp_name = self.args.config.get_value("COMMON", "mlflow_exp_name")
        #self.mlflow_run_name = ExecutionVariables.PIPELINE_EXECUTION_ID ## PIPELINE_EXECUTION_ID는 runtime별로 변경 -> cache 안된다!      
        self.mlflow_run_name = "sagemaker-ramp"
        
        ## mlflow
        client = MlflowClient()
        mlflow.set_tracking_uri(self.tracking_server_arn)
        
        try:
            # experiment 존재 여부 확인
            experiment = client.get_experiment_by_name(self.mlflow_exp_name)

            if experiment is None:
                # experiment가 없으면 새로 생성
                experiment_id = mlflow.create_experiment(self.mlflow_exp_name)
                print(f"Created new experiment '{self.mlflow_exp_name}' with ID: {experiment_id}")
            else:
                experiment_id = experiment.experiment_id
                print(f"Using existing experiment '{self.mlflow_exp_name}' with ID: {experiment_id}")

            # experiment 설정
            mlflow.set_experiment(self.mlflow_exp_name)

        except Exception as e:
            print(f"Error in run_under_experiment: {str(e)}")
                            
        session = boto3.Session()
        self.credentials = session.get_credentials()

        # SageMaker PipeLine Caching: https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching-enabling.html
        self.cache_config = CacheConfig(
            enable_caching=self.args.config.get_value("PIPELINE", "enable_caching", dtype="boolean"),
            expire_after=self.args.config.get_value("PIPELINE", "expire_after")
        )

        self.retry_policies=[
            # retry when resource limit quota gets exceeded
            SageMakerJobStepRetryPolicy(
                exception_types=[SageMakerJobExceptionTypeEnum.RESOURCE_LIMIT],
                expire_after_mins=180,
                interval_seconds=60,
                backoff_rate=1.0
            ),
        ]

        if self.args.config.get_value("LOCALMODE", "mode", dtype="boolean"): self.pipeline_session = LocalPipelineSession()
        else: self.pipeline_session = PipelineSession()

        self.pm.put_params(key="-".join([self.strPrefix, "PIPELINE-NAME"]), value=self.strPipelineName, overwrite=True)

        print (f" == Envrionment parameters == ")
        print (f"   SAGEMAKER-ROLE-ARN: {self.strExecutionRole}")
        print (f"   PREFIX: {self.strPrefix}")
        print (f"   BUCKET: {self.strBucketName}")
        print (f"   IMAGE-URI-PREP: {self.strPrepImageUri}")

    def _step_preprocessing(self, ):
        
        if self.args.config.get_value("LOCALMODE", "mode", dtype="boolean"): instance_type = "local"
        else: instance_type = self.args.config.get_value("PREPROCESSING", "instance_type")
        
        strPrefixPrep = "/opt/ml/processing/"
        strDataPath = self.args.config.get_value("PREPROCESSING", "data_path")
        strTrainDataName = self.args.config.get_value("PREPROCESSING", "data_name")
        
        # network_config로 받으면 된다
        prep_processor = FrameworkProcessor(
            estimator_cls=PyTorch,
            framework_version=self.args.config.get_value("PREPROCESSING", "framework_version"),
            image_uri=self.strPrepImageUri,
            instance_type=instance_type,
            instance_count=self.args.config.get_value("PREPROCESSING", "instance_count", dtype="int"),
            role=self.strExecutionRole,
            base_job_name="preprocessing", # bucket에 보이는 이름 (pipeline으로 묶으면 pipeline에서 정의한 이름으로 bucket에 보임)
            sagemaker_session=self.pipeline_session,
            #env={"MLFLOW_TRACKING_ARN": self.tracking_server_arn}
        )
        
        step_args = prep_processor.run(
            job_name="preprocessing", ## 이걸 넣어야 캐시가 작동함, 안그러면 프로세서의 base_job_name 이름뒤에 날짜 시간이 붙어서 캐시 동작 안함
            #git_config=git_config,
            code='preprocessing-mlflow.py', #소스 디렉토리 안에서 파일 path
            source_dir= "./src/preprocessing", #현재 파일에서 소스 디렉토리 상대경로 # add processing.py and requirements.txt here
            inputs=[
                ProcessingInput(
                    input_name="input-data",
                    source=strDataPath,
                    destination=os.path.join(strPrefixPrep, "input")
                ),
            ],
            outputs=[
                ProcessingOutput(
                    output_name="output-data",
                    source=os.path.join(strPrefixPrep, "output"),
                    destination=os.path.join(
                        "s3://{}".format(self.strBucketName),
                        self.strPipelineName,
                        "preprocessing",
                        "output"
                    )
                ),
            ],
            arguments=[
                "--proc_prefix", strPrefixPrep, \
                "--shingle_size", str(self.args.config.get_value("PREPROCESSING", "shingle_size", dtype="int")), \
                "--train_data_name", strTrainDataName,
                "--mlflow_tracking_arn", self.tracking_server_arn,
                "--experiment_name", self.mlflow_exp_name,
                "--mlflow_run_name", self.mlflow_run_name
            ]
        )

        self.preprocessing_process = ProcessingStep(
            name="PreprocessingProcess", ## Processing job이름
            step_args=step_args,
            cache_config=self.cache_config,
            retry_policies=self.retry_policies
        )
        
        print ("  \n== Preprocessing Step ==")
        print ("   \nArgs: ")

        
        for key, value in self.preprocessing_process.arguments.items():
            print ("===========================")
            print (f'key: {key}')
            pprint (value)
            
        print (type(self.preprocessing_process.properties))
            

    def _step_training(self, ):
        
        if self.args.config.get_value("LOCALMODE", "mode", dtype="boolean"):
            instance_type = "local_gpu"
            environment = {
                "AWS_ACCESS_KEY_ID": self.credentials.access_key,
                "AWS_SECRET_ACCESS_KEY": self.credentials.secret_key,
                "AWS_SESSION_TOKEN": self.credentials.token,
                "AWS_REGION": self.strRegionName
            }
        else:
            instance_type = self.args.config.get_value("TRAINING", "instance_type")
            environment={
                "MLFLOW_TRACKING_ARN": self.tracking_server_arn,
                "EXPERIMENT_NAME": self.mlflow_exp_name,
                "MLFLOW_RUN_NAME": self.mlflow_run_name
            }
        
        dicHyperParams = {
            "epochs":"50",
            "batch_size":"32",
            "lr":"0.04",
            "shingle_size":str(self.args.config.get_value("PREPROCESSING", "shingle_size", dtype="int")),
            "num_features":"4",
            "emb_size":"4",
            "workers":"2"
        }

        strOutputPath = os.path.join(
            "s3://{}".format(self.strBucketName),
            self.strPipelineName,
            "training",
            "model-output"
        )

        strCodeLocation = os.path.join(
            "s3://{}".format(self.strBucketName),
            self.strPipelineName,
            "training",
            "backup_codes"
        )

        num_re = "([0-9\\.]+)(e-?[[01][0-9])?"
        metric_definitions = [
            {"Name": "Train loss", "Regex": f"loss={num_re}"},
            {"Name": "Train cos", "Regex": f"wer:{num_re}"},
            {"Name": "Val cos", "Regex": f"wer:{num_re}"}
        ]

        bSpotTraining = False
        if bSpotTraining:
            nMaxWait = 1*60*60
            nMaxRun = 1*60*60

        else:
            nMaxWait = None
            nMaxRun = 1*60*60

        ## 비용 추가!!!!!!! 반드시 확인!!!!!!
        bUseTrainWarmPool = True ## training image 다운받지 않음, 속도 빨라진다
        
        if bUseTrainWarmPool: nKeepAliveSeconds = 3600 ## 최대 1시간 동안!!, service quota에서 warmpool을 위한 request 필요
        else: nKeepAliveSeconds = None
        if bSpotTraining:
            bUseTrainWarmPool = False # warmpool은 spot instance 사용시 활용 할 수 없음
            nKeepAliveSeconds = None
        
        self.estimator = PyTorch(
            entry_point="main-mlflow.py", # the script we want to run
            source_dir="./src/training", # where our conf/script is
            #git_config=git_config,
            role=self.strExecutionRole,
            instance_type=instance_type,
            instance_count=self.args.config.get_value("TRAINING", "instance_count", dtype="int"),
            image_uri=self.strTrImageUri,
            framework_version=self.args.config.get_value("TRAINING", "framework_version"),
            volume_size=125, ## cache 적용 안된다. 
            code_location=strCodeLocation,
            output_path=strOutputPath,
            disable_profiler=True,
            debugger_hook_config=False,
            hyperparameters=dicHyperParams, #{'config-path': 'conf'},
            #distribution={"smdistributed":{"dataparallel":{"enabled":True, "fp16": True}}},
            sagemaker_session=self.pipeline_session,
            metric_definitions=metric_definitions,
            max_run=nMaxRun,
            use_spot_instances=bSpotTraining,  # spot instance 활용
            max_wait=nMaxWait,
            keep_alive_period_in_seconds=nKeepAliveSeconds,
            enable_sagemaker_metrics=True,
            environment=environment
        )

        step_training_args = self.estimator.fit(
            job_name="training",
            inputs={
                "train": self.preprocessing_process.properties.ProcessingOutputConfig.Outputs["output-data"].S3Output.S3Uri,
                "validation": self.preprocessing_process.properties.ProcessingOutputConfig.Outputs["output-data"].S3Output.S3Uri,
            },
            logs="All",
        )
        
        self.training_process = TrainingStep(
            name="TrainingProcess",
            step_args=step_training_args,
            cache_config=self.cache_config,
            retry_policies=self.retry_policies
        )

        print ("  \n== Training Step ==")
        print ("   \nArgs: ")

        for key, value in self.training_process.arguments.items():
            print ("===========================")
            print (f'key: {key}')
            pprint (value)
    
    def _step_evaluation(self, ):
        
        if self.args.config.get_value("LOCALMODE", "mode", dtype="boolean"): instance_type = "local"
        else: instance_type = self.args.config.get_value("EVALUATION", "instance_type")
        
        strPrefixPrep = "/opt/ml/processing/"
        
        # network_config로 받으면 된다
        eval_processor = FrameworkProcessor(
            estimator_cls=PyTorch,
            framework_version=self.args.config.get_value("EVALUATION", "framework_version"),
            image_uri=self.strPrepImageUri,
            instance_type=instance_type,
            instance_count=self.args.config.get_value("EVALUATION", "instance_count", dtype="int"),
            role=self.strExecutionRole,
            base_job_name="evaluation", # bucket에 보이는 이름 (pipeline으로 묶으면 pipeline에서 정의한 이름으로 bucket에 보임)
            sagemaker_session=self.pipeline_session,
            #env={"MLFLOW_TRACKING_ARN": self.tracking_server_arn}
        )
        
        step_args = eval_processor.run(
            job_name="evaluation", ## 이걸 넣어야 캐시가 작동함, 안그러면 프로세서의 base_job_name 이름뒤에 날짜 시간이 붙어서 캐시 동작 안함
            #git_config=git_config,
            code='evaluation-mlflow.py', #소스 디렉토리 안에서 파일 path
            source_dir= "./src/evaluation", #현재 파일에서 소스 디렉토리 상대경로
            inputs=[
                ProcessingInput(
                    input_name="test-data",
                    source=self.preprocessing_process.properties.ProcessingOutputConfig.Outputs["output-data"].S3Output.S3Uri,
                    destination=os.path.join(strPrefixPrep, "test")
                ),
                ProcessingInput(
                    input_name="model_artifact",
                    source=self.training_process.properties.ModelArtifacts.S3ModelArtifacts,
                    destination=os.path.join(strPrefixPrep, "model")
                )
            ],
            outputs=[
                ProcessingOutput(
                    output_name="evaluation",
                    source=os.path.join(strPrefixPrep, "output"),
                    destination=os.path.join(
                        "s3://{}".format(self.strBucketName),
                        self.strPipelineName,
                        "evaluation",
                        "output"
                    )
                ),
            ],
            arguments=[
                "--proc_prefix", strPrefixPrep, \
                "--mlflow_tracking_arn", self.tracking_server_arn,
                "--experiment_name", self.mlflow_exp_name,
                "--mlflow_run_name", self.mlflow_run_name
            ]
        )

        self.evaluation_report = PropertyFile(
            name="EvaluationReport",
            output_name="evaluation", ## evaluation의 ProcessingOutput의 output_name
            path="evaluation.json", ## evaluate.py 에서 write하는 부분
        )
        
        self.evaluation_process = ProcessingStep(
            name="EvaluationProcess", ## Processing job이름
            step_args=step_args,
            property_files=[self.evaluation_report],
            cache_config=self.cache_config,
            retry_policies=self.retry_policies
        )
        
        print ("  \n== Evaluation Step ==")
        print ("   \nArgs: ")

        
        for key, value in self.evaluation_process.arguments.items():
            print ("===========================")
            print (f'key: {key}')
            pprint (value)
            
        print (type(self.evaluation_process.properties))
        
    def _step_model_registration(self, ):
        
        self.strModelPackageGroupName = "-".join(["MPG", self.strPrefix, self.strModelName])
        self.pm.put_params(key="-".join([self.strPrefix, "MODEL-GROUP-NAME"]), value=self.strModelPackageGroupName, overwrite=True)
                                                                              
        model_metrics = ModelMetrics(
            model_statistics=MetricsSource(
                s3_uri=Join(
                    on="/",
                    values=[
                        self.evaluation_process.properties.ProcessingOutputConfig.Outputs["evaluation"].S3Output.S3Uri,
                        #print (self.evaluation_process.arguments.items())로 확인가능
                        f"evaluation-{self.strModelName}.json"
                    ],
                ),
                content_type="application/json")
        )
        
        model = PyTorchModel(
            source_dir="./src/deploy",
            entry_point="inference.py",
            model_data=self.training_process.properties.ModelArtifacts.S3ModelArtifacts,
            role=self.strExecutionRole,
            framework_version=self.args.config.get_value("MODEL_REGISTER", "framework_version"),
            image_uri=self.strInfImageUri,
            model_server_workers=1,
            code_location=os.path.join(
                "s3://",
                self.strBucketName,
                self.strPipelineName,
                "inference",
                "model"
            ),
            sagemaker_session=self.pipeline_session,
        )
        
        print ("=======================")
        print ("=======================")
        print ("model.image_uri", model.image_uri)
        print ("self.strInfImageUri", self.strInfImageUri)
        model.image_uri = self.strInfImageUri
        print ("model.image_uri", model.image_uri)

        step_args = model.register(
            content_types=["application/json", "file-path/raw-bytes", "text/csv"],
            response_types=["application/json"],
            inference_instances=self.args.config.get_value("MODEL_REGISTER", "inference_instances", dtype="list"),
            transform_instances=self.args.config.get_value("MODEL_REGISTER", "transform_instances", dtype="list"),
            model_package_group_name=self.strModelPackageGroupName,
            approval_status=self.args.config.get_value("MODEL_REGISTER", "model_approval_status_default"),
            ## “Approved”, “Rejected”, or “PendingManualApproval” (default: “PendingManualApproval”).
            model_metrics=model_metrics
        )

        self.register_process = ModelStep(
            name="ModelRegisterProcess",
            step_args=step_args,
            #depends_on=[self.evaluation_process]
        )
        
    def _step_fail(self, ):
            
        self.fail_process = FailStep(
            name="ConditionFail",
            error_message=Join(
                on=" ",
                values=["Execution failed due to performance threshold"]
            ),
        )
        
    def _step_condition(self, ):
                
        # https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/build-and-manage-steps.html#step-type-condition
        # 조건문 종류: https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#conditions
        
        self.condition_acc = ConditionGreaterThanOrEqualTo(
            left=JsonGet(
                step_name=self.evaluation_process.name,
                property_file=self.evaluation_report,
                json_path="metrics.accuracy.value" ## evaluation.py에서 json으로 performance를 기록한 대로 한다. 
                                                   ## 즉, S3에 저장된 evaluation-<model_name>.json 파일안에 있는 값을 적어줘야 한다. 
            ),
            right=self.args.config.get_value("CONDITION", "thesh_accuracy", dtype="float"),
        )
        
        self.condition_prec = ConditionGreaterThanOrEqualTo(
            left=JsonGet(
                step_name=self.evaluation_process.name,
                property_file=self.evaluation_report,
                json_path="metrics.precision.value" ## evaluation.py에서 json으로 performance를 기록한 대로 한다. 
                                                    ## 즉, S3에 저장된 evaluation-<model_name>.json 파일안에 있는 값을 적어줘야 한다. 
            ),
            right=self.args.config.get_value("CONDITION", "thesh_precision", dtype="float"),
        )
        
        self.condition_process = ConditionStep(
            name="CheckCondition",
            display_name="CheckCondition",
            conditions=[self.condition_acc, self.condition_prec], ## 여러 조건 함께 사용할 수 있음
            if_steps=[self.register_process],
            else_steps=[self.fail_process]
        )
        
        print ("  \n== Condition Step ==")
        print ("   \nArgs: ")
        for key, value in self.condition_process.arguments.items():
            print ("===========================")
            print (f'key: {key}')
            pprint (value)

    def _step_deploy(self, ):
        
        strInstanceType = self.args.config.get_value("DEPLOY", "instance_type")
        nInstanceCount = self.args.config.get_value("DEPLOY", "instance_count", dtype="int")
        strDepolyInstanceType = self.args.config.get_value("DEPLOY", "instance_type")
        strEndpointName = f'endpoint--{self.strPipelineName}-{int(time.time())}'
        strProcPrefixPath = "/opt/ml/processing"
        
        deploy_processor = FrameworkProcessor(
            estimator_cls=PyTorch,
            framework_version=self.args.config.get_value("DEPLOY", "processing_framework_version"),
            #py_version="py310",
            image_uri=self.strInfImageUri,
            role=self.strExecutionRole,
            instance_type=strInstanceType,
            instance_count=nInstanceCount,
            base_job_name="deploy", # bucket에 보이는 이름 (pipeline으로 묶으면 pipeline에서 정의한 이름으로 bucket에 보임)
            sagemaker_session=self.pipeline_session
        )
        
        step_deploy_args = deploy_processor.run(
            code="deploy.py",
            source_dir="src/deploy/",
            arguments=[
                "--prefix_deploy", strProcPrefixPath, \
                "--region", self.strRegionName, \
                "--instance_type", strInstanceType, \
                "--depoly_instance_type", strDepolyInstanceType, \
                "--model_package_group_name", self.strModelPackageGroupName, \
                "--endpoint_name", strEndpointName, \
                "--execution_role", self.strExecutionRole, \
            ],
            job_name="deploy",
        )
        
        self.pm.put_params(key=self.strPrefix + "-ENDPOINT-NAME", value=strEndpointName, overwrite=True)
        
        self.deploy_process = ProcessingStep(
            name="DeployProcess", ## Processing job이름
            step_args=step_deploy_args,
            depends_on=[self.register_process],
            cache_config=self.cache_config,
            retry_policies=self.retry_policies
        )
        
        print ("  \n== Deploy Step ==")
        print ("   \nArgs: ")

        for key, value in self.deploy_process.arguments.items():
            print ("===========================")
            print (f'key: {key}')
            pprint (value)
            
    def _get_pipeline(self, ):

        pipeline = Pipeline(
            name=self.strPipelineName,
            #steps=[self.preprocessing_process, self.training_process, self.register_process, self.deploy_process],
            steps=[self.preprocessing_process, self.training_process, self.evaluation_process, self.condition_process, self.deploy_process],
            sagemaker_session=self.pipeline_session
        )

        return pipeline

    def execution(self, ):

        self._step_preprocessing()
        self._step_training()
        self._step_evaluation()
        self._step_model_registration()
        self._step_fail()
        self._step_condition()
        self._step_deploy()

        pipeline = self._get_pipeline()
        pipeline.upsert(role_arn=self.strExecutionRole) ## Submit the pipeline definition to the SageMaker Pipelines service 
        execution = pipeline.start()
        desc = execution.describe()

        self.pm.put_params(
            key="-".join([self.strPrefix, "PIPELINE-ARN"]),
            value=desc["PipelineArn"],
            overwrite=True
        )
        print ("PipelineArn: ", desc["PipelineArn"])
        print (execution.describe())

In [8]:
parser = argparse.ArgumentParser()
args, _ = parser.parse_known_args()
args.config = config_handler(strConfigPath="config.ini")

print("Received arguments {}".format(args))
os.environ['AWS_DEFAULT_REGION'] = args.config.get_value("COMMON", "region")

pipe_tr = mlops_pipeline(args)
pipe_tr.execution()

  LOCALMODE: mode:False
  COMMON: prefix:DJ-SM-PIPELINE
  COMMON: region:us-west-2
  COMMON: role:arn:aws:iam::615299776985:role/service-role/AmazonSageMaker-ExecutionRole-20241014T132050
  COMMON: bucket:sm-anomaly-detection-jdj
  COMMON: model_name:RAPP
  COMMON: image_uri_prep:615299776985.dkr.ecr.us-west-2.amazonaws.com/prep-docker-image
  COMMON: image_uri_tr:615299776985.dkr.ecr.us-west-2.amazonaws.com/tr-docker-image
  COMMON: image_uri_inf:615299776985.dkr.ecr.us-west-2.amazonaws.com/inf-docker-image
  COMMON: tracking_server_arn:arn:aws:sagemaker:us-west-2:615299776985:mlflow-tracking-server/mlflow-tracking-ramp
  COMMON: mlflow_exp_name:anomaly-detection-exp-trial
  PIPELINE: enable_caching:True
  PIPELINE: expire_after:T24H
  PREPROCESSING: data_path:s3://sm-anomaly-detection-jdj/data
  PREPROCESSING: data_name:merged_clicks_1T.csv
  PREPROCESSING: framework_version:2.1
  PREPROCESSING: instance_type:ml.g4dn.xlarge
  PREPROCESSING: instance_count:1
  PREPROCESSING: shingle_s

Error in run_under_experiment: RESOURCE_ALREADY_EXISTS: Experiment(name=anomaly-detection-exp-trial) already exists. Error: (raised as a result of Query-invoked autoflush; consider using a session.no_autoflush block if this flush is occurring prematurely)
(psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "experiments_name_key"
DETAIL:  Key (name)=(anomaly-detection-exp-trial) already exists.

[SQL: INSERT INTO experiments (name, artifact_location, lifecycle_stage, creation_time, last_update_time) VALUES (%(name)s, %(artifact_location)s, %(lifecycle_stage)s, %(creation_time)s, %(last_update_time)s) RETURNING experiments.experiment_id]
[parameters: {'name': 'anomaly-detection-exp-trial', 'artifact_location': '', 'lifecycle_stage': 'active', 'creation_time': 1749214356765, 'last_update_time': 1749214356765}]
(Background on this error at: https://sqlalche.me/e/20/gkpj)


 == Envrionment parameters == 
   SAGEMAKER-ROLE-ARN: arn:aws:iam::615299776985:role/service-role/AmazonSageMaker-ExecutionRole-20241014T132050
   PREFIX: DJ-SM-PIPELINE
   BUCKET: sm-anomaly-detection-jdj
   IMAGE-URI-PREP: 615299776985.dkr.ecr.us-west-2.amazonaws.com/prep-docker-image
  
== Preprocessing Step ==
   
Args: 




key: ProcessingResources
{'ClusterConfig': {'InstanceCount': 1,
                   'InstanceType': 'ml.g4dn.xlarge',
                   'VolumeSizeInGB': 30}}
key: AppSpecification
{'ContainerArguments': ['--proc_prefix',
                        '/opt/ml/processing/',
                        '--shingle_size',
                        '4',
                        '--train_data_name',
                        'merged_clicks_1T.csv',
                        '--mlflow_tracking_arn',
                        'arn:aws:sagemaker:us-west-2:615299776985:mlflow-tracking-server/mlflow-tracking-ramp',
                        '--experiment_name',
                        'anomaly-detection-exp-trial',
                        '--mlflow_run_name',
                        'sagemaker-ramp'],
 'ContainerEntrypoint': ['/bin/bash',
                         '/opt/ml/processing/input/entrypoint/runproc.sh'],
 'ImageUri': '615299776985.dkr.ecr.us-west-2.amazonaws.com/prep-docker-image'}
key: RoleArn
'arn:aws:iam

  
== Training Step ==
   
Args: 


key: AlgorithmSpecification
{'EnableSageMakerMetricsTimeSeries': True,
 'MetricDefinitions': [{'Name': 'Train loss',
                        'Regex': 'loss=([0-9\\.]+)(e-?[[01][0-9])?'},
                       {'Name': 'Train cos',
                        'Regex': 'wer:([0-9\\.]+)(e-?[[01][0-9])?'},
                       {'Name': 'Val cos',
                        'Regex': 'wer:([0-9\\.]+)(e-?[[01][0-9])?'}],
 'TrainingImage': '615299776985.dkr.ecr.us-west-2.amazonaws.com/tr-docker-image',
 'TrainingInputMode': 'File'}
key: OutputDataConfig
{'S3OutputPath': 's3://sm-anomaly-detection-jdj/DJ-SM-PIPELINE-RAPP/training/model-output'}
key: StoppingCondition
{'MaxRuntimeInSeconds': 3600}
key: ResourceConfig
{'InstanceCount': 1,
 'InstanceType': 'ml.g4dn.xlarge',
 'KeepAlivePeriodInSeconds': 3600,
 'VolumeSizeInGB': 125}
key: RoleArn
'arn:aws:iam::615299776985:role/service-role/AmazonSageMaker-ExecutionRole-20241014T132050'
key: InputDataConfig
[{'ChannelName': 'train',
  'DataSource': {'S3

key: ProcessingResources
{'ClusterConfig': {'InstanceCount': 1,
                   'InstanceType': 'ml.g4dn.xlarge',
                   'VolumeSizeInGB': 30}}
key: AppSpecification
{'ContainerArguments': ['--proc_prefix',
                        '/opt/ml/processing/',
                        '--mlflow_tracking_arn',
                        'arn:aws:sagemaker:us-west-2:615299776985:mlflow-tracking-server/mlflow-tracking-ramp',
                        '--experiment_name',
                        'anomaly-detection-exp-trial',
                        '--mlflow_run_name',
                        'sagemaker-ramp'],
 'ContainerEntrypoint': ['/bin/bash',
                         '/opt/ml/processing/input/entrypoint/runproc.sh'],
 'ImageUri': '615299776985.dkr.ecr.us-west-2.amazonaws.com/prep-docker-image'}
key: RoleArn
'arn:aws:iam::615299776985:role/service-role/AmazonSageMaker-ExecutionRole-20241014T132050'
key: ProcessingInputs
[{'AppManaged': False,
  'InputName': 'test-data',
  'S3Input'

key: Conditions
[{'LeftValue': JsonGet(step_name='EvaluationProcess', property_file=PropertyFile(name='EvaluationReport', output_name='evaluation', path='evaluation.json'), json_path='metrics.accuracy.value', s3_uri=None, step=None),
  'RightValue': 0.5,
  'Type': 'GreaterThanOrEqualTo'},
 {'LeftValue': JsonGet(step_name='EvaluationProcess', property_file=PropertyFile(name='EvaluationReport', output_name='evaluation', path='evaluation.json'), json_path='metrics.precision.value', s3_uri=None, step=None),
  'RightValue': 0.5,
  'Type': 'GreaterThanOrEqualTo'}]
key: IfSteps
[{'Arguments': {'AlgorithmSpecification': {'TrainingImage': '246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3',
                                           'TrainingInputMode': 'File'},
                'DebugHookConfig': {'CollectionConfigurations': [],
                                    'S3OutputPath': 's3://sm-anomaly-detection-jdj/DJ-SM-PIPELINE-RAPP/inference/model/inf-docker-image-

key: ProcessingResources
{'ClusterConfig': {'InstanceCount': 1,
                   'InstanceType': 'ml.g4dn.xlarge',
                   'VolumeSizeInGB': 30}}
key: AppSpecification
{'ContainerArguments': ['--prefix_deploy',
                        '/opt/ml/processing',
                        '--region',
                        'us-west-2',
                        '--instance_type',
                        'ml.g4dn.xlarge',
                        '--depoly_instance_type',
                        'ml.g4dn.xlarge',
                        '--model_package_group_name',
                        'MPG-DJ-SM-PIPELINE-RAPP',
                        '--endpoint_name',
                        'endpoint--DJ-SM-PIPELINE-RAPP-1749214358',
                        '--execution_role',
                        'arn:aws:iam::615299776985:role/service-role/AmazonSageMaker-ExecutionRole-20241014T132050'],
 'ContainerEntrypoint': ['/bin/bash',
                         '/opt/ml/processing/input/entrypoint/ru

PipelineArn:  arn:aws:sagemaker:us-west-2:615299776985:pipeline/DJ-SM-PIPELINE-RAPP
{'PipelineArn': 'arn:aws:sagemaker:us-west-2:615299776985:pipeline/DJ-SM-PIPELINE-RAPP', 'PipelineExecutionArn': 'arn:aws:sagemaker:us-west-2:615299776985:pipeline/DJ-SM-PIPELINE-RAPP/execution/drekhgey82zn', 'PipelineExecutionDisplayName': 'execution-1749214360981', 'PipelineExecutionStatus': 'Executing', 'PipelineExperimentConfig': {'ExperimentName': 'dj-sm-pipeline-rapp', 'TrialName': 'drekhgey82zn'}, 'CreationTime': datetime.datetime(2025, 6, 6, 12, 52, 40, 883000, tzinfo=tzlocal()), 'LastModifiedTime': datetime.datetime(2025, 6, 6, 12, 52, 40, 883000, tzinfo=tzlocal()), 'CreatedBy': {'IamIdentity': {'Arn': 'arn:aws:sts::615299776985:assumed-role/AmazonSageMaker-ExecutionRole-20241014T132050/SageMaker', 'PrincipalId': 'AROAY6QVZQHM46BISOX35:SageMaker'}}, 'LastModifiedBy': {'IamIdentity': {'Arn': 'arn:aws:sts::615299776985:assumed-role/AmazonSageMaker-ExecutionRole-20241014T132050/SageMaker', 'Princi

<!-- 실제 Repack 구조 변화:
원본 (훈련 후 생성된 model.tar.gz):
model.tar.gz
├── pytorch_model.bin          # 실제 모델 가중치
├── config.json               # 모델 설정
└── (기타 훈련 관련 파일들)
Repack 후:
model.tar.gz
├── pytorch_model.bin          # 동일한 모델 파일
├── config.json               # 동일한 설정
├── inference.py              # 추가: 추론 코드
├── requirements.txt          # 추가: 의존성
├── .sagemaker-metadata       # 추가: SageMaker 메타데이터
│   ├── model-config.json     # 모델 서빙 설정
│   ├── input-schema.json     # 입력 스키마
│   └── output-schema.json    # 출력 스키마
└── code/                     # 추가: 코드 디렉토리 (선택적)
    └── inference.py -->

In [92]:
'''
원본 (훈련 후 생성된 model.tar.gz):
model.tar.gz
├── pytorch_model.bin          # 실제 모델 가중치
├── config.json               # 모델 설정
└── (기타 훈련 관련 파일들)

Repack 후:
model.tar.gz
├── pytorch_model.bin          # 동일한 모델 파일
├── config.json               # 동일한 설정
├── inference.py              # 추가: 추론 코드
├── requirements.txt          # 추가: 의존성
├── .sagemaker-metadata       # 추가: SageMaker 메타데이터
│   ├── model-config.json     # 모델 서빙 설정
│   ├── input-schema.json     # 입력 스키마
│   └── output-schema.json    # 출력 스키마
└── code/                     # 추가: 코드 디렉토리 (선택적)
    └── inference.py
'''

'\n원본 (훈련 후 생성된 model.tar.gz):\nmodel.tar.gz\n├── pytorch_model.bin          # 실제 모델 가중치\n├── config.json               # 모델 설정\n└── (기타 훈련 관련 파일들)\n\nRepack 후:\nmodel.tar.gz\n├── pytorch_model.bin          # 동일한 모델 파일\n├── config.json               # 동일한 설정\n├── inference.py              # 추가: 추론 코드\n├── requirements.txt          # 추가: 의존성\n├── .sagemaker-metadata       # 추가: SageMaker 메타데이터\n│   ├── model-config.json     # 모델 서빙 설정\n│   ├── input-schema.json     # 입력 스키마\n│   └── output-schema.json    # 출력 스키마\n└── code/                     # 추가: 코드 디렉토리 (선택적)\n    └── inference.py\n'