Create/update a `.env` file in the project root including the following environment variables.

AWS_DEFAULT_SAGEMAKER_BUCKET

AWS_PROFILE

In [1]:
LOCAL = False
START_PIPELINE = True

In [2]:
%load_ext autoreload
%autoreload 1

In [3]:
!pip install python-dotenv
from dotenv import load_dotenv
load_dotenv("../../.env")



True

In [4]:
!mkdir -p ../../build
!rm -rf ../../build/*
!cp ../pipelines/recommendations_np/code/* ../../build/
!cp -r ../../src/pipeliner ../../build/

In [5]:
build_path = "../../build"

In [6]:
import os
AWS_DEFAULT_SAGEMAKER_BUCKET = os.environ.get("AWS_DEFAULT_SAGEMAKER_BUCKET", None)
DEFAULT_BUCKET_PREFIX = "pipelines"
if AWS_DEFAULT_SAGEMAKER_BUCKET is None:
    raise ValueError("AWS_DEFAULT_SAGEMAKER_BUCKET is not set")

Log into Docker registry with ECR credentials

In [7]:
!aws ecr get-login-password --region eu-west-1 | docker login --username AWS --password-stdin 141502667606.dkr.ecr.eu-west-1.amazonaws.com

https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded


In [8]:
import os
import sys

module_path = os.path.abspath(os.path.join("../../"))
if module_path not in sys.path:
    sys.path.append(module_path)

In [9]:
import sagemaker
import boto3

try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client("iam")
    role = iam.get_role(RoleName="pipeliner")["Role"]["Arn"]

role



sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


'arn:aws:iam::536135653944:role/ca-ml-sagemaker-notebook-production'

In [10]:
from sagemaker.workflow.pipeline_context import LocalPipelineSession, PipelineSession

if LOCAL:
    session = LocalPipelineSession(
        default_bucket=AWS_DEFAULT_SAGEMAKER_BUCKET,
        default_bucket_prefix=DEFAULT_BUCKET_PREFIX,
    )
    session.config = {"local": {"local_code": True}}
else:
    session = PipelineSession(
        default_bucket=AWS_DEFAULT_SAGEMAKER_BUCKET,
        default_bucket_prefix=DEFAULT_BUCKET_PREFIX,
    )

region = session.boto_region_name
default_bucket = session.default_bucket()

In [11]:
import pandas as pd
import numpy as np


ratings_data_path = "../pipelines/recommendations_np/data/user_item_interactions.csv.gz"

data_types = {"user_id": str, "item_id": str, "rating": np.float32}

user_item_interactions = pd.read_csv(
    ratings_data_path,
    compression="gzip",
    dtype=data_types,
    parse_dates=["date"],
)
user_item_interactions.head(3)

Unnamed: 0,user_id,item_id,date,interactions
0,U007714,I00372373,2024-09-08,1.0
1,U007714,I00605528,2024-09-08,1.0
2,U013522,I01182960,2024-09-08,2.0


In [12]:
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=ratings_data_path,
    desired_s3_uri=f"s3://{default_bucket}/{DEFAULT_BUCKET_PREFIX}/recommender_np/data",
)
input_data_uri

's3://cs-production-customer-analytics-sagemaker/pipelines/recommender_np/data/user_item_interactions.csv.gz'

In [13]:
import sagemaker
from sagemaker.inputs import TrainingInput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import LocalPipelineSession
from sagemaker.workflow.steps import CacheConfig, ProcessingStep, TrainingStep
from sagemaker.workflow.properties import PropertyFile


class RecommenderPipeline:
    def create(
        self,
        role: str,
        name: str,
        session: sagemaker.Session,
        framework_version="1.2-1",
    ) -> Pipeline:
        self.local = isinstance(session, LocalPipelineSession)
        self.framework_version = framework_version

        instance_type = ParameterString(
            name="InstanceType",
            default_value="local" if self.local else "ml.m5.24xlarge",
        )

        input_data = ParameterString(
            name="user_item_interactions",
            default_value=input_data_uri,
        )

        image_uri = sagemaker.image_uris.retrieve(
            framework="sklearn",
            region=session.boto_region_name,
            version="1.2-1",
        )

        preprocessing_cache_config = CacheConfig(
            enable_caching=True,
            expire_after="P30d",  # 30 days
        )

        training_cache_config = CacheConfig(
            enable_caching=True,
            expire_after="P30d",  # 30 days
        )

        processor = SKLearnProcessor(
            framework_version=framework_version,
            instance_type=instance_type,
            instance_count=1,
            base_job_name="sklearn-processor",
            role=role,
            sagemaker_session=session,
        )

        user_item_interactions_input = ProcessingInput(
            source=input_data,
            input_name="user_item_interactions",
            destination="/opt/ml/processing/input/data",
        )

        pipeliner_input = ProcessingInput(
            source=build_path + "/pipeliner",
            input_name="pipeliner",
            destination="/opt/ml/processing/input/code/pipeliner",
        )

        preprocessor_step = ProcessingStep(
            name="preprocessor",
            cache_config=preprocessing_cache_config,
            step_args=processor.run(
                inputs=[
                    user_item_interactions_input,
                    pipeliner_input,
                ],
                outputs=[
                    ProcessingOutput(
                        output_name="user_item_matrix",
                        source="/opt/ml/processing/output/user_item_matrix",
                    ),
                    ProcessingOutput(
                        output_name="item_similarity_matrix",
                        source="/opt/ml/processing/output/item_similarity_matrix",
                    ),
                    ProcessingOutput(
                        output_name="test_data",
                        source="/opt/ml/processing/output/test_data",
                    ),
                    ProcessingOutput(
                        output_name="user_encoder",
                        source="/opt/ml/processing/output/user_encoder",
                    ),
                    ProcessingOutput(
                        output_name="item_encoder",
                        source="/opt/ml/processing/output/item_encoder",
                    ),
                ],
                code=build_path + "/preprocessor.py",
            ),
        )

        sklearn_estimator = SKLearn(
            entry_point="item_based_recommender.py",
            source_dir=build_path,
            role=role,
            image_uri=image_uri,
            instance_type=instance_type,
            sagemaker_session=session,
            base_job_name="training_job",
            # hyperparameters=hyperparameters,
            enable_sagemaker_metrics=True,
        )

        item_training_step = TrainingStep(
            name="item_based_recommender",
            estimator=sklearn_estimator,
            cache_config=training_cache_config,
            inputs={
                "item_similarity_matrix": TrainingInput(
                    s3_data=preprocessor_step.properties.ProcessingOutputConfig.Outputs[
                        "item_similarity_matrix"
                    ].S3Output.S3Uri,
                    content_type="application/x-npz",
                ),
            },
        )

        evaluation_processor = SKLearnProcessor(
            framework_version=framework_version,
            instance_type=instance_type,
            instance_count=1,
            base_job_name="sklearn-evaluation",
            role=role,
            sagemaker_session=session,
        )

        evaluation_report = PropertyFile(
            name="item_based_evaluation",
            output_name="evaluation",
            path="evaluation.json",
        )

        evaluation_step = ProcessingStep(
            name="item_based_evaluation",
            step_args=evaluation_processor.run(
                inputs=[
                    ProcessingInput(
                        source=item_training_step.properties.ModelArtifacts.S3ModelArtifacts,
                        destination="/opt/ml/processing/model",
                        input_name="model",
                    ),
                    ProcessingInput(
                        source=preprocessor_step.properties.ProcessingOutputConfig.Outputs[
                            "test_data"
                        ].S3Output.S3Uri,
                        destination="/opt/ml/processing/test_data",
                    ),
                    pipeliner_input,
                ],
                outputs=[
                    ProcessingOutput(
                        output_name="evaluation", source="/opt/ml/outputs/evaluation"
                    ),
                ],
                code="../pipelines/recommendations_np/code/item_based_evaluation.py",
            ),
            property_files=[evaluation_report],
        )

        return Pipeline(
            name=name,
            steps=[
                preprocessor_step,
                item_training_step,
                evaluation_step,
            ],
            sagemaker_session=session,
            parameters=[input_data, instance_type],
        )

In [14]:
pipeline = RecommenderPipeline().create(role=role, name="recommender", session=session)





In [15]:
import json

definition = json.loads(pipeline.definition())
[
    {"Name": step.get("Name"), "Type": step.get("Type")}
    for step in definition.get("Steps")
]

[{'Name': 'preprocessor', 'Type': 'Processing'},
 {'Name': 'item_based_recommender', 'Type': 'Training'},
 {'Name': 'item_based_evaluation', 'Type': 'Processing'}]

In [16]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:eu-west-1:536135653944:pipeline/recommender',
 'ResponseMetadata': {'RequestId': '4fbc6c1f-41a0-4540-9851-75f87713965a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '4fbc6c1f-41a0-4540-9851-75f87713965a',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '79',
   'date': 'Thu, 10 Apr 2025 10:26:48 GMT'},
  'RetryAttempts': 0}}

In [17]:
execution = pipeline.start()

In [None]:
import time

execution_complete = False

while not execution_complete:
    current_execution = execution.describe()
    current_execution_name = current_execution.get("PipelineExecutionDisplayName", None)
    current_execution_status = current_execution.get("PipelineExecutionStatus", None)
    current_execution_failure_reason = current_execution.get("FailureReason", None)
    execution_complete = current_execution_status not in ('Executing','Stopping')
    print(f"\nstatus: {current_execution_status}")
    if current_execution_failure_reason:
        print(f"\nFailureReason: {current_execution_failure_reason}")
    if not execution_complete:
        time.sleep(60)

steps = execution.list_steps()
print(f"\n\nexecution steps: {steps}")