The objective of this notebook is an example of using SageMaker Processing to create train, test, and validation datasets. SageMaker Processing is used to create these datasets, which then are written back to S3.

Source:
https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker_processing/basic_sagemaker_data_processing/basic_sagemaker_processing.ipynb

#### Prepare Resources

In [16]:
import re
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
from sagemaker.sklearn.processing import SKLearnProcessor


from sagemaker.session import Session
from sagemaker import get_execution_role
import sagemaker
import boto3

nltk.download('punkt')
nltk.download('stopwords')

[nltk_data] Downloading package punkt to /home/studio-lab-
[nltk_data]     user/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /home/studio-lab-
[nltk_data]     user/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [17]:
AWS_ACCESS_KEY = 'AKIAQGM3DJFMSHEESLTE'
AWS_SECRET = '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'

region_name='us-east-2'

boto_session = boto3.session.Session(
   aws_access_key_id=AWS_ACCESS_KEY,
   aws_secret_access_key=AWS_SECRET,
   region_name=region_name
)

sagemaker_session = Session(boto_session=boto_session)

model_package_group_name = f"AmazonModelPackageGroupName"

role = 'arn:aws:iam::013747046745:role/sagemaker-role-amazon'

In [18]:
sklearn_processor = SKLearnProcessor(
    framework_version="1.0-1", 
    role=role, 
    instance_type="ml.m5.xlarge", 
    instance_count=1
)

#### Download Data

In [19]:
df = pd.read_csv('train_10k.csv', index_col=0)
df.dropna(axis=0, how='any', inplace=True)
df.head()

Unnamed: 0,label,text
0,1,Stuning even for the non-gamerThis sound track...
1,1,The best soundtrack ever to anything.I'm readi...
2,1,Amazing!This soundtrack is my favorite music o...
3,1,Excellent SoundtrackI truly like this soundtra...
4,1,"Remember, Pull Your Jaw Off The Floor After He..."


#### Prepare Processing Script

In [22]:
%%writefile preprocessing.py

import pandas as pd
import os
from sklearn.model_selection import train_test_split

input_data_path = os.path.join("/opt/ml/processing/input", "train_10k.csv")
df = pd.read_csv(input_data_path)
print("Shape of data is:", df.shape)
train, test = train_test_split(df, test_size=0.2)
train, validation = train_test_split(train, test_size=0.2)

try:
    os.makedirs("/opt/ml/processing/output/train")
    os.makedirs("/opt/ml/processing/output/validation")
    os.makedirs("/opt/ml/processing/output/test")
    print("Successfully created directories")
except Exception as e:
    # if the Processing call already creates these directories (or directory otherwise cannot be created)
    print(e)
    print("Could not make directories")
    pass

try:
    train.to_csv("/opt/ml/processing/output/train/train.csv")
    validation.to_csv("/opt/ml/processing/output/validation/validation.csv")
    test.to_csv("/opt/ml/processing/output/test/test.csv")
    print("Wrote files successfully")
except Exception as e:
    print("Failed to write the files")
    print(e)
    pass

print("Completed running the processing job")

Overwriting preprocessing.py


#### Run Processing Job

In [23]:
%%capture output

from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor.run(
    code="preprocessing.py",
    # arguments = ["arg1", "arg2"], # Arguments can optionally be specified here
    inputs=[ProcessingInput(source="train_10k.csv", destination="/opt/ml/processing/input")],
    outputs=[
        ProcessingOutput(source="/opt/ml/processing/output/train"),
        ProcessingOutput(source="/opt/ml/processing/output/validation"),
        ProcessingOutput(source="/opt/ml/processing/output/test"),
    ],
)

In [24]:
# get processing job logs
print(output)
job_name = str(output).split("\n")[1].split(" ")[-1]


Job Name:  sagemaker-scikit-learn-2022-06-12-23-47-26-846
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-2-013747046745/sagemaker-scikit-learn-2022-06-12-23-47-26-846/input/input-1/train_10k.csv', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-2-013747046745/sagemaker-scikit-learn-2022-06-12-23-47-26-846/input/code/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'output-1', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-2-013747046745/sagemaker-scikit-learn-2022-06-12-23-47-26-846/output/output-1', 'LocalPath': '/opt/ml/processing/output/train'

In [26]:
# Confirm that the output dataset files were written to S3.

s3_client = boto3.client("s3")
default_bucket = sagemaker.Session().default_bucket()
for i in range(1, 4):
    prefix = s3_client.list_objects(
        Bucket=default_bucket, Prefix=job_name + "/output/output-" + str(i) + "/"
    )["Contents"][0]["Key"]
    print("s3://" + default_bucket + "/" + prefix)


s3://sagemaker-us-east-2-013747046745/sagemaker-scikit-learn-2022-06-12-23-47-26-846/output/output-1/train.csv
s3://sagemaker-us-east-2-013747046745/sagemaker-scikit-learn-2022-06-12-23-47-26-846/output/output-2/validation.csv
s3://sagemaker-us-east-2-013747046745/sagemaker-scikit-learn-2022-06-12-23-47-26-846/output/output-3/test.csv
