In [7]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import ScriptProcessor
from sagemaker.spark.processing import PySparkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline import Pipeline

In [8]:
# 1. Configuración inicial
role = get_execution_role()
sagemaker_session = sagemaker.Session()
default_bucket = sagemaker_session.default_bucket()

# URIs en S3 donde debes subir previamente tus 3 scripts .py
uri_script_limpieza = f"s3://{default_bucket}/scripts/TEST_PS_MX_1_limpieza.py"
uri_script_modelado = f"s3://{default_bucket}/scripts/TEST_PS_MX_2_modelado.py"
uri_script_reglas = f"s3://{default_bucket}/scripts/TEST_PS_MX_3_reglas_negocio.py"

# --- PASO 1: LIMPIEZA ---
# Usamos un contenedor base de Python (Scikit-Learn es un estándar ligero)
processor_limpieza = ScriptProcessor(
    command=['python3'],
    image_uri=sagemaker.image_uris.retrieve("sklearn", sagemaker_session.boto_region_name, "1.2-1"),
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge" # Instancia estándar para cruce de datos
)

step_limpieza = ProcessingStep(
    name="PS-Mexico-Paso1-Limpieza",
    processor=processor_limpieza,
    code=uri_script_limpieza,
    outputs=[
        ProcessingOutput(
            output_name="output_limpieza",
            source="/opt/ml/processing/output/limpieza" # Ruta definida en limpieza.py
        )
    ]
)

INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


In [9]:
# --- PASO 2: MODELADO (ALS CON PYSPARK) ---
# Usamos un contenedor optimizado para PySpark
processor_modelado = PySparkProcessor(
    base_job_name="sm-spark-als",
    framework_version="3.3",
    role=role,
    instance_count=1,
    instance_type="ml.r5.xlarge", # Instancia optimizada para memoria (ideal para ALS)
    max_runtime_in_seconds=7200,
)

step_modelado = ProcessingStep(
    name="PS-Mexico-Paso2-Modelado",
    processor=processor_modelado,
    code=uri_script_modelado,
    inputs=[
        ProcessingInput(
            source=step_limpieza.properties.ProcessingOutputConfig.Outputs["output_limpieza"].S3Output.S3Uri,
            destination="/opt/ml/processing/input/rutas" # Ruta que lee modelado.py
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="output_modelado",
            source="/opt/ml/processing/output/modelado" # Ruta que escribe modelado.py
        )
    ]
    # NOTA: Al pasar el S3Output del paso 1 como source del paso 2, 
    # SageMaker automáticamente sabe que el Paso 2 DEBE esperar al Paso 1.
)

INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


In [10]:
# --- PASO 3: REGLAS DE NEGOCIO ---
processor_reglas = ScriptProcessor(
    command=['python3'],
    image_uri=sagemaker.image_uris.retrieve("sklearn", sagemaker_session.boto_region_name, "1.2-1"),
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge"
)

step_reglas = ProcessingStep(
    name="PS-Mexico-Paso3-ReglasNegocio",
    processor=processor_reglas,
    code=uri_script_reglas,
    inputs=[
        # Necesita la base limpia del Paso 1
        ProcessingInput(
            source=step_limpieza.properties.ProcessingOutputConfig.Outputs["output_limpieza"].S3Output.S3Uri,
            destination="/opt/ml/processing/input/limpieza"
        ),
        # Necesita las predicciones del Paso 2
        ProcessingInput(
            source=step_modelado.properties.ProcessingOutputConfig.Outputs["output_modelado"].S3Output.S3Uri,
            destination="/opt/ml/processing/input/modelado"
        )
    ]
    # Este script sube los resultados finales a S3 mediante awswrangler, 
    # por lo que no es estrictamente necesario declarar un ProcessingOutput aquí.
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


In [11]:
# --- DEFINICIÓN Y CREACIÓN DEL PIPELINE ---
pipeline = Pipeline(
    name="Pipeline-PedidoSugerido-Mexico",
    steps=[step_limpieza, step_modelado, step_reglas],
    sagemaker_session=sagemaker_session
)

# Esto registra/actualiza el pipeline en AWS
pipeline.upsert(role_arn=role)
print("Pipeline registrado con éxito.")

INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.


Pipeline registrado con éxito.


In [14]:
# Descomenta la siguiente línea si deseas forzar una ejecución manual ahora mismo:
execution = pipeline.start()
execution.wait()

INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.
