In [8]:
import os
import sagemaker
import json
import boto3 # Asegúrate de que boto3 está importado
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.model import Model
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.parameters import (
    ParameterFloat,
    ParameterString,
)

def get_sagemaker_pipeline(
    region: str,
    role: str,
    default_bucket: str,
    base_job_prefix: str,
    model_package_group_name: str,
    model_accuracy_threshold: float = 0.95, # Umbral de precisión
    sample_rate_for_batch_predict: float = 0.1 # Tasa de muestreo para predicciones batch
) -> Pipeline:

    sagemaker_session = sagemaker.Session(default_bucket=default_bucket)
    boto_session = boto3.Session(region_name=region) # Para S3 client

    # Parámetros del Pipeline (pueden ser modificados al iniciar el pipeline)
    pipeline_model_accuracy_threshold = ParameterFloat(
        name="ModelAccuracyThreshold",
        default_value=model_accuracy_threshold,
    )
    pipeline_sample_rate_for_batch_predict = ParameterFloat(
        name="SampleRateForBatchPredict",
        default_value=sample_rate_for_batch_predict,
    )

    # --- Paso 1: Obtener Datos (Get Data) ---
    get_data_processor = ScriptProcessor(
        image_uri=sagemaker.image_uris.retrieve(framework="sklearn", region=region, version="1.0-1"),
        role=role,
        instance_type="ml.m5.large",
        instance_count=1,
        base_job_name=f"{base_job_prefix}-get-data",
        sagemaker_session=sagemaker_session,
        command=["python3"], # <--- AGREGADO/CORREGIDO
    )
    get_data_step = ProcessingStep(
        name="GetData",
        processor=get_data_processor,
        outputs=[ProcessingOutput(source="/opt/ml/processing/output", destination=f"s3://{default_bucket}/iris-data/raw")],
        code=os.path.join(os.getcwd(), "../ml_code/get_data.py"),
    )

    # --- Paso 2: Preprocesamiento (Preprocess Data) ---
    preprocess_processor = ScriptProcessor(
        image_uri=sagemaker.image_uris.retrieve(framework="sklearn", region=region, version="1.0-1"),
        role=role,
        instance_type="ml.m5.large",
        instance_count=1,
        base_job_name=f"{base_job_prefix}-preprocess",
        sagemaker_session=sagemaker_session,
        command=["python3"], # <--- AGREGADO/CORREGIDO
    )
    preprocess_step = ProcessingStep(
        name="PreprocessData",
        processor=preprocess_processor,
        inputs=[ProcessingInput(
            source=get_data_step.properties.ProcessingOutputConfig.Outputs["output"].S3Output.S3Uri,
            destination="/opt/ml/processing/input"
        )],
        outputs=[
            ProcessingOutput(source="/opt/ml/processing/output", destination=f"s3://{default_bucket}/iris-data/processed"),
            ProcessingOutput(source="/opt/ml/model", destination=f"s3://{default_bucket}/iris-artifacts/scaler", output_name="scaler_model")
        ],
        code=os.path.join(os.getcwd(), "../ml_code/preprocess.py"),
    )

    # --- Paso 3: Dividir Datos (Split Data) ---
    split_data_processor = ScriptProcessor(
        image_uri=sagemaker.image_uris.retrieve(framework="sklearn", region=region, version="1.0-1"),
        role=role,
        instance_type="ml.m5.large",
        instance_count=1,
        base_job_name=f"{base_job_prefix}-split-data",
        sagemaker_session=sagemaker_session,
        command=["python3"], # <--- AGREGADO/CORREGIDO
    )
    split_data_step = ProcessingStep(
        name="SplitData",
        processor=split_data_processor,
        inputs=[ProcessingInput(
            source=preprocess_step.properties.ProcessingOutputConfig.Outputs["output"].S3Output.S3Uri,
            destination="/opt/ml/processing/input"
        )],
        outputs=[
            ProcessingOutput(source="/opt/ml/processing/output", destination=f"s3://{default_bucket}/iris-data/split")
        ],
        code=os.path.join(os.getcwd(), "../ml_code/split_data.py"),
    )

    # --- Paso 4: Entrenamiento (Train Model) ---
    sklearn_estimator = SKLearn(
        entry_point=os.path.join(os.getcwd(), "../ml_code/train_model.py"),
        role=role,
        instance_type="ml.m5.large",
        framework_version="1.0-1",
        base_job_name=f"{base_job_prefix}-train",
        sagemaker_session=sagemaker_session,
        output_path=f"s3://{default_bucket}/iris-artifacts/model",
    )
    train_step = TrainingStep(
        name="TrainModel",
        estimator=sklearn_estimator,
        inputs={
            "training": sagemaker.inputs.TrainingInput(
                s3_data=split_data_step.properties.ProcessingOutputConfig.Outputs["output"].S3Output.S3Uri,
                content_type="text/csv"
            )
        },
    )

    # --- Paso 5: Evaluación (Evaluate Model) ---
    evaluation_processor = ScriptProcessor(
        image_uri=sagemaker.image_uris.retrieve(framework="sklearn", region=region, version="1.0-1"),
        role=role,
        instance_type="ml.m5.large",
        instance_count=1,
        base_job_name=f"{base_job_prefix}-evaluate",
        sagemaker_session=sagemaker_session,
        command=["python3"], # <--- AGREGADO/CORREGIDO
    )
    
    evaluation_report = PropertyFile(
        name="EvaluationReport",
        output_name="evaluation",
        path="evaluation.json",
    )

    evaluate_step = ProcessingStep(
        name="EvaluateModel",
        processor=evaluation_processor,
        inputs=[
            ProcessingInput(
                source=train_step.properties.ModelArtifacts.S3ModelArtifacts,
                destination="/opt/ml/processing/model",
            ),
            ProcessingInput(
                source=split_data_step.properties.ProcessingOutputConfig.Outputs["output"].S3Output.S3Uri,
                destination="/opt/ml/processing/input",
            ),
        ],
        outputs=[
            ProcessingOutput(
                source="/opt/ml/processing/output", 
                destination=f"s3://{default_bucket}/iris-artifacts/evaluation_report",
                output_name="evaluation"
            )
        ],
        code=os.path.join(os.getcwd(), "../ml_code/evaluate_model.py"),
        property_files=[evaluation_report],
    )

    # --- Configuración para el paso de Registro del Modelo ---
    # Crearemos un archivo JSON temporal con la configuración para el registro
    # y lo subiremos a S3 para que el script de registro lo lea.
    # Esto evita el problema de los 'arguments' en ProcessingStep.
    register_config_data = {
        "model_package_group_name": model_package_group_name,
        "region": region,
        "role_arn": role, # Aunque el script lo puede inferir, lo pasamos para ser explícitos
    }
    
    # Ruta local para el archivo de configuración temporal
    config_dir = "temp_config"
    os.makedirs(config_dir, exist_ok=True)
    register_config_file_name = "register_config.json"
    register_config_file_path = os.path.join(config_dir, register_config_file_name)

    with open(register_config_file_path, "w") as f:
        json.dump(register_config_data, f)
    
    # Subir el archivo de configuración a S3
    s3_client = boto_session.client("s3")
    s3_config_uri = f"s3://{default_bucket}/pipeline-configs/{base_job_prefix}/{register_config_file_name}"
    s3_client.upload_file(register_config_file_path, default_bucket, f"pipeline-configs/{base_job_prefix}/{register_config_file_name}")
    print(f"Archivo de configuración de registro subido a S3: {s3_config_uri}")


    # --- Paso 6: REGISTRO DEL MODELO ---
    register_model_processor = ScriptProcessor(
        image_uri=sagemaker.image_uris.retrieve(framework="sklearn", region=region, version="1.0-1"),
        role=role,
        instance_type="ml.m5.large",
        instance_count=1,
        base_job_name=f"{base_job_prefix}-register-model",
        sagemaker_session=sagemaker_session,
        command=["python3"], # <--- AGREGADO/CORREGIDO
        # NO DEBE LLEVAR 'arguments' ni 'container_arguments' AQUÍ
        # Ya que el script leerá de un archivo de configuración.
    )

    register_model_step = ProcessingStep(
        name="RegisterModel",
        processor=register_model_processor,
        inputs=[
            ProcessingInput(
                source=train_step.properties.ModelArtifacts.S3ModelArtifacts,
                destination="/opt/ml/processing/model_data",
                input_name="model_data" # Nombre de la entrada para SM_CHANNEL_MODEL_DATA_S3_URI
            ),
            # NUEVA ENTRADA para el archivo de configuración
            ProcessingInput(
                source=s3_config_uri,
                destination="/opt/ml/processing/config",
                input_name="config" # Nombre de la entrada para SM_CHANNEL_CONFIG_S3_URI
            ),
             ProcessingInput(
                source=evaluate_step.properties.ProcessingOutputConfig.Outputs["evaluation"].S3Output.S3Uri,
                destination="/opt/ml/processing/evaluation",
                input_name="evaluation"
            )
        ],
        code=os.path.join(os.getcwd(), "../ml_code/register_model.py"), 
        # ELIMINADO: 'arguments' o 'container_arguments' ya no van aquí.
    )
    
    # --- Paso 7: Generar Predicciones Batch (Batch Predict) ---
    batch_predict_processor = ScriptProcessor(
        image_uri=sagemaker.image_uris.retrieve(framework="sklearn", region=region, version="1.0-1"),
        role=role,
        instance_type="ml.m5.large",
        instance_count=1,
        base_job_name=f"{base_job_prefix}-batch-predict",
        sagemaker_session=sagemaker_session,
        command=["python3"], # <--- AGREGADO/CORREGIDO
    )
    batch_predict_step = ProcessingStep(
        name="GenerateBatchPredictions",
        processor=batch_predict_processor,
        inputs=[
            ProcessingInput(
                source=train_step.properties.ModelArtifacts.S3ModelArtifacts,
                destination="/opt/ml/processing/model"
            ),
            ProcessingInput(
                source=preprocess_step.properties.ProcessingOutputConfig.Outputs["output"].S3Output.S3Uri,
                destination="/opt/ml/processing/input"
            ),
            ProcessingInput(
                source=f"s3://{default_bucket}/config", # Asumiendo que este 'config' ya existe o se creará aparte
                destination="/opt/ml/processing/config"
            )
        ],
        outputs=[
            ProcessingOutput(source="/opt/ml/processing/output", destination=f"s3://{default_bucket}/iris-predictions/batch")
        ],
        code=os.path.join(os.getcwd(), "../ml_code/batch_predict.py"),
    )

    # Agrupamos los pasos condicionales
    model_accuracy = JsonGet(
        step_name=evaluate_step.name,
        property_file=evaluation_report,
        json_path="metrics.test_accuracy.value"
    )
    cond_gt_equal = ConditionLessThanOrEqualTo(left=pipeline_model_accuracy_threshold, right=model_accuracy)

    condition_step = ConditionStep(
        name="CheckModelAccuracyAndRegister",
        conditions=[cond_gt_equal],
        if_steps=[register_model_step, batch_predict_step], # Ambos pasos se ejecutan si la condición es verdadera
        else_steps=[],
    )

    # Define el pipeline principal
    pipeline = Pipeline(
        name=f"{base_job_prefix}-IrisMLOpsPipeline",
        parameters=[
            pipeline_model_accuracy_threshold,
            pipeline_sample_rate_for_batch_predict
        ],
        steps=[
            get_data_step,
            preprocess_step,
            split_data_step,
            train_step,
            evaluate_step,
            condition_step
        ],
        sagemaker_session=sagemaker_session,
    )
    return pipeline

if __name__ == "__main__":
    region = "us-east-1"
    role = "arn:aws:iam::503427799533:role/SGMLOPS"
    default_bucket = "albertstats19-mlops-iris-bucket"
    base_job_prefix = "iris-mlops-p"    
    model_package_group_name = "IrisModelGroup"
    
    print(f"Executing locally with: ")
    print(f"  Region: {region}")
    print(f"  Role: {role}")
    print(f"  Bucket: {default_bucket}")
    print(f"  Model Package Group: {model_package_group_name}")

    # Diagnóstico: Imprimir la versión del SDK de SageMaker
    print(f"SageMaker SDK Version: {sagemaker.__version__}")

    pipeline = get_sagemaker_pipeline(
        region=region,
        role=role,
        default_bucket=default_bucket,
        base_job_prefix=base_job_prefix,
        model_package_group_name=model_package_group_name,
    )
    
    print(f"Upserting SageMaker Pipeline: {pipeline.name}")
    pipeline.upsert(role_arn=role)

    print(f"Starting SageMaker Pipeline execution for: {pipeline.name}")
    execution = pipeline.start()
    print(f"SageMaker Pipeline execution ARN: {execution.arn}")
    print("SageMaker Pipeline execution initiated. Check SageMaker Pipelines console for status.")

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


Executing locally with: 
  Region: us-east-1
  Role: arn:aws:iam::503427799533:role/SGMLOPS
  Bucket: albertstats19-mlops-iris-bucket
  Model Package Group: IrisModelGroup
SageMaker SDK Version: 2.247.0


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.


Archivo de configuración de registro subido a S3: s3://albertstats19-mlops-iris-bucket/pipeline-configs/iris-mlops-p/register_config.json
Upserting SageMaker Pipeline: iris-mlops-p-IrisMLOpsPipeline




Starting SageMaker Pipeline execution for: iris-mlops-p-IrisMLOpsPipeline
SageMaker Pipeline execution ARN: arn:aws:sagemaker:us-east-1:503427799533:pipeline/iris-mlops-p-IrisMLOpsPipeline/execution/ycoj0fbwqziw
SageMaker Pipeline execution initiated. Check SageMaker Pipelines console for status.
