In [2]:
import warnings
import sagemaker
import json
import boto3
import time
from sagemaker.model import Model
from sagemaker import ModelPackage
from sagemaker.sklearn.processing import (
    SKLearnProcessor,
    ScriptProcessor,
)
from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
)
from sagemaker.workflow.model_step import ModelStep
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer
from sagemaker.predictor import Predictor

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


In [3]:
warnings.filterwarnings(action="ignore", category=UserWarning)

In [4]:
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.session.Session()
default_bucket = sagemaker_session.default_bucket()
region =  boto3.Session().region_name
pipeline_session = PipelineSession()

In [5]:
BUCKET_NAME = 'mlops-ucu-2024'
DATA_KEY = 'popular_movies.csv'
DATASET_PATH = f"s3://{BUCKET_NAME}/{DATA_KEY}"
ENCODERS_FOLDER = f"encoders"
TRACKING_SERVER_ARN='arn:aws:sagemaker:eu-north-1:381492095903:mlflow-tracking-server/letterboxd-predictions'
EXPERIMENT_NAME = 'letterboxd-predictions-lr'
MODEL_PACKAGE_GROUP_NAME = "LetterboxdRartingPredictions"
MODEL_NAME_PREFIX = "LLR"
SKLEARN_FRAMEWORK_VERSION='1.2-1'

#### Define pipeline parameters

In [6]:
input_data = ParameterString(
    name="InputData",
    default_value=DATASET_PATH,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="Approved"
)

encoders_bucket_name = ParameterString(
    name="EncodersBucketName",
    default_value=BUCKET_NAME,
)

encoders_folder = ParameterString(
    name="EncodersFolder",
    default_value=ENCODERS_FOLDER,
)

mlflow_arn = ParameterString(
    name="MLFlowTrackingUri",
    default_value=TRACKING_SERVER_ARN,
)

mlflow_experiment_name = ParameterString(
    name="MLFlowExperimentName",
    default_value=EXPERIMENT_NAME,
)

### 1. Preprocess step

In [7]:
sklearn_processor = SKLearnProcessor(
    framework_version=SKLEARN_FRAMEWORK_VERSION,
    role=role,
    instance_type="ml.t3.medium",
    instance_count=1,
    base_job_name='letterboxd-preprocess',
    sagemaker_session=pipeline_session,
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [8]:
processor_args = sklearn_processor.run(
    code="scripts/preprocessing.py",
    inputs=[ProcessingInput(source=input_data, destination="/opt/ml/processing/input")],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
        ProcessingOutput(output_name="encoders", source="/opt/ml/processing/encoders"),
    ],
    arguments=[
        "--encoders-bucket-name", encoders_bucket_name,
        "--encoders-folder", encoders_folder
    ]
)


step_process = ProcessingStep(
    name="Preprocess",
    step_args=processor_args
)

### 2. Training step

In [9]:
model_path = f's3://{default_bucket}/LetterboxdTrain'

In [10]:
sklearn_estimator = SKLearn(
    entry_point="scripts/train.py",
    framework_version=SKLEARN_FRAMEWORK_VERSION,
    instance_type="ml.m5.xlarge",
    role=role,
    sagemaker_session=pipeline_session,
)

In [11]:
train_args = sklearn_estimator.fit({"train": step_process.properties.ProcessingOutputConfig.Outputs[
    "train"
].S3Output.S3Uri})

step_train = TrainingStep(
    name="Training",
    step_args = train_args
)

### 3. Evaluation step

In [12]:
est_cls = sagemaker.sklearn.estimator.SKLearn

script_processor = FrameworkProcessor(
    role=role,
    instance_count=1,
    instance_type="ml.t3.medium",
    estimator_cls=est_cls,
    framework_version=SKLEARN_FRAMEWORK_VERSION,
    base_job_name='letterboxd-evaluate',
    sagemaker_session=pipeline_session,
    env={
        'MLFLOW_TRACKING_ARN': mlflow_arn,
        'MLFLOW_EXPERIMENT_NAME': mlflow_experiment_name,
    }
)

In [13]:
evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)

In [14]:
eval_args = script_processor.run(
    code='evaluation.py',
    source_dir='scripts/evaluation',
    inputs=[
        ProcessingInput(source=step_train.properties.ModelArtifacts.S3ModelArtifacts, destination="/opt/ml/processing/model"),
        ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri, destination="/opt/ml/processing/test"),
        ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs[
                "encoders"
            ].S3Output.S3Uri, destination="/opt/ml/processing/encoders"),
    ],
    outputs=[ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation")],
)

step_eval = ProcessingStep(
    name="Eval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

### 4. Register model

In [15]:
model = Model(
    name=MODEL_NAME_PREFIX,
    image_uri=sklearn_estimator.training_image_uri(),
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
    entry_point='scripts/inference.py',
)

In [16]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)

INFO:sagemaker.processing:Uploaded scripts/evaluation to s3://sagemaker-eu-north-1-381492095903/letterboxd-evaluate-2024-07-19-21-57-55-937/source/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sagemaker-eu-north-1-381492095903/letterboxd-evaluate-2024-07-19-21-57-55-937/source/runproc.sh


In [17]:
register_model_step_args = model.register(
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=MODEL_PACKAGE_GROUP_NAME,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)

In [18]:
step_register = ModelStep(
   name="Register",
   step_args=register_model_step_args,
)

### 5. Add condition

In [19]:
cond_ge = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="r2_score"
    ),
    right=0.3,
)

In [20]:
step_cond = ConditionStep(
    name="R2Cond",
    conditions=[cond_ge],
    if_steps=[step_register],
    else_steps=[], 
)

### Create pipeline

In [21]:
pipeline_name = f"LetterboxdPipeline"
definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        model_approval_status,
        input_data,
        mlflow_arn,
        mlflow_experiment_name,
        encoders_bucket_name,
        encoders_folder
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    pipeline_definition_config=definition_config,
)

#### Examine pipeline

In [22]:
json.loads(pipeline.definition())

INFO:sagemaker.processing:Uploaded scripts/evaluation to s3://sagemaker-eu-north-1-381492095903/LetterboxdPipeline/code/1045eea45104324e6075f4f53099f4c7/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sagemaker-eu-north-1-381492095903/LetterboxdPipeline/code/2c207c809cb0e0e9a1d77e5247f961f9/runproc.sh


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'Approved'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://mlops-ucu-2024/popular_movies.csv'},
  {'Name': 'MLFlowTrackingUri',
   'Type': 'String',
   'DefaultValue': 'arn:aws:sagemaker:eu-north-1:381492095903:mlflow-tracking-server/letterboxd-predictions'},
  {'Name': 'MLFlowExperimentName',
   'Type': 'String',
   'DefaultValue': 'letterboxd-predictions-lr'},
  {'Name': 'EncodersBucketName',
   'Type': 'String',
   'DefaultValue': 'mlops-ucu-2024'},
  {'Name': 'EncodersFolder', 'Type': 'String', 'DefaultValue': 'encoders'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'Preprocess',
   'Type': 'Processing',
   'Arguments': {'Processing

#### Submit pipeline

In [23]:
pipeline.upsert(role_arn=role)

INFO:sagemaker.processing:Uploaded scripts/evaluation to s3://sagemaker-eu-north-1-381492095903/LetterboxdPipeline/code/1045eea45104324e6075f4f53099f4c7/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sagemaker-eu-north-1-381492095903/LetterboxdPipeline/code/2c207c809cb0e0e9a1d77e5247f961f9/runproc.sh
INFO:sagemaker.processing:Uploaded scripts/evaluation to s3://sagemaker-eu-north-1-381492095903/LetterboxdPipeline/code/1045eea45104324e6075f4f53099f4c7/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sagemaker-eu-north-1-381492095903/LetterboxdPipeline/code/2c207c809cb0e0e9a1d77e5247f961f9/runproc.sh


{'PipelineArn': 'arn:aws:sagemaker:eu-north-1:381492095903:pipeline/LetterboxdPipeline',
 'ResponseMetadata': {'RequestId': '517fc19f-4f2f-4b73-9a06-b9ed7c5ba000',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '517fc19f-4f2f-4b73-9a06-b9ed7c5ba000',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '87',
   'date': 'Fri, 19 Jul 2024 21:57:58 GMT'},
  'RetryAttempts': 0}}

#### Start execution

In [24]:
execution = pipeline.start()

In [None]:
execution.wait()

### Create model package

In [None]:
execution.list_steps()

In [None]:
registered_model_arn = execution.list_steps()[0]['Metadata']['RegisterModel']['Arn']

In [None]:
model_package = ModelPackage(
    role=role,
    model_package_arn=registered_model_arn,
)

### Deploy model

In [None]:
endpoint_name = "DEMO-endpoint-" + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())

print(f"EndpointName: {endpoint_name}")
model.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.xlarge",
    endpoint_name=endpoint_name
)

In [None]:
predictor = Predictor(endpoint_name=endpoint_name,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer())

### Test model

In [None]:
input_data = {
    "Year": 1990,
    "Minutes": 103,
    "Language": "English",
    "Genres": ["Comedy", "Family", "Adventure"],
    "Countries": ["USA"],
    "Directors": ["Chris Columbus"],
    "Cast": ["Macaulay Culkin", "Daniel Stern", "Joe Pesci", "Catherine O'Hara", "John Heard"]
}

In [None]:
prediction = predictor.predict(input_data)

### Delete endpoint

In [None]:
# predictor.delete_endpoint()