## Fake News Detection


### Building a SageMaker Pipeline to train and deploy a Roberta Model for Fake News Detection 

#### Introduction

I designed and implemented an end-to-end, fully automated workflow on AWS that takes in raw news articles, cleans and balances the data, and trains a RoBERTa “fake news” classifier. Once the model is trained and tested, the pipeline evaluates its accuracy and, if it meets our quality standards, automatically packages and registers it for deployment. The registered model will be deployed to sagemaker endpoint after human approval. This approach eliminates manual hand-offs between data preparation, model training, evaluation, and model registration—so updates happen faster, more reliably, and with consistent quality.

In [None]:
import os
import boto3
import sagemaker
import pandas as pd
import botocore
import time, json
from pprint import pprint
from IPython.core.display import display, HTML

%matplotlib inline
%config InlineBackend.figure_format='retina'

config = botocore.config.Config(user_agent_extra='fake-news-detection-pipeline')

# low-level service client of the boto3 session
sm = boto3.client(service_name='sagemaker', 
                  config=config)

sm_runtime = boto3.client('sagemaker-runtime',
                          config=config)

sess = sagemaker.Session(sagemaker_client=sm,
                         sagemaker_runtime_client=sm_runtime)

bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = sess.boto_region_name

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


  from IPython.core.display import display, HTML


## Ingest and transform the public dataset

In [3]:
raw_input_data_s3_uri = 's3://datasets-123/fakenews'
print(raw_input_data_s3_uri)

s3://datasets-123/fakenews


In [4]:
!aws s3 ls $raw_input_data_s3_uri

                           PRE fakenews/


In [5]:
!aws s3 cp $raw_input_data_s3_uri'/train.csv' ./

download: s3://datasets-123/fakenews/train.csv to ./train.csv     


In [6]:
df = pd.read_csv('train.csv')
df

Unnamed: 0,id,title,author,text,label
0,0,House Dem Aide: We Didn’t Even See Comey’s Let...,Darrell Lucus,House Dem Aide: We Didn’t Even See Comey’s Let...,1
1,1,"FLYNN: Hillary Clinton, Big Woman on Campus - ...",Daniel J. Flynn,Ever get the feeling your life circles the rou...,0
2,2,Why the Truth Might Get You Fired,Consortiumnews.com,"Why the Truth Might Get You Fired October 29, ...",1
3,3,15 Civilians Killed In Single US Airstrike Hav...,Jessica Purkiss,Videos 15 Civilians Killed In Single US Airstr...,1
4,4,Iranian woman jailed for fictional unpublished...,Howard Portnoy,Print \nAn Iranian woman has been sentenced to...,1
...,...,...,...,...,...
20795,20795,Rapper T.I.: Trump a ’Poster Child For White S...,Jerome Hudson,Rapper T. I. unloaded on black celebrities who...,0
20796,20796,"N.F.L. Playoffs: Schedule, Matchups and Odds -...",Benjamin Hoffman,When the Green Bay Packers lost to the Washing...,0
20797,20797,Macy’s Is Said to Receive Takeover Approach by...,Michael J. de la Merced and Rachel Abrams,The Macy’s of today grew from the union of sev...,0
20798,20798,"NATO, Russia To Hold Parallel Exercises In Bal...",Alex Ansary,"NATO, Russia To Hold Parallel Exercises In Bal...",1


In [7]:
df.astype(object).describe()

Unnamed: 0,id,title,author,text,label
count,20800,20242,18843,20761.0,20800
unique,20800,19803,4201,20386.0,2
top,20760,Get Ready For Civil Unrest: Survey Finds That ...,Pam Key,,1
freq,1,5,243,75.0,10413


In [8]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20800 entries, 0 to 20799
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   id      20800 non-null  int64 
 1   title   20242 non-null  object
 2   author  18843 non-null  object
 3   text    20761 non-null  object
 4   label   20800 non-null  int64 
dtypes: int64(2), object(3)
memory usage: 812.6+ KB


In [9]:
df.isna().sum()

id           0
title      558
author    1957
text        39
label        0
dtype: int64

In [10]:
df = df[df.text.isna()==False]

In [11]:
df['label'].value_counts()

label
0    10387
1    10374
Name: count, dtype: int64

**0 : reliable News, 1: unreliable News**

In [90]:
df['news_body'] = "AUTHOR: " + df.author + "NEWS_TITLE: " + df.title + "NEWS_TEXT: " +df.text
df['news_body'].isna().sum()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['news_body'] = "AUTHOR: " + df.author + "NEWS_TITLE: " + df.title + "NEWS_TEXT: " +df.text


0

In [92]:
print(df['news_body'].iloc[0])

AUTHOR: Darrell LucusNEWS_TITLE: House Dem Aide: We Didn’t Even See Comey’s Letter Until Jason Chaffetz Tweeted ItNEWS_TEXT: House Dem Aide: We Didn’t Even See Comey’s Letter Until Jason Chaffetz Tweeted It By Darrell Lucus on October 30, 2016 Subscribe Jason Chaffetz on the stump in American Fork, Utah ( image courtesy Michael Jolley, available under a Creative Commons-BY license) 
With apologies to Keith Olbermann, there is no doubt who the Worst Person in The World is this week–FBI Director James Comey. But according to a House Democratic aide, it looks like we also know who the second-worst person is as well. It turns out that when Comey sent his now-infamous letter announcing that the FBI was looking into emails that may be related to Hillary Clinton’s email server, the ranking Democrats on the relevant committees didn’t hear about it from Comey. They found out via a tweet from one of the Republican committee chairmen. 
As we now know, Comey notified the Republican chairmen and De

In [13]:
def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == 'object':
            data_frame[label] = data_frame[label].astype("str").astype("string")
    return data_frame

df = cast_object_to_string(df)

df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 20761 entries, 0 to 20799
Data columns (total 6 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   id         20761 non-null  int64 
 1   title      20761 non-null  string
 2   author     20761 non-null  string
 3   text       20761 non-null  string
 4   label      20761 non-null  int64 
 5   news_body  20761 non-null  string
dtypes: int64(2), string(4)
memory usage: 1.1 MB


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data_frame[label] = data_frame[label].astype("str").astype("string")
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data_frame[label] = data_frame[label].astype("str").astype("string")
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data_frame[label] = data_frame[label].astype("str").astype("string"

In [14]:
import re

def clean_text(text):
    # Replace non-compliant characters
    return re.sub(r'[^a-zA-Z0-9 .,!?\'"]', '', text) if isinstance(text, str) else text


problematic_rows = df.iloc[12450:18673]
print(problematic_rows['news_body'].head())

# Clean the problematic rows
problematic_rows['news_body'] = problematic_rows['news_body'].apply(clean_text)

# Retry ingestion for the cleaned rows
# feature_group.ingest(data_frame=problematic_rows)


12477    AUTHOR: 0                                    D...
12478    AUTHOR: 0                                    D...
12479    AUTHOR: 0                                    D...
12480    AUTHOR: 0                                    D...
12481    AUTHOR: 0                                    D...
Name: news_body, dtype: string


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  problematic_rows['news_body'] = problematic_rows['news_body'].apply(clean_text)


In [15]:
df.groupby('author')['label'].count().sort_values(ascending=False)[:20]

author
nan                    1918
Pam Key                 243
admin                   193
Jerome Hudson           166
Charlie Spiering        141
John Hayward            140
Katherine Rodriguez     124
Warner Todd Huston      122
Ian Hanchett            119
Breitbart News          118
Daniel Nussbaum         112
AWR Hawkins             107
Jeff Poor               107
Joel B. Pollak          106
Trent Baker             102
Breitbart London         97
Bob Price                93
Ben Kew                  90
Charlie Nash             88
Pakalert                 86
Name: label, dtype: int64

# Configure the dataset and processing step

In [16]:
import time
timestamp = int(time.time())

pipeline_name = 'fakenews-detection-pipeline-{}'.format(timestamp)

### Configure processing step


In [17]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

In [18]:
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.c5.2xlarge"
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)

train_split_percentage = ParameterFloat(
    name="TrainSplitPercentage",
    default_value=0.90,
)

validation_split_percentage = ParameterFloat(
    name="ValidationSplitPercentage",
    default_value=0.05,
)

test_split_percentage = ParameterFloat(
    name="TestSplitPercentage",
    default_value=0.05,
)

balance_dataset = ParameterString(
    name="BalanceDataset",
    default_value="True",
)

max_seq_length = ParameterInteger(
    name="MaxSeqLength",
    default_value=128,
)

feature_store_offline_prefix = ParameterString(
    name="FeatureStoreOfflinePrefix",
    default_value="news-feature-store-" + str(timestamp),
)

feature_group_name = ParameterString(
    name="FeatureGroupName",
    default_value="news-feature-group-" + str(timestamp)
)

input_data = ParameterString(
    name="InputData",
    default_value=raw_input_data_s3_uri,
)

In [19]:
from sagemaker.sklearn.processing import SKLearnProcessor

processor = SKLearnProcessor(
    framework_version='1.2-1',
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={'AWS_DEFAULT_REGION': region},                             
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is interpreted in pipeline execution time only. As the function needs to evaluate the argument value in SDK compile time, the default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


###  Setup pipeline step caching
Step signature caching allows SageMaker Pipelines, before executing a step, to find a previous execution of a step that was called using the same arguments. Cache hit gets created if the previous execution is found. Then during execution instead of recomputing the step, pipelines propagates the values from the cache hit.

In [20]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="PT12H") # PT1H represents `three hour`

In [21]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processing_inputs=[
    ProcessingInput(
        input_name='raw-input-data',
        source=input_data,
        destination='/opt/ml/processing/input/data/',
        s3_data_distribution_type='ShardedByS3Key'
    )
]

processing_outputs=[
    ProcessingOutput(output_name='sentiment-train',
                     source='/opt/ml/processing/output/sentiment/train',
                     s3_upload_mode='EndOfJob'),
    ProcessingOutput(output_name='sentiment-validation',
                     source='/opt/ml/processing/output/sentiment/validation',
                     s3_upload_mode='EndOfJob'),
    ProcessingOutput(output_name='sentiment-test',
                     source='/opt/ml/processing/output/sentiment/test',
                     s3_upload_mode='EndOfJob')
]        

processing_step = ProcessingStep(
    name='Processing', 
    code='src/prepare_data.py',
    processor=processor,
    inputs=processing_inputs,
    outputs=processing_outputs,
    job_arguments=['--train-split-percentage', str(train_split_percentage.default_value),                   
                   '--validation-split-percentage', str(validation_split_percentage.default_value),
                   '--test-split-percentage', str(test_split_percentage.default_value),
                   '--balance-dataset', str(balance_dataset.default_value),
                   '--max-seq-length', str(max_seq_length.default_value),                   
                   '--feature-store-offline-prefix', str(feature_store_offline_prefix.default_value),
                   '--feature-group-name', str(feature_group_name.default_value)
                  ],
    cache_config=cache_config
)        

print(processing_step)

<sagemaker.workflow.steps.ProcessingStep object at 0x7efe356a3110>


In [22]:
processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-train'].__dict__, 

({'_step': <sagemaker.workflow.steps.ProcessingStep at 0x7efe356a3110>,
  'step_name': 'Processing',
  'path': "ProcessingOutputConfig.Outputs['sentiment-train']",
  '_shape_names': ['ProcessingOutput'],
  'OutputName': {'_step': <sagemaker.workflow.steps.ProcessingStep object at 0x7efe356a3110>, 'step_name': 'Processing', 'path': "ProcessingOutputConfig.Outputs['sentiment-train'].OutputName", '_shape_names': ['String'], '__str__': 'String'},
  'S3Output': {'_step': <sagemaker.workflow.steps.ProcessingStep object at 0x7efe356a3110>, 'step_name': 'Processing', 'path': "ProcessingOutputConfig.Outputs['sentiment-train'].S3Output", '_shape_names': ['ProcessingS3Output'], 'S3Uri': {'_step': <sagemaker.workflow.steps.ProcessingStep object at 0x7efe356a3110>, 'step_name': 'Processing', 'path': "ProcessingOutputConfig.Outputs['sentiment-train'].S3Output.S3Uri", '_shape_names': ['S3Uri'], '__str__': 'S3Uri'}, 'LocalPath': {'_step': <sagemaker.workflow.steps.ProcessingStep object at 0x7efe356a31

In [23]:
processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-train'].S3Output.S3Uri.__dict__,

({'_step': <sagemaker.workflow.steps.ProcessingStep at 0x7efe356a3110>,
  'step_name': 'Processing',
  'path': "ProcessingOutputConfig.Outputs['sentiment-train'].S3Output.S3Uri",
  '_shape_names': ['S3Uri'],
  '__str__': 'S3Uri'},)

In [24]:
processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-test'].S3Output.S3Uri.__dict__,

({'_step': <sagemaker.workflow.steps.ProcessingStep at 0x7efe356a3110>,
  'step_name': 'Processing',
  'path': "ProcessingOutputConfig.Outputs['sentiment-test'].S3Output.S3Uri",
  '_shape_names': ['S3Uri'],
  '__str__': 'S3Uri'},)

In [25]:
processing_step.arguments.keys()

Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.


dict_keys(['ProcessingResources', 'AppSpecification', 'RoleArn', 'ProcessingInputs', 'ProcessingOutputConfig', 'Environment'])

In [26]:
processing_step.arguments['ProcessingInputs']

Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.


[{'InputName': 'raw-input-data',
  'AppManaged': False,
  'S3Input': {'S3Uri': ParameterString(name='InputData', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://datasets-123/fakenews'),
   'LocalPath': '/opt/ml/processing/input/data/',
   'S3DataType': 'S3Prefix',
   'S3InputMode': 'File',
   'S3DataDistributionType': 'ShardedByS3Key',
   'S3CompressionType': 'None'}},
 {'InputName': 'code',
  'AppManaged': False,
  'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-904233098050/Processing-b53ac3acd70df657cff90233216b5e35/input/code/prepare_data.py',
   'LocalPath': '/opt/ml/processing/input/code',
   'S3DataType': 'S3Prefix',
   'S3InputMode': 'File',
   'S3DataDistributionType': 'FullyReplicated',
   'S3CompressionType': 'None'}}]

In [27]:
processing_step.arguments['ProcessingOutputConfig']

Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.


{'Outputs': [{'OutputName': 'sentiment-train',
   'AppManaged': False,
   'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-904233098050/Processing-b53ac3acd70df657cff90233216b5e35/output/sentiment-train',
    'LocalPath': '/opt/ml/processing/output/sentiment/train',
    'S3UploadMode': 'EndOfJob'}},
  {'OutputName': 'sentiment-validation',
   'AppManaged': False,
   'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-904233098050/Processing-b53ac3acd70df657cff90233216b5e35/output/sentiment-validation',
    'LocalPath': '/opt/ml/processing/output/sentiment/validation',
    'S3UploadMode': 'EndOfJob'}},
  {'OutputName': 'sentiment-test',
   'AppManaged': False,
   'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-904233098050/Processing-b53ac3acd70df657cff90233216b5e35/output/sentiment-test',
    'LocalPath': '/opt/ml/processing/output/sentiment/test',
    'S3UploadMode': 'EndOfJob'}}]}

# Configure training step

### Define parameters

In [28]:
freeze_bert_layer = ParameterString(
    name="FreezeBertLayer",
    default_value="False",
)

epochs = ParameterInteger(
    name="Epochs",
    default_value=3
)
    
learning_rate = ParameterFloat(
    name="LearningRate",
    default_value=0.00001
) 
    
train_batch_size = ParameterInteger(
    name="TrainBatchSize",
    default_value=64
)

train_steps_per_epoch = ParameterInteger(
    name="TrainStepsPerEpoch",
    default_value=50
)

validation_batch_size = ParameterInteger(
    name="ValidationBatchSize",
    default_value=64
)

validation_steps_per_epoch = ParameterInteger(
    name="ValidationStepsPerEpoch",
    default_value=50
)

seed = ParameterInteger(
    name="Seed",
    default_value=42
)

run_validation = ParameterString(
    name="RunValidation",
    default_value="True",
)

train_instance_count = ParameterInteger(
    name="TrainInstanceCount",
    default_value=1
)

train_instance_type = ParameterString(
    name="TrainInstanceType",
    default_value="ml.c5.9xlarge"
)

train_volume_size = ParameterInteger(
    name="TrainVolumeSize",
    default_value=256
) 

input_mode = ParameterString(
    name="InputMode",
    default_value="File",
)

### Configure hyper-parameters

In [29]:
hyperparameters={
    'max_seq_length': max_seq_length,
    'freeze_bert_layer': freeze_bert_layer,
    'epochs': epochs,
    'learning_rate': learning_rate,
    'train_batch_size': train_batch_size,
    'train_steps_per_epoch': train_steps_per_epoch,
    'validation_batch_size': validation_batch_size,
    'validation_steps_per_epoch': validation_steps_per_epoch,
    'seed': seed,
    'run_validation': run_validation
}

### Configure model-evaluation metrics

In [30]:
metric_definitions = [
     {'Name': 'validation:loss', 'Regex': 'val_loss: ([0-9.]+)'},
     {'Name': 'validation:accuracy', 'Regex': 'val_acc: ([0-9.]+)'},
]

### Configure the `PyTorchEstimator`

In [31]:
from sagemaker.pytorch import PyTorch as PyTorchEstimator

estimator = PyTorchEstimator(
    entry_point='train.py',
    source_dir='src',
    role=role,
    instance_count=train_instance_count,
    instance_type=train_instance_type,
    volume_size=train_volume_size,
    py_version='py311',
    framework_version='2.3.0',
    hyperparameters=hyperparameters,
    metric_definitions=metric_definitions,
    input_mode=input_mode
)

### Configure the `TrainingStep`

In [32]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

training_step = TrainingStep(
    name='Train',
    estimator=estimator,
    inputs={
        'train': TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                'sentiment-train'
            ].S3Output.S3Uri,
            content_type='text/csv'
        ),
        'validation': TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                'sentiment-validation'
            ].S3Output.S3Uri,
            content_type='text/csv'
        )
    },
    cache_config=cache_config
)

print(training_step)

<sagemaker.workflow.steps.TrainingStep object at 0x7efe358c1090>




# Configure model-evaluation step

The evaluation script performs the following steps:
* loads in the model
* reads in the test data
* issues a bunch of predictions against the test data
* builds a classification report, including accuracy
* saves the evaluation report to the evaluation directory

In [33]:
from sagemaker.sklearn.processing import SKLearnProcessor

evaluation_processor = SKLearnProcessor(
    framework_version='1.2-1',
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={'AWS_DEFAULT_REGION': region},
    max_runtime_in_seconds=7200
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is interpreted in pipeline execution time only. As the function needs to evaluate the argument value in SDK compile time, the default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


In [34]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name='EvaluationReport',
    output_name='metrics',
    path='evaluation.json'
)

In [35]:
from sagemaker.processing import ProcessingInput, ProcessingOutput

evaluation_step = ProcessingStep(
    name='EvaluateModel',
    processor=evaluation_processor,
    code='src/evaluate_model_metrics.py',
    inputs=[
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination='/opt/ml/processing/input/model'
        ),
        ProcessingInput(
            source=processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-test'].S3Output.S3Uri,
            destination='/opt/ml/processing/input/data'
        )
    ],
    outputs=[
        ProcessingOutput(output_name='metrics', 
                         s3_upload_mode='EndOfJob',
                         source='/opt/ml/processing/output/metrics/'),
    ],
    job_arguments=[
        '--max-seq-length', str(max_seq_length.default_value),
    ],
    property_files=[evaluation_report],
    cache_config=cache_config
)

# Configure and register model step

### Configure the model for deployment

In [36]:
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

deploy_instance_type = ParameterString(
    name="DeployInstanceType",
    default_value="ml.m5.large"
)

deploy_instance_count = ParameterInteger(
    name="DeployInstanceCount",
    default_value=1
)

In [37]:
model_package_group_name = f"BERT-News-Detection-{timestamp}"

print(model_package_group_name)

BERT-News-Detection-1732989616


In [38]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics 

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)

print(model_metrics)

Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.


<sagemaker.model_metrics.ModelMetrics object at 0x7efe354d53d0>


In [39]:
inference_image_uri = sagemaker.image_uris.retrieve(
    framework="pytorch",
    region=region,
    version="2.3.0",
    py_version="py311",
    instance_type=deploy_instance_type,
    image_scope="inference"
)
print(inference_image_uri)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is interpreted in pipeline execution time only. As the function needs to evaluate the argument value in SDK compile time, the default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:2.3.0-cpu-py311


### Register the model for deployment

In [40]:
from sagemaker.workflow.step_collections import RegisterModel

register_step = RegisterModel(
    name="RegisterModel",
    estimator=estimator,
    image_uri=inference_image_uri,  
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/jsonlines"],
    response_types=["application/jsonlines"],
    inference_instances=[deploy_instance_type],
    transform_instances=[deploy_instance_type], # batch transform is not used in this lab
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics           
)

# Create model for deployment step

In [41]:
from sagemaker.model import Model

model_name = 'bert-model-{}'.format(timestamp)

model = Model(
    name=model_name,
    image_uri=inference_image_uri,
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=role,
)

In [42]:
from sagemaker.inputs import CreateModelInput

create_inputs = CreateModelInput(
    instance_type=deploy_instance_type, 
)

In [43]:
from sagemaker.workflow.steps import CreateModelStep

create_step = CreateModelStep(
    name="CreateModel",
    model=model,
    inputs=create_inputs, 
)

# Check accuracy condition step

In [44]:
min_accuracy_value = ParameterFloat(
    name="MinAccuracyValue",
    default_value=0.33 # random choice from three classes
)

In [45]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)

minimum_accuracy_condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=evaluation_step,
        property_file=evaluation_report,
        json_path="metrics.accuracy.value",
    ),
    right=min_accuracy_value # minimum accuracy threshold
)

minimum_accuracy_condition_step = ConditionStep(
    name="AccuracyCondition",
    conditions=[minimum_accuracy_condition],
    if_steps=[register_step, create_step], # successfully exceeded or equaled the minimum accuracy, continue with model registration
    else_steps=[], # did not exceed the minimum accuracy, the model will not be registered
)

The class JsonGet has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


# Create pipeline

### Define a pipeline of parameters, steps, and conditions

In [46]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        processing_instance_count,
        processing_instance_type,
        max_seq_length,
        balance_dataset,
        train_split_percentage,
        validation_split_percentage,
        test_split_percentage,
        feature_store_offline_prefix,
        feature_group_name,
        epochs,
        learning_rate,
        train_batch_size,
        train_steps_per_epoch,
        validation_batch_size,
        validation_steps_per_epoch,
        freeze_bert_layer,
        seed,
        train_instance_count,
        train_instance_type,
        train_volume_size,        
        input_mode,
        run_validation,
        min_accuracy_value,
        model_approval_status,
        deploy_instance_type,
        deploy_instance_count
    ],
    steps=[processing_step, training_step, evaluation_step, minimum_accuracy_condition_step],
    sagemaker_session=sess,
)

In [47]:
import json
from pprint import pprint

definition = json.loads(pipeline.definition())

pprint(definition)

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


{'Metadata': {},
 'Parameters': [{'DefaultValue': 's3://datasets-123/fakenews',
                 'Name': 'InputData',
                 'Type': 'String'},
                {'DefaultValue': 1,
                 'Name': 'ProcessingInstanceCount',
                 'Type': 'Integer'},
                {'DefaultValue': 'ml.c5.2xlarge',
                 'Name': 'ProcessingInstanceType',
                 'Type': 'String'},
                {'DefaultValue': 128,
                 'Name': 'MaxSeqLength',
                 'Type': 'Integer'},
                {'DefaultValue': 'True',
                 'Name': 'BalanceDataset',
                 'Type': 'String'},
                {'DefaultValue': 0.9,
                 'Name': 'TrainSplitPercentage',
                 'Type': 'Float'},
                {'DefaultValue': 0.05,
                 'Name': 'ValidationSplitPercentage',
                 'Type': 'Float'},
                {'DefaultValue': 0.05,
                 'Name': 'TestSplitPercentage',
           

In [48]:
response = pipeline.create(role_arn=role)

pipeline_arn = response["PipelineArn"]
print(pipeline_arn)

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


arn:aws:sagemaker:us-east-1:904233098050:pipeline/fakenews-detection-pipeline-1732989616


### Start Pipeline

In [49]:
execution = pipeline.start(
    parameters=dict(
        InputData=raw_input_data_s3_uri,
        ProcessingInstanceCount=1,
        ProcessingInstanceType='ml.c5.2xlarge',
        MaxSeqLength=128,
        BalanceDataset='True',
        TrainSplitPercentage=0.9,
        ValidationSplitPercentage=0.05,
        TestSplitPercentage=0.05,
        FeatureStoreOfflinePrefix='news-feature-store-'+str(timestamp),
        FeatureGroupName='news-feature-group-'+str(timestamp),
        Epochs=3,
        LearningRate=0.000012,
        TrainBatchSize=64,
        TrainStepsPerEpoch=50,
        ValidationBatchSize=64,
        ValidationStepsPerEpoch=64,
        FreezeBertLayer='False',
        Seed=42,         
        TrainInstanceCount=1,
        TrainInstanceType='ml.c5.9xlarge',
        TrainVolumeSize=256,
        InputMode='File',
        RunValidation='True',
        MinAccuracyValue=0.01,
        ModelApprovalStatus='PendingManualApproval', 
        DeployInstanceType='ml.m5.large',
        DeployInstanceCount=1 
    )
)

print(execution.arn)

arn:aws:sagemaker:us-east-1:904233098050:pipeline/fakenews-detection-pipeline-1732989616/execution/ydpsdkuxnphw


### Wait for pipeline execution

In [50]:
from pprint import pprint

execution_run = execution.describe()
pprint(execution_run)

{'CreatedBy': {'DomainId': 'd-6sjqn4j0cmrw',
               'IamIdentity': {'Arn': 'arn:aws:sts::904233098050:assumed-role/AmazonSageMaker-ExecutionRole-20240820T002537/SageMaker',
                               'PrincipalId': 'AROA5FCD6DNBJR5FULEGA:SageMaker'},
               'UserProfileArn': 'arn:aws:sagemaker:us-east-1:904233098050:user-profile/d-6sjqn4j0cmrw/default-20240820T002536',
               'UserProfileName': 'default-20240820T002536'},
 'CreationTime': datetime.datetime(2024, 11, 30, 18, 0, 19, 862000, tzinfo=tzlocal()),
 'LastModifiedBy': {'DomainId': 'd-6sjqn4j0cmrw',
                    'IamIdentity': {'Arn': 'arn:aws:sts::904233098050:assumed-role/AmazonSageMaker-ExecutionRole-20240820T002537/SageMaker',
                                    'PrincipalId': 'AROA5FCD6DNBJR5FULEGA:SageMaker'},
                    'UserProfileArn': 'arn:aws:sagemaker:us-east-1:904233098050:user-profile/d-6sjqn4j0cmrw/default-20240820T002536',
                    'UserProfileName': 'default

In [51]:
execution_run_name = execution_run['PipelineExecutionDisplayName']
print(execution_run_name)

execution-1732989619917


In [52]:
pipeline_execution_arn = execution_run['PipelineExecutionArn']
print(pipeline_execution_arn)

arn:aws:sagemaker:us-east-1:904233098050:pipeline/fakenews-detection-pipeline-1732989616/execution/ydpsdkuxnphw


### Describe completed pipeline

In [53]:
import time

time.sleep(30)

execution.list_steps()

[{'StepName': 'Processing',
  'StartTime': datetime.datetime(2024, 11, 30, 18, 0, 20, 798000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:904233098050:processing-job/pipelines-ydpsdkuxnphw-Processing-2LmKkCPsTQ'}},
  'AttemptCount': 1}]

###  Wait for the pipeline to complete

### _This cell will take approximately 30-45 minutes to run._

In [66]:
%%time

import time
from pprint import pprint

sm = boto3.Session().client(service_name='sagemaker', region_name=region)

executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)['PipelineExecutionSummaries']
pipeline_execution_status = executions_response[0]['PipelineExecutionStatus']
print(pipeline_execution_status)

while pipeline_execution_status=='Executing':
    try:
        executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)['PipelineExecutionSummaries']
        pipeline_execution_status = executions_response[0]['PipelineExecutionStatus']
    except Exception as e:
        print('Please wait...')
        time.sleep(30)    
    
pprint(executions_response)

Succeeded
[{'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:904233098050:pipeline/fakenews-detection-pipeline-1732989616/execution/ydpsdkuxnphw',
  'PipelineExecutionDisplayName': 'execution-1732989619917',
  'PipelineExecutionStatus': 'Succeeded',
  'StartTime': datetime.datetime(2024, 11, 30, 18, 0, 19, 862000, tzinfo=tzlocal())}]
CPU times: user 298 ms, sys: 46.1 ms, total: 345 ms
Wall time: 633 ms


In [67]:
pipeline_execution_status = executions_response[0]['PipelineExecutionStatus']
print(pipeline_execution_status)

Succeeded


In [68]:
pipeline_execution_arn = executions_response[0]['PipelineExecutionArn']
print(pipeline_execution_arn)

arn:aws:sagemaker:us-east-1:904233098050:pipeline/fakenews-detection-pipeline-1732989616/execution/ydpsdkuxnphw


# Evaluate the model

### Describe evaluation metrics

In [69]:
processing_job_name = None

# pull the processing step name
for execution_step in reversed(execution.list_steps()):
    if execution_step['StepName'] == 'Processing':
        processing_job_name=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]

# get the description of the processing job
describe_transform_processing_job_response = sm.describe_processing_job(ProcessingJobName=processing_job_name)

# get the output S3 path
transform_output_s3_uri = describe_transform_processing_job_response['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
print('Transform output {}'.format(transform_output_s3_uri))

Transform output s3://sagemaker-us-east-1-904233098050/Processing-b53ac3acd70df657cff90233216b5e35/output/sentiment-train


In [70]:
# list the files in the resulting output S3 path
!aws s3 ls --recursive $transform_output_s3_uri

2024-11-30 18:10:42   38986490 Processing-b53ac3acd70df657cff90233216b5e35/output/sentiment-train/part-algo-1-train.tsv


In [71]:
processing_job_name = None

for execution_step in reversed(execution.list_steps()):
    if execution_step['StepName'] == 'EvaluateModel': 
        processing_job_name=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]

describe_evaluation_processing_job_response = sm.describe_processing_job(ProcessingJobName=processing_job_name)

evaluation_metrics_s3_uri = describe_evaluation_processing_job_response['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
print('Evaluation output {}'.format(evaluation_metrics_s3_uri))

Evaluation output s3://sagemaker-us-east-1-904233098050/EvaluateModel-a0cde60807b41becf7f4fb418d766047/output/metrics


### Review the evaluation report

In [72]:
from pprint import pprint

evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
    evaluation_metrics_s3_uri
))

pprint(json.loads(evaluation_json))

{'metrics': {'accuracy': {'value': 0.5006305170239597}}}


### List pipeline artifacts

In [73]:
training_job_arn=None

for execution_step in execution.list_steps():
    if execution_step['StepName'] == 'Train':
        training_job_arn = execution_step['Metadata']['TrainingJob']['Arn']        
        pprint(execution_step)
        break
print('Training job ARN: {}'.format(training_job_arn))
        
training_job_name = training_job_arn.split('/')[-1]
print('Training job Name: {}'.format(training_job_name))

{'AttemptCount': 1,
 'EndTime': datetime.datetime(2024, 11, 30, 18, 23, 44, 539000, tzinfo=tzlocal()),
 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:904233098050:training-job/pipelines-ydpsdkuxnphw-Train-4Np6kDuf4f'}},
 'StartTime': datetime.datetime(2024, 11, 30, 18, 13, 5, 317000, tzinfo=tzlocal()),
 'StepName': 'Train',
 'StepStatus': 'Succeeded'}
Training job ARN: arn:aws:sagemaker:us-east-1:904233098050:training-job/pipelines-ydpsdkuxnphw-Train-4Np6kDuf4f
Training job Name: pipelines-ydpsdkuxnphw-Train-4Np6kDuf4f


In [74]:
processing_job_name=None
training_job_name=None

In [75]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())

for execution_step in reversed(execution.list_steps()):
    pprint(execution_step)
    if execution_step['StepName'] == 'Processing':
        processing_job_name=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]
        print('Processing job name: {}'.format(processing_job_name))
        display(viz.show(processing_job_name=processing_job_name))
    elif execution_step['StepName'] == 'Train':
        training_job_name=execution_step['Metadata']['TrainingJob']['Arn'].split('/')[-1]
        print('Training job name: {}'.format(training_job_name))
        display(viz.show(training_job_name=training_job_name))
    else:
        display(viz.show(pipeline_execution_step=execution_step))
        time.sleep(5)

{'AttemptCount': 1,
 'EndTime': datetime.datetime(2024, 11, 30, 18, 13, 4, 765000, tzinfo=tzlocal()),
 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:904233098050:processing-job/pipelines-ydpsdkuxnphw-Processing-2LmKkCPsTQ'}},
 'StartTime': datetime.datetime(2024, 11, 30, 18, 0, 20, 798000, tzinfo=tzlocal()),
 'StepName': 'Processing',
 'StepStatus': 'Succeeded'}
Processing job name: pipelines-ydpsdkuxnphw-Processing-2LmKkCPsTQ


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...90233216b5e35/input/code/prepare_data.py,Input,DataSet,ContributedTo,artifact
1,s3://datasets-123/fakenews,Input,DataSet,ContributedTo,artifact
2,68331...com/sagemaker-scikit-learn:1.2-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...57cff90233216b5e35/output/sentiment-test,Output,DataSet,Produced,artifact
4,s3://...0233216b5e35/output/sentiment-validation,Output,DataSet,Produced,artifact
5,s3://...7cff90233216b5e35/output/sentiment-train,Output,DataSet,Produced,artifact


{'AttemptCount': 1,
 'EndTime': datetime.datetime(2024, 11, 30, 18, 23, 44, 539000, tzinfo=tzlocal()),
 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:904233098050:training-job/pipelines-ydpsdkuxnphw-Train-4Np6kDuf4f'}},
 'StartTime': datetime.datetime(2024, 11, 30, 18, 13, 5, 317000, tzinfo=tzlocal()),
 'StepName': 'Train',
 'StepStatus': 'Succeeded'}
Training job name: pipelines-ydpsdkuxnphw-Train-4Np6kDuf4f


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...0233216b5e35/output/sentiment-validation,Input,DataSet,ContributedTo,artifact
1,s3://...7cff90233216b5e35/output/sentiment-train,Input,DataSet,ContributedTo,artifact
2,76310...aws.com/pytorch-training:2.3.0-cpu-py311,Input,Image,ContributedTo,artifact
3,s3://...phw-Train-4Np6kDuf4f/output/model.tar.gz,Output,Model,Produced,artifact


{'AttemptCount': 1,
 'EndTime': datetime.datetime(2024, 11, 30, 18, 28, 49, 505000, tzinfo=tzlocal()),
 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:904233098050:processing-job/pipelines-ydpsdkuxnphw-EvaluateModel-90B5wAinfn'}},
 'StartTime': datetime.datetime(2024, 11, 30, 18, 23, 45, 251000, tzinfo=tzlocal()),
 'StepName': 'EvaluateModel',
 'StepStatus': 'Succeeded'}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...047/input/code/evaluate_model_metrics.py,Input,DataSet,ContributedTo,artifact
1,s3://...57cff90233216b5e35/output/sentiment-test,Input,DataSet,ContributedTo,artifact
2,s3://...phw-Train-4Np6kDuf4f/output/model.tar.gz,Input,Model,ContributedTo,artifact
3,68331...com/sagemaker-scikit-learn:1.2-1-cpu-py3,Input,Image,ContributedTo,artifact
4,s3://...807b41becf7f4fb418d766047/output/metrics,Output,DataSet,Produced,artifact


{'AttemptCount': 1,
 'EndTime': datetime.datetime(2024, 11, 30, 18, 28, 51, 5000, tzinfo=tzlocal()),
 'Metadata': {'Condition': {'Outcome': 'True'}},
 'StartTime': datetime.datetime(2024, 11, 30, 18, 28, 50, 419000, tzinfo=tzlocal()),
 'StepName': 'AccuracyCondition',
 'StepStatus': 'Succeeded'}


None

{'AttemptCount': 1,
 'EndTime': datetime.datetime(2024, 11, 30, 18, 28, 52, 989000, tzinfo=tzlocal()),
 'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:904233098050:model-package/BERT-News-Detection-1732989616/1'}},
 'StartTime': datetime.datetime(2024, 11, 30, 18, 28, 51, 459000, tzinfo=tzlocal()),
 'StepName': 'RegisterModel-RegisterModel',
 'StepStatus': 'Succeeded'}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...phw-Train-4Np6kDuf4f/output/model.tar.gz,Input,Model,ContributedTo,artifact
1,76310...ws.com/pytorch-inference:2.3.0-cpu-py311,Input,Image,ContributedTo,artifact
2,BERT-News-Detection-1732989616-1-PendingManual...,Input,Approval,ContributedTo,action
3,BERT-News-Detection-1732989616-1732991332-aws-...,Output,ModelGroup,AssociatedWith,context


{'AttemptCount': 1,
 'EndTime': datetime.datetime(2024, 11, 30, 18, 28, 53, 267000, tzinfo=tzlocal()),
 'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:904233098050:model/pipelines-ydpsdkuxnphw-CreateModel-7hwTrKYyUv'}},
 'StartTime': datetime.datetime(2024, 11, 30, 18, 28, 51, 459000, tzinfo=tzlocal()),
 'StepName': 'CreateModel',
 'StepStatus': 'Succeeded'}


None

# Deploy and test the model

### Approve trained model

In [81]:
for execution_step in execution.list_steps():
    if execution_step['StepName'] == 'RegisterModel-RegisterModel':
        model_package_arn = execution_step['Metadata']['RegisterModel']['Arn']
        break
print(model_package_arn)

arn:aws:sagemaker:us-east-1:904233098050:model-package/BERT-News-Detection-1732989616/1


In [82]:
model_package_update_response = sm.update_model_package(
    ModelPackageArn=model_package_arn,
    ModelApprovalStatus="Approved",
)

pprint(model_package_update_response)

{'ModelPackageArn': 'arn:aws:sagemaker:us-east-1:904233098050:model-package/BERT-News-Detection-1732989616/1',
 'ResponseMetadata': {'HTTPHeaders': {'content-length': '109',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Sun, 01 Dec 2024 05:20:38 GMT',
                                      'x-amzn-requestid': 'da22734e-4ba7-4373-a373-2ed8e675adf7'},
                      'HTTPStatusCode': 200,
                      'RequestId': 'da22734e-4ba7-4373-a373-2ed8e675adf7',
                      'RetryAttempts': 0}}


### Deploy model

In [83]:
for execution_step in execution.list_steps():
    print(execution_step['StepName'])
    if execution_step['StepName'] == 'CreateModel':
        model_arn = execution_step['Metadata']['Model']['Arn']
        break
print(model_arn)

model_name = model_arn.split('/')[-1]
print(model_name)

CreateModel
arn:aws:sagemaker:us-east-1:904233098050:model/pipelines-ydpsdkuxnphw-CreateModel-7hwTrKYyUv
pipelines-ydpsdkuxnphw-CreateModel-7hwTrKYyUv


### Create endpoint from registry

In [84]:
endpoint_config_name = 'bert-model-epc-{}'.format(timestamp)
print(endpoint_config_name)

create_endpoint_config_response = sm.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType':'ml.m5.xlarge',
        'InitialVariantWeight':1,
        'InitialInstanceCount':1,
        'ModelName': model_name,
        'VariantName':'AllTraffic'}])

bert-model-epc-1732989616


In [85]:
pipeline_endpoint_name = 'bert-model-ep-{}'.format(timestamp)
print("EndpointName={}".format(pipeline_endpoint_name))

create_endpoint_response = sm.create_endpoint(
    EndpointName=pipeline_endpoint_name,
    EndpointConfigName=endpoint_config_name)
print(create_endpoint_response['EndpointArn'])

EndpointName=bert-model-ep-1732989616
arn:aws:sagemaker:us-east-1:904233098050:endpoint/bert-model-ep-1732989616


In [86]:
from IPython.core.display import display, HTML

display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={}#/endpoints/{}">SageMaker REST Endpoint</a></b>'.format(region, pipeline_endpoint_name)))

  from IPython.core.display import display, HTML


In [87]:
%%time

while True:
    try: 
        waiter = sm.get_waiter('endpoint_in_service')
        print('Waiting for endpoint to be in `InService`...')
        waiter.wait(EndpointName=pipeline_endpoint_name)
        break;
    except:
        print('Waiting for endpoint...')
        endpoint_status = sm.describe_endpoint(EndpointName=pipeline_endpoint_name)['EndpointStatus']
        print('Endpoint status: {}'.format(endpoint_status))
        if endpoint_status == 'Failed':
            break
        time.sleep(30)
        
print('Endpoint deployed.')

Waiting for endpoint to be in `InService`...
Endpoint deployed.
CPU times: user 26.6 ms, sys: 18.6 ms, total: 45.2 ms
Wall time: 3min 30s


### Test model

In [94]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONLinesSerializer
from sagemaker.deserializers import JSONLinesDeserializer

inputs = [
    {"features": ["AUTHOR: Darrell LucusNEWS_TITLE: House Dem Aide: We Didn’t Even See Comey’s Letter Until Jason Chaffetz Tweeted ItNEWS_TEXT: House Dem Aide: We Didn’t Even See Comey’s Letter Until Jason Chaffetz Tweeted It By Darrell Lucus on October 30, 2016 Subscribe Jason Chaffetz on the stump in American Fork, Utah ( image courtesy Michael Jolley, available under a Creative Commons-BY license) "]},
    # {"features": ["OK, but not great."]},
    # {"features": ["This is not the right product."]},
]

predictor = Predictor(
    endpoint_name=pipeline_endpoint_name,
    serializer=JSONLinesSerializer(),
    deserializer=JSONLinesDeserializer(),
    sagemaker_session=sess
)

predicted_classes = predictor.predict(inputs)

for predicted_class in predicted_classes:
    print("Predicted class {} with probability {}".format(predicted_class['prediction']))

ModelError: An error occurred (ModelError) when calling the InvokeEndpoint operation: Received server error (0) from primary with message "Your invocation timed out while waiting for a response from container primary. Review the latency metrics for each container in Amazon CloudWatch, resolve the issue, and try again.". See https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/aws/sagemaker/Endpoints/bert-model-ep-1732989616 in account 904233098050 for more information.