In [2]:
# please ignore warning messages during the installation
!pip install --disable-pip-version-check -q sagemaker==2.35.0
!conda install -q -y pytorch==1.6.0 -c pytorch
!pip install --disable-pip-version-check -q transformers==3.5.1

[0mCollecting package metadata (current_repodata.json): ...working... done
Solving environment: ...working... failed with initial frozen solve. Retrying with flexible solve.
Collecting package metadata (repodata.json): ...working... done
Solving environment: ...working... done

## Package Plan ##

  environment location: /opt/conda

  added / updated specs:
    - pytorch==1.6.0


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    ca-certificates-2023.01.10 |       h06a4308_0         120 KB
    certifi-2022.12.7          |   py37h06a4308_0         150 KB
    cudatoolkit-10.2.89        |       hfd86e86_1       365.1 MB
    ninja-1.10.2               |       h06a4308_5           8 KB
    ninja-base-1.10.2          |       hd09550d_5         109 KB
    openssl-1.1.1t             |       h7f8727e_0         3.7 MB
    pytorch-1.6.0              |py3.7_cuda10.2.89_cudnn7.6.5_0       537.7 MB  pyt

In [3]:
import boto3
import sagemaker
import botocore

config = botocore.config.Config(user_agent_extra='dlai-pds/c2/w1')

# low-level service client of the boto3 session
sm = boto3.client(service_name='sagemaker', 
                  config=config)

featurestore_runtime = boto3.client(service_name='sagemaker-featurestore-runtime', 
                                    config=config)

sess = sagemaker.Session(sagemaker_client=sm,
                         sagemaker_featurestore_runtime_client=featurestore_runtime)

bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = sess.boto_region_name

### 1. Configure the Sagemaker Feature Store

In [35]:
# The source of the raw data
raw_input_data_s3_uri = 's3://{}/dlai-practical-data-science/data/raw/'.format(bucket)
!aws s3 ls $raw_input_data_s3_uri

2023-03-27 11:18:58          0 
2023-03-27 11:23:01    8457214 womens_clothing_ecommerce_reviews.csv


### 1.2 Configure the SageMaker feature store

In [6]:
import time
timestamp = int(time.time())

feature_group_name  = 'reviews-feature-group-' + str(timestamp)
feature_store_offline_prefix = 'review-feature-offline-store' + str(timestamp)

print('Feature group name: {}'.format(feature_group_name))
print('Feature store offline prefix in S3: {}'.format(feature_store_offline_prefix))

Feature group name: reviews-feature-group-1679908151
Feature store offline prefix in S3: review-feature-offline-store1679908151


In [7]:
from sagemaker.feature_store.feature_definition import (
    FeatureDefinition,
    FeatureTypeEnum
)

feature_definitions= [
    # unique ID of the review
    FeatureDefinition(feature_name='review_id', feature_type=FeatureTypeEnum.STRING), 
    # ingestion timestamp
    FeatureDefinition(feature_name='date', feature_type=FeatureTypeEnum.STRING),
    # sentiment: -1 (negative), 0 (neutral) or 1 (positive). It will be found the Rating values (1, 2, 3, 4, 5)
    FeatureDefinition(feature_name='sentiment', feature_type=FeatureTypeEnum.STRING), 
    # label ID of the target class (sentiment)
    FeatureDefinition(feature_name='label_id', feature_type=FeatureTypeEnum.STRING),
    # reviews encoded with the BERT tokenizer
    FeatureDefinition(feature_name='input_ids', feature_type=FeatureTypeEnum.STRING),
    # original Review Text
    FeatureDefinition(feature_name='review_body', feature_type=FeatureTypeEnum.STRING),
    # train/validation/test label
    FeatureDefinition(feature_name='split_type', feature_type=FeatureTypeEnum.STRING)
]

In [9]:
from sagemaker.feature_store.feature_group import FeatureGroup
feature_group  = FeatureGroup (
    name = feature_group_name,
    feature_definitions = feature_definitions,
    sagemaker_session=sess
)

print(feature_group)

FeatureGroup(name='reviews-feature-group-1679908151', sagemaker_session=<sagemaker.session.Session object at 0x7f1fa7539390>, feature_definitions=[FeatureDefinition(feature_name='review_id', feature_type=<FeatureTypeEnum.STRING: 'String'>), FeatureDefinition(feature_name='date', feature_type=<FeatureTypeEnum.STRING: 'String'>), FeatureDefinition(feature_name='sentiment', feature_type=<FeatureTypeEnum.STRING: 'String'>), FeatureDefinition(feature_name='label_id', feature_type=<FeatureTypeEnum.STRING: 'String'>), FeatureDefinition(feature_name='input_ids', feature_type=<FeatureTypeEnum.STRING: 'String'>), FeatureDefinition(feature_name='review_body', feature_type=<FeatureTypeEnum.STRING: 'String'>), FeatureDefinition(feature_name='split_type', feature_type=<FeatureTypeEnum.STRING: 'String'>)])


### 2. Transform the dataset

In [20]:
from sagemaker.sklearn.processing import SKLearnProcessor

# Define processing job related configure

processing_instance_type='ml.t3.medium'
processing_instance_count=1
train_split_percentage=0.90
validation_split_percentage=0.05
test_split_percentage=0.05
balance_dataset=True
max_seq_length=128


# SKLearnProcessor lets you run scripts inside of processing jobs using the scikit-learn image provided.
sklearn_processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={'AWS_DEFAULT_REGION': region},                             
    max_runtime_in_seconds=7200
)

print(processor)

<sagemaker.sklearn.processing.SKLearnProcessor object at 0x7f1fa5a98c10>


### Excercise 2

Update prepare data script

In [18]:
import sys, importlib
# adding module path
sys.path.append('src/')

# import the `prepare_data.py` module
import prepare_data

# reload the module if it has been previously loaded 
# interesting to learn sys.modules
if 'prepare_data' in sys.modules:
    importlib.reload(prepare_data)

input_ids = prepare_data.convert_to_bert_input_ids("this product is great!", max_seq_length)
    
updated_correctly = False

if len(input_ids) != max_seq_length:
    print('#######################################################################################################')
    print('Please check that the function \'convert_to_bert_input_ids\' in the file src/prepare_data.py is complete.')
    print('#######################################################################################################')
    raise Exception('Please check that the function \'convert_to_bert_input_ids\' in the file src/prepare_data.py is complete.')
else:
    print('##################')
    print('Updated correctly!')
    print('##################')

    updated_correctly = True

HBox(children=(FloatProgress(value=0.0, description='Downloading', max=898823.0, style=ProgressStyle(descripti…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=456318.0, style=ProgressStyle(descripti…


##################
Updated correctly!
##################


In [16]:
# check tokenised inputs
print(input_ids)
print('Length of the sequence: {}'.format(len(input_ids)))

False


In [26]:
from sagemaker.processing import ProcessingInput, ProcessingOutput

if(updated_correctly):
    sklearn_processor.run(code = 'src/prepare_data.py',
                          inputs=[
                              ProcessingInput(source=raw_input_data_s3_uri,
                                              destination='/opt/ml/processing/input/data/',
                                              s3_data_distribution_type='ShardedByS3Key')
                          ],
                          outputs=[
                                ProcessingOutput(output_name='sentiment-train',
                                                 # this aligns with what configured in prepare_data.py 
                                                source='/opt/ml/processing/output/sentiment/train',
                                                s3_upload_mode='EndOfJob'),
                                ProcessingOutput(output_name='sentiment-validation',
                                                 source='/opt/ml/processing/output/sentiment/validation',
                                                 s3_upload_mode='EndOfJob'),
                                ProcessingOutput(output_name='sentiment-test',
                                                 source='/opt/ml/processing/output/sentiment/test',
                                                 s3_upload_mode='EndOfJob')
                          ],
                          arguments=['--train-split-percentage', str(train_split_percentage),
                         '--validation-split-percentage', str(validation_split_percentage),
                         '--test-split-percentage', str(test_split_percentage),
                         '--balance-dataset', str(balance_dataset),
                         '--max-seq-length', str(max_seq_length),                         
                         '--feature-store-offline-prefix', str(feature_store_offline_prefix),
                         '--feature-group-name', str(feature_group_name)                         
                          ],
                          logs=True,
                          wait=False
                         )
else:
    print('#######################################')
    print('Please update the code correctly above.')
    print('#######################################')    


Job Name:  sagemaker-scikit-learn-2023-03-27-10-46-31-801
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://dlai-practical-data-science/data/raw/', 'LocalPath': '/opt/ml/processing/input/data/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'ShardedByS3Key', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-ap-southeast-2-288344227581/sagemaker-scikit-learn-2023-03-27-10-46-31-801/input/code/prepare_data.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'sentiment-train', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-ap-southeast-2-288344227581/sagemaker-scikit-learn-2023-03-27-10-46-31-801/output/sentiment-train', 'LocalPath': '/opt/ml/processing/output/sentiment/train', 'S3UploadMode': 'EndOfJob'}}, 

ClientError: An error occurred (ValidationException) when calling the CreateProcessingJob operation: No S3 objects found under S3 URL "s3://dlai-practical-data-science/data/raw/" given in input data source. Please ensure that the bucket exists in the selected region (ap-southeast-2), that objects exist under that S3 prefix, and that the role "arn:aws:iam::288344227581:role/service-role/AmazonSageMaker-ExecutionRole-20221130T205935" has "s3:ListBucket" permissions on bucket "dlai-practical-data-science". Error message from S3: The bucket is in this region: us-east-1. Please use this region to retry the request

In [None]:
scikit_processing_job_name = sklearn_processor.jobs[-1].describe()['ProcessingJobName']
print('Processing job name: {}'.format(scikit_processing_job_name))
print(processor.jobs[-1].describe().keys())
scikit_processing_job_status = processor.jobs[-1].describe()['ProcessingJobStatus']
print('Processing job status: {}'.format(scikit_processing_job_status))

In [22]:
# check processing job, cloudwatch

In [None]:
%%time

scikit_processing_job_name = ''

running_processor_job = sagemaker.processing.ProcessingJob.from_processing_name(
    processing_job_name = scikit_processing_job_name,
    sagemaker_session=session 
)
running_processor_job.wait(logs=True)

### Check transformed data

In [None]:
processing_job_description  = running_processor.describe()

output_config = processing_job_description['ProcessingOutputConfig']
for output in output_config['Outputs']:
    if output['OutputName'] == 'sentiment-train':
         processed_train_data_s3_uri = output['S3Output']['S3Uri']
    elif output['OutputName'] == 'sentiment-validation':
        processed_validation_data_s3_uri = output['S3Output']['S3Uri']
    elif output['OutputName'] == 'sentiment-test':
         processed_test_data_s3_uri = output['S3Output']['S3Uri']

print(processed_train_data_s3_uri)
print(processed_validation_data_s3_uri)
print(processed_test_data_s3_uri)

In [None]:
# Copy the data into folder balanced

!aws s3 cp $processed_train_data_s3_uri/'' ./balanced/sentiment-train/
!aws s3 cp $processed_validation_data_s3_uri/'' ./balanced/sentiment-validation/
!aws s3 cp $processed_test_data_s3_uri/'' ./balanced/sentiment-test/

# review

!head -n 5 ./balanced/sentiment-train/''