## Spark Data Prepation & SageMaker Pipeline Integration

In this example we take a look at how you can connect an Amazon EMR Cluster to SageMaker Studio using a Service Catalog Template. Optionally if you already have an existing EMR Cluster with proper networking access enabled you can plug in that value into the cluster-id delimiter. For this specific example we take an NLP Text Classification use-case and use BERT with the SST2 public dataset for binary classification of text. We first explore how you can work with data via the Spark EMR connection with Studio, then we take this dataset and implement it into an ML Workflow using SageMaker Pipelines.

### Setup

We first connect to our EMR cluster, for our notebook setting we have the following specs:

- Studio Kernel: SparkMagic PySpark
- Instance Type: ml.t3.medium

For our EMR Cluster you can provision the necessary instance type and count using the Service Catalog template, launching of the cluster will take around ~10 minutes.

In [None]:
# substitute cluster-ID with your cluster-ID
%load_ext sagemaker_studio_analytics_extension.magics
%sm_analytics emr connect --verify-certificate False --cluster-id j-JMOFYJLFSXA8 --auth-type None --language python 

### Spark Preprocessing

Let's run some sample Spark commands to understand our dataset better. You can upload the train.csv to an S3 Bucket of your choice and replace the bucket with your bucket name/path.

In [None]:
import os
import string
import json
from io import BytesIO
import boto3
import numpy as np
from pyspark.sql import Row
from PIL import Image, ImageDraw
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from matplotlib.ticker import NullLocator
from pyspark.sql.functions import monotonically_increasing_id

In [None]:
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()

In [None]:
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("spark.hadoop.fs.s3a.committer.name", "directory")

In [None]:
df = spark.read.format("csv").load(
    "s3://bert-training-set/train.csv"
)
df.show()

In [None]:
df.count()

In [None]:
df.columns

In [None]:
# let's see distribution of 0 and 1 values in our dataset
label_counts = df.groupBy("_c0").count()
label_counts.show()

### SageMaker Pipelines Setup

This notebook creates and executes a pipeline with three Notebook Job steps and four notebooks in its workflow:

- <b>Preprocess Step</b>: Pulls down the NLP Dataset and conducts preprocessing.
- <b>Train Step</b>: Conducts Training  with a Transformers BERT Model.
    - Additional Dependencies: Notebook that prepares a test dataset for sample inference with Trained Model.
- <b>Batch Inference and Model Monitor Step</b>: Conducts Batch Inference and also sets up Model Monitor for Data Quality to generate a Baseline.

![workflow](workflow.png)

In [None]:
%%bash
pip install -U sagemaker

In [None]:
%%local
import sagemaker
from sagemaker import get_execution_role
from sagemaker.s3 import S3Downloader
from sagemaker.s3_utils import s3_path_join
from sagemaker.utils import name_from_base
from sagemaker.workflow import ParameterString
from sagemaker.workflow.notebook_job_step import NotebookJobStep
from sagemaker.workflow.parameters import ParameterInteger, ParameterBoolean, ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker import session

sagemaker_session = sagemaker.Session()
default_bucket = sagemaker_session.default_bucket()
subfolder_name = "notebook-step-artifacts-pipelines/"
image_uri = "542918446943.dkr.ecr.us-west-2.amazonaws.com/sagemaker-distribution-prod:0-cpu"
kernel_name = "python3"
role = sagemaker.get_execution_role()
notebook_artifacts = f"s3://{default_bucket}/{subfolder_name}"
print(notebook_artifacts)

In [None]:
%%local
pipeline_name = "nb-job-steps-pipelines"
display_name = "MyNotebookSteps"
preprocess_notebook = "preprocess.ipynb"
preprocess_job_name = "nb-preprocess"
preprocess_description = "This step downloads an NLP dataset and creates a CSV file out of it"
preprocess_step_name = "preprocess-bert"

# notebook job parameters
nb_job_params = {"default_s3_bucket": notebook_artifacts}

preprocess_nb_step = NotebookJobStep(
    name=preprocess_step_name,
    description=preprocess_description,
    notebook_job_name=preprocess_job_name,
    image_uri=image_uri,
    kernel_name=kernel_name,
    display_name=display_name,
    role=role,
    input_notebook=preprocess_notebook,
    instance_type="ml.m5.4xlarge",
    parameters=nb_job_params,
)
# notebook two configuration
training_notebook = "training.ipynb"
test_data_prep_notebook = "prepare-test-set.ipynb"
training_job_name = "nb-training"
training_description = "This step takes the prepared S3 dataset and runs fine-tuning"
training_step_name = "training-bert"

train_nb_step = NotebookJobStep(
    name=training_step_name,
    description=training_description,
    notebook_job_name=training_job_name,
    input_notebook=training_notebook,
    additional_dependencies=[test_data_prep_notebook],
    image_uri=image_uri,
    kernel_name=kernel_name,
    display_name=display_name,
    instance_type="ml.m5.12xlarge",
    role=role,
    parameters=nb_job_params,
)
train_nb_step.add_depends_on([preprocess_nb_step])


# notebook three configuration
batch_monitor_notebook = "transform-monitor.ipynb"
batch_monitor_job_name = "nb-batch-monitor"
batch_monitor_description = "This step runs Batch Inference and Training"
batch_monitor_step_name = "batch-monitor"

batch_monitor_step = NotebookJobStep(
    name=batch_monitor_step_name,
    description=batch_monitor_description,
    notebook_job_name=batch_monitor_job_name,
    input_notebook=batch_monitor_notebook,
    image_uri=image_uri,
    kernel_name=kernel_name,
    display_name=display_name,
    instance_type="ml.m5.12xlarge",
    role=role,
    parameters=nb_job_params,
)
batch_monitor_step.add_depends_on([train_nb_step])

In [None]:
%%local
# create pipeline
pipeline = Pipeline(
    name=pipeline_name,
    steps=[preprocess_nb_step, train_nb_step, batch_monitor_step],
)

#### Pipeline Execution
This pipeline will take approximately 45 minutes to execute.

In [None]:
%%local
pipeline.create(session.get_execution_role())
execution = pipeline.start(parameters={})
execution.wait(delay=30, max_attempts=60)
execution_steps = execution.list_steps()
print(execution_steps)

#### Output Notebook Parsing

In [None]:
%%local

# download the output notebook
from sagemaker.s3_utils import s3_path_join
from sagemaker.utils import _tmpdir
from sagemaker.s3 import S3Downloader
import tarfile
import os


# get job details
def _get_training_job_details(notebook_job_step):
    training_job_arn = notebook_job_step["Metadata"]["TrainingJob"]["Arn"]

    return sagemaker_session.sagemaker_client.describe_training_job(
        TrainingJobName=training_job_arn.split("/")[1]
    )


def _download_notebook(output_s3_uri, output_notebook_name, kms_key=None):
    download_folder = "outputs"

    if not os.path.exists(download_folder):
        os.makedirs(download_folder)

    with _tmpdir() as temp_output_folder:
        S3Downloader.download(
            output_s3_uri,
            temp_output_folder,
            sagemaker_session=sagemaker_session,
            kms_key=kms_key,
        )

        with tarfile.open(os.path.join(temp_output_folder, "output.tar.gz"), "r:gz") as tar:
            tar.extract(output_notebook_name, download_folder)
            print(f"Downloaded to {download_folder}/{output_notebook_name}")


# download the output notebook job
job_description = _get_training_job_details(execution_steps[0])

output_s3_uri = s3_path_join(
    job_description["OutputDataConfig"]["S3OutputPath"],
    job_description["TrainingJobName"],
    "output",
    "output.tar.gz",
)
output_notebook_name = job_description["Environment"]["SM_OUTPUT_NOTEBOOK_NAME"]

print(f"  - Output S3 Location: {output_s3_uri}")
print(f"  - Output Notebook Name: {output_notebook_name}")

_download_notebook(output_s3_uri, output_notebook_name)

### Cleanup

In [None]:
# Delete the Pipeline
pipeline.delete()