diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index d90d3230ea12a6..1566b3767ede29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1950,6 +1950,10 @@ public List executeInternalQuery() { if (originStmt.originStmt != null) { context.setSqlHash(DigestUtils.md5Hex(originStmt.originStmt)); } + // Mark state up front so audit log records this as an internal query even if parse/plan fails. + context.getState().setNereids(true); + context.getState().setIsQuery(true); + context.getState().setInternal(true); try { List resultRows = new ArrayList<>(); try { @@ -1957,9 +1961,6 @@ public List executeInternalQuery() { Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter, "Nereids only process LogicalPlanAdapter," + " but parsedStmt is " + parsedStmt.getClass().getName()); - context.getState().setNereids(true); - context.getState().setIsQuery(true); - context.getState().setInternal(true); planner = new NereidsPlanner(statementContext); planner.plan(parsedStmt, context.getSessionVariable().toThrift()); } catch (Exception e) { @@ -2015,6 +2016,16 @@ public List executeInternalQuery() { } catch (Exception e) { throw new RuntimeException("Failed to fetch internal SQL result. " + Util.getRootCauseMessage(e), e); } + } catch (Exception e) { + // Surface failure into ConnectContext state so AuditLogHelper records ERR instead of OK. + if (context.getState().getStateType() != MysqlStateType.ERR) { + String msg = e.getMessage(); + if (Strings.isNullOrEmpty(msg)) { + msg = Util.getRootCauseMessage(e); + } + context.getState().setError(ErrorCode.ERR_INTERNAL_ERROR, msg); + } + throw e; } finally { if (coord != null) { coord.close(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorInternalQueryTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorInternalQueryTest.java index 5091ea20dc6397..6eec28f02298f8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorInternalQueryTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorInternalQueryTest.java @@ -18,6 +18,7 @@ package org.apache.doris.qe; import org.apache.doris.analysis.StatementBase; +import org.apache.doris.common.ErrorCode; import org.apache.doris.nereids.NereidsPlanner; import mockit.Mock; @@ -42,4 +43,27 @@ public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions } Assert.assertEquals("a8ec30e5ad0820f8c5bd16a82a4491ca", executor.getContext().getSqlHash()); } + + @Test + public void testExecuteInternalQuerySetsErrorStateOnFailure() { + // Regression test for CIR-20019: when the internal SQL execution throws, + // ConnectContext state must be set to ERR so AuditLogHelper records the failure + // instead of misleadingly logging State=OK with empty error message. + ConnectContext ctx = new ConnectContext(); + StmtExecutor executor = new StmtExecutor(ctx, "select * from table1"); + try (MockedConstruction mocked = Mockito.mockConstruction(NereidsPlanner.class, + (mock, context) -> Mockito.doThrow(new RuntimeException("mock plan failure")) + .when(mock).plan(Mockito.any(StatementBase.class), Mockito.any(TQueryOptions.class)))) { + Assert.assertThrows(RuntimeException.class, executor::executeInternalQuery); + } + Assert.assertEquals(QueryState.MysqlStateType.ERR, ctx.getState().getStateType()); + Assert.assertEquals(ErrorCode.ERR_INTERNAL_ERROR, ctx.getState().getErrorCode()); + Assert.assertNotNull(ctx.getState().getErrorMessage()); + Assert.assertTrue("error message should mention root cause, got: " + ctx.getState().getErrorMessage(), + ctx.getState().getErrorMessage().contains("mock plan failure")); + Assert.assertTrue("internal query should be flagged as internal in audit state", + ctx.getState().isInternal()); + Assert.assertTrue("internal query should be flagged as query in audit state", + ctx.getState().isQuery()); + } } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 244e4f720d5235..01ed92f3e70115 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -2150,6 +2150,37 @@ class Suite implements GroovyInterceptable { } } + /** + * Set the given global variables for the duration of {@code actionSupplier}, + * restoring their original values on exit. The variable values are read via + * {@code SHOW GLOBAL VARIABLES} before being changed, so any kind of + * exception inside {@code actionSupplier} still triggers the restore. + */ + void setGlobalVarTemporary(Map tempVars, Closure actionSupplier) { + def quote = { Object v -> + if (v == null) { + return "''" + } + if (v instanceof Boolean || v instanceof Number) { + return v.toString() + } + return "'" + v.toString().replace("'", "''") + "'" + } + Map origin = [:] + tempVars.keySet().each { key -> + def rows = sql_return_maparray "show global variables like '${key}'" + if (!rows.isEmpty()) { + origin.put(key, rows[0].Value as String) + } + } + try { + tempVars.each { key, value -> sql "set global ${key} = ${quote(value)}" } + actionSupplier() + } finally { + origin.each { key, value -> sql "set global ${key} = ${quote(value)}" } + } + } + void setBeConfigTemporary(Map tempConfig, Closure actionSupplier) { Map> originConf = Maps.newHashMap() tempConfig.each{ k, v -> diff --git a/regression-test/suites/fault_injection_p0/test_audit_log_internal_query_failure.groovy b/regression-test/suites/fault_injection_p0/test_audit_log_internal_query_failure.groovy new file mode 100644 index 00000000000000..4db56802e99da6 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_audit_log_internal_query_failure.groovy @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Regression test for CIR-20019: when an internal query fails (for example +// the column-statistics gathering SQL that ANALYZE issues against a user +// table), the audit log entry must record state=ERR with a descriptive +// error_message instead of the previous misleading state=OK / return_rows=0. +suite('test_audit_log_internal_query_failure', 'nonConcurrent') { + def tbl = 'test_audit_log_internal_query_failure_t1' + + setGlobalVarTemporary([enable_audit_plugin: true], { + try { + sql "drop table if exists ${tbl}" + sql """ + create table ${tbl} (k int, v int) + duplicate key(k) + distributed by hash(k) buckets 1 + properties('replication_num'='1') + """ + sql "insert into ${tbl} values(1,10),(2,20),(3,30)" + + // Limit the injected IO error to this table's tablet so concurrent + // reads on system tables (such as audit_log itself) are unaffected. + def tabletRows = sql_return_maparray "show tablets from ${tbl}" + assertFalse(tabletRows.isEmpty(), "expected at least one tablet for ${tbl}") + def tabletId = tabletRows[0].TabletId + + // Capture the FE-side current timestamp to filter audit rows so + // stale entries from previous runs do not satisfy the assertion. + def startTime = (sql_return_maparray "select now() as ts")[0].ts.toString() + + GetDebugPoint().clearDebugPointsForAllBEs() + try { + GetDebugPoint().enableDebugPointForAllBEs( + "LocalFileReader::read_at_impl.io_error", + [ sub_path: "/${tabletId}/" ]) + + test { + sql "analyze table ${tbl} with sync" + exception "IO_ERROR" + } + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + + // Force a flush so the failed internal query is queryable from + // __internal_schema.audit_log. + // The failed gather SQL reads from our user table and runs as an + // internal query; it must show up with state=ERR. Filter by start + // time to avoid matching stale entries from previous runs. + def query = """select state, error_code, error_message + from __internal_schema.audit_log + where is_internal = 1 + and stmt like '%${tbl}%' + and state = 'ERR' + and `time` >= '${startTime}' + order by `time` desc limit 1""" + def res = [] + int retry = 60 + while (res.isEmpty() && retry-- > 0) { + sql "call flush_audit_log()" + sleep(2000) + res = sql_return_maparray "${query}" + } + assertFalse(res.isEmpty(), + "expected an audit_log entry with state=ERR for the failed gather query") + assertEquals('ERR', res[0].state.toString()) + assertNotEquals('0', res[0].error_code.toString()) + assertNotNull(res[0].error_message) + assertTrue(!res[0].error_message.toString().isEmpty(), + "audit_log error_message should not be empty, got: ${res[0].error_message}") + } finally { + try { + sql "drop table if exists ${tbl}" + } catch (Throwable ignored) { + } + } + }) +}