In [15]:
import sagemaker
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, TrainingInput
from sagemaker.sklearn import SKLearn

from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.pipeline_context import PipelineSession

In [16]:
sagemaker_session = sagemaker.Session()
role = "arn:aws:iam::557690604891:role/service-role/AmazonSageMaker-ExecutionRole-20241021T080625"

# Input Data

input_uri = "s3://projects-abdimatin/LoanDefaultprediction/Data/Raw"
train_uri = "s3://projects-abdimatin/LoanDefaultprediction/Data/Proccessed/train"
validation_uri = "s3://projects-abdimatin/LoanDefaultprediction/Data/Proccessed/validation"
test_uri = "s3://projects-abdimatin/LoanDefaultprediction/Data/Proccessed/test"
evaluation_uri = "s3://projects-abdimatin/LoanDefaultprediction/Data/Proccessed/evaluation"
model_uri = "s3://projects-abdimatin/LoanDefaultprediction/models"

In [17]:
Processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type="ml.t3.medium",
    instance_count=1,
    sagemaker_session=PipelineSession(),
    base_job_name="DataPreProcessingJob"
)

processing_step = ProcessingStep(
    name="DataProcessingStep",
    processor=Processor,
    inputs=[ProcessingInput(source=input_uri, destination="/opt/ml/processing/input")],
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/output/train", 
            destination="s3://projects-abdimatin/LoanDefaultprediction/Data/Proccessed/train"
            ),
        ProcessingOutput(
            source="/opt/ml/processing/output/validation", 
            destination="s3://projects-abdimatin/LoanDefaultprediction/Data/Proccessed/validation",
            output_name="validation"
            ),
        ProcessingOutput(
            source="/opt/ml/processing/output/test", 
            destination="s3://projects-abdimatin/LoanDefaultprediction/Data/Proccessed/test",
            output_name="test"
            ),
    ],
    job_arguments=[
        '--input-data', '/opt/ml/processing/input',
        '--output-train', '/opt/ml/processing/output/train',
        '--output-validation', '/opt/ml/processing/output/validation',
        '--output-test', '/opt/ml/processing/output/test',
    ],
    code="../Scripts/Processing/Processor-Loan-Default.py"
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [18]:
# Training Step
sklearn_estimator = SKLearn(
    entry_point="../Scripts/Training/Train.py",  # Your training script
    framework_version="0.23-1",
    role=role,
    instance_type="ml.m4.xlarge",
    instance_count=1,
    sagemaker_session=PipelineSession(),
    output_path=model_uri,
)

training_step = TrainingStep(
    name="ModelTrainingStepMain",
    estimator=sklearn_estimator,
    inputs={
        "train":TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="csv"
            ),
        "validation":TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="csv"
            )
    }
)

In [19]:
# Evaluation Step: Evalaute the model performance and save to a file called loan-default-evaluation-V1.json
Evaluation_Processor = ScriptProcessor(
    command=["python3"],
    image_uri=sklearn_estimator.image_uri,
    instance_count=1,
    instance_type="ml.t3.medium",
    sagemaker_session=PipelineSession(),
    role=role,
    base_job_name="LoanDefaultModelEvaluation"
)

evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

Evaluation_Step = ProcessingStep(
    name="ModelEvaluationStep",
    processor=Evaluation_Processor,
    inputs=[
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=validation_uri,
            destination="/opt/ml/processing/input/validation"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/output/evaluation",
            destination=evaluation_uri
            )
    ],
    code="../Scripts/Evaluation/Evaluation.py",
    property_files=[evaluation_report],
    job_arguments=[
        "--validation-data", "/opt/ml/processing/input/validation",
        "--model-dir", "/opt/ml/processing/model",
        "--output-evaluation", "/opt/ml/processing/output/evaluation"
    ]
)

In [20]:
# Creating the model step
from sagemaker.model import Model
from sagemaker.workflow.model_step import CreateModelStep
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.pipeline_context import PipelineSession

model = Model(
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    image_uri=sklearn_estimator.image_uri,
    sagemaker_session=PipelineSession(),
    role=role    
)

create_model_step = CreateModelStep(
    name="LoanDefaultModelCreationStep",
    model=model
)

In [21]:
# Transformer step 
from sagemaker.transformer import Transformer
transfomer_ouput = "s3://projects-abdimatin/LoanDefaultprediction/Data/BatchTransform"


transfomer = Transformer(
    model_name=create_model_step.properties.ModelName,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    output_path=transfomer_ouput

)

In [22]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer_step = TransformStep(
    name="TransformStep", transformer=transfomer, inputs=TransformInput(data=validation_uri)
)

In [23]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.model_step import ModelStep

# Assuming Evaluation_Step is a valid pipeline variable holding the
# ProcessingOutputConfig information
evaluation_s3_uri = Evaluation_Step.properties.ProcessingOutputConfig.Outputs["evaluation"].S3Output.S3Uri

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=evaluation_s3_uri,
        content_type="application/json",
    )
)

model_package_group_name = "LoanDefaultPredictionProject"
model_approval_status = "PendingManualApproval"

# No need to convert these strings, they are already valid arguments
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

Model_RegistrationStep = ModelStep(name="LoanDefaultModelRegistration", step_args=register_args)



In [24]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CacheConfig
steps = [
    processing_step,
    training_step,
    Evaluation_Step,
    create_model_step,
    Model_RegistrationStep,
]

pipeline_name = "LoanDefaultPredictionPipeline-Two"
pipeline = Pipeline(
    name=pipeline_name,
    steps=steps,
    sagemaker_session=PipelineSession()
)

In [25]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:557690604891:pipeline/LoanDefaultPredictionPipeline-Two',
 'ResponseMetadata': {'RequestId': 'c4e111b4-db31-4b6f-a3ca-8e4e70db79ba',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'c4e111b4-db31-4b6f-a3ca-8e4e70db79ba',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '101',
   'date': 'Thu, 14 Nov 2024 20:17:27 GMT'},
  'RetryAttempts': 0}}

In [26]:
pipeline.start()

_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:557690604891:pipeline/LoanDefaultPredictionPipeline-Two/execution/a140aor36icg', sagemaker_session=<sagemaker.workflow.pipeline_context.PipelineSession object at 0x000001D72F2B5CA0>)

<!-- WARNING:sagemaker.workflow.utilities:Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
WARNING:sagemaker.workflow.utilities:Popping out 'TrainingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
WARNING:sagemaker.workflow.utilities:Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
WARNING:sagemaker.workflow.utilities:Popping out 'ModelName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
WARNING:sagemaker.workflow._utils:Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.
WARNING:sagemaker.workflow.utilities:Popping out 'ModelPackageName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
WARNING:sagemaker.workflow.utilities:Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
WARNING:sagemaker.workflow.utilities:Popping out 'TrainingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
WARNING:sagemaker.workflow.utilities:Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
WARNING:sagemaker.workflow.utilities:Popping out 'ModelName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
WARNING:sagemaker.workflow.utilities:Popping out 'ModelPackageName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired. -->


In [27]:
import json
import pandas as pd
eval_data = pd.read_json(evaluation_s3_uri)
eval_data

ValueError: Invalid file path or buffer object type: <class 'sagemaker.workflow.properties.Properties'>