Skip to content

Commit

Permalink
Add teardown action for sns alerts
Browse files Browse the repository at this point in the history
  • Loading branch information
sb2nov committed Jul 16, 2015
1 parent 82c2055 commit 912bba3
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 16 deletions.
5 changes: 3 additions & 2 deletions dataduct/etl/etl_actions.py
Expand Up @@ -2,14 +2,14 @@
"""
import yaml

from .etl_pipeline import ETLPipeline
from ..pipeline import Activity
from ..pipeline import MysqlNode
from ..pipeline import RedshiftNode
from ..pipeline import S3Node
from ..utils.exceptions import ETLInputError
from ..utils.hook import hook
from ..utils.helpers import make_pipeline_url
from ..utils.hook import hook
from .etl_pipeline import ETLPipeline

import logging
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -53,6 +53,7 @@ def create_pipeline(definition):

# Add the steps to the pipeline object
etl.create_steps(steps)
etl.create_teardown_step()
logger.info('Created pipeline. Name: %s', etl.name)
return etl

Expand Down
40 changes: 34 additions & 6 deletions dataduct/etl/etl_pipeline.py
Expand Up @@ -6,6 +6,7 @@
import yaml

from StringIO import StringIO
from copy import deepcopy
from datetime import datetime
from datetime import timedelta

Expand Down Expand Up @@ -44,6 +45,12 @@
DP_INSTANCE_LOG_PATH = config.etl.get('DP_INSTANCE_LOG_PATH', const.NONE)
DP_PIPELINE_LOG_PATH = config.etl.get('DP_PIPELINE_LOG_PATH', const.NONE)

DEFAULT_TEARDOWN = {
'step_type': 'transform',
'command': 'echo Finished Pipeline',
'no_output': True
}


class ETLPipeline(object):
"""DataPipeline class with steps and metadata.
Expand All @@ -54,7 +61,7 @@ class ETLPipeline(object):
"""
def __init__(self, name, frequency='one-time', ec2_resource_config=None,
time_delta=None, emr_cluster_config=None, load_time=None,
topic_arn=None, max_retries=MAX_RETRIES,
topic_arn=None, max_retries=MAX_RETRIES, teardown=None,
bootstrap=None, description=None):
"""Constructor for the pipeline class
Expand Down Expand Up @@ -95,6 +102,13 @@ def __init__(self, name, frequency='one-time', ec2_resource_config=None,
else:
self.bootstrap_definitions = dict()

if teardown is not None:
self.teardown_definition = teardown
elif getattr(config, 'teardown', None):
self.teardown_definition = config.teardown
else:
self.teardown_definition = DEFAULT_TEARDOWN

if emr_cluster_config:
self.emr_cluster_config = emr_cluster_config
else:
Expand Down Expand Up @@ -183,7 +197,6 @@ def create_base_objects(self):
)
self.default = self.create_pipeline_object(
object_class=DefaultObject,
sns=self.sns,
pipeline_log_uri=self.s3_log_dir,
)

Expand Down Expand Up @@ -392,7 +405,7 @@ def translate_input_nodes(self, input_node):
output[value] = self.intermediate_nodes[key]
return output

def add_step(self, step, is_bootstrap=False):
def add_step(self, step, is_bootstrap=False, is_teardown=False):
"""Add a step to the pipeline
Args:
Expand All @@ -403,16 +416,22 @@ def add_step(self, step, is_bootstrap=False):
raise ETLInputError('Step name %s already taken' % step.id)
self._steps[step.id] = step

if self.bootstrap_steps and not is_bootstrap:
if self.bootstrap_steps and not is_bootstrap and not is_teardown:
step.add_required_steps(self.bootstrap_steps)

if is_teardown:
teardown_dependencies = deepcopy(self._steps)
teardown_dependencies.pop(step.id)
step.add_required_steps(teardown_dependencies.values())

# Update intermediate_nodes dict
if isinstance(step.output, dict):
self.intermediate_nodes.update(step.output)
elif step.output and step.id:
self.intermediate_nodes[step.id] = step.output

def create_steps(self, steps_params, is_bootstrap=False):
def create_steps(self, steps_params, is_bootstrap=False,
is_teardown=False):
"""Create pipeline steps and add appropriate dependencies
Note:
Expand All @@ -422,6 +441,7 @@ def create_steps(self, steps_params, is_bootstrap=False):
Args:
steps_params(list of dict): List of dictionary of step params
is_bootstrap(bool): flag indicating bootstrap steps
is_teardown(bool): flag indicating teardown steps
Returns:
steps(list of ETLStep): list of etl step objects
Expand All @@ -437,6 +457,9 @@ def create_steps(self, steps_params, is_bootstrap=False):
'input_path' not in step_param:
step_param['input_node'] = input_node

if is_teardown:
step_param['sns_object'] = self.sns

try:
step_class = step_param.pop('step_class')
step_args = step_class.arguments_processor(self, step_param)
Expand All @@ -452,11 +475,16 @@ def create_steps(self, steps_params, is_bootstrap=False):
raise

# Add the step to the pipeline
self.add_step(step, is_bootstrap)
self.add_step(step, is_bootstrap, is_teardown)
input_node = step.output
steps.append(step)
return steps

def create_teardown_step(self):
"""Create teardown steps for the pipeline
"""
return self.create_steps([self.teardown_definition], is_teardown=True)

def create_bootstrap_steps(self, resource_type):
"""Create the boostrap steps for installation on all machines
Expand Down
4 changes: 2 additions & 2 deletions dataduct/pipeline/default_object.py
Expand Up @@ -2,9 +2,9 @@
Pipeline object class for default metadata
"""

from .pipeline_object import PipelineObject
from ..config import Config
from ..utils import constants as const
from .pipeline_object import PipelineObject

config = Config()
ROLE = config.etl['ROLE']
Expand All @@ -22,7 +22,7 @@ def __init__(self, id, pipeline_log_uri, sns=None, scheduleType='cron',
Args:
id(str): must be 'Default' for this class
sns(int): notify on failure
sns(sns): notify on failure
scheduleType(str): frequency type for the pipeline
failureAndRerunMode(str): aws input argument for failure mode
**kwargs(optional): Keyword arguments directly passed to base class
Expand Down
17 changes: 11 additions & 6 deletions dataduct/steps/etl_step.py
Expand Up @@ -5,9 +5,9 @@
from ..pipeline import Activity
from ..pipeline import CopyActivity
from ..pipeline import S3Node
from ..s3 import S3Path
from ..s3 import S3File
from ..s3 import S3LogPath
from ..s3 import S3Path
from ..utils import constants as const
from ..utils.exceptions import ETLInputError

Expand All @@ -33,7 +33,7 @@ class ETLStep(object):
def __init__(self, id, s3_data_dir=None, s3_log_dir=None,
s3_source_dir=None, schedule=None, resource=None,
input_node=None, input_path=None, required_steps=None,
max_retries=MAX_RETRIES):
max_retries=MAX_RETRIES, sns_object=None):
"""Constructor for the ETLStep object
Args:
Expand All @@ -59,6 +59,7 @@ def __init__(self, id, s3_data_dir=None, s3_log_dir=None,
self._required_steps = list()
self._required_activities = list()
self._input_node = input_node
self._sns_object = sns_object

if input_path is not None and input_node is not None:
raise ETLInputError('Both input_path and input_node specified')
Expand All @@ -79,7 +80,8 @@ def __init__(self, id, s3_data_dir=None, s3_log_dir=None,

if isinstance(input_node, dict):
# Merge the s3 nodes if there are multiple inputs
self._input_node, self._depends_on = self.merge_s3_nodes(input_node)
self._input_node, self._depends_on = self.merge_s3_nodes(
input_node)

if required_steps:
self.add_required_steps(required_steps)
Expand Down Expand Up @@ -126,6 +128,9 @@ def create_pipeline_object(self, object_class, **kwargs):
if isinstance(new_object, Activity):
new_object['dependsOn'] = self._required_activities

if self._sns_object:
new_object['onFail'] = self._sns_object

self._objects[object_id] = new_object
return new_object

Expand All @@ -143,7 +148,6 @@ def create_s3_data_node(self, s3_object=None, **kwargs):
isinstance(s3_object, S3Path)):
raise ETLInputError('s3_object must of type S3File or S3Path')


create_s3_path = False
if s3_object is None or (isinstance(s3_object, S3File) and
s3_object.s3_path is None):
Expand Down Expand Up @@ -219,7 +223,8 @@ def copy_s3(self, input_node, dest_uri):
Returns:
activity(CopyActivity): copy activity object
"""
if not(isinstance(input_node, S3Node) and isinstance(dest_uri, S3Path)):
if not(isinstance(input_node, S3Node) and
isinstance(dest_uri, S3Path)):
raise ETLInputError('input_node and uri have type mismatch')

# create s3 node for output
Expand Down Expand Up @@ -291,7 +296,7 @@ def output(self):
result: output node for this etl step
Note:
An output S3 node, or multiple s3 nodes. If this step produces no s3
An output S3 node, or multiple s3 nodes. If step produces no s3
nodes, there will be no output. For steps producing s3 output, note
that they may produce multiple output nodes. These nodes will be
defined in a list of output directories (specified in the
Expand Down

0 comments on commit 912bba3

Please sign in to comment.