Skip to content

Commit

Permalink
fix: Stop query in SQL Lab with impala engine (#22635)
Browse files Browse the repository at this point in the history
  • Loading branch information
wanghong1314 committed Jan 10, 2023
1 parent 159dcd7 commit 8bf6d80
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 3 deletions.
4 changes: 2 additions & 2 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1131,8 +1131,8 @@ def CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC( # pylint: disable=invalid-name
TRACKING_URL_TRANSFORMER = lambda url: url


# Interval between consecutive polls when using Hive Engine
HIVE_POLL_INTERVAL = int(timedelta(seconds=5).total_seconds())
# customize the polling time of each engine
DB_POLL_INTERVAL_SECONDS: Dict[str, int] = {}

# Interval between consecutive polls when using Presto Engine
# See here: https://github.com/dropbox/PyHive/blob/8eb0aeab8ca300f3024655419b93dad926c1a351/pyhive/presto.py#L93 # pylint: disable=line-too-long,useless-suppression
Expand Down
10 changes: 9 additions & 1 deletion superset/db_engine_specs/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,15 @@ def handle_cursor( # pylint: disable=too-many-locals
last_log_line = len(log_lines)
if needs_commit:
session.commit()
time.sleep(current_app.config["HIVE_POLL_INTERVAL"])
if sleep_interval := current_app.config.get("HIVE_POLL_INTERVAL"):
logger.warning(
"HIVE_POLL_INTERVAL is deprecated and will be removed in 3.0. Please use DB_POLL_INTERVAL_SECONDS instead"
)
else:
sleep_interval = current_app.config["DB_POLL_INTERVAL_SECONDS"].get(
cls.engine, 5
)
time.sleep(sleep_interval)
polled = cursor.poll()

@classmethod
Expand Down
90 changes: 90 additions & 0 deletions superset/db_engine_specs/impala.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,25 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import re
import time
from datetime import datetime
from typing import Any, Dict, List, Optional

from flask import current_app
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.orm import Session

from superset.constants import QUERY_EARLY_CANCEL_KEY
from superset.db_engine_specs.base import BaseEngineSpec
from superset.models.sql_lab import Query
from superset.utils import core as utils

logger = logging.getLogger(__name__)
# Query 5543ffdf692b7d02:f78a944000000000: 3% Complete (17 out of 547)
QUERY_PROGRESS_REGEX = re.compile(r"Query.*: (?P<query_progress>[0-9]+)%")


class ImpalaEngineSpec(BaseEngineSpec):
"""Engine spec for Cloudera's Impala"""
Expand Down Expand Up @@ -63,3 +74,82 @@ def get_schema_names(cls, inspector: Inspector) -> List[str]:
if not row[0].startswith("_")
]
return schemas

@classmethod
def has_implicit_cancel(cls) -> bool:
"""
Return True if the live cursor handles the implicit cancelation of the query,
False otherise.
:return: Whether the live cursor implicitly cancels the query
:see: handle_cursor
"""

return True

@classmethod
def execute(
cls,
cursor: Any,
query: str,
**kwargs: Any, # pylint: disable=unused-argument
) -> None:
try:
cursor.execute_async(query)
except Exception as ex:
raise cls.get_dbapi_mapped_exception(ex)

@classmethod
def handle_cursor(cls, cursor: Any, query: Query, session: Session) -> None:
"""Stop query and updates progress information"""

query_id = query.id
unfinished_states = (
"INITIALIZED_STATE",
"RUNNING_STATE",
)

try:
status = cursor.status()
while status in unfinished_states:
session.refresh(query)
query = session.query(Query).filter_by(id=query_id).one()
# if query cancelation was requested prior to the handle_cursor call, but
# the query was still executed
# modified in stop_query in views / core.py is reflected here.
# stop query
if query.extra.get(QUERY_EARLY_CANCEL_KEY):
cursor.cancel_operation()
cursor.close_operation()
cursor.close()
break

# updates progress info by log
try:
log = cursor.get_log() or ""
except Exception: # pylint: disable=broad-except
logger.warning("Call to GetLog() failed")
log = ""

if log:
match = QUERY_PROGRESS_REGEX.match(log)
if match:
progress = int(match.groupdict()["query_progress"])
logger.debug(
"Query %s: Progress total: %s", str(query_id), str(progress)
)
needs_commit = False
if progress > query.progress:
query.progress = progress
needs_commit = True

if needs_commit:
session.commit()
sleep_interval = current_app.config["DB_POLL_INTERVAL_SECONDS"].get(
cls.engine, 5
)
time.sleep(sleep_interval)
status = cursor.status()
except Exception: # pylint: disable=broad-except
logger.debug("Call to status() failed ")
return
4 changes: 4 additions & 0 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
SqlMetric,
TableColumn,
)
from superset.constants import QUERY_EARLY_CANCEL_KEY
from superset.dashboards.commands.importers.v0 import ImportDashboardsCommand
from superset.dashboards.dao import DashboardDAO
from superset.dashboards.permalink.commands.get import GetDashboardPermalinkCommand
Expand Down Expand Up @@ -2318,6 +2319,9 @@ def stop_query(self) -> FlaskResponse:
raise SupersetCancelQueryException("Could not cancel query")

query.status = QueryStatus.STOPPED
# Add the stop identity attribute because the sqlalchemy thread is unsafe
# because of multiple updates to the status in the query table
query.set_extra_json_key(QUERY_EARLY_CANCEL_KEY, True)
query.end_time = now_as_float()
db.session.commit()

Expand Down

0 comments on commit 8bf6d80

Please sign in to comment.