From a53c42118d2129b6f76bd7a1a3a03398271bd323 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Mon, 23 Mar 2026 14:59:31 +0800 Subject: [PATCH 1/2] [fix](metrics) Fix prepared statement QPS metrics not counted when audit log disabled When enable_prepared_stmt_audit_log is false (default), auditAfterExec() is skipped entirely for prepared statements. Since QPS metric counting (COUNTER_QUERY_ALL, COUNTER_QUERY_ERR, HISTO_QUERY_LATENCY, etc.) lives inside auditAfterExec -> AuditLogHelper.logAuditLog(), these metrics are also lost for prepared statement SELECT queries. Add AuditLogHelper.updateMetrics() to count metrics independently of audit log writing, and call it in the else branch when audit log is disabled. --- .../org/apache/doris/qe/AuditLogHelper.java | 85 ++++++++++++++ .../doris/qe/MysqlConnectProcessor.java | 3 + .../apache/doris/qe/AuditLogHelperTest.java | 104 ++++++++++++++++++ 3 files changed, 192 insertions(+) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/qe/AuditLogHelperTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index fb5040be30e393..e0daef4268409f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -458,6 +458,91 @@ private static long getQueueTimeMs(ConnectContext ctx) { return queueToken == null ? -1 : queueToken.getQueueEndTime() - queueToken.getQueueStartTime(); } + /** + * Update query metrics without writing audit log. This is used when + * enable_prepared_stmt_audit_log is disabled, to ensure QPS metrics + * are still counted for prepared statement executions. + */ + public static void updateMetrics(ConnectContext ctx) { + if (Config.enable_bdbje_debug_mode) { + return; + } + try { + updateMetricsImpl(ctx); + } catch (Throwable t) { + LOG.warn("Failed to update query metrics.", t); + } + } + + private static void updateMetricsImpl(ConnectContext ctx) { + if (!ctx.getState().isQuery()) { + return; + } + if (!MetricRepo.isInit) { + return; + } + + long elapseMs = System.currentTimeMillis() - ctx.getStartTime(); + + if (!ctx.getState().isInternal()) { + MetricRepo.COUNTER_QUERY_ALL.increase(1L); + MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L); + } + + String cloudCluster = ""; + String physicalClusterName = ""; + try { + if (Config.isCloudMode()) { + cloudCluster = ctx.getCloudCluster(false); + physicalClusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getPhysicalCluster(cloudCluster); + if (!cloudCluster.equals(physicalClusterName)) { + // vcg + MetricRepo.increaseClusterQueryAll(physicalClusterName); + } + } + } catch (ComputeGroupException e) { + LOG.warn("Failed to get cloud cluster, cloudCluster={}, physicalClusterName={} ", + cloudCluster, physicalClusterName, e); + return; + } + + MetricRepo.increaseClusterQueryAll(cloudCluster); + + if (!ctx.getState().isInternal()) { + if (ctx.getState().getStateType() == MysqlStateType.ERR + && ctx.getState().getErrType() != QueryState.ErrType.ANALYSIS_ERR) { + // err query + MetricRepo.COUNTER_QUERY_ERR.increase(1L); + MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd(ctx.getQualifiedUser()).increase(1L); + if (cloudCluster.equals(physicalClusterName)) { + // not vcg + MetricRepo.increaseClusterQueryErr(cloudCluster); + } else { + // vcg + MetricRepo.increaseClusterQueryErr(cloudCluster); + MetricRepo.increaseClusterQueryErr(physicalClusterName); + } + } else if (ctx.getState().getStateType() == MysqlStateType.OK + || ctx.getState().getStateType() == MysqlStateType.EOF) { + // ok query + MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs); + MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd(ctx.getQualifiedUser()).update(elapseMs); + if (cloudCluster.equals(physicalClusterName)) { + // not vcg + MetricRepo.updateClusterQueryLatency(cloudCluster, elapseMs); + } else { + // vcg + MetricRepo.updateClusterQueryLatency(cloudCluster, elapseMs); + MetricRepo.updateClusterQueryLatency(physicalClusterName, elapseMs); + } + if (elapseMs > Config.qe_slow_log_ms) { + MetricRepo.COUNTER_QUERY_SLOW.increase(1L); + } + } + } + } + private static String getStmtType(StatementBase stmt) { if (stmt == null) { return StmtType.OTHER.name(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java index 50d3bed474a98f..5fe5223f9a19e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java @@ -186,6 +186,9 @@ protected void handleExecute(PrepareCommand prepareCommand, long stmtId, Prepare } if (ctx.getSessionVariable().isEnablePreparedStmtAuditLog()) { auditAfterExec(stmtStr, executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog(), true); + } else { + // When audit log is disabled for prepared statements, still update QPS metrics. + AuditLogHelper.updateMetrics(ctx); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/AuditLogHelperTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/AuditLogHelperTest.java new file mode 100644 index 00000000000000..e1b6b8c961cb9e --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/AuditLogHelperTest.java @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.metric.MetricRepo; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class AuditLogHelperTest { + + @BeforeClass + public static void setUp() { + FeConstants.runningUnitTest = true; + MetricRepo.init(); + } + + private ConnectContext createMockContext(boolean isQuery, boolean isInternal) { + ConnectContext ctx = new ConnectContext(); + ctx.setStartTime(); + ctx.setCurrentUserIdentity(UserIdentity.ROOT); + ctx.getState().setIsQuery(isQuery); + ctx.getState().setInternal(isInternal); + return ctx; + } + + @Test + public void testUpdateMetricsQueryOk() { + ConnectContext ctx = createMockContext(true, false); + ctx.getState().setOk(0, 0, ""); + + long before = MetricRepo.COUNTER_QUERY_ALL.getValue(); + AuditLogHelper.updateMetrics(ctx); + long after = MetricRepo.COUNTER_QUERY_ALL.getValue(); + + Assert.assertEquals(1, after - before); + } + + @Test + public void testUpdateMetricsQueryErr() { + ConnectContext ctx = createMockContext(true, false); + ctx.getState().setError("test error"); + + long beforeAll = MetricRepo.COUNTER_QUERY_ALL.getValue(); + long beforeErr = MetricRepo.COUNTER_QUERY_ERR.getValue(); + AuditLogHelper.updateMetrics(ctx); + long afterAll = MetricRepo.COUNTER_QUERY_ALL.getValue(); + long afterErr = MetricRepo.COUNTER_QUERY_ERR.getValue(); + + Assert.assertEquals(1, afterAll - beforeAll); + Assert.assertEquals(1, afterErr - beforeErr); + } + + @Test + public void testUpdateMetricsNotQuery() { + // INSERT scenario: isQuery = false, counters should not change + ConnectContext ctx = createMockContext(false, false); + ctx.getState().setOk(1, 0, ""); + + long before = MetricRepo.COUNTER_QUERY_ALL.getValue(); + AuditLogHelper.updateMetrics(ctx); + long after = MetricRepo.COUNTER_QUERY_ALL.getValue(); + + Assert.assertEquals(0, after - before); + } + + @Test + public void testUpdateMetricsDebugModeShortCircuit() { + boolean original = Config.enable_bdbje_debug_mode; + try { + Config.enable_bdbje_debug_mode = true; + + ConnectContext ctx = createMockContext(true, false); + ctx.getState().setOk(0, 0, ""); + + long before = MetricRepo.COUNTER_QUERY_ALL.getValue(); + AuditLogHelper.updateMetrics(ctx); + long after = MetricRepo.COUNTER_QUERY_ALL.getValue(); + + Assert.assertEquals(0, after - before); + } finally { + Config.enable_bdbje_debug_mode = original; + } + } +} From bd3e53559ad06183f8221ad31eb9d1c32bd4e6e3 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Mon, 23 Mar 2026 15:08:19 +0800 Subject: [PATCH 2/2] [refactor](metrics) Reuse updateMetricsImpl in logAuditLogImpl to eliminate duplicate code Let logAuditLogImpl() call updateMetricsImpl() for metric counting instead of duplicating the same logic. Only audit-specific fields (setSqlDigest, setScanBytes) remain in logAuditLogImpl. --- .../org/apache/doris/qe/AuditLogHelper.java | 65 ++----------------- 1 file changed, 7 insertions(+), 58 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index e0daef4268409f..8b6b307634af3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -345,64 +345,13 @@ private static void logAuditLogImpl(ConnectContext ctx, String origStmt, Stateme } if (ctx.getState().isQuery()) { - if (MetricRepo.isInit) { - if (!ctx.getState().isInternal()) { - MetricRepo.COUNTER_QUERY_ALL.increase(1L); - MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L); - } - String physicalClusterName = ""; - try { - if (Config.isCloudMode()) { - cloudCluster = ctx.getCloudCluster(false); - physicalClusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) - .getPhysicalCluster(cloudCluster); - if (!cloudCluster.equals(physicalClusterName)) { - // vcg - MetricRepo.increaseClusterQueryAll(physicalClusterName); - } - } - } catch (ComputeGroupException e) { - LOG.warn("Failed to get cloud cluster, cloudCluster={}, physicalClusterName={} ", - cloudCluster, physicalClusterName, e); - return; - } - - MetricRepo.increaseClusterQueryAll(cloudCluster); - if (!ctx.getState().isInternal()) { - if (ctx.getState().getStateType() == MysqlStateType.ERR - && ctx.getState().getErrType() != QueryState.ErrType.ANALYSIS_ERR) { - // err query - MetricRepo.COUNTER_QUERY_ERR.increase(1L); - MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd(ctx.getQualifiedUser()).increase(1L); - if (cloudCluster.equals(physicalClusterName)) { - // not vcg - MetricRepo.increaseClusterQueryErr(cloudCluster); - } else { - // vcg - MetricRepo.increaseClusterQueryErr(cloudCluster); - MetricRepo.increaseClusterQueryErr(physicalClusterName); - } - } else if (ctx.getState().getStateType() == MysqlStateType.OK - || ctx.getState().getStateType() == MysqlStateType.EOF) { - // ok query - MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs); - MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd(ctx.getQualifiedUser()).update(elapseMs); - if (cloudCluster.equals(physicalClusterName)) { - // not vcg - MetricRepo.updateClusterQueryLatency(cloudCluster, elapseMs); - } else { - // vcg - MetricRepo.updateClusterQueryLatency(cloudCluster, elapseMs); - MetricRepo.updateClusterQueryLatency(physicalClusterName, elapseMs); - } - if (elapseMs > Config.qe_slow_log_ms) { - MetricRepo.COUNTER_QUERY_SLOW.increase(1L); - } - if (elapseMs > Config.sql_digest_generation_threshold_ms) { - String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest()); - auditEventBuilder.setSqlDigest(sqlDigest); - } - } + updateMetricsImpl(ctx); + if (MetricRepo.isInit && !ctx.getState().isInternal()) { + if ((ctx.getState().getStateType() == MysqlStateType.OK + || ctx.getState().getStateType() == MysqlStateType.EOF) + && elapseMs > Config.sql_digest_generation_threshold_ms) { + String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest()); + auditEventBuilder.setSqlDigest(sqlDigest); } } auditEventBuilder.setScanBytesFromLocalStorage(