Skip to content

Commit

Permalink
refactor(pinot): The python_date_format for a temporal column was n…
Browse files Browse the repository at this point in the history
…ot being passed to `get_timestamp_expr` (#24942)
  • Loading branch information
ege-st authored Aug 27, 2023
1 parent 69fb309 commit c2a21d2
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 105 deletions.
134 changes: 46 additions & 88 deletions superset/db_engine_specs/pinot.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,109 +14,67 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional

from sqlalchemy.sql.expression import ColumnClause
from sqlalchemy import types
from sqlalchemy.engine.interfaces import Dialect
from sqlalchemy.types import TypeEngine

from superset.constants import TimeGrain
from superset.db_engine_specs.base import BaseEngineSpec, TimestampExpression
from superset.db_engine_specs.base import BaseEngineSpec


class PinotEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
class PinotEngineSpec(BaseEngineSpec):
engine = "pinot"
engine_name = "Apache Pinot"
allows_subqueries = False
allows_joins = False
allows_alias_in_select = False
allows_alias_in_orderby = False

# Pinot does its own conversion below
# https://docs.pinot.apache.org/users/user-guide-query/supported-transformations#datetime-functions
_time_grain_expressions = {
TimeGrain.SECOND: "1:SECONDS",
TimeGrain.MINUTE: "1:MINUTES",
TimeGrain.FIVE_MINUTES: "5:MINUTES",
TimeGrain.TEN_MINUTES: "10:MINUTES",
TimeGrain.FIFTEEN_MINUTES: "15:MINUTES",
TimeGrain.THIRTY_MINUTES: "30:MINUTES",
TimeGrain.HOUR: "1:HOURS",
TimeGrain.DAY: "1:DAYS",
TimeGrain.WEEK: "week",
TimeGrain.MONTH: "month",
TimeGrain.QUARTER: "quarter",
TimeGrain.YEAR: "year",
}

_python_to_java_time_patterns: dict[str, str] = {
"%Y": "yyyy",
"%m": "MM",
"%d": "dd",
"%H": "HH",
"%M": "mm",
"%S": "ss",
}

_use_date_trunc_function: dict[str, bool] = {
TimeGrain.SECOND: False,
TimeGrain.MINUTE: False,
TimeGrain.FIVE_MINUTES: False,
TimeGrain.TEN_MINUTES: False,
TimeGrain.FIFTEEN_MINUTES: False,
TimeGrain.THIRTY_MINUTES: False,
TimeGrain.HOUR: False,
TimeGrain.DAY: False,
TimeGrain.WEEK: True,
TimeGrain.MONTH: True,
TimeGrain.QUARTER: True,
TimeGrain.YEAR: True,
None: "{col}",
TimeGrain.SECOND: "CAST(DATE_TRUNC('second', "
+ "CAST({col} AS TIMESTAMP)) AS TIMESTAMP)",
TimeGrain.MINUTE: "CAST(DATE_TRUNC('minute', "
+ "CAST({col} AS TIMESTAMP)) AS TIMESTAMP)",
TimeGrain.FIVE_MINUTES: "CAST(ROUND(DATE_TRUNC('minute', "
+ "CAST({col} AS TIMESTAMP)), 300000) AS TIMESTAMP)",
TimeGrain.TEN_MINUTES: "CAST(ROUND(DATE_TRUNC('minute', "
+ "CAST({col} AS TIMESTAMP)), 600000) AS TIMESTAMP)",
TimeGrain.FIFTEEN_MINUTES: "CAST(ROUND(DATE_TRUNC('minute', "
+ "CAST({col} AS TIMESTAMP)), 900000) AS TIMESTAMP)",
TimeGrain.THIRTY_MINUTES: "CAST(ROUND(DATE_TRUNC('minute', "
+ "CAST({col} AS TIMESTAMP)), 1800000) AS TIMESTAMP)",
TimeGrain.HOUR: "CAST(DATE_TRUNC('hour', CAST({col} AS TIMESTAMP)) AS TIMESTAMP)",
TimeGrain.DAY: "CAST(DATE_TRUNC('day', CAST({col} AS TIMESTAMP)) AS TIMESTAMP)",
TimeGrain.WEEK: "CAST(DATE_TRUNC('week', CAST({col} AS TIMESTAMP)) AS TIMESTAMP)",
TimeGrain.MONTH: "CAST(DATE_TRUNC('month', "
+ "CAST({col} AS TIMESTAMP)) AS TIMESTAMP)",
TimeGrain.QUARTER: "CAST(DATE_TRUNC('quarter', "
+ "CAST({col} AS TIMESTAMP)) AS TIMESTAMP)",
TimeGrain.YEAR: "CAST(DATE_TRUNC('year', CAST({col} AS TIMESTAMP)) AS TIMESTAMP)",
}

@classmethod
def get_timestamp_expr(
cls,
col: ColumnClause,
pdf: Optional[str],
time_grain: Optional[str],
) -> TimestampExpression:
if not pdf:
raise NotImplementedError(f"Empty date format for '{col}'")
is_epoch = pdf in ("epoch_s", "epoch_ms")
def epoch_to_dttm(cls) -> str:
return (
"DATETIMECONVERT({col}, '1:SECONDS:EPOCH', '1:SECONDS:EPOCH', '1:SECONDS')"
)

# The DATETIMECONVERT pinot udf is documented at
# Per https://github.com/apache/incubator-pinot/wiki/dateTimeConvert-UDF
# We are not really converting any time units, just bucketing them.
tf = ""
java_date_format = ""
if not is_epoch:
java_date_format = pdf
for (
python_pattern,
java_pattern,
) in cls._python_to_java_time_patterns.items():
java_date_format = java_date_format.replace(
python_pattern, java_pattern
)
tf = f"1:SECONDS:SIMPLE_DATE_FORMAT:{java_date_format}"
else:
seconds_or_ms = "MILLISECONDS" if pdf == "epoch_ms" else "SECONDS"
tf = f"1:{seconds_or_ms}:EPOCH"
if time_grain:
granularity = cls.get_time_grain_expressions().get(time_grain)
if not granularity:
raise NotImplementedError(f"No pinot grain spec for '{time_grain}'")
else:
return TimestampExpression("{{col}}", col)
@classmethod
def epoch_ms_to_dttm_(cls) -> str:
return (
"DATETIMECONVERT({col}, '1:MILLISECONDS:EPOCH', "
+ "'1:MILLISECONDS:EPOCH', '1:MILLISECONDS')"
)

# In pinot the output is a string since there is no timestamp column like pg
if cls._use_date_trunc_function.get(time_grain):
if is_epoch:
time_expr = f"DATETRUNC('{granularity}', {{col}}, '{seconds_or_ms}')"
else:
time_expr = (
f"ToDateTime(DATETRUNC('{granularity}', "
+ f"FromDateTime({{col}}, '{java_date_format}'), "
+ f"'MILLISECONDS'), '{java_date_format}')"
)
else:
time_expr = f"DATETIMECONVERT({{col}}, '{tf}', '{tf}', '{granularity}')"
@classmethod
def column_datatype_to_string(
cls, sqla_column_type: TypeEngine, dialect: Dialect
) -> str:
# Pinot driver infers TIMESTAMP column as LONG, so make the quick fix.
# When the Pinot driver fix this bug, current method could be removed.
if isinstance(sqla_column_type, types.TIMESTAMP):
return sqla_column_type.compile().upper()

return TimestampExpression(time_expr, col)
return super().column_datatype_to_string(sqla_column_type, dialect)
39 changes: 22 additions & 17 deletions tests/integration_tests/db_engine_specs/pinot_tests.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -27,61 +27,66 @@ def test_pinot_time_expression_sec_one_1d_grain(self):
col = column("tstamp")
expr = PinotEngineSpec.get_timestamp_expr(col, "epoch_s", "P1D")
result = str(expr.compile())
expected = (
"CAST(DATE_TRUNC('day', CAST("
+ "DATETIMECONVERT(tstamp, '1:SECONDS:EPOCH', "
+ "'1:SECONDS:EPOCH', '1:SECONDS') AS TIMESTAMP)) AS TIMESTAMP)"
)
self.assertEqual(
result,
"DATETIMECONVERT(tstamp, '1:SECONDS:EPOCH', '1:SECONDS:EPOCH', '1:DAYS')",
expected,
)

def test_pinot_time_expression_simple_date_format_1d_grain(self):
col = column("tstamp")
expr = PinotEngineSpec.get_timestamp_expr(col, "%Y-%m-%d %H:%M:%S", "P1D")
result = str(expr.compile())
expected = "CAST(DATE_TRUNC('day', CAST(tstamp AS TIMESTAMP)) AS TIMESTAMP)"
self.assertEqual(
result,
(
"DATETIMECONVERT(tstamp, "
+ "'1:SECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss', "
+ "'1:SECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss', '1:DAYS')"
),
expected,
)

def test_pinot_time_expression_simple_date_format_10m_grain(self):
col = column("tstamp")
expr = PinotEngineSpec.get_timestamp_expr(col, "%Y-%m-%d %H:%M:%S", "PT10M")
result = str(expr.compile())
expected = (
"CAST(ROUND(DATE_TRUNC('minute', CAST(tstamp AS "
+ "TIMESTAMP)), 600000) AS TIMESTAMP)"
)
self.assertEqual(
result,
(
"DATETIMECONVERT(tstamp, "
+ "'1:SECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss', "
+ "'1:SECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss', '10:MINUTES')"
),
expected,
)

def test_pinot_time_expression_simple_date_format_1w_grain(self):
col = column("tstamp")
expr = PinotEngineSpec.get_timestamp_expr(col, "%Y-%m-%d %H:%M:%S", "P1W")
result = str(expr.compile())
expected = "CAST(DATE_TRUNC('week', CAST(tstamp AS TIMESTAMP)) AS TIMESTAMP)"
self.assertEqual(
result,
(
"ToDateTime(DATETRUNC('week', FromDateTime(tstamp, "
+ "'yyyy-MM-dd HH:mm:ss'), 'MILLISECONDS'), 'yyyy-MM-dd HH:mm:ss')"
),
expected,
)

def test_pinot_time_expression_sec_one_1m_grain(self):
col = column("tstamp")
expr = PinotEngineSpec.get_timestamp_expr(col, "epoch_s", "P1M")
result = str(expr.compile())
expected = (
"CAST(DATE_TRUNC('month', CAST("
+ "DATETIMECONVERT(tstamp, '1:SECONDS:EPOCH', "
+ "'1:SECONDS:EPOCH', '1:SECONDS') AS TIMESTAMP)) AS TIMESTAMP)"
)
self.assertEqual(
result,
"DATETRUNC('month', tstamp, 'SECONDS')",
expected,
)

def test_invalid_get_time_expression_arguments(self):
with self.assertRaises(NotImplementedError):
PinotEngineSpec.get_timestamp_expr(column("tstamp"), None, "P1M")
PinotEngineSpec.get_timestamp_expr(column("tstamp"), None, "P0.25Y")

with self.assertRaises(NotImplementedError):
PinotEngineSpec.get_timestamp_expr(
Expand Down
57 changes: 57 additions & 0 deletions tests/unit_tests/db_engine_specs/test_pinot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from unittest import mock

import pytest
from sqlalchemy import column


@pytest.mark.parametrize(
"time_grain,expected_result",
[
("PT1S", "CAST(DATE_TRUNC('second', CAST(col AS TIMESTAMP)) AS TIMESTAMP)"),
(
"PT5M",
"CAST(ROUND(DATE_TRUNC('minute', CAST(col AS TIMESTAMP)), 300000) AS TIMESTAMP)",
),
("P1W", "CAST(DATE_TRUNC('week', CAST(col AS TIMESTAMP)) AS TIMESTAMP)"),
("P1M", "CAST(DATE_TRUNC('month', CAST(col AS TIMESTAMP)) AS TIMESTAMP)"),
("P3M", "CAST(DATE_TRUNC('quarter', CAST(col AS TIMESTAMP)) AS TIMESTAMP)"),
("P1Y", "CAST(DATE_TRUNC('year', CAST(col AS TIMESTAMP)) AS TIMESTAMP)"),
],
)
def test_timegrain_expressions(time_grain: str, expected_result: str) -> None:
"""
DB Eng Specs (pinot): Test time grain expressions
"""
from superset.db_engine_specs.pinot import PinotEngineSpec as spec

actual = str(
spec.get_timestamp_expr(col=column("col"), pdf=None, time_grain=time_grain)
)
assert actual == expected_result


def test_extras_without_ssl() -> None:
from superset.db_engine_specs.pinot import PinotEngineSpec as spec
from tests.integration_tests.fixtures.database import default_db_extra

db = mock.Mock()
db.extra = default_db_extra
db.server_cert = None
extras = spec.get_extra_params(db)
assert "connect_args" not in extras["engine_params"]

0 comments on commit c2a21d2

Please sign in to comment.