As a pre-step, the kaggle dataset for the competition 'Feedback Prize - Evaluating Student Writing' has been uploaded to a S3 bucket. This will be the union of the different steps: Preprocessing, Training, Deployment.

In [1]:
from botocore.exceptions import ClientError

import os
import sagemaker
import logging
import boto3
import sagemaker
import pandas as pd

bucket = 'kaggle-writing-student'
key_dataset = 'dataset'
output_processed_data = 'processed_dataset'


sess = sagemaker.Session()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

sm = boto3.Session().client(service_name="sagemaker", region_name=region)

# Experiment and trial creation

First, we create an experiment and trial to track and compare our pipeline runs

In [2]:
%pip install sagemaker-experiments

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
You should consider upgrading via the '/opt/conda/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [5]:
import time

timestamp = int(time.time())

pipeline_name = "Student-Writing-pipeline-{}".format(timestamp)
pipeline_name

'Student-Writing-pipeline-1645642129'

In [6]:
%store pipeline_name

Stored 'pipeline_name' (str)


In [7]:
from smexperiments.experiment import Experiment

pipeline_experiment = Experiment.create(
    experiment_name=pipeline_name,
    description="Student Writing Bayes Pipeline Experiment",
    sagemaker_boto_client=sm,
)

pipeline_experiment_name = pipeline_experiment.experiment_name
print("Pipeline experiment name: {}".format(pipeline_experiment_name))

Pipeline experiment name: Student-Writing-pipeline-1645642129


In [8]:
%store pipeline_experiment_name

Stored 'pipeline_experiment_name' (str)


In [9]:
from smexperiments.trial import Trial

pipeline_trial = Trial.create(
    trial_name="trial-{}".format(timestamp), experiment_name=pipeline_experiment_name, sagemaker_boto_client=sm
)

pipeline_trial_name = pipeline_trial.trial_name
print("Trial name: {}".format(pipeline_trial_name))

Trial name: trial-1645642129


In [10]:
%store pipeline_trial_name

Stored 'pipeline_trial_name' (str)


# Pipeline building

In [11]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

## Experiment parameters

In [12]:
exp_name = ParameterString(
    name="ExperimentName",
    default_value=pipeline_experiment_name,
)
exp_name

ParameterString(name='ExperimentName', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='Student-Writing-pipeline-1645642129')

## Processing Step Parameters

In this step, the dataset is split between training/validation/test sets at a percentage of 80/10/10

In [13]:
raw_input_data_s3_uri = "s3://{}/{}/".format(bucket, key_dataset)

student_writing_output_data_s3_uri = "s3://{}/{}/".format(bucket, output_processed_data)

print(raw_input_data_s3_uri, student_writing_output_data_s3_uri)

s3://kaggle-writing-student/dataset/ s3://kaggle-writing-student/processed_dataset/


In [14]:
!aws s3 ls $raw_input_data_s3_uri

                           PRE test/
                           PRE train/
2022-02-22 10:34:10        101 sample_submission.csv
2022-02-22 10:34:10   67465475 train.csv


In [15]:
input_data = ParameterString(
    name="InputData",
    default_value=raw_input_data_s3_uri,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.large")

train_split_percentage = ParameterFloat(
    name="TrainSplitPercentage",
    default_value=0.8,
)

validation_split_percentage = ParameterFloat(
    name="ValidationSplitPercentage",
    default_value=0.1,
)

test_split_percentage = ParameterFloat(
    name="TestSplitPercentage",
    default_value=0.1,
)

feature_store_offline_prefix = ParameterString(
    name="FeatureStoreOfflinePrefix",
    default_value="students-discourse-feature-store-" + str(timestamp),
)

feature_group_name = ParameterString(name="FeatureGroupName", default_value="students-discourse-feature-group-" + str(timestamp))

In [16]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

In [17]:
from sagemaker.sklearn.processing import SKLearnProcessor

In [18]:
processing_inputs = [
    ProcessingInput(
        source=raw_input_data_s3_uri,
        destination='/opt/ml/processing/input_data/')
]

processing_outputs = [
    ProcessingOutput(
        output_name='processed_data',
        source='/opt/ml/processing/processed_data/',
        destination=student_writing_output_data_s3_uri,
        s3_upload_mode='EndOfJob',
        )
]

processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={"AWS_DEFAULT_REGION": region},
    max_runtime_in_seconds=1800,
)

processing_step = ProcessingStep(
    name="Processing",
    code="processing_script.py",
    processor=processor,
    inputs=processing_inputs,
    outputs=processing_outputs,
    job_arguments=[
        "train-split-percentage",
        str(train_split_percentage),
        "validation-split-percentage",
        str(validation_split_percentage),
        "test-split-percentage",
        str(test_split_percentage),
    ],
)

print(processing_step)


INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


ProcessingStep(name='Processing', display_name=None, description=None, step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)


Also, take note the `"processed-data"` named channels specified in the output configuration for the processing job. Such step `Properties` can be used in subsequent steps and will resolve to their runtime values at execution. In particular, we'll call out this usage when we define our training step.

# Training step Parameters

In [19]:
%store -r student_writing_output_data_s3_uri
student_writing_output_data_s3_uri

's3://kaggle-writing-student/processed_dataset/'

In [20]:
train_uri = os.path.join(student_writing_output_data_s3_uri, 'train.csv')

In [21]:
train_uri

's3://kaggle-writing-student/processed_dataset/train.csv'

In [22]:
train_instance_type = ParameterString(name="TrainInstanceType", default_value="ml.m5.large")

train_instance_count = ParameterInteger(name="TrainInstanceCount", default_value=1)

In [23]:
alpha = ParameterFloat(name="Alpha", default_value=1.0)

fit_prior = ParameterString(name="FitPrior", default_value="True")

In [27]:
from sagemaker.sklearn import SKLearn

estimator = SKLearn(
    entry_point="train_script.py",
    framework_version="0.23-1",
    instance_count=train_instance_count,
    instance_type=train_instance_type,
    role=role,
#     hyperparameters={"alpha": alpha, 
#                      "fit_prior": fit_prior},
)

INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.


In [28]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

training_step = TrainingStep(
    name="Train",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=train_uri,
            content_type="text/csv",
        ),
    },
)

print(training_step)

TrainingStep(name='Train', display_name=None, description=None, step_type=<StepTypeEnum.TRAINING: 'Training'>, depends_on=None)


# Define a Pipeline

In [29]:
%store -r pipeline_name

In [30]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        processing_instance_count,
        processing_instance_type,
        train_split_percentage,
        validation_split_percentage,
        test_split_percentage,
#         feature_store_offline_prefix,
#         feature_group_name,
        train_instance_type,
        train_instance_count,
#         alpha,
#         fit_prior,
        
    ],
    steps=[processing_step, training_step],
    sagemaker_session=sess,
)

In [None]:
import json
from pprint import pprint

definition = json.loads(pipeline.definition())

pprint(definition)

In [32]:
print(pipeline_experiment_name)

Student-Writing-pipeline-1645642129


# Create pipeline

In [None]:
response = pipeline.create(role_arn=role)

pipeline_arn = response["PipelineArn"]
print(pipeline_arn)

# Execute

In [None]:
execution = pipeline.start()
print(execution.arn)

In [None]:
from pprint import pprint

execution_run = execution.describe()
pprint(execution_run)

In [None]:
import time

# Giving the first step time to start up
time.sleep(30)

execution.list_steps()

In [None]:
%store -r pipeline_name

In [None]:
%%time

import time
from pprint import pprint

executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)["PipelineExecutionSummaries"]
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
print(pipeline_execution_status)

while pipeline_execution_status == "Executing":
    try:
        executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)["PipelineExecutionSummaries"]
        pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
    #        print('Executions for our pipeline...')
    #        print(pipeline_execution_status)
    except Exception as e:
        print("Please wait...")
        time.sleep(30)

pprint(executions_response)

In [39]:
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
print(pipeline_execution_status)

Succeeded


In [None]:
pipeline_execution_arn = executions_response[0]["PipelineExecutionArn"]
print(pipeline_execution_arn)

In [None]:
from pprint import pprint

steps = sm.list_pipeline_execution_steps(PipelineExecutionArn=pipeline_execution_arn)

pprint(steps)

# Generated artifacts

In [43]:
processing_job_name = None
training_job_name = None

In [None]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())

for execution_step in reversed(steps["PipelineExecutionSteps"]):
    print(execution_step)
    # We are doing this because there appears to be a bug of this LineageTableVisualizer handling the Processing Step
    if execution_step["StepName"] == "Processing":
        processing_job_name = execution_step["Metadata"]["ProcessingJob"]["Arn"].split("/")[-1]
        print(processing_job_name)
        display(viz.show(processing_job_name=processing_job_name))
    elif execution_step["StepName"] == "Train":
        training_job_name = execution_step["Metadata"]["TrainingJob"]["Arn"].split("/")[-1]
        print(training_job_name)
        display(viz.show(training_job_name=training_job_name))
    else:
        display(viz.show(pipeline_execution_step=execution_step))
        time.sleep(5)