In [1]:
from sagemaker.workflow.pipeline_context import LocalPipelineSession

local_pipeline_session = LocalPipelineSession()

In [2]:
import boto3
from pathlib import Path
import sagemaker
from sagemaker import get_execution_role
from sagemaker import Session
from sagemaker.sklearn.processing import SKLearnProcessor

region = sagemaker.Session().boto_region_name

boto_session = boto3.Session()
sagemaker_client = boto_session.client("sagemaker")
sagemaker_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client
)
region = boto_session.region_name
role = get_execution_role()
sagemaker_bucket = sagemaker_session.default_bucket()

ClientError: An error occurred (ExpiredToken) when calling the GetCallerIdentity operation: The security token included in the request is expired

In [None]:
import pandas as pd

s3 = boto3.client("s3")
s3.download_file(
    "sagemaker-sample-data-{}".format(region),
    "processing/census/census-income.csv",
    "census-income.csv",
)
df = pd.read_csv("census-income.csv")
df.to_csv("dataset.csv")
df.head()

In [None]:
%%writefile preprocessing.py
import pandas as pd
import os
from pathlib import Path
from sklearn.model_selection import train_test_split

input_data_path = os.path.join("/opt/ml/processing/input", "dataset.csv")
df = pd.read_csv(input_data_path)
print("Shape of data is:", df.shape)
train, test = train_test_split(df, test_size=0.2)
train, validation = train_test_split(train, test_size=0.2)

Path("/opt/ml/processing/output/train").mkdir(parents=True, exist_ok=True)
Path("/opt/ml/processing/output/validation").mkdir(parents=True, exist_ok=True)
Path("/opt/ml/processing/output/test").mkdir(parents=True, exist_ok=True)

try:
    train.to_csv("/opt/ml/processing/output/train/train.csv")
    validation.to_csv("/opt/ml/processing/output/validation/validation.csv")
    test.to_csv("/opt/ml/processing/output/test/test.csv")
    print("Wrote files successfully")
except Exception as e:
    print("Failed to write the files")
    print(e)
    pass

print("Completed running the processing job")

In [None]:
# %%capture output

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

sklearn_processor = SKLearnProcessor(
    framework_version="1.0-1", role=role, instance_type="local", instance_count=1
)

processor_args = sklearn_processor.run(
    code="preprocessing.py",
    # arguments = ["arg1", "arg2"], # Arguments can optionally be specified here
    inputs=[ProcessingInput(source="dataset.csv", destination="/opt/ml/processing/input")],
    outputs=[
        ProcessingOutput(source="/opt/ml/processing/output/train"),
        ProcessingOutput(source="/opt/ml/processing/output/validation"),
        ProcessingOutput(source="/opt/ml/processing/output/test"),
    ],
)
preprocessing_job_description = sklearn_processor.jobs[-1].describe()
output_config = preprocessing_job_description['ProcessingOutputConfig']

print(output_config)


In [None]:
output_config

In [None]:
# %%capture output

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

sklearn_processor = SKLearnProcessor(
    framework_version="1.0-1", role=role, instance_type="local", instance_count=1,
    sagemaker_session=local_pipeline_session,
)

processor_args = sklearn_processor.run(
    code="preprocessing.py",
    # arguments = ["arg1", "arg2"], # Arguments can optionally be specified here
    inputs=[ProcessingInput(source="dataset.csv", destination="/opt/ml/processing/input")],
    outputs=[
        ProcessingOutput(source="/opt/ml/processing/output/train"),
        ProcessingOutput(source="/opt/ml/processing/output/validation"),
        ProcessingOutput(source="/opt/ml/processing/output/test"),
    ],
)

step_process = ProcessingStep(name="TestProcessor", step_args=processor_args)

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name="MyPipeline",
    steps=[step_process],
    sagemaker_session=local_pipeline_session
)

# pipeline.create(
#     role_arn=sagemaker.get_execution_role(), 
#     description="local pipeline example"
# )

#  # pipeline will execute locally
# execution = pipeline.start()

# steps = execution.list_steps()

# training_job_name = steps['PipelineExecutionSteps'][0]['Metadata']['TrainingJob']['Arn']

# step_outputs = pipeline_session.sagemaker_client.describe_training_job(TrainingJobName = training_job_name)

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()

In [None]:
pipeline.definition()

In [None]:
execution.list_steps()

In [None]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker_session)

In [None]:
# execution.list_steps()["PipelineExecutionSteps"]
# for execution_step in reversed(execution.list_steps()["PipelineExecutionSteps"]):
#     print(execution_step)
#     display(viz.show(pipeline_execution_step=execution_step))
#     time.sleep(5)