Skip to content

Commit

Permalink
Add option to specify object names
Browse files Browse the repository at this point in the history
  • Loading branch information
sb2nov committed Nov 2, 2015
1 parent 0d9b328 commit c2d330a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
11 changes: 6 additions & 5 deletions dataduct/data_access/connection.py
@@ -1,14 +1,14 @@
"""
Connections to various databases such as RDS and Redshift
"""
import psycopg2
import MySQLdb
import MySQLdb.cursors
import psycopg2

from ..config import Config
from ..utils.helpers import retry
from ..utils.helpers import exactly_one
from ..utils.exceptions import ETLConfigError
from ..utils.helpers import exactly_one
from ..utils.helpers import retry
from ..utils.hook import hook

config = Config()
Expand All @@ -25,7 +25,8 @@ def get_redshift_config():

@retry(CONNECTION_RETRIES, 60)
@hook('connect_to_redshift')
def redshift_connection(redshift_creds=None, autocommit=True, **kwargs):
def redshift_connection(redshift_creds=None, autocommit=True,
connect_timeout=30, **kwargs):
"""Fetch a psql connection object to redshift
"""
if redshift_creds is None:
Expand All @@ -37,7 +38,7 @@ def redshift_connection(redshift_creds=None, autocommit=True, **kwargs):
password=redshift_creds['PASSWORD'],
port=redshift_creds['PORT'],
database=redshift_creds['DATABASE_NAME'],
connect_timeout=10,
connect_timeout=connect_timeout,
**kwargs)
connection.autocommit = autocommit
return connection
Expand Down
17 changes: 13 additions & 4 deletions dataduct/steps/etl_step.py
Expand Up @@ -125,7 +125,7 @@ def _resolve_dependencies(self, dependencies):
result |= self._resolve_dependencies(dependency.depends_on)
return result

def create_pipeline_object(self, object_class, **kwargs):
def create_pipeline_object(self, object_class, object_name=None, **kwargs):
"""Create the pipeline objects associated with the step
Args:
Expand All @@ -140,8 +140,11 @@ def create_pipeline_object(self, object_class, **kwargs):
if isinstance(o, object_class)])

# Object name/ids are given by [step_id].[object_class][index]
object_id = self.id + "." + object_class.__name__ + \
str(instance_count)
if object_name is None:
object_id = self.id + "." + object_class.__name__ + \
str(instance_count)
else:
object_id = object_name

new_object = object_class(object_id, **kwargs)

Expand All @@ -154,7 +157,7 @@ def create_pipeline_object(self, object_class, **kwargs):
self._objects[object_id] = new_object
return new_object

def create_s3_data_node(self, s3_object=None, **kwargs):
def create_s3_data_node(self, s3_object=None, object_name=None, **kwargs):
"""Create an S3 DataNode for s3_file or s3_path
Args:
Expand All @@ -180,6 +183,7 @@ def create_s3_data_node(self, s3_object=None, **kwargs):

s3_node = self.create_pipeline_object(
object_class=S3Node,
object_name=object_name,
schedule=self.schedule,
s3_object=s3_object,
**kwargs
Expand Down Expand Up @@ -296,6 +300,11 @@ def merge_s3_nodes(self, input_nodes):

return combined_node, depends_on

def get_name(self, *suffixes):
if all(suffixes):
return '_'.join((self.id,) + suffixes)
return None

@property
def input(self):
"""Get the input node for the etl step
Expand Down

0 comments on commit c2d330a

Please sign in to comment.