Skip to content

Commit

Permalink
Merge 6e95349 into 5d0dfe6
Browse files Browse the repository at this point in the history
  • Loading branch information
cliu587 committed Nov 19, 2015
2 parents 5d0dfe6 + 6e95349 commit 416007e
Show file tree
Hide file tree
Showing 21 changed files with 118 additions and 36 deletions.
13 changes: 12 additions & 1 deletion dataduct/pipeline/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
"""

from .pipeline_object import PipelineObject
from ..utils.helpers import exactly_one
from ..utils.exceptions import ETLInputError


class Activity(PipelineObject):
"""Base class for pipeline activities
"""

def __init__(self, dependsOn, maximumRetries, **kwargs):
def __init__(self, dependsOn, maximumRetries, runsOn,
workerGroup, **kwargs):
"""Constructor for the activity class
Args:
Expand All @@ -20,6 +23,14 @@ def __init__(self, dependsOn, maximumRetries, **kwargs):
Note:
dependsOn and maximum retries are required fields for any activity
"""
if not exactly_one(runsOn, workerGroup):
raise ETLInputError(
'Exactly one of runsOn or workerGroup allowed!')

if runsOn:
kwargs['runsOn'] = runsOn
else:
kwargs['workerGroup'] = workerGroup
super(Activity, self).__init__(
dependsOn=dependsOn,
maximumRetries=maximumRetries,
Expand Down
7 changes: 5 additions & 2 deletions dataduct/pipeline/copy_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ def __init__(self,
id,
input_node,
output_node,
resource,
schedule,
resource=None,
worker_group=None,
max_retries=None,
depends_on=None,
**kwargs):
Expand All @@ -33,8 +34,9 @@ def __init__(self,
id(str): id of the object
input_node(S3Node / list of S3Nodes): input nodes for the activity
output_node(S3Node / list of S3Nodes): output nodes for activity
resource(Ec2Resource / EmrResource): resource to run the activity on
schedule(Schedule): schedule of the pipeline
resource(Ec2Resource / EmrResource): resource to run the activity on
worker_group(str): the worker group to run the activity on
max_retries(int): number of retries for the activity
depends_on(list of activities): dependendent pipelines steps
**kwargs(optional): Keyword arguments directly passed to base class
Expand All @@ -60,5 +62,6 @@ def __init__(self,
input=input_node,
output=output_node,
runsOn=resource,
workerGroup=worker_group,
schedule=schedule,
)
7 changes: 5 additions & 2 deletions dataduct/pipeline/emr_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ class EmrActivity(Activity):

def __init__(self,
id,
resource,
schedule,
input_node,
emr_step_string,
resource=None,
worker_group=None,
output_node=None,
additional_files=None,
max_retries=None,
Expand All @@ -30,9 +31,10 @@ def __init__(self,
Args:
id(str): id of the object
resource(Ec2Resource / EMRResource): resource to run the activity on
schedule(Schedule): schedule of the pipeline
emr_step_string(list of str): command string to be executed
resource(Ec2Resource / EMRResource): resource to run the activity on
worker_group(str): the worker group to run the activity on
output_node(S3Node): output_node for the emr job
additional_files(list of S3File): Additional files required for emr
max_retries(int): number of retries for the activity
Expand All @@ -56,6 +58,7 @@ def __init__(self,
maximumRetries=max_retries,
dependsOn=depends_on,
runsOn=resource,
workerGroup=worker_group,
schedule=schedule,
step=emr_step_string,
output=output_node,
Expand Down
7 changes: 5 additions & 2 deletions dataduct/pipeline/redshift_copy_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,24 @@ class RedshiftCopyActivity(Activity):

def __init__(self,
id,
resource,
schedule,
input_node,
output_node,
insert_mode,
resource=None,
worker_group=None,
command_options=None,
max_retries=None,
depends_on=None):
"""Constructor for the RedshiftCopyActivity class
Args:
id(str): id of the object
resource(Ec2Resource / EMRResource): resource to run the activity on
schedule(Schedule): schedule of the pipeline
input_node(S3Node / RedshiftNode): input data node
output_node(S3Node / RedshiftNode): output data node
resource(Ec2Resource / EMRResource): resource to run the activity on
worker_group(str): the worker group to run the activity on
command_options(list of str): command options for the activity
max_retries(int): number of retries for the activity
depends_on(list of activities): dependendent pipelines steps
Expand All @@ -60,6 +62,7 @@ def __init__(self,
'input': input_node,
'output': output_node,
'runsOn': resource,
'workerGroup': worker_group,
'insertMode': insert_mode,
'schedule': schedule,
'dependsOn': depends_on,
Expand Down
7 changes: 5 additions & 2 deletions dataduct/pipeline/shell_command_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ def __init__(self,
id,
input_node,
output_node,
resource,
schedule,
resource=None,
worker_group=None,
script_uri=None,
script_arguments=None,
command=None,
Expand All @@ -35,8 +36,9 @@ def __init__(self,
id(str): id of the object
input_node(S3Node / list of S3Nodes): input nodes for the activity
output_node(S3Node / list of S3Nodes): output nodes for activity
resource(Ec2Resource / EMRResource): resource to run the activity on
schedule(Schedule): schedule of the pipeline
resource(Ec2Resource / EMRResource): resource to run the activity on
worker_group(str): the worker group to run the activity on
script_uri(S3File): s3 uri of the script
script_arguments(list of str): command line arguments to the script
command(str): command to be run as shell activity
Expand Down Expand Up @@ -71,6 +73,7 @@ def __init__(self,
input=input_node,
output=output_node,
runsOn=resource,
workerGroup=worker_group,
schedule=schedule,
scriptUri=script_uri,
scriptArgument=script_arguments,
Expand Down
8 changes: 5 additions & 3 deletions dataduct/pipeline/sql_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ class SqlActivity(Activity):

def __init__(self,
id,
resource,
schedule,
script,
database,
resource=None,
worker_group=None,
script_arguments=None,
queue=None,
max_retries=None,
Expand All @@ -32,11 +33,11 @@ def __init__(self,
Args:
id(str): id of the object
resource(Ec2Resource / EMRResource): resource to run the activity on
schedule(Schedule): schedule of the pipeline
script(S3File): s3 uri of the script
script_arguments(list of str): command line arguments to the script
database(RedshiftDatabase): database to execute commands on
resource(Ec2Resource / EMRResource): resource to run the activity on
worker_group(str): the worker group to run the activity on
queue(str): queue in which the query should be executed
max_retries(int): number of retries for the activity
depends_on(list of activities): dependendent pipelines steps
Expand All @@ -63,6 +64,7 @@ def __init__(self,
maximumRetries=max_retries,
dependsOn=depends_on,
runsOn=resource,
workerGroup=worker_group,
schedule=schedule,
scriptUri=script,
scriptArgument=script_arguments,
Expand Down
2 changes: 1 addition & 1 deletion dataduct/steps/create_load_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ def arguments_processor(cls, etl, input_args):
step_args(dict): Dictionary of the step arguments for the class
"""
step_args = cls.base_arguments_processor(etl, input_args)
step_args['resource'] = etl.ec2_resource

return step_args
2 changes: 1 addition & 1 deletion dataduct/steps/create_update_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,5 @@ def arguments_processor(cls, etl, input_args):
"""
step_args = cls.base_arguments_processor(etl, input_args)
cls.pop_inputs(step_args)
step_args['resource'] = etl.ec2_resource

return step_args
6 changes: 4 additions & 2 deletions dataduct/steps/emr_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
from .etl_step import ETLStep
from ..pipeline import EmrActivity
from ..utils import constants as const


class EMRJobStep(ETLStep):
Expand All @@ -27,6 +28,7 @@ def __init__(self,
self.activity = self.create_pipeline_object(
object_class=EmrActivity,
resource=self.resource,
worker_group=self.worker_group,
input_node=self.input,
schedule=self.schedule,
emr_step_string=step_string,
Expand All @@ -43,7 +45,7 @@ def arguments_processor(cls, etl, input_args):
etl(ETLPipeline): Pipeline object containing resources and steps
step_args(dict): Dictionary of the step arguments for the class
"""
step_args = cls.base_arguments_processor(etl, input_args)
step_args['resource'] = etl.emr_cluster
step_args = cls.base_arguments_processor(
etl, input_args, resource_type=const.EMR_CLUSTER_STR)

return step_args
6 changes: 4 additions & 2 deletions dataduct/steps/emr_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .etl_step import ETLStep
from ..pipeline import EmrActivity
from ..s3 import S3File
from ..utils import constants as const

HADOOP_1_SERIES = ['1', '2']

Expand Down Expand Up @@ -111,6 +112,7 @@ def __init__(self,
self.activity = self.create_pipeline_object(
object_class=EmrActivity,
resource=self.resource,
worker_group=self.worker_group,
input_node=self.input,
schedule=self.schedule,
emr_step_string=step_string,
Expand All @@ -128,7 +130,7 @@ def arguments_processor(cls, etl, input_args):
etl(ETLPipeline): Pipeline object containing resources and steps
step_args(dict): Dictionary of the step arguments for the class
"""
step_args = cls.base_arguments_processor(etl, input_args)
step_args['resource'] = etl.emr_cluster
step_args = cls.base_arguments_processor(
etl, input_args, resource_type=const.EMR_CLUSTER_STR)

return step_args
37 changes: 30 additions & 7 deletions dataduct/steps/etl_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
class ETLStep(object):
"""ETL step class with activities and metadata.
An ETL Step is an abstraction over the set of each database object. It
An ETL Step is an abstraction over a unit of work. It
represents a chunk of objects having the following attributes:
- input
Expand All @@ -32,8 +32,8 @@ class ETLStep(object):

def __init__(self, id, s3_data_dir=None, s3_log_dir=None,
s3_source_dir=None, schedule=None, resource=None,
input_node=None, input_path=None, required_steps=None,
max_retries=MAX_RETRIES, sns_object=None):
worker_group=None, input_node=None, input_path=None,
required_steps=None, max_retries=MAX_RETRIES, sns_object=None):
"""Constructor for the ETLStep object
Args:
Expand All @@ -52,6 +52,7 @@ def __init__(self, id, s3_data_dir=None, s3_log_dir=None,
self.s3_source_dir = s3_source_dir
self.schedule = schedule
self.resource = resource
self.worker_group = worker_group
self.max_retries = max_retries
self._depends_on = list()
self._output = None
Expand Down Expand Up @@ -101,7 +102,7 @@ def add_required_steps(self, required_steps):
for step in required_steps:
self._required_activities.extend(step.activities)

# Set required_acitivites as depend_on variable of all activities
# Set required_activities as the depend_on variable of all activities
for activity in self.activities:
activity['dependsOn'] = self._find_actual_required_activities()

Expand Down Expand Up @@ -268,6 +269,7 @@ def copy_s3(self, input_node, dest_uri):
CopyActivity,
schedule=self.schedule,
resource=self.resource,
worker_group=self.worker_group,
input_node=new_input_node,
output_node=output_node,
max_retries=self.max_retries
Expand Down Expand Up @@ -390,20 +392,41 @@ def activities(self):
return [x for x in self._objects.values() if isinstance(x, Activity)]

@classmethod
def base_arguments_processor(cls, etl, input_args):
def base_arguments_processor(cls, etl, input_args,
resource_type=const.EC2_RESOURCE_STR):
"""Process the step arguments according to the ETL pipeline
Args:
etl(ETLPipeline): Pipeline object containing resources and steps
input_args(dict): Dictionary of the step arguments from the YAML
resource_type(str): either const.EMR_CLUSTER_STR
or const.EC2_RESOURCE_STR
"""
assert (resource_type in [const.EMR_CLUSTER_STR,
const.EC2_RESOURCE_STR],
'resource type must be one of %s or %s' %
(const.EC2_RESOURCE_STR, const.EMR_CLUSTER_STR))
resource_or_worker_group = {}
if resource_type == const.EMR_CLUSTER_STR:
worker_group = config.emr.get('WORKER_GROUP', None)
if worker_group:
resource_or_worker_group['worker_group'] = worker_group
else:
resource_or_worker_group['resource'] = etl.emr
else:
worker_group = config.ec2.get('WORKER_GROUP', None)
if worker_group:
resource_or_worker_group['worker_group'] = worker_group
else:
resource_or_worker_group['resource'] = etl.ec2_resource

# Base dictionary for every step
step_args = {
'resource': None,
'schedule': etl.schedule,
'max_retries': etl.max_retries,
'required_steps': list()
'required_steps': list(),
}
step_args.update(resource_or_worker_group)
step_args.update(input_args)

# Description is optional and should not be passed
Expand Down
3 changes: 2 additions & 1 deletion dataduct/steps/extract_rds.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def __init__(self,
object_class=CopyActivity,
schedule=self.schedule,
resource=self.resource,
worker_group=self.worker_group,
input_node=input_node,
output_node=intermediate_node,
depends_on=self.depends_on,
Expand All @@ -101,6 +102,7 @@ def __init__(self,
command=command,
max_retries=self.max_retries,
resource=self.resource,
worker_group=self.worker_group,
schedule=self.schedule,
)

Expand All @@ -114,6 +116,5 @@ def arguments_processor(cls, etl, input_args):
"""
input_args = cls.pop_inputs(input_args)
step_args = cls.base_arguments_processor(etl, input_args)
step_args['resource'] = etl.ec2_resource

return step_args
2 changes: 1 addition & 1 deletion dataduct/steps/extract_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(self,
output_node=self.output,
insert_mode=insert_mode,
resource=self.resource,
worker_group=self.worker_group,
schedule=self.schedule,
depends_on=self.depends_on,
command_options=["DELIMITER '\t' ESCAPE"],
Expand All @@ -63,6 +64,5 @@ def arguments_processor(cls, etl, input_args):
input_args = cls.pop_inputs(input_args)
step_args = cls.base_arguments_processor(etl, input_args)
step_args['redshift_database'] = etl.redshift_database
step_args['resource'] = etl.ec2_resource

return step_args
1 change: 1 addition & 0 deletions dataduct/steps/extract_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ def arguments_processor(cls, etl, input_args):
input_args = cls.pop_inputs(input_args)
step_args = cls.base_arguments_processor(etl, input_args)
step_args.pop('resource')
step_args.pop('worker_group')

return step_args

0 comments on commit 416007e

Please sign in to comment.