From 912bba3cf2c157b3a4886e73bb799e5e675dae4a Mon Sep 17 00:00:00 2001 From: Sourabh Bajaj Date: Thu, 16 Jul 2015 14:11:42 -0700 Subject: [PATCH] Add teardown action for sns alerts --- dataduct/etl/etl_actions.py | 5 ++-- dataduct/etl/etl_pipeline.py | 40 ++++++++++++++++++++++++----- dataduct/pipeline/default_object.py | 4 +-- dataduct/steps/etl_step.py | 17 +++++++----- 4 files changed, 50 insertions(+), 16 deletions(-) diff --git a/dataduct/etl/etl_actions.py b/dataduct/etl/etl_actions.py index 84ebf79..f6ae2ac 100644 --- a/dataduct/etl/etl_actions.py +++ b/dataduct/etl/etl_actions.py @@ -2,14 +2,14 @@ """ import yaml -from .etl_pipeline import ETLPipeline from ..pipeline import Activity from ..pipeline import MysqlNode from ..pipeline import RedshiftNode from ..pipeline import S3Node from ..utils.exceptions import ETLInputError -from ..utils.hook import hook from ..utils.helpers import make_pipeline_url +from ..utils.hook import hook +from .etl_pipeline import ETLPipeline import logging logger = logging.getLogger(__name__) @@ -53,6 +53,7 @@ def create_pipeline(definition): # Add the steps to the pipeline object etl.create_steps(steps) + etl.create_teardown_step() logger.info('Created pipeline. Name: %s', etl.name) return etl diff --git a/dataduct/etl/etl_pipeline.py b/dataduct/etl/etl_pipeline.py index 227daed..dd00874 100644 --- a/dataduct/etl/etl_pipeline.py +++ b/dataduct/etl/etl_pipeline.py @@ -6,6 +6,7 @@ import yaml from StringIO import StringIO +from copy import deepcopy from datetime import datetime from datetime import timedelta @@ -44,6 +45,12 @@ DP_INSTANCE_LOG_PATH = config.etl.get('DP_INSTANCE_LOG_PATH', const.NONE) DP_PIPELINE_LOG_PATH = config.etl.get('DP_PIPELINE_LOG_PATH', const.NONE) +DEFAULT_TEARDOWN = { + 'step_type': 'transform', + 'command': 'echo Finished Pipeline', + 'no_output': True +} + class ETLPipeline(object): """DataPipeline class with steps and metadata. @@ -54,7 +61,7 @@ class ETLPipeline(object): """ def __init__(self, name, frequency='one-time', ec2_resource_config=None, time_delta=None, emr_cluster_config=None, load_time=None, - topic_arn=None, max_retries=MAX_RETRIES, + topic_arn=None, max_retries=MAX_RETRIES, teardown=None, bootstrap=None, description=None): """Constructor for the pipeline class @@ -95,6 +102,13 @@ def __init__(self, name, frequency='one-time', ec2_resource_config=None, else: self.bootstrap_definitions = dict() + if teardown is not None: + self.teardown_definition = teardown + elif getattr(config, 'teardown', None): + self.teardown_definition = config.teardown + else: + self.teardown_definition = DEFAULT_TEARDOWN + if emr_cluster_config: self.emr_cluster_config = emr_cluster_config else: @@ -183,7 +197,6 @@ def create_base_objects(self): ) self.default = self.create_pipeline_object( object_class=DefaultObject, - sns=self.sns, pipeline_log_uri=self.s3_log_dir, ) @@ -392,7 +405,7 @@ def translate_input_nodes(self, input_node): output[value] = self.intermediate_nodes[key] return output - def add_step(self, step, is_bootstrap=False): + def add_step(self, step, is_bootstrap=False, is_teardown=False): """Add a step to the pipeline Args: @@ -403,16 +416,22 @@ def add_step(self, step, is_bootstrap=False): raise ETLInputError('Step name %s already taken' % step.id) self._steps[step.id] = step - if self.bootstrap_steps and not is_bootstrap: + if self.bootstrap_steps and not is_bootstrap and not is_teardown: step.add_required_steps(self.bootstrap_steps) + if is_teardown: + teardown_dependencies = deepcopy(self._steps) + teardown_dependencies.pop(step.id) + step.add_required_steps(teardown_dependencies.values()) + # Update intermediate_nodes dict if isinstance(step.output, dict): self.intermediate_nodes.update(step.output) elif step.output and step.id: self.intermediate_nodes[step.id] = step.output - def create_steps(self, steps_params, is_bootstrap=False): + def create_steps(self, steps_params, is_bootstrap=False, + is_teardown=False): """Create pipeline steps and add appropriate dependencies Note: @@ -422,6 +441,7 @@ def create_steps(self, steps_params, is_bootstrap=False): Args: steps_params(list of dict): List of dictionary of step params is_bootstrap(bool): flag indicating bootstrap steps + is_teardown(bool): flag indicating teardown steps Returns: steps(list of ETLStep): list of etl step objects @@ -437,6 +457,9 @@ def create_steps(self, steps_params, is_bootstrap=False): 'input_path' not in step_param: step_param['input_node'] = input_node + if is_teardown: + step_param['sns_object'] = self.sns + try: step_class = step_param.pop('step_class') step_args = step_class.arguments_processor(self, step_param) @@ -452,11 +475,16 @@ def create_steps(self, steps_params, is_bootstrap=False): raise # Add the step to the pipeline - self.add_step(step, is_bootstrap) + self.add_step(step, is_bootstrap, is_teardown) input_node = step.output steps.append(step) return steps + def create_teardown_step(self): + """Create teardown steps for the pipeline + """ + return self.create_steps([self.teardown_definition], is_teardown=True) + def create_bootstrap_steps(self, resource_type): """Create the boostrap steps for installation on all machines diff --git a/dataduct/pipeline/default_object.py b/dataduct/pipeline/default_object.py index 53d10e8..78d7251 100644 --- a/dataduct/pipeline/default_object.py +++ b/dataduct/pipeline/default_object.py @@ -2,9 +2,9 @@ Pipeline object class for default metadata """ -from .pipeline_object import PipelineObject from ..config import Config from ..utils import constants as const +from .pipeline_object import PipelineObject config = Config() ROLE = config.etl['ROLE'] @@ -22,7 +22,7 @@ def __init__(self, id, pipeline_log_uri, sns=None, scheduleType='cron', Args: id(str): must be 'Default' for this class - sns(int): notify on failure + sns(sns): notify on failure scheduleType(str): frequency type for the pipeline failureAndRerunMode(str): aws input argument for failure mode **kwargs(optional): Keyword arguments directly passed to base class diff --git a/dataduct/steps/etl_step.py b/dataduct/steps/etl_step.py index f14ce29..4375a6b 100644 --- a/dataduct/steps/etl_step.py +++ b/dataduct/steps/etl_step.py @@ -5,9 +5,9 @@ from ..pipeline import Activity from ..pipeline import CopyActivity from ..pipeline import S3Node -from ..s3 import S3Path from ..s3 import S3File from ..s3 import S3LogPath +from ..s3 import S3Path from ..utils import constants as const from ..utils.exceptions import ETLInputError @@ -33,7 +33,7 @@ class ETLStep(object): def __init__(self, id, s3_data_dir=None, s3_log_dir=None, s3_source_dir=None, schedule=None, resource=None, input_node=None, input_path=None, required_steps=None, - max_retries=MAX_RETRIES): + max_retries=MAX_RETRIES, sns_object=None): """Constructor for the ETLStep object Args: @@ -59,6 +59,7 @@ def __init__(self, id, s3_data_dir=None, s3_log_dir=None, self._required_steps = list() self._required_activities = list() self._input_node = input_node + self._sns_object = sns_object if input_path is not None and input_node is not None: raise ETLInputError('Both input_path and input_node specified') @@ -79,7 +80,8 @@ def __init__(self, id, s3_data_dir=None, s3_log_dir=None, if isinstance(input_node, dict): # Merge the s3 nodes if there are multiple inputs - self._input_node, self._depends_on = self.merge_s3_nodes(input_node) + self._input_node, self._depends_on = self.merge_s3_nodes( + input_node) if required_steps: self.add_required_steps(required_steps) @@ -126,6 +128,9 @@ def create_pipeline_object(self, object_class, **kwargs): if isinstance(new_object, Activity): new_object['dependsOn'] = self._required_activities + if self._sns_object: + new_object['onFail'] = self._sns_object + self._objects[object_id] = new_object return new_object @@ -143,7 +148,6 @@ def create_s3_data_node(self, s3_object=None, **kwargs): isinstance(s3_object, S3Path)): raise ETLInputError('s3_object must of type S3File or S3Path') - create_s3_path = False if s3_object is None or (isinstance(s3_object, S3File) and s3_object.s3_path is None): @@ -219,7 +223,8 @@ def copy_s3(self, input_node, dest_uri): Returns: activity(CopyActivity): copy activity object """ - if not(isinstance(input_node, S3Node) and isinstance(dest_uri, S3Path)): + if not(isinstance(input_node, S3Node) and + isinstance(dest_uri, S3Path)): raise ETLInputError('input_node and uri have type mismatch') # create s3 node for output @@ -291,7 +296,7 @@ def output(self): result: output node for this etl step Note: - An output S3 node, or multiple s3 nodes. If this step produces no s3 + An output S3 node, or multiple s3 nodes. If step produces no s3 nodes, there will be no output. For steps producing s3 output, note that they may produce multiple output nodes. These nodes will be defined in a list of output directories (specified in the