diff --git a/README.md b/README.md index 68b13e9..2cacc13 100644 --- a/README.md +++ b/README.md @@ -6,15 +6,10 @@ There are times when EC2 instances need to be removed from the cluster, for exam # Overview of steps -1. Download index.zip from this repository +1. Download the CloudFormation template -2. Upload the downloaded index.zip containing Lambda code index.py to [Your_AWS_Account_S3_Bucket] +2. Launch the CloudFormation template that creates the following AWS resources: -3. Download the CloudFormation template - -4. Launch the CloudFormation template that creates the following AWS resources: - -* CloudFormation will require S3 bucket name as one of the parameters you created in Step 2 above. * The VPC and associated network elements (subnets, security groups, route table, etc) * ECS Cluster, ECS service, a sample ECS task definition * Auto scaling group with two EC2 instances and a termination lifecycle hook @@ -26,9 +21,6 @@ For the full solution overview visit [Blog link](https://aws.amazon.com/blogs/co ## CloudFormation template - cform/ecs.yaml -## Solution code - - code/index.py - *** Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. @@ -38,7 +30,3 @@ Licensed under the Apache License, Version 2.0 (the "License"). You may not use http://aws.amazon.com/apache2.0/ or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - -## PLEASE NOTE - -Lambda version of boto3 DOES NOT have the latest version of boto3 that (as of February 10th). This leads to user experiencing issue similar to one reported in https://github.com/awslabs/ecs-cid-sample/issues/2 . Please use the index.zip package provided, or if you are customizing code, package up the latest boto3 in the Lambda zip file. diff --git a/cform/ecs.yaml b/cform/ecs.yaml index 31148fc..cabc97d 100644 --- a/cform/ecs.yaml +++ b/cform/ecs.yaml @@ -15,10 +15,6 @@ Parameters: Description: > REQUIRED - Specifies the name of an existing Amazon EC2 key pair to enable SSH access to the EC2 instances in your cluster. - S3BucketName: - Type: String - Description: > - REQUIRED - Specifies the name of your AWS account S3 bucket in which the index.zip file is stored. index.zip contains the Python Lambda code index.py; Please download this from https://github.com/awslabs/ecs-cid-sample/tree/master/code and upload to your S3 bucket. EcsAmiId: Type: String Description: REQUIRED - Default ECS Optimized AMI for us-west-2 region. Please change it to reflect your regions' latest ECS AMI-ID @@ -261,10 +257,7 @@ Resources: NotificationConfigurations: - TopicARN: !Ref ASGSNSTopic NotificationTypes: - - autoscaling:EC2_INSTANCE_LAUNCH - - autoscaling:EC2_INSTANCE_LAUNCH_ERROR - autoscaling:EC2_INSTANCE_TERMINATE - - autoscaling:EC2_INSTANCE_TERMINATE_ERROR Tags: - Key: Name @@ -378,26 +371,16 @@ Resources: PolicyDocument: Version: "2012-10-17" Statement: - - - Effect: "Allow" + - Effect: "Allow" Action: - autoscaling:CompleteLifecycleAction - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents - - ec2:DescribeInstances - - ec2:DescribeInstanceAttribute - - ec2:DescribeInstanceStatus - - ec2:DescribeHosts - ecs:ListContainerInstances - - ecs:SubmitContainerStateChange - - ecs:SubmitTaskStateChange - ecs:DescribeContainerInstances - ecs:UpdateContainerInstancesState - - ecs:ListTasks - - ecs:DescribeTasks - sns:Publish - - sns:ListSubscriptions Resource: "*" AssumeRolePolicyDocument: Version: "2012-10-17" @@ -426,17 +409,85 @@ Resources: LambdaFunctionForASG: Type: "AWS::Lambda::Function" Properties: + Description: Gracefully drain ECS tasks from EC2 instances before the instances are + terminated by autoscaling. + Handler: index.lambda_handler + Role: !GetAtt LambdaExecutionRole.Arn + Runtime: python3.6 + MemorySize: 128 + Timeout: 60 Code: - S3Bucket: !Ref S3BucketName - S3Key: "index.zip" - Description: Lambda code for the autoscaling hook triggers invoked when autoscaling events of launching and terminating instance occur - Handler: "index.lambda_handler" - Role: - Fn::GetAtt: - - "LambdaExecutionRole" - - "Arn" - Runtime: "python2.7" - Timeout: "300" + ZipFile: !Sub | + import json + import time + import boto3 + + CLUSTER = '${EcsClusterName}' + REGION = '${AWS::Region}' + + ECS = boto3.client('ecs', region_name=REGION) + ASG = boto3.client('autoscaling', region_name=REGION) + SNS = boto3.client('sns', region_name=REGION) + + def find_ecs_instance_info(instance_id): + paginator = ECS.get_paginator('list_container_instances') + for list_resp in paginator.paginate(cluster=CLUSTER): + arns = list_resp['containerInstanceArns'] + desc_resp = ECS.describe_container_instances(cluster=CLUSTER, + containerInstances=arns) + for container_instance in desc_resp['containerInstances']: + if container_instance['ec2InstanceId'] != instance_id: + continue + + print('Found instance: id=%s, arn=%s, status=%s, runningTasksCount=%s' % + (instance_id, container_instance['containerInstanceArn'], + container_instance['status'], container_instance['runningTasksCount'])) + + return (container_instance['containerInstanceArn'], + container_instance['status'], container_instance['runningTasksCount']) + + return None, None, 0 + + def instance_has_running_tasks(instance_id): + (instance_arn, container_status, running_tasks) = find_ecs_instance_info(instance_id) + if instance_arn is None: + print('Could not find instance ID %s. Letting autoscaling kill the instance.' % + (instance_id)) + return False + + if container_status != 'DRAINING': + print('Setting container instance %s (%s) to DRAINING' % + (instance_id, instance_arn)) + ECS.update_container_instances_state(cluster=CLUSTER, + containerInstances=[instance_arn], + status='DRAINING') + + return running_tasks > 0 + + def lambda_handler(event, context): + msg = json.loads(event['Records'][0]['Sns']['Message']) + + if 'LifecycleTransition' not in msg.keys() or \ + msg['LifecycleTransition'].find('autoscaling:EC2_INSTANCE_TERMINATING') == -1: + print('Exiting since the lifecycle transition is not EC2_INSTANCE_TERMINATING.') + return + + if instance_has_running_tasks(msg['EC2InstanceId']): + print('Tasks are still running on instance %s; posting msg to SNS topic %s' % + (msg['EC2InstanceId'], event['Records'][0]['Sns']['TopicArn'])) + time.sleep(5) + sns_resp = SNS.publish(TopicArn=event['Records'][0]['Sns']['TopicArn'], + Message=json.dumps(msg), + Subject='Publishing SNS msg to invoke Lambda again.') + print('Posted msg %s to SNS topic.' % (sns_resp['MessageId'])) + else: + print('No tasks are running on instance %s; setting lifecycle to complete' % + (msg['EC2InstanceId'])) + + ASG.complete_lifecycle_action(LifecycleHookName=msg['LifecycleHookName'], + AutoScalingGroupName=msg['AutoScalingGroupName'], + LifecycleActionResult='CONTINUE', + InstanceId=msg['EC2InstanceId']) LambdaInvokePermission: Type: "AWS::Lambda::Permission" Properties: diff --git a/code/index.py b/code/index.py deleted file mode 100644 index ed5df30..0000000 --- a/code/index.py +++ /dev/null @@ -1,195 +0,0 @@ -from __future__ import print_function -import boto3 -from urlparse import urlparse -import base64 -import json -import datetime -import time -import logging - - - -logging.basicConfig() -logger = logging.getLogger() -logger.setLevel(logging.DEBUG) - -# Establish boto3 session -session = boto3.session.Session() -logger.debug("Session is in region %s ", session.region_name) - -ec2Client = session.client(service_name='ec2') -ecsClient = session.client(service_name='ecs') -asgClient = session.client('autoscaling') -snsClient = session.client('sns') -lambdaClient = session.client('lambda') - - -"""Publish SNS message to trigger lambda again. - :param message: To repost the complete original message received when ASG terminating event was received. - :param topicARN: SNS topic to publish the message to. -""" -def publishToSNS(message, topicARN): - logger.info("Publish to SNS topic %s",topicARN) - snsResponse = snsClient.publish( - TopicArn=topicARN, - Message=json.dumps(message), - Subject='Publishing SNS message to invoke lambda again..' - ) - return "published" - - -"""Check task status on the ECS container instance ID. - :param Ec2InstanceId: The EC2 instance ID is used to identify the cluster, container instances in cluster -""" -def checkContainerInstanceTaskStatus(Ec2InstanceId): - containerInstanceId = None - clusterName = None - tmpMsgAppend = None - - # Describe instance attributes and get the Clustername from userdata section which would have set ECS_CLUSTER name - ec2Resp = ec2Client.describe_instance_attribute(InstanceId=Ec2InstanceId, Attribute='userData') - userdataEncoded = ec2Resp['UserData'] - userdataDecoded = base64.b64decode(userdataEncoded['Value']) - logger.debug("Describe instance attributes response %s", ec2Resp) - - tmpList = userdataDecoded.split() - for token in tmpList: - if token.find("ECS_CLUSTER") > -1: - # Split and get the cluster name - clusterName = token.split('=')[1] - logger.info("Cluster name %s",clusterName) - - # Get list of container instance IDs from the clusterName - paginator = ecsClient.get_paginator('list_container_instances') - clusterListPages = paginator.paginate(cluster=clusterName) - for containerListResp in clusterListPages: - containerDetResp = ecsClient.describe_container_instances(cluster=clusterName, containerInstances=clusterListResp[ - 'containerInstanceArns']) - logger.debug("describe container instances response %s",containerDetResp) - - for containerInstances in containerDetResp['containerInstances']: - logger.debug("Container Instance ARN: %s and ec2 Instance ID %s",containerInstances['containerInstanceArn'], - containerInstances['ec2InstanceId']) - if containerInstances['ec2InstanceId'] == Ec2InstanceId: - logger.info("Container instance ID of interest : %s",containerInstances['containerInstanceArn']) - containerInstanceId = containerInstances['containerInstanceArn'] - - # Check if the instance state is set to DRAINING. If not, set it, so the ECS Cluster will handle de-registering instance, draining tasks and draining them - containerStatus = containerInstances['status'] - if containerStatus == 'DRAINING': - logger.info("Container ID %s with EC2 instance-id %s is draining tasks",containerInstanceId, - Ec2InstanceId) - tmpMsgAppend = {"containerInstanceId": containerInstanceId} - else: - # Make ECS API call to set the container status to DRAINING - logger.info("Make ECS API call to set the container status to DRAINING...") - ecsResponse = ecsClient.update_container_instances_state(cluster=clusterName,containerInstances=[containerInstanceId],status='DRAINING') - # When you set instance state to draining, append the containerInstanceID to the message as well - tmpMsgAppend = {"containerInstanceId": containerInstanceId} - break - if containerInstanceId is not None: - break - - # Using container Instance ID, get the task list, and task running on that instance. - if containerInstanceId != None: - # List tasks on the container instance ID, to get task Arns - listTaskResp = ecsClient.list_tasks(cluster=clusterName, containerInstance=containerInstanceId) - logger.debug("Container instance task list %s",listTaskResp['taskArns']) - - # If the chosen instance has tasks - if len(listTaskResp['taskArns']) > 0: - logger.info("Tasks are on this instance...%s",Ec2InstanceId) - return 1, tmpMsgAppend - else: - logger.info("NO tasks are on this instance...%s",Ec2InstanceId) - return 0, tmpMsgAppend - else: - logger.info("NO tasks are on this instance....%s",Ec2InstanceId) - return 0, tmpMsgAppend - - -def lambda_handler(event, context): - - line = event['Records'][0]['Sns']['Message'] - message = json.loads(line) - Ec2InstanceId = message['EC2InstanceId'] - asgGroupName = message['AutoScalingGroupName'] - snsArn = event['Records'][0]['EventSubscriptionArn'] - TopicArn = event['Records'][0]['Sns']['TopicArn'] - - lifecyclehookname = None - clusterName = None - tmpMsgAppend = None - completeHook = 0 - - logger.info("Lambda received the event %s",event) - logger.debug("records: %s",event['Records'][0]) - logger.debug("sns: %s",event['Records'][0]['Sns']) - logger.debug("Message: %s",message) - logger.debug("Ec2 Instance Id %s ,%s",Ec2InstanceId, asgGroupName) - logger.debug("SNS ARN %s",snsArn) - - # Describe instance attributes and get the Clustername from userdata section which would have set ECS_CLUSTER name - ec2Resp = ec2Client.describe_instance_attribute(InstanceId=Ec2InstanceId, Attribute='userData') - logger.debug("Describe instance attributes response %s",ec2Resp) - userdataEncoded = ec2Resp['UserData'] - userdataDecoded = base64.b64decode(userdataEncoded['Value']) - - tmpList = userdataDecoded.split() - for token in tmpList: - if token.find("ECS_CLUSTER") > -1: - # Split and get the cluster name - clusterName = token.split('=')[1] - logger.debug("Cluster name %s",clusterName) - - # Get list of container instance IDs from the clusterName - clusterListResp = ecsClient.list_container_instances(cluster=clusterName) - logger.debug("list container instances response %s",clusterListResp) - - # If the event received is instance terminating... - if 'LifecycleTransition' in message.keys(): - logger.debug("message autoscaling %s",message['LifecycleTransition']) - if message['LifecycleTransition'].find('autoscaling:EC2_INSTANCE_TERMINATING') > -1: - - # Get lifecycle hook name - lifecycleHookName = message['LifecycleHookName'] - logger.debug("Setting lifecycle hook name %s ",lifecycleHookName) - - # Check if there are any tasks running on the instance - tasksRunning, tmpMsgAppend = checkContainerInstanceTaskStatus(Ec2InstanceId) - logger.debug("Returned values received: %s ",tasksRunning) - if tmpMsgAppend != None: - message.update(tmpMsgAppend) - - # If tasks are still running... - if tasksRunning == 1: - response = snsClient.list_subscriptions() - for key in response['Subscriptions']: - logger.info("Endpoint %s AND TopicArn %s and protocol %s ",key['Endpoint'], key['TopicArn'], - key['Protocol']) - if TopicArn == key['TopicArn'] and key['Protocol'] == 'lambda': - logger.info("TopicArn match, publishToSNS function...") - msgResponse = publishToSNS(message, key['TopicArn']) - logger.debug("msgResponse %s and time is %s",msgResponse, datetime.datetime) - # If tasks are NOT running... - elif tasksRunning == 0: - completeHook = 1 - logger.debug("Setting lifecycle to complete;No tasks are running on instance, completing lifecycle action....") - - try: - response = asgClient.complete_lifecycle_action( - LifecycleHookName=lifecycleHookName, - AutoScalingGroupName=asgGroupName, - LifecycleActionResult='CONTINUE', - InstanceId=Ec2InstanceId) - logger.info("Response received from complete_lifecycle_action %s",response) - logger.info("Completedlifecycle hook action") - except Exception, e: - print(str(e)) - - - - - - - diff --git a/code/index.zip b/code/index.zip deleted file mode 100644 index 72a1075..0000000 Binary files a/code/index.zip and /dev/null differ