In [560]:
import sagemaker
import boto3
import json
import time
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import FrameworkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
)
from sagemaker.pytorch.estimator import PyTorch
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.model import Model
from sagemaker import ModelPackage
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer

In [184]:
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.session.Session()
default_bucket = sagemaker_session.default_bucket()
region =  boto3.Session().region_name
pipeline_session = PipelineSession()

In [164]:
BUCKET_NAME = 'mlops-hot-or-meh2'
DATA_KEY = 'ProductHuntProducts.csv'
DATASET_PATH = f"s3://{BUCKET_NAME}/{DATA_KEY}"
# ENCODERS_FOLDER = f"encoders"
# TRACKING_SERVER_ARN='arn:aws:sagemaker:eu-north-1:381492095903:mlflow-tracking-server/letterboxd-predictions'
# EXPERIMENT_NAME = 'letterboxd-predictions-lr'
# MODEL_PACKAGE_GROUP_NAME = "LetterboxdRartingPredictions"
# MODEL_NAME_PREFIX = "LLR"
# SKLEARN_FRAMEWORK_VERSION='1.2-1'

In [472]:
# s3 = boto3.resource('s3')
# bucket = s3.Bucket(BUCKET_NAME)
# for object_summary in bucket.objects.filter():
#     print(object_summary)


### Preprocessing

In [523]:
est_cls = sagemaker.sklearn.estimator.SKLearn

sklearn_processor = FrameworkProcessor(
    framework_version="1.2-1",
    role=role,
    estimator_cls=est_cls,
    instance_type="ml.t3.xlarge",
    instance_count=1,
    base_job_name='viralhunt-preprocess',
    sagemaker_session=pipeline_session
)


In [524]:
#Run the processing job
processor_args = sklearn_processor.run(
    code='processing.py',
    source_dir='scripts/process',
    arguments=['--train-test-split-ratio', '0.2'],
    inputs=[
        ProcessingInput(
            source=f's3://{BUCKET_NAME}/{DATA_KEY}',
            destination='/opt/ml/processing/input'
        ),
        ProcessingInput(
            source=f's3://{BUCKET_NAME}/utils/',
            destination='/opt/ml/processing/utils/'
        )
    ],
    outputs=[
        # ProcessingOutput(output_name='data_structured', source='/opt/ml/processing/tmp/data_structured', destination=f's3://{BUCKET_NAME}/preprocessing'),
        # ProcessingOutput(output_name='train', source='/opt/ml/processing/output/train', destination=f's3://{BUCKET_NAME}/preprocessing'),
        # ProcessingOutput(output_name='validation', source='/opt/ml/processing/output/val', destination=f's3://{BUCKET_NAME}/preprocessing'),
        # ProcessingOutput(output_name='test', source='/opt/ml/processing/output/test', destination=f's3://{BUCKET_NAME}/preprocessing'),
        ProcessingOutput(output_name='train', source='/opt/ml/processing/split/train', destination=f's3://{BUCKET_NAME}/preprocessing/train'),
        ProcessingOutput(output_name='test', source='/opt/ml/processing/split/test', destination=f's3://{BUCKET_NAME}/preprocessing/test')
    ]
)

step_process = ProcessingStep(
    name="Preprocess",
    step_args=processor_args
)



### Train

In [525]:
# model_path = f's3://{default_bucket}/LetterboxdTrain'

pytorch_estimator = PyTorch(
    entry_point='training.py',
    source_dir='scripts/train',
    output_path=f's3://{BUCKET_NAME}/models',
    # entry_point="scripts/training.py",
    framework_version="2.3.0",
    py_version="py311",
    instance_type="ml.g4dn.xlarge",
    instance_count=1,
    role=role,
    sagemaker_session=pipeline_session,
)
train_args = pytorch_estimator.fit({"train": step_process.properties.ProcessingOutputConfig.Outputs[
    "train"
].S3Output.S3Uri})

# train_args = pytorch_estimator.fit({"train": TrainingInput(
#             s3_data=f's3://{BUCKET_NAME}/preprocessing/train',
#         )
#     })

step_train = TrainingStep(
    name="Training",
    step_args = train_args
)

### Evaluation

In [526]:
est_cls = sagemaker.pytorch.estimator.PyTorch

script_processor = FrameworkProcessor(
    role=role,
    instance_count=1,
    instance_type="ml.t3.xlarge",
    estimator_cls=est_cls,
    framework_version="2.3.0",
    py_version="py311",
    sagemaker_session=pipeline_session
)
evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)
eval_args = script_processor.run(
    code='evaluate.py',
    source_dir='scripts/evaluate',
    inputs=[
        # ProcessingInput(source='s3://mlops-hot-or-meh2/models/pytorch-training-pdkxbehrs410-WxpgkQd1uU/output', destination="/opt/ml/processing/model"),
        # ProcessingInput(source=f's3://{BUCKET_NAME}/preprocessing/test', destination="/opt/ml/processing/test"),

        ProcessingInput(source=step_train.properties.ModelArtifacts.S3ModelArtifacts, destination="/opt/ml/processing/model"),
        ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri, destination="/opt/ml/processing/test"),
    ],
    outputs=[ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation")],
)

step_eval = ProcessingStep(
    name="Evaluate",
    step_args=eval_args,
    property_files=[evaluation_report],
)

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


### Register model

In [575]:
model = sagemaker.pytorch.model.PyTorchModel(
    entry_point='inference.py',
    source_dir='scripts/inference',
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    #model_data='s3://sagemaker-us-east-2-471112582765/pytorch-inference-2024-07-24-06-47-05-473/Register-RepackModel-0-0359cce0ceb40236-lb5xkf3a04bf-bzm2I2VfMb/output/model.tar.gz',
    framework_version="2.3.0",
    py_version="py311",
    role=role,
    sagemaker_session=pipeline_session,
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)
register_model_step_args = model.register(
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name='ViralHuntPredictions',
    approval_status='Approved',
    model_metrics=model_metrics
)

step_register = ModelStep(
   name="Register",
   step_args=register_model_step_args,
)

INFO:sagemaker.processing:Uploaded scripts/evaluate to s3://sagemaker-us-east-2-471112582765/pytorch-2024-07-24-08-13-43-165/source/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sagemaker-us-east-2-471112582765/pytorch-2024-07-24-08-13-43-165/source/runproc.sh
INFO:sagemaker:Repacking model artifact (s3://sagemaker-us-east-2-471112582765/pytorch-inference-2024-07-24-06-47-05-473/Register-RepackModel-0-0359cce0ceb40236-lb5xkf3a04bf-bzm2I2VfMb/output/model.tar.gz), script artifact (scripts/inference), and dependencies ([]) into single tar.gz file located at s3://sagemaker-us-east-2-471112582765/pytorch-inference-2024-07-24-08-13-43-355/model.tar.gz. This may take some time depending on model size...


### R2 condition

In [551]:
cond_ge = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="r2_score"
    ),
    right=-0.6,
)
step_cond = ConditionStep(
    name="R2Condition",
    conditions=[cond_ge],
    if_steps=[step_register],
    else_steps=[]
)

### Pipeline gathering

In [576]:
pipeline_name = f"ViralHuntPipeline"
definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

pipeline = Pipeline(
    name=pipeline_name,
    steps=[step_process, step_train, step_eval, step_cond],
    #steps=[step_register],
    pipeline_definition_config=definition_config,
)


In [577]:
json.loads(pipeline.definition())




{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'Register-RegisterModel',
   'Type': 'RegisterModel',
   'Arguments': {'ModelPackageGroupName': 'ViralHuntPredictions',
    'ModelMetrics': {'ModelQuality': {'Statistics': {'ContentType': 'application/json',
       'S3Uri': 's3://sagemaker-us-east-2-471112582765/pytorch-2024-07-24-06-42-33-348/output/evaluation/evaluation.json'}},
     'Bias': {},
     'Explainability': {}},
    'InferenceSpecification': {'Containers': [{'Image': '763104351884.dkr.ecr.us-east-2.amazonaws.com/pytorch-inference:2.3.0-cpu-py311',
       'Environment': {'SAGEMAKER_PROGRAM': 'inference.py',
        'SAGEMAKER_SUBMIT_DIRECTORY': '/opt/ml/model/code',
        'SAGEMAKER_CONTAINER_LOG_LEVEL': '20',
        'SAGEMAKER_REGION': 'us-east-2'},
       'ModelDataUrl': 's3://sagemaker-us-east-2-471

In [578]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-2:471112582765:pipeline/ViralHuntPipeline',
 'ResponseMetadata': {'RequestId': 'ddce0b4d-38be-48b2-86d5-a1e06c46d32f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'ddce0b4d-38be-48b2-86d5-a1e06c46d32f',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '85',
   'date': 'Wed, 24 Jul 2024 08:14:25 GMT'},
  'RetryAttempts': 0}}

In [579]:
execution = pipeline.start()
execution.wait()

In [556]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-2:471112582765:pipeline/ViralHuntPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-2:471112582765:pipeline/ViralHuntPipeline/execution/lb5xkf3a04bf',
 'PipelineExecutionDisplayName': 'execution-1721803846151',
 'PipelineExecutionStatus': 'Succeeded',
 'PipelineExperimentConfig': {'ExperimentName': 'viralhuntpipeline',
  'TrialName': 'lb5xkf3a04bf'},
 'CreationTime': datetime.datetime(2024, 7, 24, 6, 50, 46, 87000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 7, 24, 7, 20, 51, 805000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-2:471112582765:user-profile/d-bk1fq07zgp0v/default-20240721T151163',
  'UserProfileName': 'default-20240721T151163',
  'DomainId': 'd-bk1fq07zgp0v',
  'IamIdentity': {'Arn': 'arn:aws:sts::471112582765:assumed-role/AmazonSageMaker-ExecutionRole-20240721T151163/SageMaker',
   'PrincipalId': 'AROAW3MD7QZWZN5J2BDEB:SageMaker'}},
 'LastModifiedBy': {'UserProfi

In [557]:
execution.list_steps()
registered_model_arn = execution.list_steps()[0]['Metadata']['RegisterModel']['Arn']
model_package = ModelPackage(
    role=role,
    model_package_arn=registered_model_arn,
)

### Deploy model

In [None]:
endpoint_name = "ViralHunt-endpoint-" + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())

print(f"EndpointName: {endpoint_name}")
model.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.xlarge",
    endpoint_name='viral-hunt'
)
predictor = Predictor(endpoint_name=endpoint_name,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer())


model_ = sagemaker.pytorch.model.PyTorchModel(
    entry_point='inference.py',
    source_dir='scripts/inference',
    dependencies=['scripts/inference/requirements.txt'],
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    framework_version="2.3.0",
    py_version="py311",
    role=role,
    sagemaker_session=sagemaker_session
)


predictor = model.deploy(instance_type='ml.m5.xlarge',
                        initial_instance_count=1,
                        endpoint_name=endpoint_name)


In [None]:
# sagemaker_session = sagemaker.Session()

# # Specify the S3 location of your model artifact
# model_data = 's3://sagemaker-us-east-2-471112582765/pytorch-inference-2024-07-24-06-47-05-473/Register-RepackModel-0-0359cce0ceb40236-lb5xkf3a04bf-bzm2I2VfMb/output/model.tar.gz'

# # Define the PyTorch model
# model_ = sagemaker.pytorch.model.PyTorchModel(
#     entry_point='inference.py',
#     source_dir='scripts/inference',
#     dependencies=['scripts/inference/requirements.txt'],
#     model_data=model_data,
#     framework_version="2.3.0",
#     py_version="py311",
#     role=role,
#     sagemaker_session=sagemaker_session
# )

# # Generate a unique endpoint name
# endpoint_name = "ViralHunt-endpoint-" + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())

# # Deploy the model
# predictor = model_.deploy(
#     instance_type='ml.m5.xlarge',
#     initial_instance_count=1,
#     endpoint_name=endpoint_name
# )

