-
Notifications
You must be signed in to change notification settings - Fork 12.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SIP-15A] Presto types (PoC) #7682
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,20 +16,26 @@ | |
# under the License. | ||
# pylint: disable=C,R,W | ||
from collections import OrderedDict | ||
from datetime import datetime | ||
from functools import reduce | ||
import logging | ||
import re | ||
import textwrap | ||
import time | ||
from typing import List, Set, Tuple | ||
from typing import Any, List, Optional, Set, Tuple | ||
from urllib import parse | ||
|
||
from sqlalchemy import Column, literal_column, types | ||
import networkx as nx | ||
from networkx.utils import pairwise | ||
from sqlalchemy import Column, DateTime, literal_column, types | ||
from sqlalchemy.engine.base import Engine | ||
from sqlalchemy.engine.reflection import Inspector | ||
from sqlalchemy.engine.result import RowProxy | ||
from sqlalchemy.sql.expression import ColumnClause | ||
from sqlalchemy.types import BigInteger, DATE, String, TIMESTAMP | ||
|
||
from superset.db_engine_specs.base import BaseEngineSpec | ||
from superset import app | ||
from superset.db_engine_specs.base import BaseEngineSpec, TimestampExpression | ||
from superset.exceptions import SupersetTemplateException | ||
from superset.models.sql_types.presto_sql_types import type_map as presto_type_map | ||
from superset.utils import core as utils | ||
|
@@ -38,26 +44,34 @@ | |
|
||
|
||
class PrestoEngineSpec(BaseEngineSpec): | ||
import pyhive.sqlalchemy_presto | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is essentially the same as importing in module scope, it'll fail if |
||
|
||
engine = 'presto' | ||
type_map = pyhive.sqlalchemy_presto._type_map | ||
|
||
time_grain_functions = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic would probably be refactored if we rolled this our globally. |
||
None: '{col}', | ||
'PT1S': "date_trunc('second', CAST({col} AS TIMESTAMP))", | ||
'PT1M': "date_trunc('minute', CAST({col} AS TIMESTAMP))", | ||
'PT1H': "date_trunc('hour', CAST({col} AS TIMESTAMP))", | ||
'P1D': "date_trunc('day', CAST({col} AS TIMESTAMP))", | ||
'P1W': "date_trunc('week', CAST({col} AS TIMESTAMP))", | ||
'P1M': "date_trunc('month', CAST({col} AS TIMESTAMP))", | ||
'P0.25Y': "date_trunc('quarter', CAST({col} AS TIMESTAMP))", | ||
'P1Y': "date_trunc('year', CAST({col} AS TIMESTAMP))", | ||
'PT1S': "date_trunc('second', {col})", | ||
'PT1M': "date_trunc('minute', {col})", | ||
'PT1H': "date_trunc('hour', {col})", | ||
'P1D': "date_trunc('day', {col})", | ||
'P1W': "date_trunc('week', {col})", | ||
'P1M': "date_trunc('month', {col})", | ||
'P0.25Y': "date_trunc('quarter', {col})", | ||
'P1Y': "date_trunc('year', {col})", | ||
'P1W/1970-01-03T00:00:00Z': | ||
"date_add('day', 5, date_trunc('week', date_add('day', 1, " | ||
'CAST({col} AS TIMESTAMP))))', | ||
'{col})))', | ||
'1969-12-28T00:00:00Z/P1W': | ||
"date_add('day', -1, date_trunc('week', " | ||
"date_add('day', 1, CAST({col} AS TIMESTAMP))))", | ||
"date_add('day', 1, {col})))", | ||
} | ||
|
||
type_graph = nx.DiGraph() | ||
type_graph.add_edge(DATE, TIMESTAMP, sql='CAST({col} AS TIMESTAMP)') | ||
|
||
for grain, sql in time_grain_functions.items(): | ||
type_graph.add_edge(TIMESTAMP, grain, sql=sql) | ||
|
||
@classmethod | ||
def get_view_names(cls, inspector, schema): | ||
"""Returns an empty list | ||
|
@@ -349,13 +363,127 @@ def adjust_database_uri(cls, uri, selected_schema=None): | |
return uri | ||
|
||
@classmethod | ||
def convert_dttm(cls, target_type, dttm): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic would probably be refactored if we rolled this our globally. |
||
tt = target_type.upper() | ||
if tt == 'DATE': | ||
return "from_iso8601_date('{}')".format(dttm.isoformat()[:10]) | ||
if tt == 'TIMESTAMP': | ||
return "from_iso8601_timestamp('{}')".format(dttm.isoformat()) | ||
return "'{}'".format(dttm.strftime('%Y-%m-%d %H:%M:%S')) | ||
def get_timestamp_expr( | ||
cls, | ||
col: ColumnClause, | ||
pdf: Optional[str], | ||
time_grain: Optional[str], | ||
) -> TimestampExpression: | ||
""" | ||
Construct a TimeExpression to be used in a SQLAlchemy query. | ||
|
||
:param col: Target column for the TimeExpression | ||
:param pdf: date format (seconds or milliseconds) | ||
:param time_grain: time grain, e.g. P1Y for 1 year | ||
:returns: TimestampExpression object | ||
""" | ||
|
||
type_graph = cls.type_graph.copy() | ||
|
||
if time_grain: | ||
if not cls.time_grain_functions.get(time_grain): | ||
raise NotImplementedError( | ||
f'No grain spec for {time_grain} for database {cls.engine}', | ||
) | ||
|
||
target = time_grain | ||
else: | ||
target = TIMESTAMP | ||
|
||
if pdf in ('epoch_ms', 'epoch_s'): | ||
source = BigInteger | ||
|
||
if pdf == 'epoch_s': | ||
type_graph.add_edge( | ||
BigInteger, | ||
TIMESTAMP, | ||
sql='unix_timestamp({col})', | ||
) | ||
elif pdf == 'epoch_ms': | ||
type_graph.add_edge( | ||
BigInteger, | ||
TIMESTAMP, | ||
sql='unix_timestamp({col} / 1000.0)', | ||
) | ||
else: | ||
# TODO(john-bodley): SIP-15 add column type information. | ||
source = String | ||
|
||
# TODO(john-bodley): The CAST(STRING TO TIMESTAMP) is not valid for all current | ||
# string date/timestamp encodings. In SIP-15 all strings need to adhere to the | ||
# ISO 8601 format. In the future we should use the `date_parse` function. | ||
if not app.config['SIP_15_ENABLED']: | ||
type_graph.add_edge(String, TIMESTAMP, sql='CAST({col} AS TIMESTAMP)') | ||
|
||
return TimestampExpression( | ||
cls.get_type_sql(type_graph, source, target), | ||
col, | ||
type_=DateTime, | ||
) | ||
|
||
@classmethod | ||
def get_type_sql(cls, graph: nx.DiGraph, source: Any, target: Any) -> Optional[str]: | ||
""" | ||
Return the SQL expression mapping from the source to target type/format. | ||
|
||
:param graph: The type mapping | ||
:param source: The source type/format | ||
:param target: The target type/format | ||
:returns: The SQL expression | ||
:raises TypeError: If the mapping between types does not exist | ||
""" | ||
|
||
try: | ||
path = nx.shortest_path(graph, source, target) | ||
|
||
return reduce( | ||
lambda x, y: x.format(col=y), | ||
reversed([graph.get_edge_data(*edge)['sql'] for edge in pairwise(path)]), | ||
) | ||
except nx.NodeNotFound: | ||
if app.config['SIP_15_ENABLED']: | ||
raise TypeError( | ||
f"No mapping exists between the '{source}' and '{target}' types.", | ||
) | ||
else: | ||
return None | ||
|
||
@classmethod | ||
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]: | ||
""" | ||
Converts the date-time to the target type. | ||
|
||
Note this should be deprecated after a major refactor. | ||
|
||
:param target_type: The target type | ||
:param dttm: The date-time | ||
:returns: The SQL expression | ||
""" | ||
|
||
type_graph = cls.type_graph.copy() | ||
|
||
tt = target_type.lower() | ||
|
||
if app.config['SIP_15_ENABLED'] and tt not in cls.type_map: | ||
raise TypeError(f"The '{target_type}' type is not supported.") | ||
|
||
type_graph.add_edge( | ||
datetime, | ||
DATE, | ||
sql=f"from_iso8601_date('{dttm.isoformat()[:10]}')", | ||
) | ||
|
||
type_graph.add_edge( | ||
datetime, | ||
TIMESTAMP, | ||
sql=f"from_iso8601_timestamp('{dttm.isoformat()}')", | ||
) | ||
|
||
return cls.get_type_sql( | ||
type_graph, | ||
source=datetime, | ||
target=cls.type_map.get(tt, String), | ||
) | ||
|
||
@classmethod | ||
def epoch_to_dttm(cls): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be
{self.type}
?