In [22]:
import os
import sagemaker
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.pytorch import PyTorchProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline import Pipeline

role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()

bucket = "lxeml"
prefix = "CH_Test"
base_path = f"s3://{bucket}/{prefix}/"

# -------------------------------------------------
# STEP 1: Preprocess Test Data
# -------------------------------------------------
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    base_job_name="test-preprocess"
)

preprocess_test_step = ProcessingStep(
    name="PreprocessTestData",
    processor=sklearn_processor,
    code="preprocess_test_file_sagemaker.py",
    inputs=[
        ProcessingInput(
            source=os.path.join(base_path, "TEST.csv"),
            destination="/opt/ml/processing/input/test"
        ),
        ProcessingInput(
            source=os.path.join(base_path, "list_of_categories.csv"),
            destination="/opt/ml/processing/input/categories"
        ),
        # If needed for encoders:
        # ProcessingInput(
        #     source=os.path.join(base_path, "encoders"),
        #     destination="/opt/ml/processing/input/encoders"
        # )
    ],
    outputs=[
        ProcessingOutput(
            output_name="test_data",
            source="/opt/ml/processing/output",
            destination=os.path.join(base_path, "preprocessed_test")
        )
    ]
)

# -------------------------------------------------
# STEP 2: Predict
# -------------------------------------------------
model_s3_uri = "s3://lxeml/CH_Test/results/pipelines-f6evmcmrynvn-TrainModel-p0Z3pUjfMG/output/model.tar.gz"

predict_processor = PyTorchProcessor(
    framework_version="1.7.1",
    py_version="py3",
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    base_job_name="test-predict",
    env={"SAGEMAKER_PROGRAM": "predict.py"}
)

predict_step = ProcessingStep(
    name="PredictTestData",
    processor=predict_processor,
    code="predict.py",
    inputs=[
        ProcessingInput(
            source=preprocess_test_step.properties.ProcessingOutputConfig.Outputs["test_data"].S3Output.S3Uri,
            destination="/opt/ml/processing/input/test_data"
        ),
        ProcessingInput(
            source=model_s3_uri,
            destination="/opt/ml/processing/input/model"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="predictions",
            source="/opt/ml/processing/output",
            destination=os.path.join(base_path, "test_predictions")
        )
    ]
)

# -------------------------------------------------
# BUILD & RUN the Pipeline
# -------------------------------------------------
test_inference_pipeline = Pipeline(
    name="CHTestInferencePipeline",
    steps=[preprocess_test_step, predict_step],
    sagemaker_session=sagemaker_session
)

test_inference_pipeline.upsert(role_arn=role)
print("Inference pipeline definition uploaded.")

execution = test_inference_pipeline.start()
execution.wait()
print("Inference pipeline execution complete.")

Inference pipeline definition uploaded.
