# Transform Raw Text into BERT Embeddings with Tensorflow


In [9]:
%store -r

In [2]:
import sagemaker
from time import gmtime, strftime
import boto3

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name

sm = boto3.Session().client(service_name='sagemaker', region_name=region)

## Make Preprocess Script

In [3]:
%%writefile preprocess-scikit-text-to-bert.py


from sklearn.model_selection import train_test_split
from sklearn.utils import resample
import functools
import multiprocessing

import pandas as pd
from datetime import datetime
import subprocess
import sys

subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'tensorflow==2.1.0'])
import tensorflow as tf
print(tf.__version__)

subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'transformers==2.8.0'])
from transformers import DistilBertTokenizer

from tensorflow import keras
import os
import re
import collections
import argparse
import json
import os
import pandas as pd
import csv
import glob
from pathlib import Path

tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')

DATA_COLUMN = 'TWEET'
LABEL_COLUMN = 'LABEL'
LABEL_VALUES = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    
label_map = {}
for (i, label) in enumerate(LABEL_VALUES):
    label_map[label] = i

class InputFeatures(object):
  """BERT feature vectors."""

  def __init__(self,
               input_ids,
               input_mask,
               segment_ids,
               label_id):
    self.input_ids = input_ids
    self.input_mask = input_mask
    self.segment_ids = segment_ids
    self.label_id = label_id
    
    
class Input(object):
  """A single training/test input for sequence classification."""

  def __init__(self, text, label=None):
    """Constructs an Input.
    Args:
      text: string. The untokenized text of the first sequence. For single
        sequence tasks, only this sequence must be specified.
      label: (Optional) string. The label of the example. This should be
        specified for train and dev examples, but not for test examples.
    """
    self.text = text
    self.label = label

def convert_input(text_input, max_seq_length):
    # First, we need to preprocess our data so that it matches the data BERT was trained on:
    #
    # 1. Lowercase our text (if we're using a BERT lowercase model)
    # 2. Tokenize it (i.e. "sally says hi" -> ["sally", "says", "hi"])
    # 3. Break words into WordPieces (i.e. "calling" -> ["call", "##ing"])
    # 
    # Fortunately, the Transformers tokenizer does this for us!
    #
    tokens = tokenizer.tokenize(text_input.text)    

    encode_plus_tokens = tokenizer.encode_plus(text_input.text,
                                               pad_to_max_length=True,
                                               max_length=max_seq_length)

    # Convert the text-based tokens to ids from the pre-trained BERT vocabulary
    input_ids = encode_plus_tokens['input_ids']
    # Specifies which tokens BERT should pay attention to (0 or 1)
    input_mask = encode_plus_tokens['attention_mask']
    # Segment Ids are always 0 for single-sequence tasks (or 1 if two-sequence tasks)
    segment_ids = [0] * max_seq_length

    # Label for our training data (star_rating 1 through 5)
    label_id = label_map[text_input.label]

    features = InputFeatures(
        input_ids=input_ids,
        input_mask=input_mask,
        segment_ids=segment_ids,
        label_id=label_id)

    return features

def convert_features_to_tfrecord(inputs,
                                 output_file,
                                 max_seq_length):
    """Convert a set of `Input`s to a TFRecord file."""

    tfrecord_writer = tf.io.TFRecordWriter(output_file)

    for (input_idx, text_input) in enumerate(inputs):
        if input_idx % 10000 == 0:
            print("Writing example %d of %d" % (input_idx, len(inputs)))

            bert_features = convert_input(text_input, max_seq_length)
        
            tfrecord_features = collections.OrderedDict()
            
            tfrecord_features['input_ids'] = tf.train.Feature(int64_list=tf.train.Int64List(value=bert_features.input_ids))
            tfrecord_features['input_mask'] = tf.train.Feature(int64_list=tf.train.Int64List(value=bert_features.input_mask))
            tfrecord_features['segment_ids'] = tf.train.Feature(int64_list=tf.train.Int64List(value=bert_features.segment_ids))
            tfrecord_features['label_ids'] = tf.train.Feature(int64_list=tf.train.Int64List(value=[bert_features.label_id]))

            tfrecord = tf.train.Example(features=tf.train.Features(feature=tfrecord_features))
            
            tfrecord_writer.write(tfrecord.SerializeToString())

    tfrecord_writer.close()

    
    
def list_arg(raw_value):
    """argparse type for a list of strings"""
    return str(raw_value).split(',')



    
def _transform_tsv_to_tfrecord(file, 
                               max_seq_length, 
                               balance_dataset):
    print('file {}'.format(file))
    print('max_seq_length {}'.format(max_seq_length))
    print('balance_dataset {}'.format(balance_dataset))

    filename_without_extension = Path(Path(file).stem).stem

    df = pd.read_csv(file, 
                     compression='gzip')

    df.isna().values.any()
    df = df.dropna()
    df = df.reset_index(drop=True)

    print('Shape of dataframe {}'.format(df.shape))

        
    print('Shape of dataframe before splitting {}'.format(df.shape))
    
    print('train split percentage {}'.format(args.train_split_percentage))
    print('validation split percentage {}'.format(args.validation_split_percentage))
    print('test split percentage {}'.format(args.test_split_percentage))    
    
    holdout_percentage = 1.00 - args.train_split_percentage
    print('holdout percentage {}'.format(holdout_percentage))
    df_train, df_holdout = train_test_split(df, 
                                            test_size=holdout_percentage, 
                                            stratify=df[LABEL_COLUMN])

    test_holdout_percentage = args.test_split_percentage / holdout_percentage
    print('test holdout percentage {}'.format(test_holdout_percentage))
    df_validation, df_test = train_test_split(df_holdout, 
                                              test_size=test_holdout_percentage,
                                              stratify=df_holdout[LABEL_COLUMN])
    
    df_train = df_train.reset_index(drop=True)
    df_validation = df_validation.reset_index(drop=True)
    df_test = df_test.reset_index(drop=True)

    print('Shape of train dataframe {}'.format(df_train.shape))
    print('Shape of validation dataframe {}'.format(df_validation.shape))
    print('Shape of test dataframe {}'.format(df_test.shape))

    train_inputs = df_train.apply(lambda x: Input(text = x[DATA_COLUMN], 
                                                         label = x[LABEL_COLUMN]), axis = 1)

    validation_inputs = df_validation.apply(lambda x: Input(text = x[DATA_COLUMN], 
                                                            label = x[LABEL_COLUMN]), axis = 1)

    test_inputs = df_test.apply(lambda x: Input(text = x[DATA_COLUMN], 
                                                label = x[LABEL_COLUMN]), axis = 1)

    # Next, we need to preprocess our data so that it matches the data BERT was trained on. For this, we'll need to do a couple of things (but don't worry--this is also included in the Python library):
    # 
    # 
    # 1. Lowercase our text (if we're using a BERT lowercase model)
    # 2. Tokenize it (i.e. "sally says hi" -> ["sally", "says", "hi"])
    # 3. Break words into WordPieces (i.e. "calling" -> ["call", "##ing"])
    # 4. Map our words to indexes using a vocab file that BERT provides
    # 5. Add special "CLS" and "SEP" tokens (see the [readme](https://github.com/google-research/bert))
    # 6. Append "index" and "segment" tokens to each input (see the [BERT paper](https://arxiv.org/pdf/1810.04805.pdf))
    # 
    # We don't have to worry about these details.  The Transformers tokenizer does this for us.
    # 
    train_data = '{}/bert/train'.format(args.output_data)
    validation_data = '{}/bert/validation'.format(args.output_data)
    test_data = '{}/bert/test'.format(args.output_data)

    # Convert our train and validation features to InputFeatures (.tfrecord protobuf) that works with BERT and TensorFlow.
    df_train_embeddings = convert_features_to_tfrecord(train_inputs, 
                                                       '{}/part-{}-{}.tfrecord'.format(train_data, args.current_host, filename_without_extension), 
                                                       max_seq_length)

    df_validation_embeddings = convert_features_to_tfrecord(validation_inputs, '{}/part-{}-{}.tfrecord'.format(validation_data, args.current_host, filename_without_extension), max_seq_length)

    df_test_embeddings = convert_features_to_tfrecord(test_inputs, '{}/part-{}-{}.tfrecord'.format(test_data, args.current_host, filename_without_extension), max_seq_length)
        
def parse_args():
    # Unlike SageMaker training jobs (which have `SM_HOSTS` and `SM_CURRENT_HOST` env vars), processing jobs to need to parse the resource config file directly
    resconfig = {}
    try:
        with open('/opt/ml/config/resourceconfig.json', 'r') as cfgfile:
            resconfig = json.load(cfgfile)
    except FileNotFoundError:
        print('/opt/ml/config/resourceconfig.json not found.  current_host is unknown.')
        pass # Ignore

    # Local testing with CLI args
    parser = argparse.ArgumentParser(description='Process')

    parser.add_argument('--hosts', type=list_arg,
        default=resconfig.get('hosts', ['unknown']),
        help='Comma-separated list of host names running the job'
    )
    parser.add_argument('--current-host', type=str,
        default=resconfig.get('current_host', 'unknown'),
        help='Name of this host running the job'
    )
    parser.add_argument('--input-data', type=str,
        default='/opt/ml/processing/input/data',
    )
    parser.add_argument('--output-data', type=str,
        default='/opt/ml/processing/output',
    )
    parser.add_argument('--train-split-percentage', type=float,
        default=0.90,
    )
    parser.add_argument('--validation-split-percentage', type=float,
        default=0.05,
    )    
    parser.add_argument('--test-split-percentage', type=float,
        default=0.05,
    )
    parser.add_argument('--balance-dataset', type=eval,
        default=False
    )
    parser.add_argument('--max-seq-length', type=int,
        default=128,
    )  
    
    return parser.parse_args()
        
    
def process(args):
    print('Current host: {}'.format(args.current_host))
    
    train_data = None
    validation_data = None
    test_data = None

    transform_tsv_to_tfrecord = functools.partial(_transform_tsv_to_tfrecord, 
                                                 max_seq_length=args.max_seq_length,
                                                 balance_dataset=args.balance_dataset

    )
    input_files = glob.glob('{}/*'.format(args.input_data))
    print("********** input files ***************")    
    print("args.input_data: ", args.input_data)
    print("input_files: ", input_files)

    num_cpus = multiprocessing.cpu_count()
    print('num_cpus {}'.format(num_cpus))

    p = multiprocessing.Pool(num_cpus)
    p.map(transform_tsv_to_tfrecord, input_files)

    print("********** Listing tf-record files ***************")        
    print('Listing contents of {}'.format(args.output_data))
    dirs_output = os.listdir(args.output_data)
    for file in dirs_output:
        print(file)

    print('Listing contents of {}'.format(train_data))
    dirs_output = os.listdir(train_data)
    for file in dirs_output:
        print(file)

    print('Listing contents of {}'.format(validation_data))
    dirs_output = os.listdir(validation_data)
    for file in dirs_output:
        print(file)

    print('Listing contents of {}'.format(test_data))
    dirs_output = os.listdir(test_data)
    for file in dirs_output:
        print(file)

    print('Complete')
    
    
if __name__ == "__main__":
    args = parse_args()
    print('################ START #######################')    
    print('Loaded arguments:')
    print(args)
    
    print('Environment variables:')
#     print(os.environ)

    process(args)    



Overwriting preprocess-scikit-text-to-bert.py


## Test on Local Notebook

### Parameters

In [4]:
save_split_data_dir = 'data/split'
output_data_dir = 'data/output'


In [5]:
! mkdir -p data/output/bert/train
! mkdir -p data/output/bert/validation
! mkdir -p data/output/bert/test

In [6]:
!python preprocess-scikit-text-to-bert.py \
    --input-data {save_split_data_dir} \
    --output-data {output_data_dir} 

Collecting tensorflow==2.1.0
  Downloading tensorflow-2.1.0-cp36-cp36m-manylinux2010_x86_64.whl (421.8 MB)
[K     |██████████████████████▎         | 293.3 MB 118.1 MB/s eta 0:00:02    |██████▏                         | 81.7 MB 117.6 MB/s eta 0:00:03     |████████████▎                   | 161.6 MB 117.6 MB/s eta 0:00:03

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



[K     |████████████████████████████████| 421.8 MB 20 kB/s 
[?25hCollecting grpcio>=1.8.6
  Downloading grpcio-1.30.0-cp36-cp36m-manylinux2010_x86_64.whl (3.0 MB)
[K     |████████████████████████████████| 3.0 MB 110.9 MB/s eta 0:00:01
[?25hCollecting keras-preprocessing>=1.1.0
  Downloading Keras_Preprocessing-1.1.2-py2.py3-none-any.whl (42 kB)
[K     |████████████████████████████████| 42 kB 2.5 MB/s  eta 0:00:01
Collecting absl-py>=0.7.0
  Downloading absl-py-0.9.0.tar.gz (104 kB)
[K     |████████████████████████████████| 104 kB 112.7 MB/s eta 0:00:01
Collecting opt-einsum>=2.3.2
  Downloading opt_einsum-3.2.1-py3-none-any.whl (63 kB)
[K     |████████████████████████████████| 63 kB 4.6 MB/s  eta 0:00:01
[?25hCollecting keras-applications>=1.0.8
  Downloading Keras_Applications-1.0.8-py3-none-any.whl (50 kB)
[K     |████████████████████████████████| 50 kB 14.6 MB/s  eta 0:00:01
[?25hCollecting gast==0.2.2
  Downloading gast-0.2.2.tar.gz (10 kB)
Collecting google-pasta>=0.1.6


## Run the Processing Job using Amazon SageMaker

In [10]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

processor = SKLearnProcessor(framework_version = '0.20.0',
                             role = role,
                             instance_type = 'ml.c5.2xlarge',
                             instance_count = 2
                            )

In [11]:
s3_raw_input_data = s3_destination_path_csv

In [15]:
train_split_percentage = 0.90
validation_split_percentage = 0.05
test_split_percentage = 0.05
max_seq_length = 128
balance_dataset = False

In [16]:
processor.run(code='preprocess-scikit-text-to-bert.py',
              inputs=[ProcessingInput(source=s3_raw_input_data,
                                      destination='/opt/ml/processing/input/data/',
                                      s3_data_distribution_type='ShardedByS3Key')
              ],
              outputs=[
                       ProcessingOutput(s3_upload_mode='EndOfJob',
                                        output_name='bert-train',
                                        source='/opt/ml/processing/output/bert/train'),
                       ProcessingOutput(s3_upload_mode='EndOfJob',
                                        output_name='bert-validation',
                                        source='/opt/ml/processing/output/bert/validation'),
                       ProcessingOutput(s3_upload_mode='EndOfJob',
                                        output_name='bert-test',
                                        source='/opt/ml/processing/output/bert/test'),
              ],
              arguments=['--train-split-percentage', str(train_split_percentage),
                         '--validation-split-percentage', str(validation_split_percentage),
                         '--test-split-percentage', str(test_split_percentage),
                         '--max-seq-length', str(max_seq_length),
                         '--balance-dataset', str(balance_dataset)
              ],
              logs=True,
              wait=False)

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.



Job Name:  sagemaker-scikit-learn-2020-06-27-06-07-25-298
Inputs:  [{'InputName': 'input-1', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-2-057716757052/tweet_emoticon/csv', 'LocalPath': '/opt/ml/processing/input/data/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'ShardedByS3Key', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-2-057716757052/sagemaker-scikit-learn-2020-06-27-06-07-25-298/input/code/preprocess-scikit-text-to-bert.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'bert-train', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-2-057716757052/sagemaker-scikit-learn-2020-06-27-06-07-25-298/output/bert-train', 'LocalPath': '/opt/ml/processing/output/bert/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'bert-validation', 'S3Output': {'S3Uri': 

In [17]:
scikit_processing_job_name = processor.jobs[-1].describe()['ProcessingJobName']
print(scikit_processing_job_name)

sagemaker-scikit-learn-2020-06-27-06-07-25-298


## Link to CloudWatch 

In [18]:
from IPython.core.display import display, HTML

display(HTML('<b>Review <a href="https://console.aws.amazon.com/cloudwatch/home?region={}#logStream:group=/aws/sagemaker/ProcessingJobs;prefix={};streamFilter=typeLogStreamPrefix">CloudWatch Logs</a> After About 5 Minutes</b>'.format(region, scikit_processing_job_name)))



In [19]:
from IPython.core.display import display, HTML

display(HTML('<b>Review <a href="https://s3.console.aws.amazon.com/s3/buckets/{}/{}/?region={}&tab=overview">S3 Output Data</a> After The Spark Job Has Completed</b>'.format(bucket, scikit_processing_job_name, region)))



### Look at processor jobs

In [20]:
sm.list_processing_jobs()

{'ProcessingJobSummaries': [{'ProcessingJobName': 'sagemaker-scikit-learn-2020-06-27-06-07-25-298',
   'ProcessingJobArn': 'arn:aws:sagemaker:us-east-2:057716757052:processing-job/sagemaker-scikit-learn-2020-06-27-06-07-25-298',
   'CreationTime': datetime.datetime(2020, 6, 27, 6, 7, 25, 696000, tzinfo=tzlocal()),
   'LastModifiedTime': datetime.datetime(2020, 6, 27, 6, 7, 25, 899000, tzinfo=tzlocal()),
   'ProcessingJobStatus': 'InProgress'},
  {'ProcessingJobName': 'sagemaker-scikit-learn-2020-06-23-05-20-02-678',
   'ProcessingJobArn': 'arn:aws:sagemaker:us-east-2:057716757052:processing-job/sagemaker-scikit-learn-2020-06-23-05-20-02-678',
   'CreationTime': datetime.datetime(2020, 6, 23, 5, 20, 3, 128000, tzinfo=tzlocal()),
   'ProcessingEndTime': datetime.datetime(2020, 6, 23, 5, 24, 15, tzinfo=tzlocal()),
   'LastModifiedTime': datetime.datetime(2020, 6, 23, 5, 24, 15, 523000, tzinfo=tzlocal()),
   'ProcessingJobStatus': 'Completed'},
  {'ProcessingJobName': 'sagemaker-scikit-lea

### Status of the Processor Job

In [21]:
running_processor = sagemaker.processing.ProcessingJob.from_processing_name(processing_job_name=scikit_processing_job_name,
                                                                            sagemaker_session=sagemaker_session)

processing_job_description = running_processor.describe()

processing_job_status = processing_job_description['ProcessingJobStatus']
print('\n')
print(processing_job_status)
print('\n')

print(processing_job_description)



InProgress


{'ProcessingInputs': [{'InputName': 'input-1', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-2-057716757052/tweet_emoticon/csv', 'LocalPath': '/opt/ml/processing/input/data/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'ShardedByS3Key', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-2-057716757052/sagemaker-scikit-learn-2020-06-27-06-07-25-298/input/code/preprocess-scikit-text-to-bert.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}], 'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'bert-train', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-2-057716757052/sagemaker-scikit-learn-2020-06-27-06-07-25-298/output/bert-train', 'LocalPath': '/opt/ml/processing/output/bert/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'bert-validation', 'S3Output': {'S3Uri': 's3

In [22]:
running_processor.wait(logs=False)

..............................................!

## Inspect the Processed Output Data

In [23]:
output_config = processing_job_description['ProcessingOutputConfig']
for output in output_config['Outputs']:
    if output['OutputName'] == 'bert-train':
        processed_train_data_s3_uri = output['S3Output']['S3Uri']
    if output['OutputName'] == 'bert-validation':
        processed_validation_data_s3_uri = output['S3Output']['S3Uri']
    if output['OutputName'] == 'bert-test':
        processed_test_data_s3_uri = output['S3Output']['S3Uri']
        
print(processed_train_data_s3_uri)
print(processed_validation_data_s3_uri)
print(processed_test_data_s3_uri)

s3://sagemaker-us-east-2-057716757052/sagemaker-scikit-learn-2020-06-27-06-07-25-298/output/bert-train
s3://sagemaker-us-east-2-057716757052/sagemaker-scikit-learn-2020-06-27-06-07-25-298/output/bert-validation
s3://sagemaker-us-east-2-057716757052/sagemaker-scikit-learn-2020-06-27-06-07-25-298/output/bert-test


In [24]:
!aws s3 ls $processed_train_data_s3_uri/

2020-06-27 06:11:16       5584 part-algo-1-tweet_file_01.tfrecord
2020-06-27 06:11:23       5658 part-algo-2-tweet_file_02.tfrecord


In [25]:
!aws s3 ls $processed_validation_data_s3_uri/

2020-06-27 06:11:16        510 part-algo-1-tweet_file_01.tfrecord
2020-06-27 06:11:24        523 part-algo-2-tweet_file_02.tfrecord


In [26]:
!aws s3 ls $processed_test_data_s3_uri/

2020-06-27 06:11:16        505 part-algo-1-tweet_file_01.tfrecord
2020-06-27 06:11:24        509 part-algo-2-tweet_file_02.tfrecord


In [27]:
%store s3_raw_input_data
%store max_seq_length
%store train_split_percentage
%store validation_split_percentage
%store test_split_percentage
%store processed_train_data_s3_uri
%store processed_validation_data_s3_uri
%store processed_test_data_s3_uri

Stored 's3_raw_input_data' (str)
Stored 'max_seq_length' (int)
Stored 'train_split_percentage' (float)
Stored 'validation_split_percentage' (float)
Stored 'test_split_percentage' (float)
Stored 'processed_train_data_s3_uri' (str)
Stored 'processed_validation_data_s3_uri' (str)
Stored 'processed_test_data_s3_uri' (str)


In [28]:
%store

Stored variables and their in-db values:
max_seq_length                               -> 128
processed_test_data_s3_uri                   -> 's3://sagemaker-us-east-2-057716757052/sagemaker-s
processed_train_data_s3_uri                  -> 's3://sagemaker-us-east-2-057716757052/sagemaker-s
processed_validation_data_s3_uri             -> 's3://sagemaker-us-east-2-057716757052/sagemaker-s
s3_destination_path_csv                      -> 's3://sagemaker-us-east-2-057716757052/tweet_emoti
s3_raw_input_data                            -> 's3://sagemaker-us-east-2-057716757052/tweet_emoti
save_split_data_dir                          -> 'data/split'
test_split_percentage                        -> 0.05
train_split_percentage                       -> 0.9
validation_split_percentage                  -> 0.05
