Skip to content

Commit

Permalink
Merge pull request #63 from remind101/simplify_action_execute
Browse files Browse the repository at this point in the history
Simplify steps in plan/build action
  • Loading branch information
phobologic committed Aug 10, 2015
2 parents 9fd8286 + 8f22883 commit bbd14ea
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 196 deletions.
40 changes: 10 additions & 30 deletions stacker/actions/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class Action(BaseAction):
"""

def _resolve_parameters(self, outputs, parameters, blueprint):
def _resolve_parameters(self, parameters, blueprint):
"""Resolves parameters for a given blueprint.
Given a list of parameters, first discard any parameters that the
Expand All @@ -33,7 +33,6 @@ def _resolve_parameters(self, outputs, parameters, blueprint):
stack.
Args:
outputs (dict): any outputs that can be referenced by other stacks
parameters (dict): A dictionary of parameters provided by the
stack definition
blueprint (`stacker.blueprint.base.Blueprint`): A Blueprint object
Expand All @@ -55,16 +54,10 @@ def _resolve_parameters(self, outputs, parameters, blueprint):
# Get from the Output of another stack in the stack_map
stack_name, output = value.split('::')
stack_fqn = self.context.get_fqn(stack_name)
# XXX check out this logic to see if this is what we really
# want to do
try:
stack_outputs = outputs[stack_fqn]
value = self.provider.get_output(stack_fqn, output)
except KeyError:
raise exceptions.StackDoesNotExist(stack_fqn)
try:
value = stack_outputs[output]
except KeyError:
raise exceptions.ParameterDoesNotExist(value)
raise exceptions.OutputDoesNotExist(stack_fqn, value)
params[k] = value
return params

Expand All @@ -81,14 +74,18 @@ def _build_stack_tags(self, stack, template_url):
tags['required_stacks'] = ':'.join(requires)
return tags

def _launch_stack(self, results, stack, **kwargs):
def _launch_stack(self, stack, **kwargs):
"""Handles the creating or updating of a stack in CloudFormation.
Also makes sure that we don't try to create or update a stack while
it is already updating or creating.
"""
provider_stack = self.provider.get_stack(stack.fqn)
try:
provider_stack = self.provider.get_stack(stack.fqn)
except exceptions.StackDoesNotExist:
provider_stack = None

if provider_stack and kwargs.get('status') is SUBMITTED:
logger.debug(
"Stack %s provider status: %s",
Expand All @@ -104,7 +101,7 @@ def _launch_stack(self, results, stack, **kwargs):
logger.info("Launching stack %s now.", stack.fqn)
template_url = self.s3_stack_push(stack.blueprint)
tags = self._build_stack_tags(stack, template_url)
parameters = self._resolve_parameters(results, stack.parameters,
parameters = self._resolve_parameters(stack.parameters,
stack.blueprint)
required_params = [k for k, v in stack.blueprint.required_parameters]
parameters = self._handle_missing_parameters(parameters,
Expand All @@ -123,21 +120,6 @@ def _launch_stack(self, results, stack, **kwargs):

return SUBMITTED

def _get_outputs(self, stack):
"""Gets all the outputs from a given stack in CloudFormation.
Updates the local output cache with the values it finds.
"""
provider_stack = self.provider.get_stack(stack.fqn)
if not provider_stack:
raise ValueError("Stack %s does not exist." % (stack.fqn,))
stack_outputs = {}
for output in provider_stack.outputs:
logger.debug(" %s: %s", output.key, output.value)
stack_outputs[output.key] = output.value
return stack_outputs

def _handle_missing_parameters(self, params, required_params,
existing_stack=None):
"""Handles any missing parameters.
Expand Down Expand Up @@ -183,8 +165,6 @@ def _generate_plan(self):
plan.add(
stacks[stack_name],
run_func=self._launch_stack,
completion_func=self._get_outputs,
skip_func=self._get_outputs,
requires=dependencies.get(stack_name),
)
return plan
Expand Down
9 changes: 5 additions & 4 deletions stacker/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ def __init__(self, parameters, *args, **kwargs):
**kwargs)


class ParameterDoesNotExist(Exception):
class OutputDoesNotExist(Exception):

def __init__(self, parameter, *args, **kwargs):
message = 'Parameter: "%s" does not exist in output' % (parameter,)
super(ParameterDoesNotExist, self).__init__(message, *args, **kwargs)
def __init__(self, stack_name, output, *args, **kwargs):
message = 'Output %s does not exist on stack %s' % (output,
stack_name)
super(OutputDoesNotExist, self).__init__(message, *args, **kwargs)


class MissingEnvironment(Exception):
Expand Down
97 changes: 35 additions & 62 deletions stacker/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,17 @@ class Step(object):
stack (`stacker.stack.Stack`): the `Stack` object associated with this
step
run_func (func): the function to be run for the given stack
completion_func (Optional[func]): Optional function that should be run
when the step is complete. This should be used to return some
result of the step.
skip_func (Optional[func]): Optional function that should be run when
the step is skipped. This can be used to return results from the
step, without risking calling a completion function agian.
requires (Optional[list]): List of stacks this step depends on being
completed before running. This step will not be executed unless the
required stacks have either completed or skipped.
"""

def __init__(self, stack, run_func, completion_func=None,
skip_func=None, requires=None):
def __init__(self, stack, run_func, requires=None):
self.stack = stack
self.status = PENDING
self.requires = requires or []
self._run_func = run_func
self._completion_func = completion_func
self._skip_func = skip_func

def __repr__(self):
return '<stacker.plan.Step:%s>' % (self.stack.fqn,)
Expand All @@ -58,11 +49,8 @@ def skipped(self):
def submitted(self):
return self.status.code >= SUBMITTED.code

def submit(self):
self.set_status(SUBMITTED)

def run(self, results):
return self._run_func(results, self.stack, status=self.status)
def run(self):
return self._run_func(self.stack, status=self.status)

def set_status(self, status):
if status is not self.status:
Expand All @@ -72,13 +60,12 @@ def set_status(self, status):

def complete(self):
self.set_status(COMPLETE)
if self._completion_func and callable(self._completion_func):
return self._completion_func(self.stack)

def skip(self):
self.set_status(SKIPPED)
if self._skip_func and callable(self._skip_func):
return self._skip_func(self.stack)

def submit(self):
self.set_status(SUBMITTED)


class Plan(OrderedDict):
Expand Down Expand Up @@ -113,14 +100,11 @@ def __init__(self, description, sleep_time=5, wait_func=None, *args,
self._wait_func = time.sleep
super(Plan, self).__init__(*args, **kwargs)

def add(self, stack, run_func, completion_func=None, skip_func=None,
requires=None):
def add(self, stack, run_func, requires=None):
"""Add a new step to the plan."""
self[stack.fqn] = Step(
stack=stack,
run_func=run_func,
completion_func=completion_func,
skip_func=skip_func,
requires=requires,
)

Expand Down Expand Up @@ -149,59 +133,48 @@ def completed(self):
return False
return True

def execute(self, results=None):
def _single_run(self):
"""Executes a single run through the plan, touching each step."""
for step_name, step in self.list_pending():
waiting_on = []
for required_stack in step.requires:
if not self[required_stack].completed and \
not self[required_stack].skipped:
waiting_on.append(required_stack)

if waiting_on:
logger.debug(
'Stack: "%s" waiting on required stacks: %s',
step.stack.name,
', '.join(waiting_on),
)
continue

status = step.run()
if not isinstance(status, Status):
raise ValueError('Step run_func must return a valid '
'Status')

step.set_status(status)
return self.completed

def execute(self):
"""Execute the plan.
This will run through all of the steps registered with the plan and
submit them in parallel based on their dependencies. The results will
be returned once all steps have either been skipped or completed.
Args:
results (Optional[dict]): The dictionary that should be used to
store the results
submit them in parallel based on their dependencies.
"""

if results is None:
results = {}

attempts = 0
while not self.completed:
attempts += 1
if not attempts % 10:
self._check_point()

for step_name, step in self.list_pending():
waiting_on = []
for required_stack in step.requires:
if required_stack not in results:
waiting_on.append(required_stack)

if waiting_on:
logger.debug(
'Stack: "%s" waiting on required stacks: %s',
step.stack.name,
', '.join(waiting_on),
)
continue

status = step.run(results)
if not isinstance(status, Status):
raise ValueError('Step run_func must return a valid '
'Status')

if status is COMPLETE:
results[step_name] = step.complete()
elif status is SKIPPED:
results[step_name] = step.skip()
else:
step.set_status(status)

if not self.completed:
if not self._single_run():
self._wait_func(self.sleep_time)

self._check_point()
return results

def outline(self, level=logging.INFO, execute_helper=False):
"""Print an outline of the actions the plan is going to take.
Expand Down
43 changes: 36 additions & 7 deletions stacker/providers/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,25 @@
logger = logging.getLogger(__name__)


def get_output_dict(stack):
"""Returns a dict of key/values for the outputs for a given CF stack.
Args:
stack (boto.cloudformation.stack.Stack): The stack object to get
outputs from.
Returns:
dict: A dictionary with key/values for each output on the stack.
"""
outputs = {}
for output in stack.outputs:
logger.debug(" %s %s: %s", stack.name, output.key,
output.value)
outputs[output.key] = output.value
return outputs


def retry_on_throttling(fn, attempts=3, args=None, kwargs=None):
"""Wrap retry_with_backoff to handle AWS Cloudformation Throttling.
Expand Down Expand Up @@ -55,6 +74,8 @@ class Provider(BaseProvider):

def __init__(self, region, **kwargs):
self.region = region
self._stacks = {}
self._outputs = {}

@property
def cloudformation(self):
Expand All @@ -64,14 +85,16 @@ def cloudformation(self):
return self._cloudformation

def get_stack(self, stack_name, **kwargs):
stack = None
try:
stack = retry_on_throttling(self.cloudformation.describe_stacks,
if stack_name not in self._stacks:
try:
self._stacks[stack_name] = \
retry_on_throttling(self.cloudformation.describe_stacks,
args=[stack_name])[0]
except boto.exception.BotoServerError as e:
if 'does not exist' not in e.message:
raise
return stack
except boto.exception.BotoServerError as e:
if 'does not exist' not in e.message:
raise
raise exceptions.StackDoesNotExist(stack_name)
return self._stacks[stack_name]

def get_stack_status(self, stack, **kwargs):
return stack.stack_status
Expand Down Expand Up @@ -133,3 +156,9 @@ def get_required_stacks(self, stack, **kwargs):

def get_stack_name(self, stack, **kwargs):
return stack.stack_name

def get_outputs(self, stack_name, *args, **kwargs):
if stack_name not in self._outputs:
stack = self.get_stack(stack_name)
self._outputs[stack_name] = get_output_dict(stack)
return self._outputs[stack_name]
32 changes: 14 additions & 18 deletions stacker/providers/base.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,26 @@
from .. import exceptions
def not_implemented(method):
raise NotImplementedError("Provider does not support '%s' "
"method." % method)


class BaseProvider(object):

name = None

def __init__(self, *args, **kwargs):
if not self.name:
raise exceptions.ImproperlyConfigured('Provider must have a '
'"name"')

def _not_implemented_erorr(self, method):
raise NotImplementedError('Provider "%s" does not support "%s"' % (
self.name, method))

def get_stack(self, stack_name, *args, **kwargs):
self._not_implemented_erorr('get_stack')
not_implemented("get_stack")

def create_stack(self, *args, **kwargs):
self._not_implemented_erorr('create_stack')
not_implemented("create_stack")

def update_stack(self, *args, **kwargs):
self._not_implemented_erorr('update_stack')
not_implemented("update_stack")

def destroy_stack(self, *args, **kwargs):
self._not_implemented_erorr('destroy_stack')
not_implemented("destroy_stack")

def get_stack_status(self, stack_name, *args, **kwargs):
self._not_implemented_erorr('get_stack_status')
not_implemented("get_stack_status")

def get_outputs(self, stack_name, *args, **kwargs):
not_implemented("get_outputs")

def get_output(self, stack_name, output):
return self.get_outputs(stack_name)[output]

0 comments on commit bbd14ea

Please sign in to comment.