In [15]:
#!/usr/bin/env python
# coding: utf-8

# In[1]:

import os
import pytz
from datetime import datetime

import boto3
import sagemaker
import sagemaker.session

from sagemaker.transformer import Transformer
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput,TransformInput,CreateModelInput
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
)
from sagemaker.network import NetworkConfig
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.conditions import (
    ConditionGreaterThanOrEqualTo,
)
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
    TransformStep,
    CreateModelStep
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.model import Model


sess = sagemaker.Session()

In [16]:
def get_session(region, default_bucket):
    """Gets the sagemaker session based on the region.
    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts
    Returns:
        `sagemaker.session.Session instance
    """

    boto_session = boto3.Session(region_name=region)

    sagemaker_client = boto_session.client("sagemaker")
    runtime_client = boto_session.client("sagemaker-runtime")
    return sagemaker.session.Session(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        sagemaker_runtime_client=runtime_client,
        default_bucket=default_bucket,
    )

In [54]:
default_bucket = sess.default_bucket()
account_id = boto3.client("sts").get_caller_identity().get("Account")
region = boto3.session.Session().region_name

In [None]:
input_path = f"s3://{default_bucket}/mobile_price_pred/input/"

In [57]:
import pandas as pd
df = pd.read_csv("data/data.csv")

df.to_csv(input_path)

## Pipeline for Preprocess Job + Training Job

In [45]:
def get_pipeline(
    region,
    #subnets, #if your orginization concern about security
    #security_group_ids,
    role=None,
    default_bucket=None,
    model_package_group_name="MobilePricePred-ModelPackageGroup",  # Choose any name
    pipeline_name="MobilePricePred-pipeline-",  # You can find your pipeline name in the Studio UI (project -> Pipelines -> name)
    base_job_prefix="MobilePricePred",  # Choose any name
    env='dev' #Choose the enviornment
):
    """Gets a SageMaker ML Pipeline instance working with on CustomerChurn data.
    Args:
        region: AWS region to create and run the pipeline.
        role: IAM role to create and run steps and pipeline.
        default_bucket: the bucket to use for storing the artifacts
    Returns:
        an instance of a pipeline
    """
    
    srilanka_tz = pytz.timezone('Asia/Colombo')
    s3 = boto3.client('s3')
    #working input data path
    input_data = "s3://{default_bucket}/mobile_price_pred/input/"

    directory_name = "mobile-price-pred"
    date_folder = datetime.now(srilanka_tz).strftime("%Y-%m-%d")
    
    sagemaker_session = get_session(region, default_bucket)
    if role is None:
        role = sagemaker.session.get_execution_role(sagemaker_session)
    account_id = boto3.client("sts").get_caller_identity().get("Account")
    region = boto3.session.Session().region_name

    # Parameters for pipeline execution
    processing_instance_count = ParameterInteger(
        name="ProcessingInstanceCount", default_value=1
    )
    processing_instance_type = ParameterString(
        name="ProcessingInstanceType", default_value="ml.m5.2xlarge"
    )
      
    preprocess_output_data = f"s3://{default_bucket}/mobile_price_pred/preprocess_output_data/{date_folder}/X_train/"
    preprocess_output_data1 = f"s3://{default_bucket}/mobile_price_pred/preprocess_output_data/{date_folder}/X_test/"
    preprocess_output_data2 = f"s3://{default_bucket}/mobile_price_pred/preprocess_output_data/{date_folder}/y_train/"
    preprocess_output_data3 = f"s3://{default_bucket}/mobile_price_pred/preprocess_output_data/{date_folder}/y_test/"
  
    training_instance_type = ParameterString(
        name="TrainingInstanceType", default_value="ml.m5.xlarge"
    )
    
    model_path = ParameterString(
        name="ModelPath",
        default_value=f"s3://{default_bucket}/mobile_price_pred/preprocess_output_data/{date_folder}/model/xgboost/", 
    )
    
    # --------------------- PREPROCESSING --------------------------------------------------------------------
    ecr_repository = "mobile-price-pred-preprocessing-image"
    tag = ":latest"
    uri_suffix = "amazonaws.com"
    
    preprocessing_repository_uri = "{}.dkr.ecr.{}.{}/{}".format(
        account_id, region, uri_suffix, ecr_repository + tag
    )
        
    script_processor = ScriptProcessor(
         command = ["python3"],
         image_uri = preprocessing_repository_uri,
         role = role,
         instance_count = 1,
         instance_type = processing_instance_type,
         #network_config = NetworkConfig(subnets=subnets.split(':'), security_group_ids=security_group_ids.split(':'))
    )
    
    step_preprocess = ProcessingStep(
        name="mobile-price-pred-preprocessing",
        processor=script_processor, 
        code="mobile_price_preprocessing/preprocessing.py",
        inputs=[ProcessingInput(source=input_data, destination="/opt/ml/processing/input")],
        outputs=[
            ProcessingOutput(output_name="trainX", destination=preprocess_output_data, source="/opt/ml/processing/train-x"),
            ProcessingOutput(output_name="testX", destination=preprocess_output_data1, source="/opt/ml/processing/test-x"),
            ProcessingOutput(output_name="trainy", destination=preprocess_output_data2, source="/opt/ml/processing/train-y"),
            ProcessingOutput(output_name="testy", destination=preprocess_output_data3, source="/opt/ml/processing/test-y"),
            
        ]
        #job_arguments=["--env", env],
    )
    
    
    # --------------------- TRAINING --------------------------------------------------------------------
    
    # Training step for generating model artifacts
    
    ecr_repository = "mobile-price-pred-training-image"
    tag = ":latest"
    uri_suffix = "amazonaws.com"
    
    recommender_image_uri = "{}.dkr.ecr.{}.{}/{}".format(
        account_id, region, uri_suffix, ecr_repository + tag
    )
    estimator = Estimator(image_uri=recommender_image_uri,
                      role=role,
                      sagemaker_session=sess,                                  # Technical object
                      output_path=model_path,
                      base_job_name='Mobile-price-pred-training-job',
                      input_mode='File',                                       # Copy the dataset and then train    
                      train_instance_count=1,
                      train_instance_type=training_instance_type,
                      debugger_hook_config=False,
                      disable_profiler = True,
                      metric_definitions=[
                          # Only 40 Metrics can be accomodated
                            {'Name': 'Training Accuracy:' , 'Regex': 'Training Accuracy: ([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'MAE' , 'Regex': 'MAE: ([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'MSE' , 'Regex': 'MSE: ([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'RMSE' , 'Regex': 'RMSE: ([-+]?[0-9]*\.?[0-9]+)'},
                            #{'Name': 'F1 score:' , 'Regex': 'F1 score:([-+]?[0-9]*\.?[0-9]+)'}
                      ],
                      #subnets = subnets.split(':'),
                      #security_group_ids = security_group_ids.split(':')
                         )

    # start training
    step_train = TrainingStep(
        name="Mobile-price-pred-training",
        estimator=estimator,
        inputs = {
            "train-x": TrainingInput(
                s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs["trainX"].S3Output.S3Uri,
                content_type="text/csv",
            ),
           "train-y": TrainingInput(
                s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs["trainy"].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "test-x": TrainingInput(
                s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs["testX"].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "test-y": TrainingInput(
                s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs["testy"].S3Output.S3Uri,
                content_type="text/csv",
            ),
        }
    )
    
 
    # ========================================= PIPELINE ORCHESTRATION ================================================
    
    # Pipeline instance
    pipeline = Pipeline(
        name=pipeline_name+env,
        parameters=[
            processing_instance_type,
            processing_instance_count,
            training_instance_type,
            model_path,
        ],
        steps=[
            step_preprocess, 
               step_train, 
              ],
        sagemaker_session=sagemaker_session,
    )
    return pipeline


## Execute the Pipeline

In [46]:
region = region
role = sagemaker.get_execution_role()
#dev_subnets = dev_subnets
#dev_sg = dev_sg
model_package_group_name="MobilePricePred-ModelPackageGroup"
pipeline_name="MobilePricePred-pipeline-"
default_bucket = sagemaker.session.Session().default_bucket()

default_bucket='pipeline-sagemaker-test'

pipeline_def = get_pipeline(
    region=region,
    #subnets = dev_subnets,
    #security_group_ids = dev_sg,
    role=role,
    default_bucket=default_bucket,
    model_package_group_name=model_package_group_name,
    pipeline_name=pipeline_name,
    env='dev'
)

train_instance_count has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.
train_instance_type has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


In [None]:
pipeline_def.upsert(role_arn=role)

In [48]:
execution = pipeline_def.start()

## Clean Up

In [None]:
import boto3
client = boto3.client('sagemaker')

response = client.delete_pipeline(
PipelineName='mobilepricepred-pipeline-dev',
ClientRequestToken='12312312312312312312312312312311'
)