Skip to content

Commit

Permalink
Merge pull request #173 from coursera/combine_multi_step_v3
Browse files Browse the repository at this point in the history
Combine create-load-redshift, reload, primary-key-check into one
  • Loading branch information
darinyu-coursera committed Nov 16, 2015
2 parents 66a0a0d + d957228 commit c9b984b
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 0 deletions.
1 change: 1 addition & 0 deletions dataduct/etl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
'extract-redshift': ExtractRedshiftStep,
'extract-s3': ExtractS3Step,
'load-redshift': LoadRedshiftStep,
'load-reload-pk': LoadReloadAndPrimaryKeyStep,
'pipeline-dependencies': PipelineDependenciesStep,
'primary-key-check': PrimaryKeyCheckStep,
'qa-transform': QATransformStep,
Expand Down
1 change: 1 addition & 0 deletions dataduct/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .extract_redshift import ExtractRedshiftStep
from .extract_s3 import ExtractS3Step
from .load_redshift import LoadRedshiftStep
from .load_reload_pk import LoadReloadAndPrimaryKeyStep
from .pipeline_dependencies import PipelineDependenciesStep
from .primary_key_check import PrimaryKeyCheckStep
from .qa_transform import QATransformStep
Expand Down
197 changes: 197 additions & 0 deletions dataduct/steps/load_reload_pk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
"""ETL step wrapper for loading into redshift with the COPY command
"""
import os

from ..config import Config
from ..database import Table
from ..database import SqlStatement
from ..database import SqlScript
from .etl_step import ETLStep
from ..pipeline import ShellCommandActivity
from ..s3 import S3File
from ..utils import constants as const
from ..utils.helpers import parse_path

config = Config()


class LoadReloadAndPrimaryKeyStep(ETLStep):
"""LoadReloadAndPrimaryKeyStep Step class that creates table if needed and loads data
"""

def __init__(self, id, input_node, staging_table_definition,
production_table_definition, pipeline_name,
analyze_table=True, non_transactional=False,
log_to_s3=False, **kwargs):
"""Constructor for the LoadReloadAndPrimaryKeyStep class
Args:
input_node: A S3 data node as input
staging_table_definition(filepath):
staging table schema to store the data
production_table_definition(filepath):
schema file for the table to be reloaded into
**kwargs(optional): Keyword arguments directly passed to base class
"""
super(LoadReloadAndPrimaryKeyStep, self).__init__(id=id, **kwargs)

# TODO: Move the following three steps into lib after
# they support all the parameters
create_and_load_pipeline_object = self.create_and_load_redshift(
table_definition=staging_table_definition,
input_node=input_node
)

reload_pipeline_object = self.reload(
source=staging_table_definition,
destination=production_table_definition,
depends_on=[create_and_load_pipeline_object],
analyze_table=analyze_table,
non_transactional=non_transactional
)

primary_key_check_object = self.primary_key_check(
table_definition=production_table_definition,
pipeline_name=pipeline_name,
depends_on=[reload_pipeline_object],
log_to_s3=log_to_s3
)


def primary_key_check(self, table_definition, pipeline_name, depends_on,
log_to_s3):
table = self.get_table_from_def(table_definition)

# We initialize the table object to check valid strings
script_arguments = ['--table=%s' % table.sql()]

if log_to_s3:
script_arguments.append('--log_to_s3')

steps_path = os.path.abspath(os.path.dirname(__file__))
script = os.path.join(steps_path, const.PK_CHECK_SCRIPT_PATH)
script = self.create_script(S3File(path=script))

step_name = self.get_name("primary_key_check")
script_arguments.append('--test_name=%s' % (pipeline_name + "." + step_name))

sns_topic_arn = config.etl.get('SNS_TOPIC_ARN_WARNING', None)
if sns_topic_arn:
script_arguments.append('--sns_topic_arn=%s' % sns_topic_arn)

primary_key_check_pipeline_object = self.create_pipeline_object(
object_class=ShellCommandActivity,
object_name=step_name,
input_node=[],
output_node=None,
resource=self.resource,
schedule=self.schedule,
script_uri=script,
script_arguments=script_arguments,
max_retries=self.max_retries,
depends_on=depends_on
)
return primary_key_check_pipeline_object


def reload(self, source, destination, depends_on,
analyze_table, non_transactional):
source_table = parse_path(source)
destination_table = parse_path(destination)

source_relation = Table(SqlScript(filename=source_table))
destination_relation = Table(SqlScript(filename=destination_table))

# Reload specific config
enforce_primary_key = True
delete_existing = True
sql_script = destination_relation.upsert_script(
source_relation, enforce_primary_key, delete_existing)

update_script = SqlScript(sql_script.sql())
script_arguments = [
'--table_definition=%s' % destination_relation.sql(),
'--sql=%s' % update_script.sql()
]

if analyze_table:
script_arguments.append('--analyze')

if non_transactional:
script_arguments.append('--non_transactional')

steps_path = os.path.abspath(os.path.dirname(__file__))
script = os.path.join(steps_path, const.SQL_RUNNER_SCRIPT_PATH)
script = self.create_script(S3File(path=script))

reload_pipeline_object = self.create_pipeline_object(
object_class=ShellCommandActivity,
object_name=self.get_name("reload"),
input_node=[],
output_node=None,
resource=self.resource,
schedule=self.schedule,
script_uri=script,
script_arguments=script_arguments,
max_retries=self.max_retries,
depends_on=depends_on
)
return reload_pipeline_object


def create_and_load_redshift(self, table_definition, input_node):
table = self.get_table_from_def(table_definition)

if isinstance(input_node, dict):
input_paths = [i.path().uri for i in input_node.values()]
else:
input_paths = [input_node.path().uri]

script_arguments = [
'--table_definition=%s' % table.sql().sql(),
'--s3_input_paths'
]
script_arguments.extend(input_paths)

steps_path = os.path.abspath(os.path.dirname(__file__))
script = os.path.join(steps_path, const.CREATE_LOAD_SCRIPT_PATH)
script = self.create_script(S3File(path=script))

# Create output_node based on output_path
base_output_node = self.create_s3_data_node()

create_and_load_pipeline_object = self.create_pipeline_object(
object_class=ShellCommandActivity,
object_name=self.get_name("create_and_load"),
input_node=[input_node],
output_node=base_output_node,
resource=self.resource,
schedule=self.schedule,
script_uri=script,
script_arguments=script_arguments,
max_retries=self.max_retries,
depends_on=self.depends_on
)
return create_and_load_pipeline_object


def get_table_from_def(self, table_definition):
with open(parse_path(table_definition)) as f:
table_def_string = f.read()

table = Table(SqlStatement(table_def_string))
return table


@classmethod
def arguments_processor(cls, etl, input_args):
"""Parse the step arguments according to the ETL pipeline
Args:
etl(ETLPipeline): Pipeline object containing resources and steps
step_args(dict): Dictionary of the step arguments for the class
"""
step_args = cls.base_arguments_processor(etl, input_args)
step_args['resource'] = etl.ec2_resource
step_args['pipeline_name'] = etl.name
return step_args
13 changes: 13 additions & 0 deletions examples/example_load_reload_pk.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name: example_load_reload_primary_key_check
frequency: one-time
load_time: 01:00 # Hour:Min in UTC

description: Example for the load-reload-pk step

steps:
- step_type: extract-local
path: data/test_table1.tsv

- step_type: load-reload-pk
staging_table_definition: tables/dev.test_table.sql
production_table_definition: tables/dev.test_table_2.sql

0 comments on commit c9b984b

Please sign in to comment.