### EMR Manager

The EMR manager is a wrapper to speed up the process of spinning an EMR cluster and process a specified Python file.

#### Utilities

In [None]:
import configparser
import os

import boto3

#### Load the AWS Configuration

In [None]:
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

AWS_KEY = config.get('AWS_DATA', 'AWS_ACCESS_KEY_ID')
AWS_SECRET = config.get('AWS_DATA','AWS_SECRET_ACCESS_KEY')
AWS_REGION_NAME = "us-west-2"

if AWS_KEY is None or AWS_SECRET is None:
    print('AWS Credentials not found!')
else:
    os.environ["AWS_ACCESS_KEY_ID"]= AWS_KEY
    os.environ["AWS_SECRET_ACCESS_KEY"]= AWS_SECRET
    os.environ["AWS_DEFAULT_REGION"]= AWS_REGION_NAME

#### Spin a EMR Cluster using CLI

In [None]:
!aws emr create-cluster --name udacity-project\
    --release-label emr-6.2.0 \
    --applications Name=Spark Name=Hadoop Name=Hive Name=JupyterHub \
    --instance-count 5 \
    --use-default-roles \
    --no-auto-terminate \
    --instance-type m5.xlarge \
    --no-enable-debugging \
    --ec2-attributes KeyName="emr-udacity-main"

#### Validate Cluster has Started

In [None]:
emr = boto3.client('emr', 
                   aws_access_key_id=KEY,
                   aws_secret_access_key=SECRET, 
                   region_name= AWS_REGION_NAME)

In [None]:
emr.list_clusters(ClusterStates=['STARTING', 'RUNNING', 'WAITING'])

#### Upload the required files into an S3 Bucket

In [None]:
s3 = boto3.client('s3', 
                   aws_access_key_id= AWS_KEY,
                   aws_secret_access_key= AWS_SECRET, 
                   region_name= AWS_REGION_NAME)

emr_bucket = [entry['Name'] for entry in s3.list_buckets()['Buckets'] if 'udacity' in entry['Name']  ]
emr_bucket

In [None]:
# Perform the actual uploading
s3.upload_file('etl.py', emr_bucket[0], 'etl.py' )
s3.upload_file('dl.cfg', emr_bucket[0], 'dl.cfg' )

In [None]:
# Check if the files are in the Bucket
[entry for entry in s3.list_objects(Bucket = emr_bucket[0])['Contents'] if 'etl.py' in entry['Key']]

#### Add the steps to the ETL process

In [None]:
# Ensure there is at least one waiting cluster
waiting_clusters = [cluster['Id'] for cluster in emr.list_clusters()['Clusters'] if cluster['Status']['State'] == 'WAITING']

if len(waiting_clusters) > 0:
    print('Good to go.')
else:
    print('Cluster is spinning')

In [None]:
response = emr.add_job_flow_steps(
    JobFlowId = waiting_clusters[0],
    Steps=[
        {
            'Name': 'Run ETL.py',
            'ActionOnFailure': 'CANCEL_AND_WAIT',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': [
                    'spark-submit',
                     '--deploy-mode',
                     'cluster',
                     '--master',
                     'yarn',
                     's3a://' + emr_bucket[0] + '/etl.py'
                ]
            }
        },
    ]
)

In [None]:
response

In [None]:
# Check for steps running in the cluster
[step for step in emr.list_steps( ClusterId = waiting_clusters[0] )['Steps'] if step['Status']['State'] in ['RUNNING', 'PENDING'] ]

In [None]:
emr.list_steps( ClusterId = waiting_clusters[0] )['Steps']

#### Finally delete unnused resources

In [None]:
# including RUNNING and WAITING clusters
standby_clusters = emr.list_clusters( ClusterStates=['RUNNING', 'WAITING'] )

clusters = []
for cluster in standby_clusters["Clusters"] : 
    clusters.append( cluster["Id"] )
    
print( "Nr clusters: {}".format( len(clusters) ))

In [None]:
if len(clusters) > 0 :
    emr.terminate_job_flows(
        JobFlowIds = clusters
    )
    print( "Terminating Clusters" )
else: 
    print( "No clusters found." )