Skip to content

Commit

Permalink
Merge 0c4bb2f into d9bcd6c
Browse files Browse the repository at this point in the history
  • Loading branch information
cliu587 committed Nov 18, 2015
2 parents d9bcd6c + 0c4bb2f commit 7780165
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 26 deletions.
4 changes: 3 additions & 1 deletion dataduct/pipeline/shell_command_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,16 @@ def __init__(self,
depends_on = []
if max_retries is None:
max_retries = MAX_RETRIES
# Set stage to true if we use either input or output node
stage = 'true' if input_node or output_node else 'false'

super(ShellCommandActivity, self).__init__(
id=id,
retryDelay=RETRY_DELAY,
type='ShellCommandActivity',
maximumRetries=max_retries,
dependsOn=depends_on,
stage='true',
stage=stage,
input=input_node,
output=output_node,
runsOn=resource,
Expand Down
6 changes: 3 additions & 3 deletions dataduct/steps/create_load_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class CreateAndLoadStep(TransformStep):
"""CreateAndLoad Step class that creates table if needed and loads data
"""

def __init__(self, id, table_definition, input_node=None,
def __init__(self, id, table_definition, input_node,
script_arguments=None, **kwargs):
"""Constructor for the CreateAndLoadStep class
Expand Down Expand Up @@ -47,8 +47,8 @@ def __init__(self, id, table_definition, input_node=None,
script = os.path.join(steps_path, const.CREATE_LOAD_SCRIPT_PATH)

super(CreateAndLoadStep, self).__init__(
id=id, script=script, input_node=input_node,
script_arguments=script_arguments, **kwargs)
id=id, script=script, script_arguments=script_arguments,
no_input=True, no_output=True, **kwargs)

@classmethod
def arguments_processor(cls, etl, input_args):
Expand Down
16 changes: 6 additions & 10 deletions dataduct/steps/load_reload_pk.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def __init__(self, id, input_node, staging_table_definition,
log_to_s3=log_to_s3
)


def primary_key_check(self, table_definition, pipeline_name, depends_on,
log_to_s3):
table = self.get_table_from_def(table_definition)
Expand All @@ -83,7 +82,7 @@ def primary_key_check(self, table_definition, pipeline_name, depends_on,
primary_key_check_pipeline_object = self.create_pipeline_object(
object_class=ShellCommandActivity,
object_name=step_name,
input_node=[],
input_node=None,
output_node=None,
resource=self.resource,
schedule=self.schedule,
Expand All @@ -94,7 +93,6 @@ def primary_key_check(self, table_definition, pipeline_name, depends_on,
)
return primary_key_check_pipeline_object


def reload(self, source, destination, depends_on,
analyze_table, non_transactional):
source_table = parse_path(source)
Expand Down Expand Up @@ -128,7 +126,7 @@ def reload(self, source, destination, depends_on,
reload_pipeline_object = self.create_pipeline_object(
object_class=ShellCommandActivity,
object_name=self.get_name("reload"),
input_node=[],
input_node=None,
output_node=None,
resource=self.resource,
schedule=self.schedule,
Expand Down Expand Up @@ -161,14 +159,11 @@ def create_and_load_redshift(self, table_definition,
script = os.path.join(steps_path, const.CREATE_LOAD_SCRIPT_PATH)
script = self.create_script(S3File(path=script))

# Create output_node based on output_path
base_output_node = self.create_s3_data_node()

create_and_load_pipeline_object = self.create_pipeline_object(
object_class=ShellCommandActivity,
object_name=self.get_name("create_and_load"),
input_node=[input_node],
output_node=base_output_node,
input_node=None,
output_node=None,
resource=self.resource,
schedule=self.schedule,
script_uri=script,
Expand All @@ -179,7 +174,8 @@ def create_and_load_redshift(self, table_definition,
return create_and_load_pipeline_object


def get_table_from_def(self, table_definition):
@classmethod
def get_table_from_def(cls, table_definition):
with open(parse_path(table_definition)) as f:
table_def_string = f.read()

Expand Down
28 changes: 16 additions & 12 deletions dataduct/steps/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@


class TransformStep(ETLStep):
"""Transform Step class that helps run scripts on resouces
"""Transform Step class that helps run scripts on resources
"""

def __init__(self,
command=None,
script=None,
script_directory=None,
script_name=None,
output_node=None,
script_arguments=None,
additional_s3_files=None,
output_node=None,
output_path=None,
no_output=False,
no_input=False,
**kwargs):
"""Constructor for the TransformStep class
Expand All @@ -42,9 +43,12 @@ def __init__(self,
script(path): local path to the script that should executed
script_directory(path): local path to the script directory
script_name(str): script to be executed in the directory
output_node(dict): output data nodes from the transform
script_arguments(list of str): list of arguments to the script
additional_s3_files(list of S3File): additional files used
output_node(dict): output data nodes from the transform
output_path(str): the S3 path to output data
no_output(bool): whether the script outputs anything to s3
no_input(bool): whether the script takes any inputs
**kwargs(optional): Keyword arguments directly passed to base class
"""
super(TransformStep, self).__init__(**kwargs)
Expand All @@ -53,20 +57,18 @@ def __init__(self,
raise ETLInputError(
'Only one of script, command and directory allowed')

base_output_node = None
if not no_output:
# Create output_node based on output_path
base_output_node = self.create_s3_data_node(
self.get_output_s3_path(get_modified_s3_path(output_path)))
# Create output_node based on output_path
base_output_node = self.create_s3_data_node(
self.get_output_s3_path(get_modified_s3_path(output_path)))

script_arguments = self.translate_arguments(script_arguments)
if script_arguments is None:
script_arguments = []

if self.input:
if self.input and not no_input:
input_nodes = [self.input]
else:
input_nodes = list()
input_nodes = []

if script_directory:
# The script to be run with the directory
Expand Down Expand Up @@ -98,7 +100,7 @@ def __init__(self,
if script:
script = self.create_script(S3File(path=script))

# Translate output nodes if output map provided
# Translate output nodes if output path is provided
if output_node:
self._output = self.create_output_nodes(
base_output_node, output_node)
Expand All @@ -108,10 +110,12 @@ def __init__(self,
logger.debug('Script Arguments:')
logger.debug(script_arguments)

output_node = None if no_output else base_output_node

self.create_pipeline_object(
object_class=ShellCommandActivity,
input_node=input_nodes,
output_node=base_output_node,
output_node=output_node,
resource=self.resource,
schedule=self.schedule,
script_uri=script,
Expand Down

0 comments on commit 7780165

Please sign in to comment.