Skip to content

Commit

Permalink
[Hive Engine Spec] Fix latest partition logic
Browse files Browse the repository at this point in the history
  • Loading branch information
erik_ritter committed Aug 23, 2019
1 parent 0d76fc7 commit aa6630b
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 15 deletions.
37 changes: 25 additions & 12 deletions superset/db_engine_specs/hive.py
Expand Up @@ -28,7 +28,7 @@
from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.engine.url import make_url
from sqlalchemy.sql.expression import ColumnClause
from sqlalchemy.sql.expression import ColumnClause, Select
from werkzeug.utils import secure_filename

from superset import app, conf
Expand Down Expand Up @@ -297,19 +297,29 @@ def get_columns(
return inspector.get_columns(table_name, schema)

@classmethod
def where_latest_partition(cls, table_name, schema, database, qry, columns=None):
def where_latest_partition(
cls,
table_name: str,
schema: Optional[str],
database,
qry: Select,
columns: Optional[List] = None,
) -> Optional[Select]:
try:
col_name, value = cls.latest_partition(
col_names, values = cls.latest_partition(
table_name, schema, database, show_first=True
)
except Exception:
# table is not partitioned
return False
if value is not None:
for c in columns:
if c.get("name") == col_name:
return qry.where(Column(col_name) == value)
return False
return None
if values is not None and columns is not None:
for col_name, value in zip(col_names, values):
for c in columns:
if c.get("name") == col_name:
qry = qry.where(Column(col_name) == value)

return qry
return None

@classmethod
def _get_fields(cls, cols: List[dict]) -> List[ColumnClause]:
Expand All @@ -321,10 +331,11 @@ def latest_sub_partition(cls, table_name, schema, database, **kwargs):
pass

@classmethod
def _latest_partition_from_df(cls, df):
def _latest_partition_from_df(cls, df) -> Optional[List[str]]:
"""Hive partitions look like ds={partition name}"""
if not df.empty:
return df.ix[:, 0].max().split("=")[1]
return [df.ix[:, 0].max().split("=")[1]]
return None

@classmethod
def _partition_query(cls, table_name, limit=0, order_by=None, filters=None):
Expand All @@ -343,7 +354,9 @@ def select_star(
latest_partition: bool = True,
cols: Optional[List[Dict[str, Any]]] = None,
) -> str:
return BaseEngineSpec.select_star(
return super( # pylint: disable=bad-super-call
PrestoEngineSpec, cls
).select_star(
database,
table_name,
engine,
Expand Down
6 changes: 3 additions & 3 deletions superset/db_engine_specs/presto.py
Expand Up @@ -950,10 +950,10 @@ def where_latest_partition(
)
except Exception:
# table is not partitioned
return False
return None

if values is None:
return False
return None

column_names = {column.get("name") for column in columns or []}
for col_name, value in zip(col_names, values):
Expand All @@ -962,7 +962,7 @@ def where_latest_partition(
return query

@classmethod
def _latest_partition_from_df(cls, df):
def _latest_partition_from_df(cls, df) -> Optional[List[str]]:
if not df.empty:
return df.to_records(index=False)[0].item()
return None
Expand Down

0 comments on commit aa6630b

Please sign in to comment.