In [4]:
import sagemaker
from sagemaker import get_execution_role
import json
import boto3
import pandas as pd
import numpy as np
sess = sagemaker.Session()

role = get_execution_role()
print(role) # This is the role that SageMaker would use to leverage AWS resources (S3, CloudWatch) on your behalf

bucket = 'bucket-name' # Replace with your own bucket name if needed
print(bucket)
prefix = 'blazing/supervised/newlabels' #Replace with the prefix under which you want to store the data if needed

In [42]:
from random import shuffle
import multiprocessing
from multiprocessing import Pool
import csv
import nltk
nltk.download('punkt')

[nltk_data] Downloading package punkt to /home/ec2-user/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [6]:
data_key = 'blazing/supervised/training_original/filename.csv'
data_location = 's3://{}/{}'.format(bucket, data_key)
df = pd.read_csv(data_location, header=None) # df columns - label, state, covidguideline

In [5]:
df.columns = ['label', 'state', 'covidguideline']
df.head()

In [6]:
print('unique labels: \n', list(df.label.unique())), 
print('\n')
print('no. of unique labels: ', len(df.label.unique()))

In [7]:
sort1 = ['L1', 'L2', 'L3', 'L4', 'L5', 'L6', 'L7', 'L8', 'L9', 'L10', 'L11']

sort1.sort()
sort1

In [27]:
df['label'] = df['label'].map({'L1':1,
 'L2':2,
 'L3':3,
 'L4':4,
 'L5':5,
 'L6':6,
 'L7':7,
 'L8':8,
 'L9':9,
 'L10':10,
 'L11':11})

In [28]:
df.label.unique()

array([ 1,  2,  3,  7,  8,  4,  5,  6,  9, 11, 10])

In [8]:
df.head()

In [9]:
split = np.random.rand(len(df)) < 0.8
train = df[split]
test = df[~split]

print('train samples: \n', len(train))
print('\n')
print('test samples: \n', len(test))

In [34]:
object_prefix = 'blazing/supervised/training_original/'
data_path = 's3://{}/{}'.format(bucket, object_prefix)

#train.to_csv('./data_new_labels/train.csv', header=None, index=False)
#test.to_csv('./data_new_labels/test.csv', header=None, index=False)

train.to_csv(data_path+'train.csv', header=None, index=False)
test.to_csv(data_path+'test.csv', header=None, index=False)

In [35]:
!head data_new_labels/train.csv -n 3

1,businesses_reopening,"'May 1: reopening all closed businesses. Restaurants that remained open for takeout and delivery may not reopen for dine-in until May 1.  May 15: large event venues, xa0sports and recreation venues, entertainment venues  May 29: state enters Green Phase  September xa04: state begins county-by-county risk level assessment. Requirements for businesses with respect to mitigation protocols varies by risk level.'"
2,operation_restrictions,"'Food establishment occupancy limits: 50% of capacity in Yellow Phase; increases to 75% in Green Phase and to 100% in Blue Phase.  Additional occupancy guidelines and other operating requirements apply to personal care services, movie theaters, and fitness centers. Refer to Smart Restart industry specific protocols.'"
3,ppe_requirement,'Recommended generally. Encourage use of cloth face coverings by employees whose duties require close contact with other employees and/or the public.  Personal care services: required.'


In [10]:
!cat data_new_labels/classes.txt

In [11]:
index_to_label = {} 
with open("data_new_labels/classes.txt") as f:
    for i,label in enumerate(f.readlines()):
        index_to_label[str(i+1)] = label.strip()
print(index_to_label)

In [43]:
def transform_instance(row):
    cur_row = []
    label = "__label__" + index_to_label[row[0]]  #Prefix the index-ed label with __label__
    cur_row.append(label)
    cur_row.extend(nltk.word_tokenize(row[1].lower()))
    cur_row.extend(nltk.word_tokenize(row[2].lower()))
    return cur_row

In [44]:
def preprocess(input_file, output_file, keep=1):
    all_rows = []
    with open(input_file, 'r') as csvinfile:
        csv_reader = csv.reader(csvinfile, delimiter=',')
        for row in csv_reader:
            all_rows.append(row)
    shuffle(all_rows)
    all_rows = all_rows[:int(keep*len(all_rows))]
    pool = Pool(processes=multiprocessing.cpu_count())
    transformed_rows = pool.map(transform_instance, all_rows)
    pool.close() 
    pool.join()
    
    with open(output_file, 'w') as csvoutfile:
        csv_writer = csv.writer(csvoutfile, delimiter=' ', lineterminator='\n')
        csv_writer.writerows(transformed_rows)

In [45]:
%%time

preprocess('data_new_labels/train.csv', 'guidelines.train', keep=1)
        
# Preparing the validation dataset        
preprocess('data_new_labels/test.csv', 'guidelines.validation')

CPU times: user 22.1 ms, sys: 20.5 ms, total: 42.6 ms
Wall time: 344 ms


In [46]:
%%time

train_channel = prefix + '/train'
validation_channel = prefix + '/validation'

sess.upload_data(path='guidelines.train', bucket=bucket, key_prefix=train_channel)
sess.upload_data(path='guidelines.validation', bucket=bucket, key_prefix=validation_channel)

s3_train_data = 's3://{}/{}'.format(bucket, train_channel)
s3_validation_data = 's3://{}/{}'.format(bucket, validation_channel)

CPU times: user 72.5 ms, sys: 15.6 ms, total: 88.1 ms
Wall time: 1.46 s


Next we need to setup an output location at S3, where the model artifact will be dumped. These artifacts are also the output of the algorithm's traning job.

In [47]:
s3_output_location = 's3://{}/{}/output'.format(bucket, prefix)

## Training
Now that we are done with all the setup that is needed, we are ready to train our object detector. To begin, let us create a ``sageMaker.estimator.Estimator`` object. This estimator will launch the training job.

In [48]:
region_name = boto3.Session().region_name
print(region_name)

us-east-2


In [49]:
container = sagemaker.amazon.amazon_estimator.get_image_uri(region_name, "blazingtext", "latest")
print('Using SageMaker BlazingText container: {} ({})'.format(container, region_name))

'get_image_uri' method will be deprecated in favor of 'ImageURIProvider' class in SageMaker Python SDK v2.


Using SageMaker BlazingText container: 825641698319.dkr.ecr.us-east-2.amazonaws.com/blazingtext:latest (us-east-2)


## Training the BlazingText model for supervised text classification

Similar to the original implementation of [Word2Vec](https://arxiv.org/pdf/1301.3781.pdf), SageMaker BlazingText provides an efficient implementation of the continuous bag-of-words (CBOW) and skip-gram architectures using Negative Sampling, on CPUs and additionally on GPU[s]. The GPU implementation uses highly optimized CUDA kernels. To learn more, please refer to [*BlazingText: Scaling and Accelerating Word2Vec using Multiple GPUs*](https://dl.acm.org/citation.cfm?doid=3146347.3146354).




Besides skip-gram and CBOW, SageMaker BlazingText also supports the "Batch Skipgram" mode, which uses efficient mini-batching and matrix-matrix operations ([BLAS Level 3 routines](https://software.intel.com/en-us/mkl-developer-reference-fortran-blas-level-3-routines)). This mode enables distributed word2vec training across multiple CPU nodes, allowing almost linear scale up of word2vec computation to process hundreds of millions of words per second. Please refer to [*Parallelizing Word2Vec in Shared and Distributed Memory*](https://arxiv.org/pdf/1604.04661.pdf) to learn more.

BlazingText also supports a *supervised* mode for text classification. It extends the FastText text classifier to leverage GPU acceleration using custom CUDA kernels. The model can be trained on more than a billion words in a couple of minutes using a multi-core CPU or a GPU, while achieving performance on par with the state-of-the-art deep learning text classification algorithms. For more information, please refer to the [algorithm documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/blazingtext.html).

To summarize, the following modes are supported by BlazingText on different types instances:

|          Modes         	| cbow (supports subwords training) 	| skipgram (supports subwords training) 	| batch_skipgram 	| supervised |
|:----------------------:	|:----:	|:--------:	|:--------------:	| :--------------:	|
|   Single CPU instance  	|   ✔  	|     ✔    	|        ✔       	|  ✔  |
|   Single GPU instance  	|   ✔  	|     ✔    	|                	|  ✔ (Instance with 1 GPU only)  |
| Multiple CPU instances 	|      	|          	|        ✔       	|     | |

Now, let's define the SageMaker `Estimator` with resource configurations and hyperparameters to train Text Classification on *DBPedia* dataset, using "supervised" mode on a `c4.4xlarge` instance.


In [50]:
guidelines_model = sagemaker.estimator.Estimator(container,
                                         role, 
                                         train_instance_count=1, 
                                         train_instance_type='ml.c4.4xlarge',
                                         train_volume_size = 30,
                                         train_max_run = 360000,
                                         input_mode= 'File',
                                         output_path=s3_output_location,
                                         sagemaker_session=sess)

Parameter image_name will be renamed to image_uri in SageMaker Python SDK v2.


Please refer to [algorithm documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/blazingtext_hyperparameters.html) for the complete list of hyperparameters.

In [51]:
guidelines_model.set_hyperparameters(mode="supervised",
                            epochs=100,
                            min_count=2,
                            learning_rate=0.05,
                            vector_dim=20,
                            early_stopping=True,
                            patience=6,
                            min_epochs=5,
                            word_ngrams=1)

Now that the hyper-parameters are setup, let us prepare the handshake between our data channels and the algorithm. To do this, we need to create the `sagemaker.session.s3_input` objects from our data channels. These objects are then put in a simple dictionary, which the algorithm consumes.

In [52]:
train_data = sagemaker.session.s3_input(s3_train_data, distribution='FullyReplicated', 
                        content_type='text/plain', s3_data_type='S3Prefix')
validation_data = sagemaker.session.s3_input(s3_validation_data, distribution='FullyReplicated', 
                             content_type='text/plain', s3_data_type='S3Prefix')
data_channels = {'train': train_data, 'validation': validation_data}

's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.


In [53]:
data_channels

{'train': <sagemaker.inputs.s3_input at 0x7f48dd1d5d68>,
 'validation': <sagemaker.inputs.s3_input at 0x7f48dd1d57f0>}

In [54]:
guidelines_model.fit(inputs=data_channels, logs=True)

2020-11-06 06:16:17 Starting - Starting the training job...
2020-11-06 06:16:19 Starting - Launching requested ML instances......
2020-11-06 06:17:25 Starting - Preparing the instances for training......
2020-11-06 06:18:48 Downloading - Downloading input data...
2020-11-06 06:19:10 Training - Downloading the training image..[34mArguments: train[0m
[34m[11/06/2020 06:19:24 INFO 140080725144960] nvidia-smi took: 0.02536940574645996 secs to identify 0 gpus[0m
[34m[11/06/2020 06:19:24 INFO 140080725144960] Running single machine CPU BlazingText training using supervised mode.[0m
[34mNumber of CPU sockets found in instance is  1[0m
[34m[11/06/2020 06:19:24 INFO 140080725144960] Processing /opt/ml/input/data/train/guidelines.train . File size: 0.16891956329345703 MB[0m
[34m[11/06/2020 06:19:24 INFO 140080725144960] Processing /opt/ml/input/data/validation/guidelines.validation . File size: 0.04774284362792969 MB[0m
[34mRead 0M words[0m
[34mNumber of words:  1221[0m
[34mLoad

## Hosting / Inference
Once the training is done, we can deploy the trained model as an Amazon SageMaker real-time hosted endpoint. This will allow us to make predictions (or inference) from the model. Note that we don't have to host on the same type of instance that we used to train. Because instance endpoints will be up and running for long, it's advisable to choose a cheaper instance for inference.

In [56]:
text_classifier = guidelines_model.deploy(initial_instance_count = 1,instance_type = 'ml.m4.xlarge', endpoint_name = 'end-point-name')

Parameter image will be renamed to image_uri in SageMaker Python SDK v2.
Using already existing model: blazingtext-2020-11-06-06-16-17-570


-------------!

### You can use the following deployment script in a separt notebook

#### Use JSON format for inference
BlazingText supports `application/json` as the content-type for inference. The payload should contain a list of sentences with the key as "**instances**" while being passed to the endpoint.

In [13]:
import sagemaker
from sagemaker import get_execution_role
#import json
#import boto3

sess = sagemaker.Session()

#role = get_execution_role()

In [14]:
# Get RealTimePredictor using SageMaker SDK
# Specify Your Endpoint Name
endpoint_name = 'end-point-name'

predictor = sagemaker.predictor.RealTimePredictor(endpoint=endpoint_name,
                                                 sagemaker_session=sess)

In [15]:
import json
import nltk
nltk.download('punkt')

[nltk_data] Downloading package punkt to /home/ec2-user/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [16]:
import re
def clean(text):
    text = re.sub('[^a-zA-Z0-9]', ' ', text)
    text = text.split()
    text = " ".join(text)
    return text

In [17]:
#single inference #ppe
sentences = ["Recommended generally. Encourage use of cloth face coverings by employees whose duties require close contact with other employees and/or the public.  Personal care services: required."]

# using the same nltk tokenizer that we used during data preparation for training
tokenized_sentences = [' '.join(nltk.word_tokenize(sent)) for sent in sentences]
#print(tokenized_sentences)
payload = {"instances" : tokenized_sentences}

response = predictor.predict(json.dumps(payload))

predictions = json.loads(response)
print(json.dumps(predictions, indent=2))

[
  {
    "label": [
      "__label__GD-102"
    ],
    "prob": [
      0.663482666015625
    ]
  }
]


In [18]:
#single inference #operations restrictions
sentences = ["Food establishment occupancy limits: 50% of capacity in Yellow Phase; increases to 75% in Green Phase and to 100% in Blue Phase.  Additional occupancy guidelines and other operating requirements apply to personal care services, movie theaters, and fitness centers. Refer to Smart Restart industry specific protocols."]

# using the same nltk tokenizer that we used during data preparation for training
tokenized_sentences = [' '.join(nltk.word_tokenize(sent)) for sent in sentences]
#print(tokenized_sentences)
payload = {"instances" : tokenized_sentences}

response = predictor.predict(json.dumps(payload))

predictions = json.loads(response)
print(json.dumps(predictions, indent=2))

[
  {
    "label": [
      "__label__GD-101"
    ],
    "prob": [
      0.9612232446670532
    ]
  }
]


In [19]:
#single inference #operations restrictions #business reopen
sentences = ["May 1: reopening all closed businesses. Restaurants that remained open for takeout and delivery may not reopen for dine-in until May 1.  May 15: large event venues, xa0sports and recreation venues, entertainment venues  May 29: state enters Green Phase  September xa04: state begins county-by-county risk level assessment. Requirements for businesses with respect to mitigation protocols varies by risk level."]

# using the same nltk tokenizer that we used during data preparation for training
tokenized_sentences = [' '.join(nltk.word_tokenize(sent)) for sent in sentences]
#print(tokenized_sentences)
payload = {"instances" : tokenized_sentences}

response = predictor.predict(json.dumps(payload))

predictions = json.loads(response)
print(json.dumps(predictions, indent=2))              
                

[
  {
    "label": [
      "__label__GD-100"
    ],
    "prob": [
      0.5057657957077026
    ]
  }
]


In [13]:
import pandas as pd
guidelines = pd.read_csv('testfile_new2.csv', header=None)
guidelines.columns = ['id','l_id','datasource_id','source_day_id','News','Links','s_upsertTimestamp','w_upsertTimestamp']
guidelines['News'] = guidelines['News'].astype(str)
guidelines['News'] = guidelines['News'].apply(lambda x: x.lower())
guidelines['News'] = guidelines['News'].apply(lambda x: clean(x))
guidelines.head(1)

In [21]:
guide_lines = guidelines[['id', 'News']]

In [23]:
sentences = list(guide_lines.News)
tokenized_sentences = sentences
payload = {"instances" : tokenized_sentences}
print('payload created')
response = predictor.predict(json.dumps(payload))
predictions = json.loads(response)
label_list = []
for i in range(len(sentences)):
       label_list.append(predictions[i]['label'][0].split('__label__')[1])
final_df = pd.DataFrame(label_list, columns=['label'])

payload created


In [14]:
final_df.head() #only predicted column

In [15]:
import datetime as dt
guide_lines['label'] = final_df['label']
final_df = guide_lines[['id', 'label']]
final_df.columns = ['table_id', 'label']

final_df['table_name'] = 'covid_data.newstable'
final_df['algorithm'] = 'blazing text'
final_df['label_type'] = 'categorization'
final_df['w_upsert_timestamp'] = pd.Series([dt.datetime.now()] * len(final_df))
    
final_df = final_df[['table_name', 'algorithm', 'table_w_id', 'label_type', 'label', 'w_upsert_timestamp']]
final_df.head()

In [16]:
final_df.label.unique()

In [27]:
#write to local
final_df.to_csv('predicted_Guidelinesfile_push_to_redshift_new_labels.csv', index=False)

In [None]:
#write S3 bucket
final_df.to_csv('s3://bucket/blazing/outputfile/predicted_file_push_to_redshift.csv', index=False)

In [64]:
#sess.delete_endpoint(text_classifier.endpoint)