# EMR Serverless Step in SageMaker Pipelines

This notebook demonstrates how to use the `EMRServerlessStep` to run Spark jobs on EMR Serverless within a SageMaker Pipeline.

## Prerequisites
- An AWS account with EMR Serverless access

## What This Notebook Does
1. Creates an IAM role for EMR Serverless (if it doesn't exist)
2. Creates a sample PySpark script
3. Copies sample data to your S3 bucket
4. Creates an EMR Serverless step that provisions a new application
5. Creates and executes a SageMaker Pipeline

In [None]:
from sagemaker.mlops.workflow.emr_serverless_step import (
    EMRServerlessStep,
    EMRServerlessJobConfig,
)
from sagemaker.mlops.workflow.pipeline import Pipeline
from sagemaker.core.workflow.parameters import ParameterString
from sagemaker.core.workflow.pipeline_context import PipelineSession
from sagemaker.core.helper.session_helper import Session, get_execution_role

In [None]:
# Create the SageMaker Session
sagemaker_session = Session()
pipeline_session = PipelineSession()
region = sagemaker_session.boto_region_name
account_id = sagemaker_session.account_id()

print(f"Region: {region}")
print(f"Account ID: {account_id}")

In [None]:
# Define variables and parameters needed for the Pipeline steps
role = get_execution_role()
default_bucket = sagemaker_session.default_bucket()
s3_prefix = "v3-emr-serverless-pipeline"

# Pipeline parameters
emr_execution_role = ParameterString(
    name="EMRServerlessExecutionRole",
    default_value=f"arn:aws:iam::{account_id}:role/EMRServerlessExecutionRole"
)

spark_script_uri = ParameterString(
    name="SparkScriptUri",
    default_value=f"s3://{default_bucket}/{s3_prefix}/scripts/spark_job.py"
)

print(f"Role: {role}")
print(f"Default Bucket: {default_bucket}")

## Create IAM Role for EMR Serverless

The EMR Serverless job needs an execution role with permissions to access S3 and CloudWatch Logs.

In [None]:
import boto3
import json

# Create IAM role for EMR Serverless (if it doesn't exist)
iam_client = boto3.client('iam')

trust_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "emr-serverless.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

try:
    iam_client.create_role(
        RoleName="EMRServerlessExecutionRole",
        AssumeRolePolicyDocument=json.dumps(trust_policy),
        Description="Execution role for EMR Serverless"
    )
    print("Role created!")
except iam_client.exceptions.EntityAlreadyExistsException:
    print("Role already exists")

# Attach required policies
for policy_arn in [
    "arn:aws:iam::aws:policy/AmazonS3FullAccess",
    "arn:aws:iam::aws:policy/CloudWatchLogsFullAccess"
]:
    iam_client.attach_role_policy(
        RoleName="EMRServerlessExecutionRole",
        PolicyArn=policy_arn
    )

print("EMRServerlessExecutionRole is ready!")
print(f"Role ARN: arn:aws:iam::{account_id}:role/EMRServerlessExecutionRole")

In [None]:
!mkdir -p code

In [None]:
%%writefile code/spark_job.py

"""Sample PySpark job for EMR Serverless."""
import argparse
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", type=str, required=True, help="Input S3 path")
    parser.add_argument("--output", type=str, required=True, help="Output S3 path")
    args = parser.parse_args()

    # Create Spark session
    spark = SparkSession.builder.appName("EMRServerlessExample").getOrCreate()

    print(f"Reading data from: {args.input}")
    
    # Define schema for abalone dataset (no headers in CSV)
    schema = StructType([
        StructField("sex", StringType(), True),
        StructField("length", DoubleType(), True),
        StructField("diameter", DoubleType(), True),
        StructField("height", DoubleType(), True),
        StructField("whole_weight", DoubleType(), True),
        StructField("shucked_weight", DoubleType(), True),
        StructField("viscera_weight", DoubleType(), True),
        StructField("shell_weight", DoubleType(), True),
        StructField("rings", DoubleType(), True),
    ])
    
    # Read input data (no header in abalone dataset)
    df = spark.read.csv(args.input, header=False, schema=schema)
    
    # Simple transformation - show schema and count
    print("Schema:")
    df.printSchema()
    print(f"Row count: {df.count()}")
    
    # Show sample data
    print("Sample data:")
    df.show(5)
    
    # Example transformation - compute statistics
    result_df = df.describe()
    
    # Write output
    print(f"Writing results to: {args.output}")
    result_df.write.mode("overwrite").parquet(args.output)
    
    print("Job completed successfully!")
    spark.stop()


if __name__ == "__main__":
    main()

In [None]:
import boto3

# Upload the Spark script to S3
s3_client = boto3.client("s3")
script_s3_key = f"{s3_prefix}/scripts/spark_job.py"

s3_client.upload_file(
    "code/spark_job.py",
    default_bucket,
    script_s3_key
)

script_s3_uri = f"s3://{default_bucket}/{script_s3_key}"
print(f"Spark script uploaded to: {script_s3_uri}")

## Copy Sample Data to Your Bucket

We copy the sample data from AWS public bucket to your bucket to ensure it's in the same region as EMR Serverless.

In [None]:
import boto3

# Copy sample data to your bucket (same region as EMR Serverless)
s3_resource = boto3.resource('s3')
copy_source = {
    'Bucket': 'sagemaker-sample-files',
    'Key': 'datasets/tabular/uci_abalone/abalone.csv'
}

dest_key = f"{s3_prefix}/input/abalone.csv"
s3_resource.meta.client.copy(copy_source, default_bucket, dest_key)

input_data_uri = f"s3://{default_bucket}/{dest_key}"
print(f"Sample data copied to: {input_data_uri}")

## Create EMR Serverless Step

The `EMRServerlessStep` supports two modes:
1. **Existing Application**: Use an existing EMR Serverless application ID
2. **New Application**: Create a new EMR Serverless application as part of the step

This notebook uses Option 2 (New Application) so it works out of the box.

In [None]:
# Define the EMR Serverless job configuration
job_config = EMRServerlessJobConfig(
    job_driver={
        "sparkSubmit": {
            "entryPoint": script_s3_uri,
            "entryPointArguments": [
                "--input", input_data_uri,
                "--output", f"s3://{default_bucket}/{s3_prefix}/output/"
            ],
        }
    },
    execution_role_arn=f"arn:aws:iam::{account_id}:role/EMRServerlessExecutionRole",
    configuration_overrides={
        "monitoringConfiguration": {
            "s3MonitoringConfiguration": {
                "logUri": f"s3://{default_bucket}/{s3_prefix}/logs/"
            }
        }
    }
)

print("EMR Serverless Job Configuration created")

### Option 1: Use Existing EMR Serverless Application (Optional)

If you have an existing EMR Serverless application, you can use it instead. Uncomment the code below and replace `your-application-id` with your actual application ID.

In [None]:
# Option 1: Use an existing EMR Serverless application
# Uncomment below if you have an existing application

# step_emr_serverless_existing = EMRServerlessStep(
#     name="EMRServerlessSparkJob",
#     display_name="EMR Serverless Spark Job",
#     description="Run a PySpark job on EMR Serverless",
#     job_config=job_config,
#     application_id="your-application-id",  # Replace with your application ID
# )

print("Option 1 skipped - Using Option 2 (new application) below")

### Option 2: Create New EMR Serverless Application (Default)

This option creates a new EMR Serverless application as part of the pipeline step. The application will auto-start when needed and auto-stop after 15 minutes of idle time.

In [None]:
# Option 2: Create a new EMR Serverless application as part of the step
# This is the default option that works out of the box

step_emr_serverless = EMRServerlessStep(
    name="EMRServerlessSparkJob",
    display_name="EMR Serverless Spark Job",
    description="Run a PySpark job with a newly created EMR Serverless application",
    job_config=job_config,
    application_config={
        "name": "sagemaker-pipeline-spark-app",
        "releaseLabel": "emr-6.15.0",
        "type": "SPARK",
        "autoStartConfiguration": {
            "enabled": True
        },
        "autoStopConfiguration": {
            "enabled": True,
            "idleTimeoutMinutes": 15
        },
    },
)

print(f"Step Name: {step_emr_serverless.name}")
print(f"Step Type: {step_emr_serverless.step_type}")

In [None]:
# Create the pipeline

pipeline = Pipeline(
    name="EMRServerlessPipeline",
    parameters=[
        emr_execution_role,
        spark_script_uri,
    ],
    steps=[step_emr_serverless],
    sagemaker_session=pipeline_session,
)

print("Pipeline created successfully!")

In [None]:
import json

definition = json.loads(pipeline.definition())
print(json.dumps(definition, indent=2))

## Execute Pipeline

The cells below will:
1. Create/update the pipeline in SageMaker
2. Start the pipeline execution
3. Wait for completion

In [None]:
# Create/update the pipeline
pipeline.upsert(role_arn=role)
print("Pipeline upserted successfully!")

In [None]:
# Start the pipeline execution
execution = pipeline.start()
print(f"Pipeline execution started: {execution.arn}")

In [None]:
import time

# Wait for pipeline execution to complete
while True:
    status = execution.describe()['PipelineExecutionStatus']
    print(f"Status: {status}")
    
    if status in ['Succeeded', 'Failed', 'Stopped']:
        print(f"Pipeline finished with status: {status}")
        break
    
    print("Still running... waiting 30 seconds")
    time.sleep(30)

In [None]:
# Check step execution details
steps = execution.list_steps()
for step in steps:
    print(f"Step: {step['StepName']}")
    print(f"  Status: {step['StepStatus']}")
    if 'FailureReason' in step:
        print(f"  Failure Reason: {step['FailureReason']}")
    print()

## Cleanup (Optional)

Uncomment the cell below to delete the pipeline when you're done.

In [None]:
# Uncomment to delete the pipeline
sm_client = sagemaker_session.sagemaker_client
sm_client.delete_pipeline(PipelineName="EMRServerlessPipeline")
print("Pipeline deleted")