In [None]:
import sagemaker
from sagemaker.pytorch import PyTorch
from sagemaker.tuner import HyperparameterTuner, IntegerParameter, ContinuousParameter
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import TuningStep, TrainingStep
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.model import Model
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep, CreateModelStep, TransformStep
from sagemaker.inputs import TransformInput
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.parameters import ParameterString, ParameterInteger



role = 'arn:aws:iam::339712893183:role/SagemakerNotebookRole'
sagemaker_session = sagemaker.Session()
pipeline_session = PipelineSession()

# Define the S3 bucket and prefix
bucket = 'stock-news-sentiment'

prefix = 'models'
# Define parameters
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.4xlarge")
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.p3.2xlarge")
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)
input_data_uri = ParameterString(name="InputDataURI", default_value=f"s3://{bucket}/Combined_News_DJIA.csv")

# Preprocessing step
script_processor = ScriptProcessor(
    role=role,
    image_uri='683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.20.0-cpu-py3',
    command=['python3'],
    instance_type=processing_instance_type,
    instance_count=processing_instance_count
)

preprocessing_step = ProcessingStep(
    name="PreprocessData",
    processor=script_processor,
    inputs=[
        sagemaker.processing.ProcessingInput(
            source=input_data_uri,
            destination='/opt/ml/processing/input'
        )
    ],
    outputs=[
        ProcessingOutput(output_name="train", source='/opt/ml/processing/train', destination=f's3://{bucket}/output/train'),
        ProcessingOutput(output_name="test", source='/opt/ml/processing/test', destination=f's3://{bucket}/output/test'),
        ProcessingOutput(output_name="validation", source='/opt/ml/processing/validation', destination=f's3://{bucket}/output/validation')
    ],
    code='scripts/preprocessing.py'
)

model_path = f"s3://{bucket}/models"


pytorch_estimator = PyTorch(
    entry_point='train.py',
    source_dir='scripts',
    role=role,
    instance_count=1,
    instance_type='ml.p3.2xlarge',
    framework_version='1.8.1',
    py_version='py3',
    sagemaker_session=sagemaker_session,
    hyperparameters={
        'bucket_name': bucket,
    },
    output_path=model_path
)

# Define hyperparameter ranges for tuning
hyperparameter_ranges = {
    'lr': ContinuousParameter(1e-6, 1e-4),  # Learning rate range
    'batch_size': IntegerParameter(16, 64)  # Batch size range
}

# Create a HyperparameterTuner object
tuner = HyperparameterTuner(
    estimator=pytorch_estimator,
    objective_metric_name='accuracy',
    objective_type='Maximize',  # Choose 'Minimize' if you're optimizing a loss metric
    metric_definitions=[
        {'Name': 'accuracy', 'Regex': 'Val accuracy ([0-9\\.]+)'}
    ],
    hyperparameter_ranges=hyperparameter_ranges,
    max_jobs=1,  # Total number of training jobs
    max_parallel_jobs=1  # Number of jobs to run in parallel
)


# Define the TuningStep
tuning_step = TuningStep(
    name='TuneHyperparameters',
    tuner=tuner,
    inputs={
            "train": TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "test": TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "test"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            )
    },
    display_name="HyperparameterTuning"
)




# Get the top model's S3 URI using tuning_step
top_model_s3_uri = tuning_step.get_top_model_s3_uri(top_k=0, s3_bucket=bucket, prefix = prefix)

# Define the EvaluationStep
evaluation_script_processor = ScriptProcessor(
    image_uri=sagemaker.image_uris.retrieve(framework='sklearn', region=sagemaker_session.boto_region_name, version='0.23-1'),
    command=['python3'],
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    role=role,
    sagemaker_session=pipeline_session
)

evaluation_report = PropertyFile(
    name="NewsSentimentEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

evaluation_step = ProcessingStep(
    name='Evaluation',
    processor=evaluation_script_processor,
    inputs=[
        ProcessingInput(source=top_model_s3_uri, destination='/opt/ml/processing/model'),
        ProcessingInput(source=preprocessing_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, destination='/opt/ml/processing/test')
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source='/opt/ml/processing/evaluation', destination=f's3://{bucket}/evaluation')
    ],
    code='scripts/evaluate.py',
    property_files=[evaluation_report]
)

# Define the Model object using the best model from TuningStep
model = Model(
    image_uri=pytorch_estimator.training_image_uri(),
    model_data=top_model_s3_uri,  # Use the top model's S3 URI directly
    role=role,
    sagemaker_session=pipeline_session
)

# Define the CreateModelStep using the top model's S3 URI
create_model_step = CreateModelStep(
    name='CreateModel',
    model=model,
    inputs=sagemaker.inputs.CreateModelInput(instance_type=processing_instance_type)
)

# Define ModelMetrics for registration
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="s3://{bucket}/evaluation/evaluation.json",
        content_type="application/json"
    )
)


# Define RegisterModelStep using estimator and model_data
register_model_step = RegisterModel(
    name="RegisterModel",
    estimator=pytorch_estimator,
    model_data=top_model_s3_uri,  # Assuming top_model_s3_uri is defined earlier
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=[processing_instance_type],
    transform_instances=[processing_instance_type],
    model_metrics=model_metrics
)



# Define the ConditionStep
condition_step = ConditionStep(
    name="CheckAUC",
    conditions=[
        ConditionGreaterThanOrEqualTo(
            left=JsonGet(
                step_name=evaluation_step.name,
                property_file=evaluation_report,
                json_path="classification_metrics.auc_score.value"
            ),
            right=0.50
        )
    ],
    if_steps=[create_model_step, register_model_step],
    else_steps=[]
)

# Define the pipeline
pipeline = Pipeline(
    name='StockNewsSentimentPipeline2',
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        training_instance_count,
        input_data_uri
    ],
    steps=[evaluation_step, condition_step],
    sagemaker_session=pipeline_session
)
pipeline.create(role_arn=role)
# Start the pipeline
pipeline.start()
