In [None]:
# please ignore warning messages during the installation
!pip install --disable-pip-version-check -q sagemaker==2.35.0
!conda install -q -y pytorch==1.6.0 -c pytorch
!pip install --disable-pip-version-check -q transformers==3.5.1

In [None]:
import importlib
import sys
import time

import boto3
from IPython.core.display import display, HTML
import pandas as pd
import sagemaker
from sagemaker.feature_store.feature_definition import (
    FeatureDefinition, FeatureTypeEnum)
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
import seaborn as sns

sys.path.append('src/')
import prepare_data

In [None]:
sess = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sess.default_bucket()
region = boto3.Session().region_name

# 1. Configure the SageMaker Feature Store

In [None]:
raw_input_data_s3_uri = 's3://dlai-practical-data-science/data/raw/'
print(raw_input_data_s3_uri)

In [None]:
!aws s3 ls $raw_input_data_s3_uri

In [None]:
timestamp = int(time.time())
feature_group_name = f'reviews-feature-group-{timestamp}'
feature_store_offline_prefix = f'reviews-feature-store-{timestamp}''
print(f'Feature group name: {feature_group_name}')
print(
    f'Feature store offline prefix in S3: {feature_store_offline_prefix}'

In [None]:
feature_definitions= [
    # unique ID of the review
    FeatureDefinition(feature_name='review_id', 
                      feature_type=FeatureTypeEnum.STRING), 
    # ingestion timestamp
    FeatureDefinition(feature_name='date', 
                      feature_type=FeatureTypeEnum.STRING),
    # sentiment: -1 (negative), 0 (neutral) or 1 (positive). 
    # It will be found the Rating values (1, 2, 3, 4, 5)
    FeatureDefinition(feature_name='sentiment', 
                      feature_type=FeatureTypeEnum.STRING), 
    # label ID of the target class (sentiment)
    FeatureDefinition(feature_name='label_id', 
                      feature_type=FeatureTypeEnum.STRING),
    # reviews encoded with the BERT tokenizer
    FeatureDefinition(feature_name='input_ids', 
                      feature_type=FeatureTypeEnum.STRING),
    # original Review Text
    FeatureDefinition(feature_name='review_body', 
                      feature_type=FeatureTypeEnum.STRING),
    # train/validation/test label
    FeatureDefinition(feature_name='split_type', 
                      feature_type=FeatureTypeEnum.STRING)]

In [None]:
feature_group = FeatureGroup(
    name=feature_group_name,
    feature_definitions=feature_definitions,
    sagemaker_session=sess)
print(feature_group)

# 2. Transform the dataset

In [None]:
processing_instance_type = 'ml.c5.xlarge'
processing_instance_count = 1
train_split_percentage = 0.90
validation_split_percentage = 0.05
test_split_percentage = 0.05
balance_dataset = True
max_seq_length = 128

In [None]:
processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={'AWS_DEFAULT_REGION': region},                             
    max_runtime_in_seconds=7200)

In [None]:
# reload the module if it has been previously loaded 
if 'prepare_data' in sys.modules:
    importlib.reload(prepare_data)

input_ids = prepare_data.convert_to_bert_input_ids(
    "this product is great!", max_seq_length)
updated_correctly = False
if len(input_ids) != max_seq_length:
    print('#' * 40)
    print('Please check that the function "convert_to_bert_input_ids" in '
          'the file src/prepare_data.py is complete.')
    print('#' * 40)
    raise Exception(
        'Please check that the function "convert_to_bert_input_ids" in '
        'the file src/prepare_data.py is complete.')
else:
    print('##################')
    print('Updated correctly!')
    print('##################')
    updated_correctly = True

In [None]:
input_ids = prepare_data.convert_to_bert_input_ids(
    "this product is great!", max_seq_length)
print(input_ids)
print(f'Length of the sequence: {len(input_ids)}')

In [None]:
if (updated_correctly):
    processor.run(
        code='src/prepare_data.py',
        inputs=[ProcessingInput(
            source=raw_input_data_s3_uri,
            destination='/opt/ml/processing/input/data/',
            s3_data_distribution_type='ShardedByS3Key')],
        outputs=[
            ProcessingOutput(
                output_name='sentiment-train',
                source='/opt/ml/processing/output/sentiment/train',
                s3_upload_mode='EndOfJob'),
            ProcessingOutput(
                output_name='sentiment-validation',
                source='/opt/ml/processing/output/sentiment/validation',
                s3_upload_mode='EndOfJob'),
            ProcessingOutput(
                output_name='sentiment-test',
                source='/opt/ml/processing/output/sentiment/test',
                s3_upload_mode='EndOfJob')],
        arguments=[
            '--train-split-percentage', str(train_split_percentage),
            '--validation-split-percentage', 
            str(validation_split_percentage),
            '--test-split-percentage', str(test_split_percentage),
            '--balance-dataset', str(balance_dataset),
            '--max-seq-length', str(max_seq_length),                         
            '--feature-store-offline-prefix', 
            str(feature_store_offline_prefix),
            '--feature-group-name', str(feature_group_name)],
        logs=True,
        wait=False)
else:
    print('#######################################')
    print('Please update the code correctly above.')
    print('#######################################')    

In [None]:
scikit_processing_job_name = (
    processor.jobs[-1].describe()['ProcessingJobName'])
print('Processing job name: {}'.format(scikit_processing_job_name))

In [None]:
print(processor.jobs[-1].describe().keys())

In [None]:
scikit_processing_job_status = (
    processor.jobs[-1].describe()['ProcessingJobStatus'])
print('Processing job status: {}'.format(scikit_processing_job_status))

In [None]:
display(
    HTML(
        f'<b>Review <a target="blank" href="https://console.aws.amazon.'
        f'com/sagemaker/home?region={region}#/processing-jobs/'
        f'{scikit_processing_job_name}">'
        f'processing job</a></b>'))

In [None]:
display(
    HTML(
        f'<b>Review <a target="blank" href="https://console.aws.amazon.'
        f'com/cloudwatch/home?region={region}#logStream:group=/aws/'
        f'sagemaker/ProcessingJobs;prefix={scikit_processing_job_name};'
        f'streamFilter=typeLogStreamPrefix">CloudWatch logs</a> after '
        'about 5 minutes</b>'))

In [None]:
display(
    HTML(
        f'<b>Review <a target="blank" href="https://s3.console.aws.amazon'
        f'.com/s3/buckets/{bucket}/{scikit_processing_job_name}/'
        f'?region={region}&tab=overview">S3 output data</a> after the '
        'processing job has completed</b>'))

In [None]:
%%time

running_processor = (
    sagemaker.processing.ProcessingJob.from_processing_name(
        processing_job_name=scikit_processing_job_name,
        sagemaker_session=sess))
running_processor.wait(logs=False)

In [None]:
processing_job_description = running_processor.describe()
output_config = processing_job_description['ProcessingOutputConfig']
for output in output_config['Outputs']:
    if output['OutputName'] == 'sentiment-train':
        processed_train_data_s3_uri = output['S3Output']['S3Uri']
    if output['OutputName'] == 'sentiment-validation':
        processed_validation_data_s3_uri = output['S3Output']['S3Uri']
    if output['OutputName'] == 'sentiment-test':
        processed_test_data_s3_uri = output['S3Output']['S3Uri']
print(processed_train_data_s3_uri)
print(processed_validation_data_s3_uri)
print(processed_test_data_s3_uri)

In [None]:
!aws s3 ls $processed_train_data_s3_uri/

In [None]:
!aws s3 ls $processed_validation_data_s3_uri/

In [None]:
!aws s3 ls $processed_test_data_s3_uri/

In [None]:
!aws s3 cp $processed_train_data_s3_uri/part-algo-1-womens_clothing_ecommerce_reviews.tsv ./balanced/sentiment-train/
!aws s3 cp $processed_validation_data_s3_uri/part-algo-1-womens_clothing_ecommerce_reviews.tsv ./balanced/sentiment-validation/
!aws s3 cp $processed_test_data_s3_uri/part-algo-1-womens_clothing_ecommerce_reviews.tsv ./balanced/sentiment-test/

In [None]:
!head -n 5 ./balanced/sentiment-train/part-algo-1-womens_clothing_ecommerce_reviews.tsv

In [None]:
!head -n 5 ./balanced/sentiment-validation/part-algo-1-womens_clothing_ecommerce_reviews.tsv

In [None]:
!head -n 5 ./balanced/sentiment-test/part-algo-1-womens_clothing_ecommerce_reviews.tsv

# 3. Query the Feature Store

In [None]:
feature_store_query = feature_group.athena_query()
feature_store_table = feature_store_query.table_name
query_string = f"""
    SELECT date,
        review_id,
        sentiment, 
        label_id,
        input_ids,
        review_body
    FROM "{feature_store_table}" 
    WHERE split_type='train' 
    LIMIT 5
"""
print(f'Glue Catalog table name: {feature_store_table}')
print(f'Running query: {query_string}'

In [None]:
output_s3_uri = (f's3://{bucket}/query_results/'
                 f'{feature_store_offline_prefix}/')
print(output_s3_uri)

In [None]:
feature_store_query.run(query_string=query_string,
                        output_location=output_s3_uri)
feature_store_query.wait()

In [None]:
pd.set_option("max_colwidth", 100)
df_feature_store = feature_store_query.as_dataframe()
df_feature_store

In [None]:
df_feature_store.to_csv(
    './feature_store_export.tsv',
    sep='\t',
    index=False,
    header=True)

In [None]:
!head -n 5 ./feature_store_export.tsv

In [None]:
!aws s3 cp ./feature_store_export.tsv s3://$bucket/feature_store/feature_store_export.tsv

In [None]:
!aws s3 ls --recursive s3://$bucket/feature_store/feature_store_export.tsv

In [None]:
feature_store_query_2 = feature_group.athena_query()
query_string_count_by_sentiment = f"""
    SELECT sentiment, COUNT(sentiment) AS count_reviews
    FROM "{feature_store_table}"
    GROUP BY sentiment
"""

In [None]:
feature_store_query_2.run(
    query_string=query_string_count_by_sentiment,
    output_location=output_s3_uri)
feature_store_query_2.wait()
df_count_by_sentiment = feature_store_query_2.as_dataframe()
df_count_by_sentiment

In [None]:
sns.barplot(data=df_count_by_sentiment,
            x='sentiment',
            y='count_reviews',
            color="blue");

In [None]:
!aws s3 cp ./C2_W1_Assignment.ipynb s3://$bucket/C2_W1_Assignment_Learner.ipynb
!aws s3 cp ./src/prepare_data.py s3://$bucket/src/C2_W1_prepare_data_Learner.py