#### Setup

In [13]:
import json
import logging
import sys
from pathlib import Path
import ipytest
import os
import sagemaker
import boto3


CODE_FOLDER = Path("code")
sys.path.extend([f"./{CODE_FOLDER}"])

from config import Configuration


DATA_FILEPATH = "data/penguins.csv"

ipytest.autoconfig(raise_on_error=True)

logging.getLogger("sagemaker.config").setLevel(logging.ERROR)


In [14]:
# set local mode
LOCAL_MODE = True

In [15]:
config_instance = Configuration(LOCAL_MODE)

bucket = config_instance.bucket
role = config_instance.role
config = config_instance.config
config

{'session': <sagemaker.workflow.pipeline_context.LocalPipelineSession at 0x10474ebf0>,
 'instance_type': 'local',
 'image': 'sagemaker-tensorflow-toolkit-local',
 'framework_version': '2.12',
 'py_version': 'py310'}

In [16]:
S3_LOCATION = f"s3://{bucket}/penguins"

sagemaker_session = sagemaker.session.Session()
sagemaker_client = boto3.client("sagemaker")
iam_client = boto3.client("iam")
region = boto3.Session().region_name

#### Preprocessing

In [17]:
(CODE_FOLDER / "processing").mkdir(parents=True, exist_ok=True)
sys.path.extend([f"./{CODE_FOLDER}/processing"])

In [18]:
# test the processing script
%run test/test_processing_script.py

[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m                                                                                     [100%][0m
[32m[32m[1m8 passed[0m[32m in 0.14s[0m[0m


In [19]:
# cache configuration
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="30d")

In [20]:
# pipeline configuration
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig

pipeline_definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)


# dataset location parameter
dataset_location = ParameterString(
    name="dataset_location",
    default_value = f"{S3_LOCATION}/data"
)

In [21]:
# setup processing step
from sagemaker.sklearn.processing import SKLearnProcessor

processor = SKLearnProcessor(
    base_job_name = "preprocess-data",
    framework_version = '1.2-1',
    instance_type = config["instance_type"],
    instance_count = 1,
    role = role,
    sagemaker_session = config["session"]
)

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

preprocessing_step = ProcessingStep(
    name="preprocess-data",
    step_args=processor.run(
        code=f"{(CODE_FOLDER / 'processing' / 'script.py').as_posix()}",
        inputs=[
            ProcessingInput(
                source=dataset_location,
                destination="/opt/ml/processing/input",
            ),
        ],
        outputs=[
            ProcessingOutput(
                output_name="train",
                source="/opt/ml/processing/train",
                destination=f"{S3_LOCATION}/preprocessing/train",
            ),
            ProcessingOutput(
                output_name="validation",
                source="/opt/ml/processing/validation",
                destination=f"{S3_LOCATION}/preprocessing/validation",
            ),
            ProcessingOutput(
                output_name="test",
                source="/opt/ml/processing/test",
                destination=f"{S3_LOCATION}/preprocessing/test",
            ),
            ProcessingOutput(
                output_name="model",
                source="/opt/ml/processing/model",
                destination=f"{S3_LOCATION}/preprocessing/model",
            ),
            ProcessingOutput(
                output_name="train-baseline",
                source="/opt/ml/processing/train-baseline",
                destination=f"{S3_LOCATION}/preprocessing/train-baseline",
            ),
            ProcessingOutput(
                output_name="test-baseline",
                source="/opt/ml/processing/test-baseline",
                destination=f"{S3_LOCATION}/preprocessing/test-baseline",
            ),
        ],
    ),
    cache_config=cache_config,
)

In [None]:
# create preprocessing pipeline
from sagemaker.workflow.pipeline import Pipeline

preprocess_pipeline = Pipeline(
    name="preprocess-pipeline",
    parameters=[dataset_location],
    steps=[
        preprocessing_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=config["session"],
)

preprocess_pipeline.upsert(role_arn=role)

In [None]:
# run pipeline
#preprocess_pipeline.start()