# Sagemaker Pipeline with EMR Steps

Sample code to build sagemaker pipeline to call an EMR Step.

Please refer to the following docs for detail: https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps-types.html#step-type-emr

In [1]:
import os
import json

import boto3
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
# Create the SageMaker Session

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
region = sagemaker_session.boto_region_name
boto_session = boto3.Session(region_name=region)
sagemaker_client = sagemaker_session.sagemaker_client
default_bucket = sagemaker_session.default_bucket()


account = boto_session.client("sts").get_caller_identity()["Account"]


pipeline_session = PipelineSession(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    default_bucket=default_bucket,
)

## Parameters

In [3]:
# Variables for the EMR step

pipeline_name = "MyEMRStepPipeline"
base_job_prefix = "my-emr-step-pipeline"


job_flow_role = f"arn:aws:iam::{account}:instance-profile/EMR_EC2_DefaultRole"
service_role = f"arn:aws:iam::{account}:role/EMR_DefaultRole"

In [4]:
# Define variables and parameters needed for the Pipeline
# parameters for pipeline execution

script = "s3://sagemaker-ap-southeast-3-117019135262/emr-step-pipeline/app/preprocess.py"

input_data = ParameterString(
    name="InputDataUrl",
    default_value=f"s3://sagemaker-example-files-prod-ap-southeast-1/datasets/tabular/uci_abalone/abalone.csv",
)

output_path = f"s3://{default_bucket}/{base_job_prefix}/prep"

## RAPIDS Spark GPU Accelerator

In this example we are using RAPIDS Spark library to accelerate Spark jobs, based on the parameter described in https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-rapids.html

In [5]:
rapids_configurations = [
    {
        "Classification":"spark",
        "Properties":{
            "enableSparkRapids":"true"
        }
    },
    {
        "Classification":"yarn-site",
        "Properties":{
            "yarn.nodemanager.resource-plugins":"yarn.io/gpu",
            "yarn.resource-types":"yarn.io/gpu",
            "yarn.nodemanager.resource-plugins.gpu.allowed-gpu-devices":"auto",
            "yarn.nodemanager.resource-plugins.gpu.path-to-discovery-executables":"/usr/bin",
            "yarn.nodemanager.linux-container-executor.cgroups.mount":"true",
            "yarn.nodemanager.linux-container-executor.cgroups.mount-path":"/spark-rapids-cgroup",
            "yarn.nodemanager.linux-container-executor.cgroups.hierarchy":"yarn",
            "yarn.nodemanager.container-executor.class":"org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor"
        }
    },
    {
        "Classification":"container-executor",
        "Properties":{
            
        },
        "Configurations":[
            {
                "Classification":"gpu",
                "Properties":{
                    "module.enabled":"true"
                }
            },
            {
                "Classification":"cgroups",
                "Properties":{
                    "root":"/spark-rapids-cgroup",
                    "yarn-hierarchy":"yarn"
                }
            }
        ]
    },
    {
        "Classification":"spark-defaults",
        "Properties":{
            "spark.plugins":"com.nvidia.spark.SQLPlugin",
            "spark.executor.resource.gpu.discoveryScript":"/usr/lib/spark/scripts/gpu/getGpusResources.sh",
            "spark.executor.extraLibraryPath":"/usr/local/cuda/targets/x86_64-linux/lib:/usr/local/cuda/extras/CUPTI/lib64:/usr/local/cuda/compat/lib:/usr/local/cuda/lib:/usr/local/cuda/lib64:/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native",
            "spark.submit.pyFiles":"/usr/lib/spark/jars/xgboost4j-spark_3.0-1.4.2-0.3.0.jar",
            "spark.rapids.sql.concurrentGpuTasks":"1",
            "spark.executor.resource.gpu.amount":"1",
            "spark.executor.cores":"2",
            "spark.task.cpus":"1",
            "spark.task.resource.gpu.amount":"0.5",
            "spark.rapids.memory.pinnedPool.size":"0",
            "spark.executor.memoryOverhead":"2G",
            "spark.locality.wait":"0s",
            "spark.sql.shuffle.partitions":"200",
            "spark.sql.files.maxPartitionBytes":"512m"
        }
    },
    {
        "Classification":"capacity-scheduler",
        "Properties":{
            "yarn.scheduler.capacity.resource-calculator":"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
        }
    }
]


## Sagemaker Pipeline EMR Step

Based on the references in 
- https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html
- https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps-types.html#step-type-emr
- https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#emrstep
- https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/emr-step/sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb

In [9]:
# Process the training data step using a PySpark script.
# Split the training data set into train, test, and validation datasets
# Run as a step as a job flow on EMR
emr_config = EMRStepConfig(
    jar="command-runner.jar",
    args=[
        "spark-submit",
        "--deploy-mode",
        "cluster",
        script,
        "--input",
        input_data,
        "--output",
        output_path,
    ],
)


step_emr = EMRStep(
    name="EMRStep",
    cluster_id=None,
    step_config=emr_config,
    display_name="Preprocess",
    description="preprocess data for XGBoost",
    cluster_config={
        "Applications": [
            {
                "Name": "Spark",
            }
        ],
        "Instances": {
            "InstanceGroups": [
                {"InstanceRole": "MASTER", "InstanceCount": 1, "InstanceType": "m5.xlarge"},
                {"InstanceRole": "CORE", "InstanceCount": 2, "InstanceType": "g5.xlarge"},
            ]
        },
        "BootstrapActions": [
            {
                "Name": "Install packages",
                "ScriptBootstrapAction": {
                    "Path": "s3://emr-test-117019135262/xldemo/service-catalog/bootstrap-emr-script.sh",
                    "Args": ["s3://emr-test-117019135262/xldemo/service-catalog/requirements-numpy.txt"]
                }
            },
            {
                "Name": "RAPIDS bootstrap",
                "ScriptBootstrapAction": {
                    "Path": "s3://emr-test-117019135262/xldemo/service-catalog/bootstrap-rapids-script.sh"
                }
            }
        ],
        "ReleaseLabel": "emr-7.3.0",
        "JobFlowRole": job_flow_role,
        "ServiceRole": service_role,
        "LogUri": "s3://aws-logs-117019135262-ap-southeast-3/elasticmapreduce"
    },
)


In [10]:
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
    ],
    steps=[step_emr],
    sagemaker_session=pipeline_session,
)


## Execute the pipeline

In [11]:
definition = json.loads(pipeline.definition())
definition


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'InputDataUrl',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-example-files-prod-ap-southeast-1/datasets/tabular/uci_abalone/abalone.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'EMRStep',
   'Type': 'EMR',
   'Arguments': {'StepConfig': {'HadoopJarStep': {'Jar': 'command-runner.jar',
      'Args': ['spark-submit',
       '--deploy-mode',
       'cluster',
       's3://sagemaker-ap-southeast-3-117019135262/emr-step-pipeline/app/preprocess.py',
       '--input',
       {'Get': 'Parameters.InputDataUrl'},
       '--output',
       's3://sagemaker-ap-southeast-3-117019135262/my-emr-step-pipeline/prep']}},
    'ClusterConfig': {'Applications': [{'Name': 'Spark'}],
     'Instances': {'InstanceGroups': [{'InstanceRole': 'MASTER',
        'InstanceCount': 1,
        'InstanceType': 'm5.xlarge'}

In [None]:
pipeline.upsert(role_arn=role)


In [12]:
execution = pipeline.start()


In [None]:
execution.wait()


In [18]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:ap-southeast-3:117019135262:pipeline/MyEMRStepPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:ap-southeast-3:117019135262:pipeline/MyEMRStepPipeline/execution/6rib0w45wsxm',
 'PipelineExecutionDisplayName': 'execution-1731397501288',
 'PipelineExecutionStatus': 'Succeeded',
 'PipelineExperimentConfig': {'ExperimentName': 'myemrsteppipeline',
  'TrialName': '6rib0w45wsxm'},
 'CreationTime': datetime.datetime(2024, 11, 12, 7, 45, 1, 243000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 11, 12, 8, 0, 33, 776000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:ap-southeast-3:117019135262:user-profile/d-i1nji19a0nxy/studio-user',
  'UserProfileName': 'studio-user',
  'DomainId': 'd-i1nji19a0nxy',
  'IamIdentity': {'Arn': 'arn:aws:sts::117019135262:assumed-role/SMEMR-EMR-SageMakerExecutionRole/SageMaker',
   'PrincipalId': 'AROARWPXCJUPIGXUMCZKV:SageMaker'}},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sage