Skip to content

Commit

Permalink
[PoC] SIP-15 Presto types
Browse files Browse the repository at this point in the history
  • Loading branch information
John Bodley committed Jun 10, 2019
1 parent c6179b1 commit c270b96
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 26 deletions.
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ colorama==0.3.9
contextlib2==0.5.5
croniter==0.3.29
cryptography==2.4.2
decorator==4.3.0 # via retry
decorator==4.3.0 # via networkx, retry
defusedxml==0.5.0 # via python3-openid
flask-appbuilder==2.1.4
flask-babel==0.11.1 # via flask-appbuilder
Expand Down Expand Up @@ -50,6 +50,7 @@ markupsafe==1.0 # via jinja2, mako
marshmallow-enum==1.4.1 # via flask-appbuilder
marshmallow-sqlalchemy==0.16.2 # via flask-appbuilder
marshmallow==2.19.2 # via flask-appbuilder, marshmallow-enum, marshmallow-sqlalchemy
networkx==2.3
numpy==1.15.2 # via pandas
pandas==0.23.4
parsedatetime==2.0.0
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def get_git_sha():
'idna',
'isodate',
'markdown>=3.0',
'networkx',
'pandas>=0.18.0, <0.24.0', # `pandas`>=0.24.0 changes datetimelike API
'parsedatetime',
'pathlib2',
Expand Down
4 changes: 4 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,3 +654,7 @@ class CeleryConfig(object):
superset_config.__file__))
except ImportError:
pass

# Whether to enable SIP-15 functionality. Note currently this should never be
# set to True as these features are not ready for prime time.
SIP_15_ENABLED = False
14 changes: 12 additions & 2 deletions superset/connectors/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,21 @@ def dttm_sql_literal(self, dttm, is_epoch_in_utc):
return str(seconds_since_epoch)
elif tf == 'epoch_ms':
return str(seconds_since_epoch * 1000)
return "'{}'".format(dttm.strftime(tf))

# TODO(john-bodley): SIP-15 will only support ISO 8601 formats as this adheres
# to the lexigraphical ordering.
return f"'{dttm.strftime(tf)}'"
else:
s = self.table.database.db_engine_spec.convert_dttm(
self.type or '', dttm)
return s or "'{}'".format(dttm.strftime('%Y-%m-%d %H:%M:%S.%f'))

if s:
return s

if app.config['SIP_15_ENABLED']:
raise TypeError(f"The '{type}' type is unsupported.")
else:
return f"'{dttm.strftime('%Y-%m-%d %H:%M:%S.%f')}'"


class SqlMetric(Model, BaseMetric):
Expand Down
176 changes: 153 additions & 23 deletions superset/db_engine_specs/presto.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,26 @@
# under the License.
# pylint: disable=C,R,W
from collections import OrderedDict
from datetime import datetime
from functools import reduce
import logging
import re
import textwrap
import time
from typing import List, Set, Tuple
from typing import Any, List, Optional, Set, Tuple
from urllib import parse

from sqlalchemy import Column, literal_column, types
import networkx as nx
from networkx.utils import pairwise
from sqlalchemy import Column, DateTime, literal_column, types
from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.engine.result import RowProxy
from sqlalchemy.sql.expression import ColumnClause
from sqlalchemy.types import BigInteger, DATE, String, TIMESTAMP

from superset.db_engine_specs.base import BaseEngineSpec
from superset import app
from superset.db_engine_specs.base import BaseEngineSpec, TimestampExpression
from superset.exceptions import SupersetTemplateException
from superset.models.sql_types.presto_sql_types import type_map as presto_type_map
from superset.utils import core as utils
Expand All @@ -38,26 +44,38 @@


class PrestoEngineSpec(BaseEngineSpec):
import pyhive.sqlalchemy_presto

engine = 'presto'
type_map = pyhive.sqlalchemy_presto._type_map

time_grain_functions = {
None: '{col}',
'PT1S': "date_trunc('second', CAST({col} AS TIMESTAMP))",
'PT1M': "date_trunc('minute', CAST({col} AS TIMESTAMP))",
'PT1H': "date_trunc('hour', CAST({col} AS TIMESTAMP))",
'P1D': "date_trunc('day', CAST({col} AS TIMESTAMP))",
'P1W': "date_trunc('week', CAST({col} AS TIMESTAMP))",
'P1M': "date_trunc('month', CAST({col} AS TIMESTAMP))",
'P0.25Y': "date_trunc('quarter', CAST({col} AS TIMESTAMP))",
'P1Y': "date_trunc('year', CAST({col} AS TIMESTAMP))",
'PT1S': "date_trunc('second', {col})",
'PT1M': "date_trunc('minute', {col})",
'PT1H': "date_trunc('hour', {col})",
'P1D': "date_trunc('day', {col})",
'P1W': "date_trunc('week', {col})",
'P1M': "date_trunc('month', {col})",
'P0.25Y': "date_trunc('quarter', {col})",
'P1Y': "date_trunc('year', {col})",
'P1W/1970-01-03T00:00:00Z':
"date_add('day', 5, date_trunc('week', date_add('day', 1, "
'CAST({col} AS TIMESTAMP))))',
'{col})))',
'1969-12-28T00:00:00Z/P1W':
"date_add('day', -1, date_trunc('week', "
"date_add('day', 1, CAST({col} AS TIMESTAMP))))",
"date_add('day', 1, {col})))",
}

edges = [
(DATE, TIMESTAMP, {'sql': 'CAST({col} AS TIMESTAMP)'}),
]

type_graph = nx.DiGraph()
type_graph.add_edges_from(edges)

for type_, sql in time_grain_functions.items():
type_graph.add_edge(TIMESTAMP, type_, sql=sql)

@classmethod
def get_view_names(cls, inspector, schema):
"""Returns an empty list
Expand Down Expand Up @@ -349,15 +367,127 @@ def adjust_database_uri(cls, uri, selected_schema=None):
return uri

@classmethod
def convert_dttm(cls, target_type, dttm):
tt = target_type.upper()
if tt == 'DATE':
return "from_iso8601_date('{}')".format(dttm.isoformat()[:10])
if tt == 'TIMESTAMP':
return "from_iso8601_timestamp('{}')".format(dttm.isoformat())
if tt == 'BIGINT':
return "to_unixtime(from_iso8601_timestamp('{}'))".format(dttm.isoformat())
return "'{}'".format(dttm.strftime('%Y-%m-%d %H:%M:%S'))
def get_timestamp_expr(
cls,
col: ColumnClause,
pdf: Optional[str],
time_grain: Optional[str],
) -> TimestampExpression:
"""
Construct a TimeExpression to be used in a SQLAlchemy query.
:param col: Target column for the TimeExpression
:param pdf: date format (seconds or milliseconds)
:param time_grain: time grain, e.g. P1Y for 1 year
:returns: TimestampExpression object
"""

type_graph = cls.type_graph.copy()

if time_grain:
if not cls.time_grain_functions.get(time_grain):
raise NotImplementedError(
f'No grain spec for {time_grain} for database {cls.engine}',
)

target = time_grain
else:
target = TIMESTAMP

if pdf in ('epoch_ms', 'epoch_s'):
source = BigInteger

if pdf == 'epoch_s':
type_graph.add_edge(
BigInteger,
TIMESTAMP,
sql='unix_timestamp({col})',
)
elif pdf == 'epoch_ms':
type_graph.add_edge(
BigInteger,
TIMESTAMP,
sql='unix_timestamp({col} / 1000.0)',
)
else:
# TODO(john-bodley): SIP-15 add column type information.
source = String

# TODO(john-bodley): The CAST(STRING TO TIMESTAMP) is not valid for all current
# string date/timestamp encodings. In SIP-15 all strings need to adhere to the
# ISO 8601 format. In the future we should use the `date_parse` function.
if not app.config['SIP_15_ENABLED']:
type_graph.add_edge(String, TIMESTAMP, sql='CAST({col} AS TIMESTAMP)')

return TimestampExpression(
cls.get_type_sql(type_graph, source, target),
col,
type_=DateTime,
)

@classmethod
def get_type_sql(cls, graph: nx.DiGraph, source: Any, target: Any) -> Optional[str]:
"""
Return the SQL expression mapping from the source to target type/format.
:param graph: The type mapping
:param source: The source type/format
:param target: The target type/format
:returns: The SQL expression
:raises TypeError: If the mapping between types does not exist
"""

try:
path = nx.shortest_path(graph, source, target)

return reduce(
lambda x, y: x.format(col=y),
reversed([graph.get_edge_data(*edge)['sql'] for edge in pairwise(path)]),
)
except nx.NodeNotFound:
if app.config['SIP_15_ENABLED']:
raise TypeError(
f"No mapping exists between the '{source}' and '{target}' types.",
)
else:
return None

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
"""
Converts the date-time to the target type.
Note this should be deprecated after a major refactor.
:param target_type: The target type
:param dttm: The date-time
:returns: The SQL expression
"""

type_graph = cls.type_graph.copy()

tt = target_type.lower()

if app.config['SIP_15_ENABLED'] and tt not in cls.type_map:
raise TypeError(f"The '{target_type}' type is not supported.")

type_graph.add_edge(
datetime,
DATE,
sql=f"from_iso8601_date('{dttm.isoformat()[:10]}')",
)

type_graph.add_edge(
datetime,
TIMESTAMP,
sql=f"from_iso8601_timestamp('{dttm.isoformat()}')",
)

return cls.get_type_sql(
type_graph,
source=datetime,
target=cls.type_map.get(tt, String),
)

@classmethod
def epoch_to_dttm(cls):
Expand Down

0 comments on commit c270b96

Please sign in to comment.