Skip to content

Commit

Permalink
Merge 8c1584e into 05b81ea
Browse files Browse the repository at this point in the history
  • Loading branch information
p5k6 committed Apr 27, 2016
2 parents 05b81ea + 8c1584e commit 4d5fdd7
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
13 changes: 12 additions & 1 deletion dataduct/etl/etl_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ class ETLPipeline(object):
and has functionality to add steps to the pipeline
"""

# put this here so as not to pollute global namespace. Also makes mocking
# easier
DEFAULT_TOPIC_ARN = config.etl.get('DEFAULT_TOPIC_ARN', const.NONE)

def __init__(self, name, frequency='one-time', ec2_resource_config=None,
time_delta=None, emr_cluster_config=None, load_time=None,
topic_arn=None, max_retries=MAX_RETRIES, teardown=None,
Expand Down Expand Up @@ -93,7 +98,13 @@ def __init__(self, name, frequency='one-time', ec2_resource_config=None,
self.time_delta = time_delta
self.description = description
self.max_retries = max_retries
self.topic_arn = topic_arn

if topic_arn is not None:
self.topic_arn = topic_arn
elif self.DEFAULT_TOPIC_ARN:
self.topic_arn = self.DEFAULT_TOPIC_ARN
else:
self.topic_arn = None

if bootstrap is not None:
self.bootstrap_definitions = bootstrap
Expand Down
16 changes: 15 additions & 1 deletion dataduct/etl/tests/test_etl_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
"""Tests for the ETL Pipeline object
"""
import unittest
import mock
from nose.tools import raises
from nose.tools import eq_
from nose.tools import assert_not_equal

from datetime import timedelta
from ..etl_pipeline import ETLPipeline
from ...utils.exceptions import ETLInputError


class EtlPipelineTests(unittest.TestCase):
"""Tests for the ETL Pipeline object
"""
Expand Down Expand Up @@ -59,3 +60,16 @@ def test_bad_data_type_throws(self):
_s3_uri is bad
"""
self.default_pipeline._s3_uri('TEST_DATA_TYPE')

@staticmethod
def test_default_arn_loaded_if_not_in_etl_yaml():
with mock.patch('dataduct.etl.etl_pipeline.ETLPipeline.DEFAULT_TOPIC_ARN', 'blah'):
result = ETLPipeline('test_pipeline')
eq_(result.topic_arn, 'blah')

@staticmethod
def test_arn_loads_if_provided_in_etl_yaml():
with mock.patch('dataduct.etl.etl_pipeline.ETLPipeline.DEFAULT_TOPIC_ARN', 'blah'):
result = ETLPipeline('test_pipeline', topic_arn="not_blah")
eq_(result.topic_arn, "not_blah")
assert_not_equal(result.topic_arn, ETLPipeline.DEFAULT_TOPIC_ARN)
3 changes: 3 additions & 0 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ ETL
ROLE: FILL_ME_IN
S3_BASE_PATH: dev
S3_ETL_BUCKET: FILL_ME_IN
DEFAULT_TOPIC_ARN: 'arn:aws:sns:example_arn'
SNS_TOPIC_ARN_FAILURE: null
SNS_TOPIC_ARN_WARNING: null
FREQUENCY_OVERRIDE: one-time
Expand Down Expand Up @@ -222,6 +223,8 @@ level. The parameters are explained below:
or across production and dev
- ``S3_ETL_BUCKET``: S3 bucket to use for DP data, logs, source code
etc.
- ``DEFAULT_TOPIC_ARN``: default ARN to use for pipelines. Overridden if
specified in pipeline yaml definition.
- ``SNS_TOPIC_ARN_FAILURE``: SNS to trigger for failed steps or
pipelines
- ``SNS_TOPIC_ARN_WARNING``: SNS to trigger for failed QA checks
Expand Down

0 comments on commit 4d5fdd7

Please sign in to comment.