Skip to content

Commit

Permalink
Merge 39895d3 into f82387b
Browse files Browse the repository at this point in the history
  • Loading branch information
idralyuk committed Nov 22, 2016
2 parents f82387b + 39895d3 commit 5bfcdd0
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 25 deletions.
15 changes: 12 additions & 3 deletions dataduct/etl/etl_pipeline.py
Expand Up @@ -193,12 +193,17 @@ def create_base_objects(self):
else:
self.sns = self.create_pipeline_object(
object_class=SNSAlarm,
topic_arn=self.topic_arn,
topic_arn=self.topic_arn.replace('all:',''),
pipeline_name=self.name,
)
if self.frequency == 'on-demand':
scheduleType='ONDEMAND'
else:
scheduleType='cron'
self.default = self.create_pipeline_object(
object_class=DefaultObject,
pipeline_log_uri=self.s3_log_dir,
scheduleType=scheduleType
)

@property
Expand Down Expand Up @@ -476,8 +481,12 @@ def create_steps(self, steps_params, is_bootstrap=False,
'input_path' not in step_param:
step_param['input_node'] = input_node

if is_teardown:
step_param['sns_object'] = self.sns
if hasattr(self.sns,'fields'):
if self.topic_arn.startswith("all:"):
## Instead of just teardown set sns for every step so as to get SNS alerts with error stack trace
step_param['sns_object'] = self.sns
elif is_teardown:
step_param['sns_object'] = self.sns

try:
step_class = step_param.pop('step_class')
Expand Down
2 changes: 1 addition & 1 deletion dataduct/pipeline/data_pipeline.py
Expand Up @@ -65,7 +65,7 @@ def aws_format(self):
Returns:
result(list of dict): list of AWS-readable dict of all objects
"""
return [x.aws_format() for x in self.objects]
return [x.aws_format() for x in self.objects if hasattr(x,'fields')]

def add_object(self, pipeline_object):
"""Add an object to the datapipeline
Expand Down
52 changes: 32 additions & 20 deletions dataduct/pipeline/pipeline_object.py
Expand Up @@ -8,7 +8,7 @@
from ..s3 import S3Path
from ..utils.exceptions import ETLInputError


scheduleType = ''
class PipelineObject(object):
"""DataPipeline class with steps and metadata.
Expand Down Expand Up @@ -56,12 +56,15 @@ def s3_files(self):
Returns:
result(list of S3Files): List of files to be uploaded to s3
"""
result = self.additional_s3_files
for _, values in self.fields.iteritems():
for value in values:
if isinstance(value, S3File) or isinstance(value, S3Directory):
result.append(value)
return result
if hasattr(self,'additional_s3_files'):
result = self.additional_s3_files
for _, values in self.fields.iteritems():
for value in values:
if isinstance(value, S3File) or isinstance(value, S3Directory):
result.append(value)
return result
else:
return []

def __getitem__(self, key):
"""Fetch the items associated with a key
Expand Down Expand Up @@ -130,16 +133,25 @@ def aws_format(self):
result: The AWS-readable dict format of the object
"""
fields = []
for key, values in self.fields.iteritems():
for value in values:
if isinstance(value, PipelineObject):
fields.append({'key': key, 'refValue': value.id})
elif isinstance(value, S3Path):
fields.append({'key': key, 'stringValue': value.uri})
elif isinstance(value, S3File) or \
isinstance(value, S3Directory):
fields.append({'key': key,
'stringValue': value.s3_path.uri})
else:
fields.append({'key': key, 'stringValue': str(value)})
return {'id': self._id, 'name': self._id, 'fields': fields}
global scheduleType
if hasattr(self, 'fields'):
for key, values in self.fields.iteritems():
for value in values:
if isinstance(value, PipelineObject):
if scheduleType == 'ONDEMAND'and key == 'schedule' :
pass
else:
fields.append({'key': key, 'refValue': value.id})
elif isinstance(value, S3Path):
fields.append({'key': key, 'stringValue': value.uri})
elif isinstance(value, S3File) or \
isinstance(value, S3Directory):
fields.append({'key': key,
'stringValue': value.s3_path.uri})
else:
if key == 'scheduleType' and str(value) == 'ONDEMAND':
scheduleType = 'ONDEMAND'
fields.append({'key': key, 'stringValue': str(value)})
return {'id': self._id, 'name': self._id, 'fields': fields}
else:
return None
7 changes: 7 additions & 0 deletions dataduct/pipeline/schedule.py
Expand Up @@ -37,6 +37,9 @@
'8-hours': ('8 hours', None),
'12-hours': ('12 hours', None),
'one-time': ('15 minutes', 1),
'on-demand': ('ondemand', None),
'30-min': ('30 minutes', None),
'15-min': ('15 minutes', None),
}


Expand All @@ -62,6 +65,10 @@ def __init__(self,
load_minutes(int): Minutes at which the pipeline should be run
**kwargs(optional): Keyword arguments directly passed to base class
"""
if frequency == 'on-demand':
logger.debug("On-demand schedule required so don't create schedule object")
return None

current_time = datetime.utcnow()
# Set the defaults for load hour and minutes
if load_minutes is None:
Expand Down
2 changes: 1 addition & 1 deletion dataduct/pipeline/sns_alarm.py
Expand Up @@ -42,7 +42,7 @@ def __init__(self,
'Error Stack Trace: #{node.errorStackTrace}'
])

subject = 'Data Pipeline %s failed' % pipeline_name
subject = 'Data Pipeline %s #{node.@status}' % pipeline_name

if topic_arn is None:
topic_arn = SNS_TOPIC_ARN_FAILURE
Expand Down

0 comments on commit 5bfcdd0

Please sign in to comment.