In [2]:
import uuid
import json
from time import gmtime, strftime
import boto3
import sagemaker
from sagemaker.session import Session
from sagemaker.feature_store.feature_group import FeatureGroup

role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
boto_session = boto3.Session(region_name=region)

account_id = boto3.client('sts').get_caller_identity().get('Account')

suffix=uuid.uuid1().hex[:6] # to be used in resource names

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


In [6]:
pwd

'/root/machine-learning-pipelines-for-multimodal-health-data/imaging/src'

In [4]:
cd src

/root/machine-learning-pipelines-for-multimodal-health-data/imaging/src


In [7]:
!sed -i "s|##REGION##|{region}|g" Dockerfile

In [8]:
!cat Dockerfile

FROM python:3.7-slim-buster

COPY ./requirements.txt /opt/
RUN pip3 install --no-cache-dir -r /opt/requirements.txt
ENV PYTHONUNBUFFERED=TRUE
ENV AWS_DEFAULT_REGION=us-east-1

COPY ./dcm2nifti_processing.py /opt/
COPY ./radiomics_utils.py /opt/

ENTRYPOINT ["python3", "/opt/dcm2nifti_processing.py"]


Build a container image from the Dockerfile

In [9]:
!pip install -q sagemaker-studio-image-build

[0m

In [15]:
!sm-docker build . --repository medical-image-processing-smstudio:1.0

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
................[Container] 2024/06/26 01:25:26.477851 Running on CodeBuild On-demand

[Container] 2024/06/26 01:25:26.477862 Waiting for agent ping
[Container] 2024/06/26 01:25:28.684790 Waiting for DOWNLOAD_SOURCE
[Container] 2024/06/26 01:25:28.923543 Phase is DOWNLOAD_SOURCE
[Container] 2024/06/26 01:25:28.924362 CODEBUILD_SRC_DIR=/codebuild/output/src563659493/src
[Container] 2024/06/26 01:25:28.924816 YAML location is /codebuild/output/src563659493/src/buildspec.yml
[Container] 2024/06/26 01:25:28.926446 Setting HTTP client timeout to higher timeout for S3 source
[Container] 2024/06/26 01:25:28.926583 Processing environment variables
[Container] 2024/06/26 01:25:28.970443 No runtime version selected in buildspec.
[Container] 2024/06/26 01:25:28.984760 Moving to directory /codebuild/outp

Define the input and output data location. Please insert your bucket names to `input_data_bucket` and `output_data_bucket`.

In [11]:
input_data_bucket='sagemaker-solutions-prod-us-east-1'
input_data_prefix='sagemaker-lung-cancer-survival-prediction/1.1.0/data/nsclc_radiogenomics'
input_data_uri='s3://%s/%s' % (input_data_bucket, input_data_prefix)
print(input_data_uri)

s3://sagemaker-solutions-prod-us-east-1/sagemaker-lung-cancer-survival-prediction/1.1.0/data/nsclc_radiogenomics


In [12]:
output_data_bucket=sagemaker_session.default_bucket()
output_data_prefix='nsclc_radiogenomics'
output_data_uri='s3://%s/%s' % (output_data_bucket, output_data_prefix)
print(output_data_uri)

s3://sagemaker-us-east-1-942514891246/nsclc_radiogenomics


Be sure to use the image and tag name defined in `!sm-docker build` command. We will be replacing the placeholders in the Stepfunctions state machine definition json file with your bucket and image uri.

In [16]:
ecr_image_uri='%s.dkr.ecr.%s.amazonaws.com/medical-image-processing-smstudio:1.0' % (account_id, region)

In [17]:
!sed -i "s|##INPUT_DATA_S3URI##|{input_data_uri}|g" nsclc-radiogenomics-imaging-workflow.json
!sed -i "s|##OUTPUT_DATA_S3URI##|{output_data_uri}|g" nsclc-radiogenomics-imaging-workflow.json
!sed -i "s|##ECR_IMAGE_URI##|{ecr_image_uri}|g" nsclc-radiogenomics-imaging-workflow.json
!sed -i "s|##IAM_ROLE_ARN##|{role}|g" nsclc-radiogenomics-imaging-workflow.json

In [18]:
with open('nsclc-radiogenomics-imaging-workflow.json') as f:
    state_machine_json = json.load(f)

We need to create an IAM execution role for the Stepfunctions workflow.

In [19]:
iam = boto3.client('iam')

my_managed_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "events:PutTargets",
                "events:DescribeRule",
                "events:PutRule"
            ],
            "Resource": [
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTrainingJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTransformJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTuningJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForECSTaskRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForBatchJobsRule"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": role,
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": "sagemaker.amazonaws.com"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "sagemaker:CreateProcessingJob",
                "sagemaker:DescribeProcessingJob",
                "sagemaker:ListProcessingJobs",
                "sagemaker:ListTags",
                "sagemaker:StopProcessingJob",
                "sagemaker:AddTags"
            ],
            "Resource": "*"
        }
    ]
}

trust_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
            "Service": ["states.amazonaws.com", "sagemaker.amazonaws.com"]},
            "Action": "sts:AssumeRole"
        }
    ]
}

In [21]:
role_name

'StepFunctionsWorkflowExecutionRole-dcd4788e327911efbca9be616a3e66ff'

In [22]:
policy_name = 'StepFunctionsWorkflowExecutionPolicy-%s' % suffix[:5]
role_name = 'StepFunctionsWorkflowExecutionRole-%s' % suffix[:5]
policy_response = iam.create_policy(
  PolicyName=policy_name,
  PolicyDocument=json.dumps(my_managed_policy)
)

role_response = iam.create_role(
    RoleName=role_name,
    AssumeRolePolicyDocument=json.dumps(trust_policy),
    Description='Role to execute StepFunctions workflow which submits SageMaker jobs',
    MaxSessionDuration=3600,
)

# Attach a policy to role
iam.attach_role_policy(
    PolicyArn=policy_response['Policy']['Arn'],
    RoleName=role_name
)
iam.attach_role_policy(
    PolicyArn='arn:aws:iam::aws:policy/CloudWatchEventsFullAccess',
    RoleName=role_name
)

{'ResponseMetadata': {'RequestId': 'ce8ccd5b-0e23-412a-acb8-0138eced5ef2',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Wed, 26 Jun 2024 01:29:09 GMT',
   'x-amzn-requestid': 'ce8ccd5b-0e23-412a-acb8-0138eced5ef2',
   'content-type': 'text/xml',
   'content-length': '212'},
  'RetryAttempts': 0}}

Create a Stepfunctions workflow, i.e. a state machine.

In [23]:
sfn = boto3.client('stepfunctions')
sfn_execution_role = role_response['Role']['Arn']
state_machine_name = 'nsclc-radiogenomics-imaging-workflow-%s' % suffix
sfn_response = sfn.create_state_machine(name = state_machine_name,
                                        definition = json.dumps(state_machine_json),
                                        roleArn = sfn_execution_role,
                                        type = 'STANDARD')

In [24]:
state_machine_name

'nsclc-radiogenomics-imaging-workflow-dcd4788e327911efbca9be616a3e66ff'

In [26]:
stateMachineArn=sfn_response['stateMachineArn']
stateMachineArn

'arn:aws:states:us-east-1:942514891246:stateMachine:nsclc-radiogenomics-imaging-workflow-dcd4788e327911efbca9be616a3e66ff'

The following is how you would run this workflow for all the `RO1` subjects.

```python
subject_list = ['R01-%03d'%i for i in range(1,163)]

stateMachineArn=sfn_response['stateMachineArn']

feature_store_name = 'imaging-feature-group-%s' % suffix
processing_job_name = 'dcm-nifti-conversion-%s' % suffix
offline_store_s3uri = '%s/multimodal-imaging-featurestore' % output_data_uri
payload = {
  "PreprocessingJobName": processing_job_name,
  "FeatureStoreName": feature_store_name,
  "OfflineStoreS3Uri": offline_store_s3uri,
  "Subject": subject_list
}
exeution_response = sfn.start_execution(stateMachineArn=stateMachineArn,
                                        name=suffix,
                                        input=json.dumps(payload))

print(exeution_response)
```