Skip to content
This repository has been archived by the owner on Jan 6, 2022. It is now read-only.

Commit

Permalink
Adding back channel evaluation on app stack
Browse files Browse the repository at this point in the history
  • Loading branch information
karolyi committed Dec 14, 2015
1 parent 2ef0837 commit e13b869
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 28 deletions.
3 changes: 2 additions & 1 deletion src/integrationtest/python/assume_role.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ def format_(credentials):


if __name__ == "__main__":
format_(json.loads(sys.stdin.read())['Credentials'])
input_str = sys.stdin.read()
format_(json.loads(input_str)['Credentials'])
116 changes: 89 additions & 27 deletions src/integrationtest/python/crassus_integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ def setUp(self):
fqdn = socket.gethostname()
hostname = fqdn[:fqdn.find('.')] if '.' in fqdn else fqdn
self.test_id = 'crassus-it-{0}-{1}' \
.format(hostname,
datetime.utcnow().strftime('%Y%m%d%H%M%S'))
.format(
hostname, datetime.utcnow().strftime('%Y%m%d%H%M%S'))
self.invoker_role_name = 'crassus-invoker-it-{0}'.format(self.test_id)
self.crassus_stack_name = self.test_id
self.app_stack_name = 'app-{0}'.format(self.test_id)
Expand All @@ -58,8 +58,8 @@ def setUp(self):

def tearDown(self):
self.delete_invoker_role()
# self.delete_stack(self.crassus_stack_name)
# self.delete_stack_when_update_finished(self.app_stack_name)
self.delete_stack(self.crassus_stack_name)
self.delete_stack_when_update_finished(self.app_stack_name)

def test_create_stacks_and_update(self):
invoker_role = self.create_invoker_role()
Expand All @@ -71,6 +71,7 @@ def test_create_stacks_and_update(self):

self.send_update_message(invoker_role)

self.wait_success_from_backchannel(invoker_role)
self.assert_update_successful()

def create_invoker_role(self):
Expand All @@ -90,8 +91,8 @@ def create_invoker_role(self):
RoleName=self.invoker_role_name,
AssumeRolePolicyDocument=assume_role_policy)

self.iam_client.attach_role_policy(RoleName=self.invoker_role_name,
PolicyArn=SNS_FULL_ACCESS)
self.iam_client.attach_role_policy(
RoleName=self.invoker_role_name, PolicyArn=SNS_FULL_ACCESS)

return invoker_role

Expand All @@ -114,9 +115,66 @@ def assume_role(self, invoker_role):

return credentials

def contains_final_message(self, messages):
"""
Check if the passes SQS messages contain the final 'UPDATE_COMPLETE'
from Cloudformation for the demo app.
Return True if yes, delete the messages and return False-ish if
not.
"""
for message in messages:
json_body = json.loads(message.body)
message.delete()
stack_name = json_body.get('stackName')
res_type = json_body.get('resourceType')
status = json_body.get('status')
if (stack_name == self.app_stack_name and
res_type == 'AWS::CloudFormation::Stack' and
status == 'UPDATE_COMPLETE'):
# Eureka!
return True

def wait_success_from_backchannel(self, invoker_role):
"""
Waits for the cloudformation success message arriving from the
cloudformation back channel converter, meaning the stack is
updated and ready to run.
This means, we read the backchannel (SQS Queue) and wait for a
'UPDATE_COMPLETE' message from cloudformation.
Read for 15 minutes, raise error if not found, return if found.
"""
credentials = self.assume_role(invoker_role)
reading_start = datetime.now()
sqs_resource = boto3.resource(
service_name='sqs', region_name=REGION_NAME,
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken'])
back_channel_url = self.get_stack_output(
self.crassus_stack_name, 'outputSqsQueue')
queue = sqs_resource.Queue(url=back_channel_url)
wait_seconds = 15 * 60 # Wait until this many seconds
delta_seconds = None
logger.info('Start polling queue \'{0}\' for {1} seconds'.format(
back_channel_url, wait_seconds))
while delta_seconds < wait_seconds:
messages = queue.receive_messages(MaxNumberOfMessages=10)
if self.contains_final_message(messages):
# Final message caught, exit gracefully
logger.info('Final CFN message received from SQS queue.')
return
# No related final message
sleep(5)
delta_seconds = (datetime.now() - reading_start).seconds
# This is reached on a timeout after the while statement
self.fail('Final message not received from Cloudformation.')

def assert_update_successful(self):
hello_world_url = self.get_stack_output(self.app_stack_name,
'WebsiteURL')
hello_world_url = self.get_stack_output(
self.app_stack_name, 'WebsiteURL')
update_successful = False
for i in range(0, 30):
try:
Expand Down Expand Up @@ -160,15 +218,20 @@ def send_update_message(self, invoker_role):
crassus_input_topic_arn = self.get_stack_output(
self.crassus_stack_name, 'inputSnsTopicARN')

message_id = sns_client.publish(TopicArn=crassus_input_topic_arn,
Message=json.dumps(message))
message_id = sns_client.publish(
TopicArn=crassus_input_topic_arn, Message=json.dumps(message))
logger.info(
'published update message to topic: {0}, message: {1}, got '
'message_id: {2}'.format(
crassus_input_topic_arn, message, message_id
))

def get_stack_output(self, stack_name, output_name):
"""
Get an output value from a stack input parameter.
Return the parameter value, o raise Exception if not found.
"""
stack_outputs = self.cfn_client.describe_stacks(
StackName=stack_name)['Stacks'][0]['Outputs']

Expand All @@ -179,8 +242,9 @@ def get_stack_output(self, stack_name, output_name):
output_value = output_item['OutputValue']

if output_value is None:
raise Exception('Stack with name: {0} does not have output: {1}'
.format(stack_name, output_name))
self.fail(
'Stack with name: {0} does not have output: {1}'
.format(stack_name, output_name))

return output_value

Expand All @@ -204,7 +268,6 @@ def create_app_stack(self):
}
app_config = Config(config_dict=config_dict)
app_stack_creation = CreateStack('AppCreationThread', app_config)
print ('MY APP CONFIG: ', config_dict)
app_stack_creation.start()
return app_stack_creation

Expand All @@ -223,8 +286,8 @@ def create_crassus_stack(self, invoker_role_arn):
}
}
})
crassus_stack_creation = CreateStack('CrassusCreationThread',
crassus_config)
crassus_stack_creation = CreateStack(
'CrassusCreationThread', crassus_config)
crassus_stack_creation.start()
return crassus_stack_creation

Expand All @@ -239,25 +302,24 @@ def get_first_vpc_and_subnets(self):
return subnet_ids_paramater, vpc_id

def delete_invoker_role(self):
self.ignore_client_error(lambda:
self.iam_client.detach_role_policy(
RoleName=self.invoker_role_name,
PolicyArn=SNS_FULL_ACCESS))
self.ignore_client_error(lambda:
self.iam_client.delete_role(
RoleName=self.invoker_role_name))
self.ignore_client_error(
lambda: self.iam_client.detach_role_policy(
RoleName=self.invoker_role_name,
PolicyArn=SNS_FULL_ACCESS))
self.ignore_client_error(
lambda: self.iam_client.delete_role(
RoleName=self.invoker_role_name))

def delete_stack(self, stack_name):
self.ignore_client_error(lambda:
self.cfn_client.delete_stack(
StackName=stack_name))
self.ignore_client_error(
lambda: self.cfn_client.delete_stack(StackName=stack_name))

def delete_stack_when_update_finished(self, stack_name):
try:
self.cfn_client.delete_stack(StackName=stack_name)
except ClientError as client_error:
if client_error.message.endswith('cannot be deleted while in '
'status UPDATE_IN_PROGRESS'):
if client_error.message.endswith(
'cannot be deleted while in status UPDATE_IN_PROGRESS'):
logger.info(client_error.message)
sleep(60)
self.delete_stack_when_update_finished(stack_name)
Expand Down

0 comments on commit e13b869

Please sign in to comment.