Skip to content

Commit

Permalink
Merge pull request #177 from coursera/combine_multi_step_v4
Browse files Browse the repository at this point in the history
Allow new step to use script_arguments
  • Loading branch information
darinyu-coursera committed Nov 17, 2015
2 parents d1ea1c7 + b456b13 commit a06b471
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 6 deletions.
16 changes: 10 additions & 6 deletions dataduct/steps/load_reload_pk.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ class LoadReloadAndPrimaryKeyStep(ETLStep):

def __init__(self, id, input_node, staging_table_definition,
production_table_definition, pipeline_name,
analyze_table=True, non_transactional=False,
log_to_s3=False, **kwargs):
script_arguments=None, analyze_table=True,
non_transactional=False, log_to_s3=False, **kwargs):
"""Constructor for the LoadReloadAndPrimaryKeyStep class
Args:
Expand All @@ -39,7 +39,8 @@ def __init__(self, id, input_node, staging_table_definition,
# they support all the parameters
create_and_load_pipeline_object = self.create_and_load_redshift(
table_definition=staging_table_definition,
input_node=input_node
input_node=input_node,
script_arguments=script_arguments
)

reload_pipeline_object = self.reload(
Expand Down Expand Up @@ -139,18 +140,21 @@ def reload(self, source, destination, depends_on,
return reload_pipeline_object


def create_and_load_redshift(self, table_definition, input_node):
def create_and_load_redshift(self, table_definition,
input_node, script_arguments):
if not script_arguments:
script_arguments = list()
table = self.get_table_from_def(table_definition)

if isinstance(input_node, dict):
input_paths = [i.path().uri for i in input_node.values()]
else:
input_paths = [input_node.path().uri]

script_arguments = [
script_arguments.extend([
'--table_definition=%s' % table.sql().sql(),
'--s3_input_paths'
]
])
script_arguments.extend(input_paths)

steps_path = os.path.abspath(os.path.dirname(__file__))
Expand Down
40 changes: 40 additions & 0 deletions docs/steps.rst
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,46 @@ Example
- step_type: create-load-redshift
table_definition: tables/dev.example_table.sql

Load, Reload, Primary Key Check
----------------------------------

Combine ``create-load-redshift``, ``reload`` and ``primary-key-check`` into one single step.

Properties
^^^^^^^^^^

- ``staging_table_definition``: Intermidiate staging schema file for the table to be loaded into.
(Required)
- ``production_table_definition``: Production schema file for the table to be reloaded into.
(Required)
- ``script_arguments``: Arguments for the runner.

- ``--max_error``: The maximum number of errors to be ignored during
the load. Usage: ``--max_error=5``
- ``--replace_invalid_char``: Character the replace non-utf8
characters with. Usage: ``--replace_invalid_char='?'``
- ``--no_escape``: If passed, does not escape special characters.
Usage: ``--no_escape``
- ``--gzip``: If passed, compresses the output with gzip. Usage:
``--gzip``
- ``--command_options``: A custom SQL string as the options for the
copy command. Usage: ``--command_options="DELIMITER '\t'"``

- Note: If ``--command_options`` is passed, script arguments
``--max_error``, ``--replace_invalid_char``, ``--no_escape``,
and ``--gzip`` have no effect.

Example
^^^^^^^

::

- step_type: load-reload-pk
staging_table_definition: tables/staging.example_table.sql
production_table_definition: tables/dev.example_table.sql
script_arguments:
- "--foo=bar"

Upsert
-------------------------

Expand Down

0 comments on commit a06b471

Please sign in to comment.