Skip to content

Commit

Permalink
Allow different default namespaces for database read/write operations. (
Browse files Browse the repository at this point in the history
#112)

* Allow different default namespaces for database read/write operations.

* Work around the limitations of Python 2.
  • Loading branch information
matthewwardrop committed Oct 27, 2021
1 parent af0148e commit 8818b57
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 9 deletions.
26 changes: 19 additions & 7 deletions omniduct/databases/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,21 @@ class DatabaseClient(Duct, MagicsProvider):
NAMESPACE_QUOTECHAR = '"'
NAMESPACE_SEPARATOR = '.'

NAMESPACE_DEFAULT = None # DEPRECATED (use NAMESPACE_DEFAULTS_READ instead): Will be removed in Omniduct 2.0.0

@property
def NAMESPACE_DEFAULT(self):
pass
def NAMESPACE_DEFAULTS_READ(self):
"""
Backwards compatible shim for `NAMESPACE_DEFAULTS`.
"""
return self.NAMESPACE_DEFAULT

@property
def NAMESPACE_DEFAULTS_WRITE(self):
"""
Unless overridden, this is the same as `NAMESPACE_DEFAULTS_READ`.
"""
return self.NAMESPACE_DEFAULTS_READ

@quirk_docs('_init', mro=True)
def __init__(
Expand Down Expand Up @@ -682,7 +694,7 @@ def query_to_table(self, statement, table, if_exists='fail', **kwargs):
DB-API cursor: The cursor object associated with the execution.
"""
assert if_exists in {'fail', 'replace', 'append'}
table = self._parse_namespaces(table)
table = self._parse_namespaces(table, write=True)
return self._query_to_table(statement, table, if_exists=if_exists, **kwargs)

@logging_scope('Dataframe Upload', timed=True)
Expand All @@ -704,7 +716,7 @@ def dataframe_to_table(self, df, table, if_exists='fail', **kwargs):
`DatabaseClient._dataframe_to_table`.
"""
assert if_exists in {'fail', 'replace', 'append'}
self._dataframe_to_table(df, self._parse_namespaces(table), if_exists=if_exists, **kwargs)
self._dataframe_to_table(df, self._parse_namespaces(table, write=True), if_exists=if_exists, **kwargs)

# Table properties

Expand All @@ -721,13 +733,13 @@ def _dataframe_to_table(self, df, table, if_exists='fail', **kwargs):
def _cursor_empty(self, cursor):
return cursor is None

def _parse_namespaces(self, name, level=0, defaults=None):
def _parse_namespaces(self, name, level=0, defaults=None, write=False):
return ParsedNamespaces.from_name(
name,
self.NAMESPACE_NAMES[:-level] if level > 0 else self.NAMESPACE_NAMES,
quote_char=self.NAMESPACE_QUOTECHAR,
separator=self.NAMESPACE_SEPARATOR,
defaults=self.NAMESPACE_DEFAULT if defaults is None else defaults
defaults=defaults if defaults else (self.NAMESPACE_DEFAULTS_WRITE if write else self.NAMESPACE_DEFAULTS_READ),
)

@quirk_docs('_table_list')
Expand Down Expand Up @@ -782,7 +794,7 @@ def table_drop(self, table, **kwargs):
Returns:
DB-API cursor: The cursor associated with this execution.
"""
return self._table_drop(table=self._parse_namespaces(table), **kwargs)
return self._table_drop(table=self._parse_namespaces(table, write=True), **kwargs)

@abstractmethod
def _table_drop(self, table, **kwargs):
Expand Down
8 changes: 7 additions & 1 deletion omniduct/databases/hiveserver2.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ def NAMESPACE_DEFAULT(self):
'schema': self.schema
}

@property
@override
def NAMESPACE_DEFAULTS_WRITE(self):
defaults = self.NAMESPACE_DEFAULTS_READ.copy()
defaults['schema'] = self.username
return defaults

@override
def _init(self, schema=None, driver='pyhive', auth_mechanism='NOSASL',
push_using_hive_cli=False, default_table_props=None,
Expand Down Expand Up @@ -297,7 +304,6 @@ def _dataframe_to_table(
dtype_overrides (dict): Mapping of column names to Hive datatypes to
use instead of default mapping.
"""
table = self._parse_namespaces(table, defaults={'schema': self.username})
use_hive_cli = use_hive_cli or self.push_using_hive_cli
partition = partition or {}
table_props = table_props or {}
Expand Down
8 changes: 7 additions & 1 deletion omniduct/databases/presto.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ def NAMESPACE_DEFAULT(self):
'schema': self.schema
}

@property
@override
def NAMESPACE_DEFAULTS_WRITE(self):
defaults = self.NAMESPACE_DEFAULTS_READ.copy()
defaults['schema'] = self.username
return defaults

@override
def _init(self, catalog='default', schema='default', server_protocol='http', source=None, requests_session=None):
"""
Expand Down Expand Up @@ -192,7 +199,6 @@ def _dataframe_to_table(self, df, table, if_exists='fail', **kwargs):
defaulted to your username. Catalog overrides will be ignored, and will
default to `self.catalog`.
"""
table = self._parse_namespaces(table, defaults={'schema': self.username})
return _pandas.to_sql(
df=df, name=table.table, schema=table.schema, con=self._sqlalchemy_engine,
index=False, if_exists=if_exists, **kwargs
Expand Down

0 comments on commit 8818b57

Please sign in to comment.