Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2013,16 +2013,17 @@ public List<ResultRow> executeInternalQuery() {
if (originStmt.originStmt != null) {
context.setSqlHash(DigestUtils.md5Hex(originStmt.originStmt));
}
// Mark state up front so audit log records this as an internal query even if parse/plan fails.
context.getState().setNereids(true);
context.getState().setIsQuery(true);
context.getState().setInternal(true);
try {
List<ResultRow> resultRows = new ArrayList<>();
try {
parseByNereids();
Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter,
"Nereids only process LogicalPlanAdapter,"
+ " but parsedStmt is " + parsedStmt.getClass().getName());
context.getState().setNereids(true);
context.getState().setIsQuery(true);
context.getState().setInternal(true);
planner = new NereidsPlanner(statementContext);
planner.plan(parsedStmt, context.getSessionVariable().toThrift());
} catch (Exception e) {
Expand Down Expand Up @@ -2078,6 +2079,16 @@ public List<ResultRow> executeInternalQuery() {
} catch (Exception e) {
throw new RuntimeException("Failed to fetch internal SQL result. " + Util.getRootCauseMessage(e), e);
}
} catch (Exception e) {
// Surface failure into ConnectContext state so AuditLogHelper records ERR instead of OK.
if (context.getState().getStateType() != MysqlStateType.ERR) {
String msg = e.getMessage();
if (Strings.isNullOrEmpty(msg)) {
msg = Util.getRootCauseMessage(e);
}
context.getState().setError(ErrorCode.ERR_INTERNAL_ERROR, msg);
}
throw e;
} finally {
if (coord != null) {
coord.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.qe;

import org.apache.doris.analysis.StatementBase;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.thrift.TQueryOptions;

Expand All @@ -43,4 +44,27 @@ public void testSetSqlHash() {
}
Assert.assertEquals("a8ec30e5ad0820f8c5bd16a82a4491ca", executor.getContext().getSqlHash());
}

@Test
public void testExecuteInternalQuerySetsErrorStateOnFailure() {
// Regression test for CIR-20019: when the internal SQL execution throws,
// ConnectContext state must be set to ERR so AuditLogHelper records the failure
// instead of misleadingly logging State=OK with empty error message.
ConnectContext ctx = new ConnectContext();
StmtExecutor executor = new StmtExecutor(ctx, "select * from table1");
try (MockedConstruction<NereidsPlanner> mocked = Mockito.mockConstruction(NereidsPlanner.class,
(mock, context) -> Mockito.doThrow(new RuntimeException("mock plan failure"))
.when(mock).plan(Mockito.any(StatementBase.class), Mockito.any(TQueryOptions.class)))) {
Assert.assertThrows(RuntimeException.class, executor::executeInternalQuery);
}
Assert.assertEquals(QueryState.MysqlStateType.ERR, ctx.getState().getStateType());
Assert.assertEquals(ErrorCode.ERR_INTERNAL_ERROR, ctx.getState().getErrorCode());
Assert.assertNotNull(ctx.getState().getErrorMessage());
Assert.assertTrue("error message should mention root cause, got: " + ctx.getState().getErrorMessage(),
ctx.getState().getErrorMessage().contains("mock plan failure"));
Assert.assertTrue("internal query should be flagged as internal in audit state",
ctx.getState().isInternal());
Assert.assertTrue("internal query should be flagged as query in audit state",
ctx.getState().isQuery());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2408,6 +2408,37 @@ class Suite implements GroovyInterceptable {
}
}

/**
* Set the given global variables for the duration of {@code actionSupplier},
* restoring their original values on exit. The variable values are read via
* {@code SHOW GLOBAL VARIABLES} before being changed, so any kind of
* exception inside {@code actionSupplier} still triggers the restore.
*/
void setGlobalVarTemporary(Map<String, Object> tempVars, Closure actionSupplier) {
def quote = { Object v ->
if (v == null) {
return "''"
}
if (v instanceof Boolean || v instanceof Number) {
return v.toString()
}
return "'" + v.toString().replace("'", "''") + "'"
}
Map<String, String> origin = [:]
tempVars.keySet().each { key ->
def rows = sql_return_maparray "show global variables like '${key}'"
if (!rows.isEmpty()) {
origin.put(key, rows[0].Value as String)
}
}
try {
tempVars.each { key, value -> sql "set global ${key} = ${quote(value)}" }
actionSupplier()
} finally {
origin.each { key, value -> sql "set global ${key} = ${quote(value)}" }
}
}

void setBeConfigTemporary(Map<String, Object> tempConfig, Closure actionSupplier) {
Map<String, Map<String, String>> originConf = Maps.newHashMap()
tempConfig.each{ k, v ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Regression test for CIR-20019: when an internal query fails (for example
// the column-statistics gathering SQL that ANALYZE issues against a user
// table), the audit log entry must record state=ERR with a descriptive
// error_message instead of the previous misleading state=OK / return_rows=0.
suite('test_audit_log_internal_query_failure', 'nonConcurrent') {
def tbl = 'test_audit_log_internal_query_failure_t1'

setGlobalVarTemporary([enable_audit_plugin: true], {
try {
sql "drop table if exists ${tbl}"
sql """
create table ${tbl} (k int, v int)
duplicate key(k)
distributed by hash(k) buckets 1
properties('replication_num'='1')
"""
sql "insert into ${tbl} values(1,10),(2,20),(3,30)"

// Limit the injected IO error to this table's tablet so concurrent
// reads on system tables (such as audit_log itself) are unaffected.
def tabletRows = sql_return_maparray "show tablets from ${tbl}"
assertFalse(tabletRows.isEmpty(), "expected at least one tablet for ${tbl}")
def tabletId = tabletRows[0].TabletId

// Capture the FE-side current timestamp to filter audit rows so
// stale entries from previous runs do not satisfy the assertion.
def startTime = (sql_return_maparray "select now() as ts")[0].ts.toString()

GetDebugPoint().clearDebugPointsForAllBEs()
try {
GetDebugPoint().enableDebugPointForAllBEs(
"LocalFileReader::read_at_impl.io_error",
[ sub_path: "/${tabletId}/" ])

test {
sql "analyze table ${tbl} with sync"
exception "IO_ERROR"
}
} finally {
GetDebugPoint().clearDebugPointsForAllBEs()
}

// Force a flush so the failed internal query is queryable from
// __internal_schema.audit_log.
// The failed gather SQL reads from our user table and runs as an
// internal query; it must show up with state=ERR. Filter by start
// time to avoid matching stale entries from previous runs.
def query = """select state, error_code, error_message
from __internal_schema.audit_log
where is_internal = 1
and stmt like '%${tbl}%'
and state = 'ERR'
and `time` >= '${startTime}'
order by `time` desc limit 1"""
def res = []
int retry = 60
while (res.isEmpty() && retry-- > 0) {
sql "call flush_audit_log()"
sleep(2000)
res = sql_return_maparray "${query}"
}
assertFalse(res.isEmpty(),
"expected an audit_log entry with state=ERR for the failed gather query")
assertEquals('ERR', res[0].state.toString())
assertNotEquals('0', res[0].error_code.toString())
assertNotNull(res[0].error_message)
assertTrue(!res[0].error_message.toString().isEmpty(),
"audit_log error_message should not be empty, got: ${res[0].error_message}")
} finally {
try {
sql "drop table if exists ${tbl}"
} catch (Throwable ignored) {
}
}
})
}
Loading