In [4]:
import boto3
from botocore.exceptions import ClientError
import json
import configparser
import time
import sys
from datetime import datetime

# 確認權限及建立APP

In [5]:
def user_auth_detail(iam_client, user_name):
    """
    Return all policy names that attached to the given user name.

    Args:
        iam_client: An iam client from boto3.
        user_name : The user name to be checked.

    Returns:
        A list of policy names attached.

    """
    response = iam_client.get_account_authorization_details(Filter=['User'])
    output_list = []
    
    for account in response['UserDetailList']:
        if account['UserName'] == user_name:
            for policy in account['AttachedManagedPolicies']:
                output_list.append(policy['PolicyName'])
    return output_list

def create_application(emr_serverless_client, purpose_value):
    """
    Create an application in EMR Serverless Studio and return app info.

    Args:
        emr_serverless_client : An EMR Serverless client from boto3.
        purpose_value         : The value for tag named "purpose", for tracking costs.

    Returns:
        A dict with application infos
        {
            'applicationId': 'string',
            'name': 'string',
            'arn': 'string'
        }

    """
    dt = datetime.now().strftime('%Y-%m-%d_%H%M')
    create_app_res = emr_serverless.create_application(
                        name=f"{APP_NAME}_{dt}",
                        releaseLabel=RELEASE_LABEL,
                        type='Spark',
                        initialCapacity={
                            'DRIVER': {
                                'workerCount': DRIVER_CNT,
                                'workerConfiguration': {
                                    'cpu': DRIVER_CPU,
                                    'memory': DRIVER_MEM,
                                    'disk': DRIVER_DISK
                                }
                            },
                            'EXECUTOR': {
                                'workerCount': EXECUTOR_CNT,
                                'workerConfiguration': {
                                    'cpu': EXECUTOR_CPU,
                                    'memory': EXECUTOR_MEM,
                                    'disk': EXECUTOR_DISK
                                }
                            }
                        },
                        maximumCapacity={
                          'cpu': MAX_CPU,
                          'memory': MAX_MEM,
                          'disk': MAX_DISK
                        },
                        tags={'generated_method':'boto3',
                              'resource':'4vCPU-16gMem-20gDisk',
                              'purpose':purpose_value},
                        autoStopConfiguration={
                            'enabled':True,
                            'idleTimeoutMinutes':10
                        }
                    )
    return create_app_res




# 建立 Role

In [6]:
def create_role(iam_client):
    trust_relationship_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "Service": "emr-serverless.amazonaws.com"
                },
                "Action": "sts:AssumeRole"
            }
        ]
    }

    try:
        create_role_res = iam_client.create_role(
            RoleName=ROLE_NAME,
            AssumeRolePolicyDocument=json.dumps(trust_relationship_policy),
            Description='This is a role for emr-serverless project',
            Tags=[
                {
                    'Key': 'user-spark-mrt',
                    'Value': 'submit-job-to-emr-serverless'
                }
            ]
        )
        return create_role_res
    except ClientError as error:
        if error.response['Error']['Code'] == 'EntityAlreadyExists':
            return 'Role already exists... hence exiting from here'
        else:
            return 'Unexpected error occurred... Role could not be created'


# 建立 Policy

In [7]:
def create_policy(iam_client):
    role_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "FullAccessToOutputBucket",
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::tpe-mrt-data",
                    "arn:aws:s3:::tpe-mrt-data/*"
                ]
            }
        ]
    }


    policy_name = ROLE_NAME + '_policy'
    policy_arn = ''

    try:
        policy_res = iam_client.create_policy(
            PolicyName=policy_name,
            PolicyDocument=json.dumps(role_policy)
        )
        # policy_arn = policy_res['Policy']['Arn']
        return policy_res
        
        
    except ClientError as error:
        if error.response['Error']['Code'] == 'EntityAlreadyExists':
            return 'Policy already exists... hence using the same policy'
            policy_arn = 'arn:aws:iam::' + USER_ACCOUNT_ID + ':policy/' + policy_name
        else:
            return 'Unexpected error occurred... hence cleaning up','\n', error
            iam_client.delete_role(
                RoleName= ROLE_NAME
            )
            return 'Role could not be created...','\n', error

# 替Role掛上Policy

In [8]:
def attach_policy_to_role(iam_client, policy_arn):
    try:
        policy_attach_res = iam_client.attach_role_policy(
            RoleName=ROLE_NAME,
            PolicyArn=policy_arn
        )
        return f'policy: {policy_arn} attached to Role: {ROLE_NAME}'
    
    except ClientError as error:
        return 'Unexpected error occurred... hence cleaning up'
        iam_client.delete_role(
            RoleName= ROLE_NAME
        )
        return 'Role could not be created...'

# 提交Job

In [9]:
def submit_job(emr_serverless_client, app_id, execution_role, job_name, job_script):
    """
    Submit a job to application and return info of submitted job.

    Args:
        emr_serverless_client : An EMR Serverless client from boto3.
        app_id                : App id to run the job.
        execution_role        : Role for running the job.
        job_name              : Name that will be shown on job list in the app.
        job_script            : File name of job script

    Returns:
        A dict with job infos
        {
            'applicationId': 'string',
            'jobRunId': 'string',
            'arn': 'string'
        }

    """
    dt = datetime.now().strftime('%Y-%m-%d_%H%M')
    submit_job_res = emr_serverless_client.start_job_run(
        applicationId=app_id,
        executionRoleArn=execution_role,
        name=f"{job_name}_{dt}",
        jobDriver={
            'sparkSubmit':{
                'entryPoint':f's3://{BUCKET_NAME}/job-scripts/{job_script}',
                'sparkSubmitParameters':('--conf spark.executor.cores=1 ' 
                                         '--conf spark.executor.memory=4g '
                                         '--conf spark.driver.cores=1 '
                                         '--conf spark.driver.memory=4g '
                                         '--conf spark.executor.instances=1 ')
            }
        },
        tags={
            'period': '202201_demo',
            'property': 'getting-start-default',
        }
    )
    return submit_job_res


# 追蹤JOB

In [10]:
def job_tracking(app_id, job_id):
    """
    Tracking the given job status.

    Args:
        app_id                : App id that run the job.
        job_id                : Tracking job id.

    Returns:
        Print out the current job status, updates every 5 seconds.
        A normal job status will be:  PENDING->SCHEDULED->RUNNING->SUCCESS/FAILED
    """
    
    state = ''

    while state not in ['SUCCESS', 'FAILED']:
        current_state = emr_serverless.get_job_run(
            applicationId = app_id,
            jobRunId = job_id
        )['jobRun']['state']

        if current_state != state:
            sys.stdout.write(f"\n{current_state}")
            state = current_state
       
        sys.stdout.write('.')
        time.sleep(5)

    if state == 'SUCCESS':
        return True
    return False


# 啟動APP、提交JOB、停止APP

In [11]:
print('06. submit job...')
def submit_and_track_job(emr_serverless_client, app_id, role, job_name, job_script):
    
    app_state = emr_serverless_client.get_application(applicationId=app_id)
    
    if app_state != 'STARTED':
        emr_serverless_client.start_application(applicationId=app_id)


    submit_job_res = submit_job(emr_serverless_client=emr_serverless_client, 
                                app_id=app_id, 
                                execution_role=role, 
                                job_name=job_name,
                                job_script=job_script)

    if job_tracking(app_id=app_id, job_id=submit_job_res['jobRunId']):
        emr_serverless_client.stop_application(applicationId=app_id)
        print('\nJob Successed, stop app')
        return True
    
    emr_serverless_client.stop_application(applicationId=app_id)
    print('\nJob Failed, stop app')
    return False
    
    

06. submit job...


# 卸下Policy、刪除Policy、刪除Role、刪除APP

In [12]:
def delete_role(iam_client):
    try:
        response = iam_client.delete_role(
            RoleName= ROLE_NAME
        )
        
        return response
    except iam.exceptions.NoSuchEntityException as Error:
        print(f"Role Name '{ROLE_NAME}' does not exists.")


def clean_up(iam_client, emr_serverless_client, app_id, policy_arn, delete_app=True):
    print('detach policy...')
    iam_client.detach_role_policy(RoleName=ROLE_NAME,
                                  PolicyArn=policy_arn)
    
    print('delete role...')
    delete_role(iam_client)
    
    print('delete policy...')
    response = iam_client.delete_policy(PolicyArn=policy_arn)
    
    if delete_app:
        print('delete app...')
        app_state = ''
        while app_state != 'STOPPED':
            sys.stdout.write('Waiting.')
            time.sleep(5)
            app_state = emr_serverless_client.get_application(applicationId = app_id)['application']['state']

        emr_serverless_client.delete_application(applicationId=app_id)

    print('clean up done')


    
# #  





# 總流程

## 參數設定

In [13]:
config = configparser.ConfigParser()
config.read_file(open('credentials.cfg'))

# user credential
KEY    = config.get('AWS_USER','ACCESS_KEY')
SECRET = config.get('AWS_USER','SECRET_ACCESS_KEY')
USER_NAME = config.get('AWS_USER','USER_NAME') # for checking permission
USER_ACCOUNT_ID = config.get('AWS_USER', 'USER_ACCOUNT_ID') # element of arn
REGION = config.get("AWS_USER", 'AWS_DEFAULT_REGION')

BUCKET_NAME = config.get('OTHERS','BUCKET_NAME')
ROLE_NAME = config.get("OTHERS","ROLE_NAME")
JOB_SCRIPT = config.get("OTHERS", "JOB_SCRIPT")


# app setting
APP_NAME = config.get("APP_SETTING","APP_NAME")
RELEASE_LABEL = config.get("APP_SETTING", "RELEASE_LABEL")


DRIVER_CNT=int( config.get("APP_SETTING", "DRIVER_CNT"))
DRIVER_CPU=config.get("APP_SETTING", "DRIVER_CPU")
DRIVER_MEM=config.get("APP_SETTING", "DRIVER_MEM")
DRIVER_DISK=config.get("APP_SETTING", "DRIVER_DISK")

EXECUTOR_CNT=int( config.get("APP_SETTING", "EXECUTOR_CNT") )
EXECUTOR_CPU=config.get("APP_SETTING", "EXECUTOR_CPU")
EXECUTOR_MEM=config.get("APP_SETTING", "EXECUTOR_MEM")
EXECUTOR_DISK=config.get("APP_SETTING", "EXECUTOR_DISK")

MAX_CPU=config.get("APP_SETTING", "MAX_CPU")
MAX_MEM=config.get("APP_SETTING", "MAX_MEM")
MAX_DISK=config.get("APP_SETTING", "MAX_DISK")



emr_serverless = boto3.client(service_name='emr-serverless',
                      region_name=REGION,
                      aws_access_key_id=KEY,
                      aws_secret_access_key=SECRET)


iam = boto3.client(service_name='iam',
                      # region_name=REGION,
                      aws_access_key_id=KEY,
                      aws_secret_access_key=SECRET)

s3 = boto3.resource('s3'
                    , region_name=REGION
                    , aws_access_key_id=KEY
                    , aws_secret_access_key=SECRET)

## 執行

In [16]:
## check "purpose_value" in step2, "job_script" in step6
print('01. check permissions...')
print(user_auth_detail(iam, USER_NAME))

print('02. create APP...')
create_app = create_application(emr_serverless, purpose_value='process201701-202208-weekly-take2')
print(f"app id: {create_app['applicationId']}")


print('03. create role...')
create_role_res = create_role(iam)
print(f"role arn: {create_role_res['Role']['Arn']}")


print('04. create policy...')
policy_res = create_policy(iam)
print(f"policy arn: {policy_res['Policy']['Arn']}")

print('05. attach policy...')
attach_policy_to_role(iam, policy_res['Policy']['Arn'])

print('06. submit job...')
submit_and_track_job(emr_serverless_client=emr_serverless, 
                     app_id=create_app['applicationId'], 
                     role=create_role_res['Role']['Arn'], 
                     job_name='boto_submit_job', 
                     job_script='mrt-weekly-sumup.py')

print('07. clean up...')
clean_up(iam_client=iam, 
         emr_serverless_client=emr_serverless, 
         app_id=create_app['applicationId'], 
         policy_arn=policy_res['Policy']['Arn'],
         delete_app=False)
    

01. check permissions...
['EmrServerlessForMrtProject']
02. create APP...
app id: 00f43dq70indsk25
03. create role...
role arn: arn:aws:iam::250172214346:role/emr-serverless-iac
04. create policy...
policy arn: arn:aws:iam::250172214346:policy/emr-serverless-iac_policy
05. attach policy...
06. submit job...

PENDING..................
SCHEDULED.
RUNNING...............................................................................
SUCCESS.
Job Successed, stop app
07. clean up...
detach policy...
delete role...
delete policy...
clean up done
