### Assigment 2.2. Data Parallelism: Distributed Preprocessing

* <span style="padding:4px; background-color: #f2a68a; color: #000;"><strong>Assignment 2.2</strong></span> For this assignment we want to run a distributed Processing Job across multiple instances to capitalize the `island` column of the a dataset. Your dataset will consist of 10 different files stored in S3. Set up a Processing Job using 2 instances. When specifying the input to the Processing Job, you need to set the `ProcessingInput.s3_data_distribution_type` attribute to `ShardedByS3Key`. By doing this, SageMaker will run a cluster with two instances simultaneously, each with access to half the files.

**Steps**:

1. Split the data in 10 files
2. Creating the processing script
3. Create the Processor & ProcessingStep
4. Run it


In [10]:
# Assignment 2.2. - Distributed Processing Job

import os
import sys
import tempfile

from io import StringIO
from pathlib import Path

import pandas as pd
import boto3
import sagemaker
from sagemaker.s3 import S3Uploader
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig


BUCKET = "mlschool-davidorti"

pipeline_session = PipelineSession(default_bucket=BUCKET)
role = sagemaker.get_execution_role()
pipeline_definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

S3_LOCATION = f"s3://{BUCKET}/penguins/assignments/assignment_22"
BASE_DIRECTORY = Path().resolve()
LOCAL_INPUT_DATA_FILEPATH = BASE_DIRECTORY.parent.parent / "data.csv"


# Split data in 10 files
def split_csv(input_filepath, output_directory):
    data = pd.read_csv(input_filepath)
    output_directory.mkdir(parents=True, exist_ok=True)
    
    i = 0
    while i < len(data):
        subset = data[i:i+35]
        subset.to_csv(f"{output_directory}/data_part_{i//35 + 1 :02}.csv", index=False)
        print(f"Saved {output_directory}/data_part_{i//35 + 1 :02}.csv")
        i += 35


if __name__ == "__main__":
    split_csv(LOCAL_INPUT_DATA_FILEPATH, BASE_DIRECTORY)
    S3Uploader.upload(
        local_path=BASE_DIRECTORY,
        desired_s3_uri=S3_LOCATION
    )

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
Saved /root/ml.school/penguins/assignments/assignment_22/data_part_01.csv
Saved /root/ml.school/penguins/assignments/assignment_22/data_part_02.csv
Saved /root/ml.school/penguins/assignments/assignment_22/data_part_03.csv
Saved /root/ml.school/penguins/assignments/assignment_22/data_part_04.csv
Saved /root/ml.school/penguins/assignments/assignment_22/data_part_05.csv
Saved /root/ml.school/penguins/assignments/assignment_22/data_part_06.csv
Saved /root/ml.school/penguins/assignments/assignment_22/data_part_07.csv
Saved /root/ml.school/penguins/assignments/assignment_22/data_part_08.csv
Saved /root/ml

In [67]:
# Check that contents are indeed uploaded to S3
s3 = boto3.client("s3")
response = s3.list_objects_v2(
    Bucket=BUCKET,
    Prefix="penguins/assignments/assignment_22/"
)
for obj in response.get("Contents", []):
    if obj["Key"].endswith(".csv") and "code" not in obj["Key"] and "output" not in obj["Key"]:
        print(obj["Key"])

penguins/assignments/assignment_22/./data_part_01.csv
penguins/assignments/assignment_22/./data_part_02.csv
penguins/assignments/assignment_22/./data_part_03.csv
penguins/assignments/assignment_22/./data_part_04.csv
penguins/assignments/assignment_22/./data_part_05.csv
penguins/assignments/assignment_22/./data_part_06.csv
penguins/assignments/assignment_22/./data_part_07.csv
penguins/assignments/assignment_22/./data_part_08.csv
penguins/assignments/assignment_22/./data_part_09.csv
penguins/assignments/assignment_22/./data_part_10.csv


In [68]:
# Step 2 - processing script

ASSIGNMENT_CODE_DIR = BASE_DIRECTORY / "code"
ASSIGNMENT_CODE_DIR.mkdir(parents=True, exist_ok=True)

if str(ASSIGNMENT_CODE_DIR) not in sys.path:
    sys.path.append(f"{ASSIGNMENT_CODE_DIR}")

In [70]:
%%writefile {ASSIGNMENT_CODE_DIR}/preprocessor_assignment_22.py

import pandas as pd
from pathlib import Path


def capitalize_island(dataframe):
    """Island column values to uppercase"""
    dataframe["island"] = dataframe["island"].str.upper()  # so it's different from default
    return dataframe


def preprocess(input_filepath, output_filepath):
    """Process a single CSV file and save to the specified output filepath"""
    data = pd.read_csv(input_filepath)
    processed_data = capitalize_island(data)
    processed_data.to_csv(output_filepath, index=False)
    print(f"Processed {input_filepath} and saved to {output_filepath}")


def process_all_files(input_dir, output_dir):
    """Process all CSV files in the input directory and save to the output directory"""
    input_dir_path = Path(input_dir)
    for filename in input_dir_path.iterdir():
        if filename.suffix == '.csv':  # only process .csv files
            input_filepath = input_dir_path / filename.name
            output_filepath = Path(output_dir) / filename.name
            preprocess(input_filepath, output_filepath)


if __name__ == "__main__":

    # default paths for SageMaker Processing
    BASE_DIRECTORY = Path("/opt/ml/processing")
    DATA_DIRECTORY = BASE_DIRECTORY / "input"
    OUTPUT_DIRECTORY = BASE_DIRECTORY / "output"
    
    process_all_files(DATA_DIRECTORY, OUTPUT_DIRECTORY)

Overwriting /root/ml.school/penguins/assignments/assignment_22/code/preprocessor_assignment_22.py


In [15]:
!ls code

preprocessor_assignment_22.py


In [71]:
# test the script
from preprocessor_assignment_22 import preprocess, process_all_files

with tempfile.TemporaryDirectory() as directory:
    process_all_files(
        input_dir=BASE_DIRECTORY,
        output_dir=directory
    )
    
    print(f"\nFolders: {os.listdir(directory)}\n")
    
    # check the last file
    data = pd.read_csv(os.path.join(directory, 'data_part_10.csv'))
    print(data.island[-5:])

Processed /root/ml.school/penguins/assignments/assignment_22/data_part_07.csv and saved to /tmp/tmpqnxb170d/data_part_07.csv
Processed /root/ml.school/penguins/assignments/assignment_22/data_part_03.csv and saved to /tmp/tmpqnxb170d/data_part_03.csv
Processed /root/ml.school/penguins/assignments/assignment_22/data_part_09.csv and saved to /tmp/tmpqnxb170d/data_part_09.csv
Processed /root/ml.school/penguins/assignments/assignment_22/data_part_10.csv and saved to /tmp/tmpqnxb170d/data_part_10.csv
Processed /root/ml.school/penguins/assignments/assignment_22/data_part_01.csv and saved to /tmp/tmpqnxb170d/data_part_01.csv
Processed /root/ml.school/penguins/assignments/assignment_22/data_part_05.csv and saved to /tmp/tmpqnxb170d/data_part_05.csv
Processed /root/ml.school/penguins/assignments/assignment_22/data_part_08.csv and saved to /tmp/tmpqnxb170d/data_part_08.csv
Processed /root/ml.school/penguins/assignments/assignment_22/data_part_04.csv and saved to /tmp/tmpqnxb170d/data_part_04.csv


In [84]:
# Step 3 - Define parameters, Processors, Processing Step and Pipeline

# parameter definition
data_location = ParameterString(
    name="data_location",
    default_value=f"{S3_LOCATION}"
)
preprocessed_data_location = ParameterString(
    name="preprocessed_data_location",
    default_value=f"{S3_LOCATION}/output"
)

cache_config = CacheConfig(
    enable_caching=True, 
    expire_after="15d"
)

# processor
processor = SKLearnProcessor(
    base_job_name="penguins-assignment_22-capitalize",
    framework_version="0.23-1",
    instance_type="ml.t3.medium",
    instance_count=2,
    role=role,
    sagemaker_session=pipeline_session
)

# inputs & outputs
inputs = [
    ProcessingInput(
        source=data_location,  # parameterized
        destination=f"/opt/ml/processing/input",
        s3_data_distribution_type="ShardedByS3Key"
    )
]
outputs = [
    ProcessingOutput(
        output_name=f"output",
        source=f"/opt/ml/processing/output",
        destination=preprocessed_data_location # parameterized
    )
]

# processing step
preprocess_data_step = ProcessingStep(
    name="assignment-22-preprocess-data",
    step_args=processor.run(
        code=f"{ASSIGNMENT_CODE_DIR}/preprocessor_assignment_22.py",
        inputs=inputs,
        outputs=outputs
    ),
    cache_config=cache_config
)

# pipeline
assignment_22_pipeline = Pipeline(
    name="assignment-22-pipeline",
    parameters=[data_location, preprocessed_data_location],
    steps=[preprocess_data_step],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=pipeline_session
)

assignment_22_pipeline.upsert(role_arn=role)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


{'PipelineArn': 'arn:aws:sagemaker:eu-west-1:833724363691:pipeline/assignment-22-pipeline',
 'ResponseMetadata': {'RequestId': '459e951c-fee3-48aa-b5a1-b8be64eb8a0d',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '459e951c-fee3-48aa-b5a1-b8be64eb8a0d',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '90',
   'date': 'Tue, 26 Sep 2023 17:42:48 GMT'},
  'RetryAttempts': 0}}

In [85]:
# run the pipeline
#assignment_22_pipeline.start()

_PipelineExecution(arn='arn:aws:sagemaker:eu-west-1:833724363691:pipeline/assignment-22-pipeline/execution/06pk4e9mmil1', sagemaker_session=<sagemaker.workflow.pipeline_context.PipelineSession object at 0x7fa96c8060d0>)

In [72]:
# Check that outputs are indeed uploaded to S3
s3 = boto3.client("s3")
contents = s3.list_objects_v2(Bucket=BUCKET, Prefix="penguins/assignments/assignment_22/output").get("Contents", [])
for obj in contents:
    print(obj["Key"])

# check how the last one looks
obj = s3.get_object(Bucket=BUCKET, Key=obj["Key"])
csv_data = obj['Body'].read().decode('utf-8')
data = pd.read_csv(StringIO(csv_data))

data.tail(2)

penguins/assignments/assignment_22/output/data_part_01.csv
penguins/assignments/assignment_22/output/data_part_02.csv
penguins/assignments/assignment_22/output/data_part_03.csv
penguins/assignments/assignment_22/output/data_part_04.csv
penguins/assignments/assignment_22/output/data_part_05.csv
penguins/assignments/assignment_22/output/data_part_06.csv
penguins/assignments/assignment_22/output/data_part_07.csv
penguins/assignments/assignment_22/output/data_part_08.csv
penguins/assignments/assignment_22/output/data_part_09.csv
penguins/assignments/assignment_22/output/data_part_10.csv


Unnamed: 0,species,island,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g,sex
27,Gentoo,BISCOE,45.2,14.8,212.0,5200.0,FEMALE
28,Gentoo,BISCOE,49.9,16.1,213.0,5400.0,MALE
