In [None]:
import sys

!{sys.executable} -m pip install "sagemaker>=2.99.0"

In [1]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.session import Session
import boto3
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.workflow.pipeline_context import PipelineSession

In [6]:
sagemaker_session=sagemaker.Session()
pipeline_session = PipelineSession()
bucket = sagemaker_session.default_bucket()

role = get_execution_role()
prefix = 'chapter9/data'

print('Training input/output will be stored in {}/{}'.format(bucket, prefix))
print('\nIAM Role: {}'.format(role))

Training input/output will be stored in sagemaker-us-east-1-485822383573/chapter9/data

IAM Role: arn:aws:iam::485822383573:role/service-role/AmazonSageMaker-ExecutionRole-20220426T122295


In [7]:
%%writefile scripts/preprocessing.py
import csv
import wget
import zipfile
import os
import pandas as pd
import boto3
import time
import json
import argparse
from sklearn.preprocessing import Normalizer
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np

parser = argparse.ArgumentParser()
parser.add_argument('--bucket', type=str)
parser.add_argument('--region',type=str)
args = parser.parse_args()

bucket=args.bucket
cm = boto3.client('comprehendmedical',region_name=args.region)
s3_client = boto3.client('s3',region_name=args.region)

if os.path.exists('data')==False:
    os.mkdir('data')

file_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/00462/drugsCom_raw.zip'
dest_file = 'data/drugsCom_raw.zip'

print("Downloading source files...")

wget.download(file_url, dest_file)

with zipfile.ZipFile('data/drugsCom_raw.zip', 'r') as zip_ref:
    zip_ref.extractall('data')

os.remove('data/drugsCom_raw.zip')

orig_list = list()
for filename in os.listdir('data'):
    with open('data/'+filename) as csvfile:
        myreader = csv.reader(csvfile, delimiter='\t')
        for row in myreader:
            if row[0] == '':
                continue
            else:
                orig_list.append({
                    'id': row[0],
                    'drugName': row[1],
                    'condition': row[2],
                    'review': row[3]
                })

    
if os.path.exists('processed_data')==False:
    os.mkdir('processed_data')
    
raw_df=pd.DataFrame.from_records(orig_list)
raw_df.to_csv('processed_data/raw_df.csv', index=False)

print("\nRaw data processed from input files")
print("\nRamdomly sampling 100 rows for topic extraction")

df_sample=raw_df.sample(n=100)
sample_list = list()


for index,row in df_sample.iterrows():
    entities = cm.detect_entities(Text=row['review'])
    topic_list = []
    for entity in entities['Entities']:
        if entity['Category'] == 'MEDICAL_CONDITION':
            topic_list.append(entity['Text'])

    sample_list.append({
            'id': row['id'],
            'drugName': row['drugName'],
            'condition': row['condition'],
            'review': row['review'],
            'topics': topic_list[:5]
        })
        
sample_df=pd.DataFrame.from_records(sample_list)

sample_df.to_csv('processed_data/sample_df.csv', index=False) 


sampled_topics=pd.read_csv('processed_data/sample_df.csv')['topics'].tolist()
print(sampled_topics)
vectorizer = TfidfVectorizer()
vecs = vectorizer.fit_transform(sampled_topics)
normalizer = Normalizer(copy=False)
normalized_data = normalizer.fit_transform(vecs).toarray()
normalized_data.shape
np.savetxt("processed_data/prediction_data.csv", normalized_data, delimiter=",")




s3_client.upload_file('processed_data/sample_df.csv', bucket, 'chapter9/data/sample_df.csv')
s3_client.upload_file('processed_data/raw_df.csv', bucket, 'chapter9/data/raw_df.csv')
s3_client.upload_file('processed_data/prediction_data.csv', bucket, 'chapter9/data/prediction_data.csv')


print("\nprocessed files uploaded to s3")

Overwriting scripts/preprocessing.py


In [8]:
#!python scripts/preprocessing.py --bucket $bucket --region $sagemaker_session.boto_session.region_name 

In [9]:
%%sh

docker_name=sagemaker-preprocessing
account=$(aws sts get-caller-identity --query Account --output text)
echo $account
region=$(aws configure get region)

fullname="${account}.dkr.ecr.${region}.amazonaws.com/${docker_name}:latest"
# If the repository doesn't exist in ECR, create it.
aws ecr describe-repositories --repository-names "${docker_name}" > /dev/null 2>&1
if [ $? -ne 0 ]
then
    aws ecr create-repository --repository-name "${docker_name}" > /dev/null
fi

# Get the login command from ECR and execute it directly
$(aws ecr get-login --region ${region} --no-include-email)
docker build -t $docker_name -f scripts/Dockerfile .
docker tag ${docker_name} ${fullname}
docker push ${fullname}

485822383573
Login Succeeded

Step 1/8 : FROM python:3.7-slim-buster
 ---> 8fe6e55c0412
Step 2/8 : RUN pip install pandas
 ---> Using cache
 ---> ed3c2aadaa6e
Step 3/8 : RUN pip install wget
 ---> Using cache
 ---> 93dc76d1c100
Step 4/8 : RUN pip install boto3
 ---> Using cache
 ---> 43acfff1ec93
Step 5/8 : RUN pip install sagemaker
 ---> Using cache
 ---> 0a0768240618
Step 6/8 : RUN pip install scikit-learn
 ---> Using cache
 ---> 2ce6c8fc1e49
Step 7/8 : ENV PYTHONUNBUFFERED=TRUE
 ---> Using cache
 ---> f68470613295
Step 8/8 : ENTRYPOINT ["python3"]
 ---> Using cache
 ---> 41fbb5b0e27c
Successfully built 41fbb5b0e27c
Successfully tagged sagemaker-preprocessing:latest
The push refers to repository [485822383573.dkr.ecr.us-east-1.amazonaws.com/sagemaker-preprocessing]
20e4ef78b000: Preparing
1cd08d11abf2: Preparing
073fe9ab5fca: Preparing
873a3963f49c: Preparing
32682a294d34: Preparing
cd77cebc5d3e: Preparing
c899963fae46: Preparing
353cc9dc1c96: Preparing
c89d0deb3e29: Preparing
73595

https://docs.docker.com/engine/reference/commandline/login/#credentials-store



In [10]:
from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.steps import ProcessingStep

docker_name = "sagemaker-preprocessing"
account = sagemaker_session.boto_session.client("sts").get_caller_identity()["Account"]
region = sagemaker_session.boto_session.region_name
image = "{}.dkr.ecr.{}.amazonaws.com/{}:latest".format(account, region, docker_name)
print(image)
script_processor = ScriptProcessor(image_uri=image,
                role=role,
                instance_count=1,
                instance_type='ml.m5.xlarge',
                command=['python3'],
                sagemaker_session=pipeline_session)


processor_args=script_processor.run(code='scripts/preprocessing.py',
                    arguments = ["--bucket",bucket,'--region',region])



step_process = ProcessingStep(
    name="PreprocessData",
    step_args=processor_args,
)


485822383573.dkr.ecr.us-east-1.amazonaws.com/sagemaker-preprocessing:latest





Job Name:  sagemaker-preprocessing-2022-08-16-01-24-34-435
Inputs:  [{'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-485822383573/sagemaker-preprocessing-2022-08-16-01-24-34-435/input/code/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  []


In [11]:
%%writefile scripts/train.py

import argparse
import os
import pandas as pd
from sklearn.cluster import KMeans
from sklearn.externals import joblib
from sklearn.preprocessing import Normalizer
from sklearn.feature_extraction.text import TfidfVectorizer
from io import StringIO

def model_fn(model_dir):
    
    kmeans = joblib.load(os.path.join(model_dir, "kmeansmodel.joblib"))
    return kmeans

def input_fn(input_data, content_type):
    
    if content_type == "text/csv":
        # Read the raw input data as CSV.
        df = pd.read_csv(StringIO(input_data), header=None)
        return df
    else:
        raise ValueError("{} not supported by script!".format(content_type))

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    
    # Hyperparameters are described here. In this simple example we are just including one hyperparameter.
    parser.add_argument('--n_clusters', type=int, default=2)
    parser.add_argument('--random_state', type=int, default=0)
    
    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--training', type=str, default=os.environ['SM_CHANNEL_TRAINING'])

    args = parser.parse_args()
    
    input_files = [ os.path.join(args.training, file) for file in os.listdir(args.training) ]
    if len(input_files) == 0:
        raise ValueError(('There are no files in {}.\n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                          'the data specification in S3 was incorrectly specified or the role specified\n' +
                          'does not have permission to access the data.').format(args.training, "train"))
    
    raw_data = [ pd.read_csv(file) for file in input_files ]
    train_data = pd.concat(raw_data)
    print(train_data.shape)
    kmeans = KMeans(n_clusters=2,random_state=0).fit(train_data)
    joblib.dump(kmeans, os.path.join(args.model_dir, "kmeansmodel.joblib"))


Overwriting scripts/train.py


In [12]:
from sagemaker.workflow.steps import TrainingStep

model_path= f"s3://{bucket}/{prefix}/model/"


sklearn = SKLearn(
    source_dir='scripts',
    entry_point='train.py',
    instance_type="ml.m4.xlarge",
    role = role,
    sagemaker_session=pipeline_session,
    framework_version='0.20.0',
    output_path=model_path,
    hyperparameters={'n_clusters': 2, 'random_state':0})

train_args=sklearn.fit({'training': 's3://{}/{}/prediction_data.csv'.format(bucket,prefix)})
step_train_model = TrainingStep(name="TrainModel", step_args=train_args)
step_train_model.add_depends_on([step_process])

In [13]:
from sagemaker.model import Model
from sagemaker.sklearn.model import SKLearnModel
from sagemaker.workflow.model_step import ModelStep



clustering_model = SKLearnModel(
    model_data=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
    role=role,
    sagemaker_session=pipeline_session,
    entry_point="scripts/train.py",
    framework_version='0.20.0',
    
)



register_model_step_args = clustering_model.register(
    content_types=["text/csv"],
   response_types=["text/csv"],
   inference_instances=["ml.t2.medium"],
   model_package_group_name='adverse-event-clustering'
)

step_register=ModelStep(name='adverse-event-clustering-model', step_args=register_model_step_args)

In [14]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name="adverse-drug-reaction",
    steps=[step_process, step_train_model, step_register]
)

In [15]:
import json
definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'PreprocessData',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': 1,
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '485822383573.dkr.ecr.us-east-1.amazonaws.com/sagemaker-preprocessing:latest',
     'ContainerArguments': ['--bucket',
      'sagemaker-us-east-1-485822383573',
      '--region',
      'us-east-1'],
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processing/input/code/preprocessing.py']},
    'RoleArn': 'arn:aws:iam::485822383573:role/service-role/AmazonSageMaker-ExecutionRole-20220426T122295',
    'ProcessingInputs': [{'InputName': 'code',
      'AppManaged': False,
      'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-485822383573/sagem

In [16]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:485822383573:pipeline/adverse-drug-reaction',
 'ResponseMetadata': {'RequestId': '21caa611-b96e-4e37-af34-b6c47e70e945',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '21caa611-b96e-4e37-af34-b6c47e70e945',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '89',
   'date': 'Tue, 16 Aug 2022 01:25:25 GMT'},
  'RetryAttempts': 0}}

In [17]:
execution = pipeline.start()

In [18]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:485822383573:pipeline/adverse-drug-reaction',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:485822383573:pipeline/adverse-drug-reaction/execution/7hzfj8g9p4o7',
 'PipelineExecutionDisplayName': 'execution-1660613127172',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'adverse-drug-reaction',
  'TrialName': '7hzfj8g9p4o7'},
 'CreationTime': datetime.datetime(2022, 8, 16, 1, 25, 27, 94000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2022, 8, 16, 1, 25, 27, 94000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': '4451c759-bd77-48fc-be65-cf2a383ec9d7',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '4451c759-bd77-48fc-be65-cf2a383ec9d7',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '504',
   'date': 'Tue, 16 Aug 2022 01:25:28 GMT'},
  'RetryAttempts': 0}}

In [19]:
execution.wait()