## Sagemaker Processing Job

haimtran 05/09/2023

It take about 10 minutes to process 100 files x 140MB with 8 ml.m4.xlarge 

- [sagemaker session](https://sagemaker.readthedocs.io/en/stable/api/utility/session.html)
- [sagemaker process data docs](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html)
- [sagemaker processing job sdk](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_processing.html)
- [sagemaker pipe mode](https://aws.amazon.com/blogs/machine-learning/using-pipe-input-mode-for-amazon-sagemaker-algorithms/)
- [sagemaker spark job](https://sagemaker-examples.readthedocs.io/en/latest/sagemaker_processing/spark_distributed_data_processing/sagemaker-spark-processing.html)
- [sagemaker distributed processing](https://docs.aws.amazon.com/prescriptive-guidance/latest/patterns/use-sagemaker-processing-for-distributed-feature-engineering-of-terabyte-scale-ml-datasets.html)
- [S3DataDistributionType ](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ProcessingS3Input.html)
- [stackoverflow sagemaker distributed](https://stackoverflow.com/questions/68624368/distributed-processing-aws-sagemaker)
- [s3fs docs](https://s3fs.readthedocs.io/en/latest/)
- [timeout default 24 hours](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.processing.Processor)
- [timeout limit](https://docs.aws.amazon.com/general/latest/gr/sagemaker.html)
- [service quota](https://docs.aws.amazon.com/general/latest/gr/sagemaker.html)



## Processing Job S3 Data Flow 
<img src="./assets/sagemaker-processing-job-1.png" width="70%"/>

## Processing Job Input and Output
<img src="./assets/sagemaker-processing-job-2.png" width="70%">

In [10]:
import glob
from time import strftime

from sagemaker import get_execution_role, image_uris
from sagemaker.processing import ProcessingInput, ProcessingOutput, Processor
from sagemaker.session import Session
import boto3



## Sagemaker Session 

In [11]:
role = get_execution_role()

In [12]:
print(role)

arn:aws:iam::005681628734:role/RoleForDataScientistUserProfile


In [13]:
session = Session()

In [14]:
bucket = session.default_bucket()

In [15]:
bucket

'sagemaker-us-east-1-005681628734'

## Envrinment Variables 

In [16]:
container_base_path = "/opt/ml/processing"

In [17]:
data_input_path = f"s3://{bucket}/data-ecg/"

In [18]:
code_input_path = f"s3://{bucket}/script/pca-ecg.py"

In [19]:
data_output_path = f"s3://{bucket}/data-pca/"

## Upload Data 

In [20]:
for file in glob.glob("./data/*.csv"):
    session.upload_data(path=file, bucket=bucket, key_prefix="data-house")

In [22]:
# client = boto3.client("s3")
# for k in range(100):
#     print(f"upload file raw_{k}.csv")
#     client.upload_file("data/171A_raw.csv", Bucket=bucket, Key=f"data-ecg/raw_{k}.csv")

## Upload Script 

In [23]:
session.upload_data(path="./script/pca-ecg.py", bucket=bucket, key_prefix="script")

's3://sagemaker-us-east-1-005681628734/script/pca-ecg.py'

## Processing Job 

In [24]:
image_uri = image_uris.retrieve(
    region="us-east-1", framework="sklearn", version="0.23-1"
)

In [25]:
image_uri

'683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3'

In [26]:
processor = Processor(
    role=role,
    image_uri=image_uri,
    # check service quota from console depending on instance type    
    instance_count=8,
    instance_type="ml.m4.xlarge",
    entrypoint=[
        "python",
        f"{container_base_path}/script/pca-ecg.py",
        f"--bucket={bucket}",
    ],
)

In [None]:
# it takes about 5 minutes
processor.run(
    inputs=[
        ProcessingInput(
            # data in s3://bucket/data-ecg/
            source=data_input_path,
            # data in container /opt/ml/processing/data/
            destination=f"{container_base_path}/data/",
            s3_data_distribution_type="ShardedByS3Key",
        ),
        ProcessingInput(
            # code in s3://bucket/script/
            source=code_input_path,
            # code in container /opt/ml/processing/script/
            destination=f"{container_base_path}/script/",
        ),
    ],
    outputs=[
        ProcessingOutput(
            # data in container /output/ml/processing/output/
            source=f"{container_base_path}/output/",
            # data in s3 s3://bucket/data-pca/
            destination=f"{data_output_path}",
            output_name="data-pca",
        )
    ],
    job_name=f'demo-{strftime("%Y-%m-%d-%H-%M-%S")}',
)

INFO:sagemaker:Creating processing-job with name demo-2023-09-11-02-41-32
