Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(elasticsearch): time_zone setting does not work for cast datetime expressions #17048

Merged
merged 13 commits into from
Nov 25, 2021
20 changes: 20 additions & 0 deletions docs/src/pages/docs/Connecting to Databases/elasticsearch.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,23 @@ POST /_aliases
```

Then register your table with the alias name logstasg_all

**Time zone**

By default, Superset uses UTC time zone for elasticsearch query. If you need to specify a time zone,
please edit your Database and enter the settings of your specified time zone in the Other > ENGINE PARAMETERS:


```
{
"connect_args": {
"time_zone": "Asia/Shanghai"
}
}
```

Another issue to note about the time zone problem is that before elasticsearch7.8, if you want to convert a string into a `DATETIME` object,
you need to use the `CAST` function,but this function does not support our `time_zone` setting. So it is recommended to upgrade to the version after elasticsearch7.8.
After elasticsearch7.8, you can use the `DATETIME_PARSE` function to solve this problem.
The DATETIME_PARSE function is to support our `time_zone` setting, and here you need to fill in your elasticsearch version number in the Other > VERSION setting.
the superset will use the `DATETIME_PARSE` function for conversion.
29 changes: 20 additions & 9 deletions superset/connectors/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,17 +264,23 @@ def is_temporal(self) -> bool:
def db_engine_spec(self) -> Type[BaseEngineSpec]:
return self.table.db_engine_spec

@property
def db_extra(self) -> Dict[str, Any]:
return self.table.database.get_extra()

@property
def type_generic(self) -> Optional[utils.GenericDataType]:
if self.is_dttm:
return GenericDataType.TEMPORAL
column_spec = self.db_engine_spec.get_column_spec(self.type)
column_spec = self.db_engine_spec.get_column_spec(
self.type, db_extra=self.db_extra
)
return column_spec.generic_type if column_spec else None

def get_sqla_col(self, label: Optional[str] = None) -> Column:
label = label or self.column_name
db_engine_spec = self.db_engine_spec
column_spec = db_engine_spec.get_column_spec(self.type)
column_spec = db_engine_spec.get_column_spec(self.type, db_extra=self.db_extra)
type_ = column_spec.sqla_type if column_spec else None
if self.expression:
tp = self.table.get_template_processor()
Expand Down Expand Up @@ -332,7 +338,9 @@ def get_timestamp_expression(

pdf = self.python_date_format
is_epoch = pdf in ("epoch_s", "epoch_ms")
column_spec = self.db_engine_spec.get_column_spec(self.type)
column_spec = self.db_engine_spec.get_column_spec(
self.type, db_extra=self.db_extra
)
type_ = column_spec.sqla_type if column_spec else DateTime
if not self.expression and not time_grain and not is_epoch:
sqla_col = column(self.column_name, type_=type_)
Expand All @@ -357,7 +365,11 @@ def dttm_sql_literal(
],
) -> str:
"""Convert datetime object to a SQL expression string"""
sql = self.db_engine_spec.convert_dttm(self.type, dttm) if self.type else None
sql = (
self.db_engine_spec.convert_dttm(self.type, dttm, db_extra=self.db_extra)
if self.type
else None
)

if sql:
return sql
Expand All @@ -370,10 +382,8 @@ def dttm_sql_literal(
utils.TimeRangeEndpoint.INCLUSIVE,
utils.TimeRangeEndpoint.EXCLUSIVE,
):
tf = (
self.table.database.get_extra()
.get("python_date_format_by_column_name", {})
.get(self.column_name)
tf = self.db_extra.get("python_date_format_by_column_name", {}).get(
self.column_name
)

if tf:
Expand Down Expand Up @@ -1523,10 +1533,11 @@ def _normalize_prequery_result_type(
value = value.item()

column_ = columns_by_name[dimension]
db_extra: Dict[str, Any] = self.database.get_extra()

if column_.type and column_.is_temporal and isinstance(value, str):
sql = self.db_engine_spec.convert_dttm(
column_.type, dateutil.parser.parse(value),
column_.type, dateutil.parser.parse(value), db_extra=db_extra
)

if sql:
Expand Down
4 changes: 3 additions & 1 deletion superset/connectors/sqla/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ def get_physical_table_metadata(
db_type = db_engine_spec.column_datatype_to_string(
col["type"], db_dialect
)
type_spec = db_engine_spec.get_column_spec(db_type)
type_spec = db_engine_spec.get_column_spec(
db_type, db_extra=database.get_extra()
)
col.update(
{
"type": db_type,
Expand Down
4 changes: 3 additions & 1 deletion superset/db_engine_specs/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ class AthenaEngineSpec(BaseEngineSpec):
}

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"from_iso8601_date('{dttm.date().isoformat()}')"
Expand Down
9 changes: 5 additions & 4 deletions superset/db_engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
from superset.utils import core as utils
from superset.utils.core import ColumnSpec, GenericDataType
from superset.utils.hashing import md5_sha_from_str
from superset.utils.memoized import memoized
from superset.utils.network import is_hostname_valid, is_port_open

if TYPE_CHECKING:
Expand Down Expand Up @@ -692,13 +691,14 @@ def df_to_sql(

@classmethod
def convert_dttm( # pylint: disable=unused-argument
cls, target_type: str, dttm: datetime,
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
"""
Convert Python datetime object to a SQL expression

:param target_type: The target type of expression
:param dttm: The datetime object
:param db_extra: The database extra object
:return: The SQL expression
"""
return None
Expand Down Expand Up @@ -1286,10 +1286,10 @@ def is_select_query(cls, parsed_query: ParsedQuery) -> bool:
return parsed_query.is_select()

@classmethod
@memoized
def get_column_spec( # pylint: disable=unused-argument
cls,
native_type: Optional[str],
db_extra: Optional[Dict[str, Any]] = None,
source: utils.ColumnTypeSource = utils.ColumnTypeSource.GET_TABLE,
column_type_mappings: Tuple[
Tuple[
Expand All @@ -1304,6 +1304,7 @@ def get_column_spec( # pylint: disable=unused-argument
Converts native database type to sqlalchemy column type.
:param native_type: Native database typee
:param source: Type coming from the database table or cursor description
:param db_extra: The database extra object
:return: ColumnSpec object
"""
col_types = cls.get_sqla_column_type(
Expand All @@ -1315,7 +1316,7 @@ def get_column_spec( # pylint: disable=unused-argument
# using datetimes
if generic_type == GenericDataType.TEMPORAL:
column_type = literal_dttm_type_factory(
column_type, cls, native_type or ""
column_type, cls, native_type or "", db_extra=db_extra or {}
)
is_dttm = generic_type == GenericDataType.TEMPORAL
return ColumnSpec(
Expand Down
4 changes: 3 additions & 1 deletion superset/db_engine_specs/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ class BigQueryEngineSpec(BaseEngineSpec):
}

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"CAST('{dttm.date().isoformat()}' AS DATE)"
Expand Down
6 changes: 4 additions & 2 deletions superset/db_engine_specs/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
import logging
from datetime import datetime
from typing import Dict, List, Optional, Type, TYPE_CHECKING
from typing import Any, Dict, List, Optional, Type, TYPE_CHECKING

from urllib3.exceptions import NewConnectionError

Expand Down Expand Up @@ -72,7 +72,9 @@ def get_dbapi_mapped_exception(cls, exception: Exception) -> Exception:
return new_exception(str(exception))

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"toDate('{dttm.date().isoformat()}')"
Expand Down
6 changes: 4 additions & 2 deletions superset/db_engine_specs/crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Optional, TYPE_CHECKING
from typing import Any, Dict, Optional, TYPE_CHECKING

from superset.db_engine_specs.base import BaseEngineSpec
from superset.utils import core as utils
Expand Down Expand Up @@ -50,7 +50,9 @@ def epoch_ms_to_dttm(cls) -> str:
return "{col}"

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.TIMESTAMP:
return f"{dttm.timestamp() * 1000}"
Expand Down
8 changes: 5 additions & 3 deletions superset/db_engine_specs/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.o

from datetime import datetime
from typing import Optional
from typing import Any, Dict, Optional

from superset.db_engine_specs.base import BaseEngineSpec
from superset.db_engine_specs.hive import HiveEngineSpec
Expand All @@ -40,8 +40,10 @@ class DatabricksODBCEngineSpec(BaseEngineSpec):
_time_grain_expressions = HiveEngineSpec._time_grain_expressions

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
return HiveEngineSpec.convert_dttm(target_type, dttm)
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
return HiveEngineSpec.convert_dttm(target_type, dttm, db_extra=db_extra)

@classmethod
def epoch_to_dttm(cls) -> str:
Expand Down
6 changes: 4 additions & 2 deletions superset/db_engine_specs/dremio.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Optional
from typing import Any, Dict, Optional

from superset.db_engine_specs.base import BaseEngineSpec
from superset.utils import core as utils
Expand Down Expand Up @@ -43,7 +43,9 @@ def epoch_to_dttm(cls) -> str:
return "TO_DATE({col})"

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"TO_DATE('{dttm.date().isoformat()}', 'YYYY-MM-DD')"
Expand Down
6 changes: 4 additions & 2 deletions superset/db_engine_specs/drill.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Optional
from typing import Any, Dict, Optional
from urllib import parse

from sqlalchemy.engine.url import URL
Expand Down Expand Up @@ -55,7 +55,9 @@ def epoch_ms_to_dttm(cls) -> str:
return "TO_DATE({col})"

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"TO_DATE('{dttm.date().isoformat()}', 'yyyy-MM-dd')"
Expand Down
4 changes: 3 additions & 1 deletion superset/db_engine_specs/druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ def get_extra_params(database: "Database") -> Dict[str, Any]:
return extra

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"CAST(TIME_PARSE('{dttm.date().isoformat()}') AS DATE)"
Expand Down
37 changes: 34 additions & 3 deletions superset/db_engine_specs/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from datetime import datetime
from typing import Dict, Optional, Type
from distutils.version import StrictVersion
from typing import Any, Dict, Optional, Type

from superset.db_engine_specs.base import BaseEngineSpec
from superset.db_engine_specs.exceptions import (
Expand All @@ -25,6 +27,8 @@
)
from superset.utils import core as utils

logger = logging.getLogger()


class ElasticSearchEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
engine = "elasticsearch"
Expand Down Expand Up @@ -59,9 +63,34 @@ def get_dbapi_exception_mapping(cls) -> Dict[Type[Exception], Type[Exception]]:
}

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:

db_extra = db_extra or {}
if target_type.upper() == utils.TemporalType.DATETIME:
es_version = db_extra.get("version")
# The elasticsearch CAST function does not take effect for the time zone
# setting. In elasticsearch7.8 and above, we can use the DATETIME_PARSE
# function to solve this problem.
supports_dttm_parse = False
try:
if es_version:
supports_dttm_parse = StrictVersion(es_version) >= StrictVersion(
"7.8"
)
except Exception as ex: # pylint: disable=broad-except
logger.error("Unexpected error while convert es_version", exc_info=True)
logger.exception(ex)

if supports_dttm_parse:
datetime_formatted = dttm.isoformat(sep=" ", timespec="seconds")
return (
f"""DATETIME_PARSE('{datetime_formatted}', 'yyyy-MM-dd HH:mm:ss')"""
)

return f"""CAST('{dttm.isoformat(timespec="seconds")}' AS DATETIME)"""

return None


Expand All @@ -87,7 +116,9 @@ class OpenDistroEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
engine_name = "ElasticSearch (OpenDistro SQL)"

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
if target_type.upper() == utils.TemporalType.DATETIME:
return f"""'{dttm.isoformat(timespec="seconds")}'"""
return None
Expand Down
6 changes: 4 additions & 2 deletions superset/db_engine_specs/firebird.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Optional
from typing import Any, Dict, Optional

from superset.db_engine_specs.base import BaseEngineSpec, LimitMethod
from superset.utils import core as utils
Expand Down Expand Up @@ -70,7 +70,9 @@ def epoch_to_dttm(cls) -> str:
return "DATEADD(second, {col}, CAST('00:00:00' AS TIMESTAMP))"

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.TIMESTAMP:
dttm_formatted = dttm.isoformat(sep=" ")
Expand Down