From 895f115f3562d8d380011a85bf3926a3e5413a2c Mon Sep 17 00:00:00 2001 From: yujun Date: Wed, 22 Apr 2026 16:45:01 +0800 Subject: [PATCH] [feature](statistics) Skip collecting stats for long string columns (#62686) Issue Number: close #0 Problem Summary: When a string column contains extremely long values, running ANALYZE on it can consume excessive memory and may even OOM the BE. This PR introduces an inline per-row guard that short-circuits stats collection for any string column whose rows exceed `Config.statistics_max_string_column_length` (default 1024 bytes). The guard is injected into every statistics-collection SQL template (FULL_ANALYZE_TEMPLATE, LINEAR_ANALYZE_TEMPLATE, DUJ1_ANALYZE_TEMPLATE, HMS full/sample) as a `NoneMovableFunction`: assert_true(col IS NULL OR LENGTH(col) <= N, 'ANALYZE_SKIP_LONG_STRING_COLUMN') AS __lc When any row violates the predicate, `assert_true` triggers a controlled error carrying the `ANALYZE_SKIP_LONG_STRING_COLUMN` marker. `BaseAnalysisTask` walks the exception cause chain for this marker and, when detected, converts the failure into a FINISHED task with a skip message so users see why the column was dropped (via SHOW ANALYZE and via OK-packet info for sync analyze). Other columns in the same job are unaffected. Setting `statistics_max_string_column_length = 0` disables the guard entirely (upstream behavior). Added FE config `statistics_max_string_column_length` (default 1024). String columns whose row byte length exceeds this value are skipped during ANALYZE with a visible skip message, instead of risking OOM. Set to 0 to disable. --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> (cherry picked from commit 34b5c6c7c9a6c695e8c066e0b328a98fa5c44527) --- .../java/org/apache/doris/common/Config.java | 17 ++ .../doris/catalog/BuiltinScalarFunctions.java | 2 + .../apache/doris/statistics/AnalysisJob.java | 15 +- .../doris/statistics/AnalysisManager.java | 27 +- .../statistics/AnalyzeSkipException.java | 39 +++ .../doris/statistics/BaseAnalysisTask.java | 117 ++++++- .../statistics/ExternalAnalysisTask.java | 1 + .../doris/statistics/OlapAnalysisTask.java | 5 + .../doris/statistics/AnalysisManagerTest.java | 96 ++++++ .../doris/statistics/HMSAnalysisTaskTest.java | 2 +- .../statistics/OlapAnalysisTaskTest.java | 122 ++++++++ .../hive/test_hive_analyze_long_string.groovy | 125 ++++++++ .../suites/statistics/analyze_stats.groovy | 2 +- .../test_analyze_long_string.groovy | 288 ++++++++++++++++++ 14 files changed, 846 insertions(+), 12 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/AnalyzeSkipException.java create mode 100644 regression-test/suites/external_table_p0/hive/test_hive_analyze_long_string.groovy create mode 100644 regression-test/suites/statistics/test_analyze_long_string.groovy diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 83f49b16175431..63d8134757f3d8 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2728,6 +2728,23 @@ public class Config extends ConfigBase { @ConfField public static int auto_analyze_simultaneously_running_task_num = 1; + @ConfField(mutable = true, masterOnly = true, description = { + "统计信息收集时 string 列允许的最大字节长度。若列中存在长度超过该值的行," + + "该列的统计信息将被跳过收集(task 仍标记为 FINISHED,在 SHOW ANALYZE 中显示跳过原因)。" + + "≤ 0 表示关闭此保护。默认 1024 (1KB)。" + + "注意:此保护只覆盖 FULL / LINEAR / DUJ1 统计收集路径(即 analyze 全表和 sample 的主 SQL)。" + + "当 enable_partition_analyze=true 时的 per-partition 路径(PARTITION_ANALYZE_TEMPLATE)" + + "出于正确性考虑不启用该保护,详见 BaseAnalysisTask 中的 NOTE。", + "Max byte length allowed for a string column when collecting statistics. " + + "If any row in a string column is longer than this value, the column's stats " + + "collection is skipped (the task is still marked FINISHED, with the skip reason " + + "shown in SHOW ANALYZE). A value <= 0 disables this protection. Default: 1024 (1KB). " + + "Note: this protection applies to the FULL / LINEAR / DUJ1 collection paths " + + "(i.e. the main SQL used by full-table and sample analyze). The per-partition path " + + "(PARTITION_ANALYZE_TEMPLATE, used when enable_partition_analyze=true) is intentionally " + + "not guarded for correctness reasons; see the NOTE in BaseAnalysisTask."}) + public static long statistics_max_string_column_length = 1024; + @Deprecated @ConfField public static final int period_analyze_simultaneously_running_task_num = 1; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java index c9ba5ccc61a6a3..99c6b77016859f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java @@ -89,6 +89,7 @@ import org.apache.doris.nereids.trees.expressions.functions.scalar.Ascii; import org.apache.doris.nereids.trees.expressions.functions.scalar.Asin; import org.apache.doris.nereids.trees.expressions.functions.scalar.Asinh; +import org.apache.doris.nereids.trees.expressions.functions.scalar.AssertTrue; import org.apache.doris.nereids.trees.expressions.functions.scalar.Atan; import org.apache.doris.nereids.trees.expressions.functions.scalar.Atan2; import org.apache.doris.nereids.trees.expressions.functions.scalar.Atanh; @@ -655,6 +656,7 @@ public class BuiltinScalarFunctions implements FunctionHelper { scalar(Ascii.class, "ascii"), scalar(Asin.class, "asin"), scalar(Asinh.class, "asinh"), + scalar(AssertTrue.class, "assert_true"), scalar(Atan.class, "atan"), scalar(Atanh.class, "atanh"), scalar(Atan2.class, "atan2"), diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java index bc43f80cbc21a0..8d0f3b892b9492 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java @@ -108,7 +108,20 @@ public void updateTaskState(AnalysisState state, String msg) { killed = true; case FINISHED: for (BaseAnalysisTask task : queryFinished) { - analysisManager.updateTaskStatus(task.info, state, msg, time); + // When flushBuffer passes an empty msg, fall back to the + // task's own info.message. This propagates a skip reason + // previously stashed by BaseAnalysisTask.handleSkip into + // the single FINISHED update for this task, so job.message + // accumulation in AnalysisManager sees it and SHOW ANALYZE + // surfaces the skip reason at the job level. + String taskMsg = msg; + if ((taskMsg == null || taskMsg.isEmpty()) && task.info != null) { + taskMsg = task.info.message; + } + if (taskMsg == null) { + taskMsg = ""; + } + analysisManager.updateTaskStatus(task.info, state, taskMsg, time); } default: // DO NOTHING diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 26bc81b985cfc7..f8d5b4cfce1c23 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -235,6 +235,18 @@ protected AnalysisInfo buildAndAssignJob(AnalyzeTableCommand command) throws Ddl syncExecute(analysisTaskInfos.values()); jobInfo.state = AnalysisState.FINISHED; updateTableStats(jobInfo); + // Sync analyze never populates analysisJobIdToTaskMap, so updateTaskStatus + // skip-message accumulation does not fire for it. Surface any per-task skip + // reasons (e.g. long-string column skip) as an OK-packet info message so + // the user still sees why a column was dropped from collection. + List skipMessages = analysisTaskInfos.values().stream() + .map(t -> t.info == null ? null : t.info.message) + .filter(m -> m != null && !m.isEmpty()) + .collect(Collectors.toList()); + if (!skipMessages.isEmpty() && ConnectContext.get() != null) { + ConnectContext.get().getState().setOk(0, skipMessages.size(), + String.join(" ", skipMessages)); + } return null; } recordAnalysisJob(jobInfo); @@ -471,7 +483,12 @@ public void updateTaskStatus(AnalysisInfo info, AnalysisState taskState, String return; } info.state = taskState; - info.message = message; + // Preserve the existing info.message when flushBuffer calls updateTaskState(FINISHED, "") + // for already-finished tasks, so that a previously-set skip message (from + // BaseAnalysisTask.handleSkip) is not wiped by the subsequent batch FINISHED update. + if (!(taskState.equals(AnalysisState.FINISHED) && StringUtils.isEmpty(message))) { + info.message = message; + } // Update the task cost time when task finished or failed. And only log the final state. if (taskState.equals(AnalysisState.FINISHED) || taskState.equals(AnalysisState.FAILED)) { info.timeCostInMs = time - info.lastExecTimeInMs; @@ -496,6 +513,14 @@ public void updateTaskStatus(AnalysisInfo info, AnalysisState taskState, String String errMessage = String.format("%s:[%s] ", info.colName, message); job.message = job.message == null ? errMessage : job.message + errMessage; } + // Accumulate a non-empty FINISHED message (e.g. long-string skip reason) into + // job.message so it is visible in SHOW ANALYZE at job level. Guard on the + // incoming message being non-empty to avoid double-counting when flushBuffer + // later calls updateTaskState(FINISHED, "") for the same already-skipped task. + if (taskState.equals(AnalysisState.FINISHED) && !StringUtils.isEmpty(message)) { + String skipMessage = String.format("%s:[%s] ", info.colName, message); + job.message = job.message == null ? skipMessage : job.message + skipMessage; + } // Set the job state to RUNNING when its first task becomes RUNNING. if (info.state.equals(AnalysisState.RUNNING) && job.state.equals(AnalysisState.PENDING)) { job.state = AnalysisState.RUNNING; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalyzeSkipException.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalyzeSkipException.java new file mode 100644 index 00000000000000..25f088e5aa4031 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalyzeSkipException.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +/** + * Control-flow signal thrown by an analysis task when it decides to skip + * statistics collection for a specific column (e.g. a string column contains + * at least one row whose byte length exceeds + * {@code Config.statistics_max_string_column_length}). + * + * This is NOT an error. The task that catches this exception should mark + * itself as FINISHED (not FAILED) and surface the skip reason via + * {@code info.message} / {@code SHOW ANALYZE}. + */ +public class AnalyzeSkipException extends RuntimeException { + + public AnalyzeSkipException(String message) { + super(message); + } + + public AnalyzeSkipException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index 71bd7feeea6e24..8ace3876b5f9d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -65,6 +65,14 @@ public abstract class BaseAnalysisTask { public static final long LIMIT_SIZE = 1024 * 1024 * 1024; // 1GB public static final double LIMIT_FACTOR = 1.2; + /** + * Marker string embedded in {@code assert_true} inside statistics collection SQL. + * When any row's string column length exceeds the configured limit, BE throws an + * error whose message contains this marker; FE detects it and converts the task + * result to a skip signal ({@link AnalyzeSkipException}) rather than a failure. + */ + public static final String ANALYZE_SKIP_LONG_STRING_COLUMN_MARKER = "ANALYZE_SKIP_LONG_STRING_COLUMN"; + protected static final String FULL_ANALYZE_TEMPLATE = "SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, " + "${catalogId} AS `catalog_id`, " @@ -81,10 +89,11 @@ public abstract class BaseAnalysisTask { + "${dataSizeFunction} AS `data_size`, " + "NOW() AS `update_time`, " + "null as `hot_value` " - + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index}"; + + "FROM (SELECT `${colName}`${lengthAssert} " + + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index}) __lc_t"; protected static final String LINEAR_ANALYZE_TEMPLATE = "WITH cte1 AS (" - + "SELECT `${colName}` " + + "SELECT `${colName}`${lengthAssert} " + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index} ${sampleHints} ${limit} ${preAggHint}), " + "cte2 AS (" + "SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, " @@ -120,7 +129,7 @@ public abstract class BaseAnalysisTask { + "(SELECT " + "${subStringColName} AS `hash_value`, " + "`${colName}` AS `col_value`, " - + "LENGTH(`${colName}`) as `len` " + + "LENGTH(`${colName}`) as `len`${lengthAssert} " + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index} ${sampleHints} ${limit}) as `t0` " + "${preAggHint} GROUP BY `t0`.`hash_value`), " + "cte2 AS ( " @@ -166,6 +175,14 @@ public abstract class BaseAnalysisTask { + "${data_size} AS `data_size`, " + "NOW() "; + // NOTE: PARTITION_ANALYZE_TEMPLATE intentionally does NOT apply the long-string + // skip guard (statistics_max_string_column_length). Partition-granularity analyze + // commits per-batch INSERTs into __partition_stats incrementally. Aborting mid-loop + // via assert_true would leave a mix of fresh and stale rows in __partition_stats + // that is non-trivial to roll back. Since partition-level statistics are seldom + // relied upon today, we accept that long-string columns are NOT protected on this + // path. Only the full / sample OLAP paths and the external-table path enforce the + // per-row byte-length ceiling. protected static final String PARTITION_ANALYZE_TEMPLATE = " SELECT " + "${catalogId} AS `catalog_id`, " + "${dbId} AS `db_id`, " @@ -181,7 +198,7 @@ public abstract class BaseAnalysisTask { + "SUBSTRING(CAST(MAX(`${colName}`) AS STRING), 1, 1024) AS `max`, " + "${dataSizeFunction} AS `data_size`, " + "NOW() AS `update_time` " - + " FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index} ${partitionInfo}"; + + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index} ${partitionInfo}"; protected static final String MERGE_PARTITION_TEMPLATE = "SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, " @@ -254,8 +271,66 @@ protected void init(AnalysisInfo info) { public void execute() throws Exception { prepareExecution(); - doExecute(); - afterExecution(); + try { + doExecute(); + } catch (AnalyzeSkipException e) { + handleSkip(e); + } catch (Exception e) { + if (containsSkipMarker(e)) { + handleSkip(new AnalyzeSkipException(buildSkipMessage(), e)); + return; + } + throw e; + } + } + + /** + * Walk the cause chain and inspect every Throwable's message for the + * long-string skip marker. More robust than only checking the root cause, + * since some execution paths wrap the BE error in an outer exception that + * reformats the message without preserving the original cause. + */ + protected static boolean containsSkipMarker(Throwable e) { + Throwable cur = e; + while (cur != null) { + String m = cur.getMessage(); + if (m != null && m.contains(ANALYZE_SKIP_LONG_STRING_COLUMN_MARKER)) { + return true; + } + cur = cur.getCause(); + } + return false; + } + + /** + * Mark this task as FINISHED with a skip message. Called when the task + * detects that the column should be skipped (e.g. long string column). + */ + private void handleSkip(AnalyzeSkipException e) { + String skipMsg = e.getMessage(); + LOG.info("Analyze task skip column [{}] in table [{}]. Reason: {}", + info.colName, tbl == null ? "?" : tbl.getName(), skipMsg); + // Stash the skip message on info.message. The job-level flushBuffer path + // (AnalysisJob.updateTaskState -> AnalysisManager.updateTaskStatus) will + // pick it up as the single FINISHED transition for this task, so we + // avoid a redundant updateTaskStatus call here. Doing the state update + // twice used to overwrite AnalysisInfo.timeCostInMs with the near-zero + // delta between the two FINISHED calls. + info.message = skipMsg; + // Route through taskDoneWithoutData: adds this task to queryFinished + // and triggers flushBuffer, which will call updateTaskState(FINISHED, + // "") exactly once per task. AnalysisJob.updateTaskState substitutes + // the task's own info.message when the outer msg is empty, so skipMsg + // reaches job.message for SHOW ANALYZE visibility. + job.taskDoneWithoutData(this); + } + + private String buildSkipMessage() { + return String.format( + "Column [%s] has row(s) whose byte length exceeds %d (Config.statistics_max_string_column_length), " + + "skip collecting statistics for this column.", + info == null ? "?" : info.colName, + org.apache.doris.common.Config.statistics_max_string_column_length); } protected void prepareExecution() { @@ -266,8 +341,6 @@ protected void prepareExecution() { protected abstract void doSample() throws Exception; - protected void afterExecution() {} - protected void setTaskStateToRunning() { Env.getCurrentEnv().getAnalysisManager() .updateTaskStatus(info, AnalysisState.RUNNING, "", System.currentTimeMillis()); @@ -498,6 +571,31 @@ protected Map buildSqlParams() { return Maps.newHashMap(); } + /** + * Populate the {@code ${lengthAssert}} placeholder into SQL params map. + * For string columns with config > 0, the placeholder expands into a per-row + * {@code , assert_true(col IS NULL OR LENGTH(col) <= N, 'marker') AS __lc} + * clause that gets inserted into the inner-most SELECT list of statistics + * collection SQL. For non-string columns or when config <= 0, the placeholder + * is an empty string so the SQL stays unchanged. + * + * Note: the {@code IS NULL OR} guard is required because Doris's + * {@code assert_true} BE function throws on NULL inputs. + */ + protected void addLengthAssertParam(Map params) { + long maxLen = org.apache.doris.common.Config.statistics_max_string_column_length; + if (col != null && col.getType().isStringType() && maxLen > 0) { + String escapedColName = StatisticsUtil.escapeColumnName(String.valueOf(info.colName)); + // The StringSubstitutor used by callers already has ${colName} populated, + // so we inline the escaped column name directly here. + params.put("lengthAssert", + ", assert_true(`" + escapedColName + "` IS NULL OR LENGTH(`" + escapedColName + "`) <= " + + maxLen + ", '" + ANALYZE_SKIP_LONG_STRING_COLUMN_MARKER + "') AS `__lc`"); + } else { + params.put("lengthAssert", ""); + } + } + protected String castToNumeric(String colName) { Type type = col.getType(); if (type.isNumericType()) { @@ -535,6 +633,9 @@ protected void runQuery(String sql) { job.appendBuf(this, Collections.singletonList(colStatsData)); } catch (Exception e) { LOG.warn("Failed to execute sql {}", sql); + if (containsSkipMarker(e)) { + throw new AnalyzeSkipException(buildSkipMessage(), e); + } throw e; } finally { if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ExternalAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ExternalAnalysisTask.java index 72beb343956253..b9a2525ac6fb90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ExternalAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ExternalAnalysisTask.java @@ -94,6 +94,7 @@ protected Map buildSqlParams() { params.put("type", col.getType().toString()); } params.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis())); + addLengthAssertParam(params); return params; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java index 187eff6d40a147..bd0c5d94f12fdf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -32,6 +32,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.SessionVariable; @@ -398,6 +399,7 @@ protected Map buildSqlParams() { params.put("tblName", String.valueOf(tbl.getName())); params.put("index", getIndex()); params.put("preAggHint", ""); + addLengthAssertParam(params); return params; } @@ -524,6 +526,9 @@ protected long getSampleRows() { * @return True for single unique key column and single distribution column. */ protected boolean useLinearAnalyzeTemplate() { + if (DebugPointUtil.isEnable("OlapAnalysisTask.useDUJ1Template")) { + return false; + } if (partitionColumnSampleTooManyRows || scanFullTable) { return true; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java index 0f37721022b09d..cb164412c4f04b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java @@ -42,6 +42,7 @@ import mockit.Mocked; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import java.util.ArrayList; import java.util.HashMap; @@ -108,6 +109,101 @@ public String toString() { Assertions.assertEquals(job.state, AnalysisState.FINISHED); } + @Test + public void testUpdateTaskStatusPreservesSkipMessage() { + // Verify that a subsequent updateTaskStatus(FINISHED, "") call (e.g. from + // flushBuffer) does NOT wipe a previously-set skip message on info.message, + // and that job.message only accumulates the skip reason once. + BaseAnalysisTask task1 = Mockito.mock(BaseAnalysisTask.class); + + AnalysisManager manager = Mockito.spy(new AnalysisManager()); + Mockito.doNothing().when(manager).logCreateAnalysisTask(Mockito.any()); + Mockito.doNothing().when(manager).logCreateAnalysisJob(Mockito.any()); + Mockito.doNothing().when(manager).updateTableStats(Mockito.any()); + + AnalysisInfo job = new AnalysisInfoBuilder().setJobId(10) + .setState(AnalysisState.PENDING).setAnalysisType(AnalysisType.FUNDAMENTALS) + .setJobType(AnalysisInfo.JobType.MANUAL).build(); + AnalysisInfo taskInfo = new AnalysisInfoBuilder().setJobId(10) + .setTaskId(11).setJobType(JobType.MANUAL).setAnalysisType(AnalysisType.FUNDAMENTALS) + .setColName("big_str").setState(AnalysisState.PENDING).build(); + manager.replayCreateAnalysisJob(job); + manager.replayCreateAnalysisTask(taskInfo); + + task1.info = taskInfo; + Map tasks = new HashMap<>(); + tasks.put(11L, task1); + manager.addToJobIdTasksMap(10, tasks); + + String skipMsg = "Column [big_str] has row(s) whose byte length exceeds 1024" + + " (Config.statistics_max_string_column_length), skip collecting statistics for this column."; + manager.updateTaskStatus(taskInfo, AnalysisState.FINISHED, skipMsg, 0); + Assertions.assertEquals(skipMsg, taskInfo.message); + Assertions.assertTrue(job.message != null && job.message.contains(skipMsg), + "expected skip msg in job.message, got: " + job.message); + String firstJobMessage = job.message; + + // Simulate flushBuffer replay: subsequent FINISHED with empty message should + // NOT wipe info.message NOR re-append skip reason. + manager.updateTaskStatus(taskInfo, AnalysisState.FINISHED, "", 0); + Assertions.assertEquals(skipMsg, taskInfo.message); + Assertions.assertEquals(firstJobMessage, job.message, + "job.message should not accumulate again on empty-message update"); + } + + @Test + public void testUpdateTaskStatusAccumulatesMultipleSkipMessages() { + // Two string columns get skipped -> job.message must contain both entries keyed + // by their respective colName, and repeated flushBuffer (FINISHED,"") replays + // must NOT duplicate them. + BaseAnalysisTask task1 = Mockito.mock(BaseAnalysisTask.class); + BaseAnalysisTask task2 = Mockito.mock(BaseAnalysisTask.class); + + AnalysisManager manager = Mockito.spy(new AnalysisManager()); + Mockito.doNothing().when(manager).logCreateAnalysisTask(Mockito.any()); + Mockito.doNothing().when(manager).logCreateAnalysisJob(Mockito.any()); + Mockito.doNothing().when(manager).updateTableStats(Mockito.any()); + + AnalysisInfo job = new AnalysisInfoBuilder().setJobId(20) + .setState(AnalysisState.PENDING).setAnalysisType(AnalysisType.FUNDAMENTALS) + .setJobType(AnalysisInfo.JobType.MANUAL).build(); + AnalysisInfo ti1 = new AnalysisInfoBuilder().setJobId(20).setTaskId(21) + .setColName("s1").setJobType(JobType.MANUAL) + .setAnalysisType(AnalysisType.FUNDAMENTALS).setState(AnalysisState.PENDING).build(); + AnalysisInfo ti2 = new AnalysisInfoBuilder().setJobId(20).setTaskId(22) + .setColName("s2").setJobType(JobType.MANUAL) + .setAnalysisType(AnalysisType.FUNDAMENTALS).setState(AnalysisState.PENDING).build(); + manager.replayCreateAnalysisJob(job); + manager.replayCreateAnalysisTask(ti1); + manager.replayCreateAnalysisTask(ti2); + task1.info = ti1; + task2.info = ti2; + Map tasks = new HashMap<>(); + tasks.put(21L, task1); + tasks.put(22L, task2); + manager.addToJobIdTasksMap(20, tasks); + + String skip1 = "Column [s1] has row(s) whose byte length exceeds 1024 ..."; + String skip2 = "Column [s2] has row(s) whose byte length exceeds 1024 ..."; + manager.updateTaskStatus(ti1, AnalysisState.FINISHED, skip1, 0); + manager.updateTaskStatus(ti2, AnalysisState.FINISHED, skip2, 0); + Assertions.assertNotNull(job.message); + Assertions.assertTrue(job.message.contains("s1:[" + skip1 + "]"), + "expected s1 skip in job.message, got: " + job.message); + Assertions.assertTrue(job.message.contains("s2:[" + skip2 + "]"), + "expected s2 skip in job.message, got: " + job.message); + String afterFirstRound = job.message; + + // Simulate flushBuffer replay with empty message for both tasks. Neither entry + // should be duplicated. + manager.updateTaskStatus(ti1, AnalysisState.FINISHED, "", 0); + manager.updateTaskStatus(ti2, AnalysisState.FINISHED, "", 0); + Assertions.assertEquals(afterFirstRound, job.message, + "job.message must remain stable across flushBuffer replays"); + Assertions.assertEquals(skip1, ti1.message); + Assertions.assertEquals(skip2, ti2.message); + } + @Test public void testRecordLimit1() { Config.analyze_record_limit = 2; diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/HMSAnalysisTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/HMSAnalysisTaskTest.java index e870b0b3bfd4e2..f629f36d07c8a7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/HMSAnalysisTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/HMSAnalysisTaskTest.java @@ -209,7 +209,7 @@ public void runQuery(String sql) { + "SUBSTRING(CAST(MIN(`hour`) AS STRING), 1, 1024) AS `min`, " + "SUBSTRING(CAST(MAX(`hour`) AS STRING), 1, 1024) AS `max`, " + "COUNT(1) * 4 AS `data_size`, NOW() AS `update_time`, " - + "null as `hot_value` FROM `hms`.`default`.`test` ", sql); + + "null as `hot_value` FROM (SELECT `hour` FROM `hms`.`default`.`test` ) __lc_t", sql); } }; diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java index fa1879db5f3bfc..b4a838852f2be0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java @@ -753,4 +753,126 @@ public void testMergePartitionSql() { + "WHERE `catalog_id` = 0 AND `db_id` = 1 AND `tbl_id` = 2 AND `idx_id` = 3 AND `col_id` = 'col1'", sql); } + + @Test + public void testAddLengthAssertParamForStringColumn() throws Exception { + // String column with positive config -> emits per-row assert_true guard + Column strCol = new Column("s", PrimitiveType.VARCHAR); + long savedLen = org.apache.doris.common.Config.statistics_max_string_column_length; + org.apache.doris.common.Config.statistics_max_string_column_length = 1024; + try { + OlapAnalysisTask task = new OlapAnalysisTask(); + AnalysisInfo info = new AnalysisInfoBuilder() + .setJobId(1L) + .setTaskId(2L) + .setColName("s") + + .build(); + task.info = info; + task.col = strCol; + Map params = Maps.newHashMap(); + task.addLengthAssertParam(params); + String lengthAssert = params.get("lengthAssert"); + Assertions.assertNotNull(lengthAssert); + Assertions.assertTrue(lengthAssert.contains("assert_true"), + "expected assert_true in placeholder, got: " + lengthAssert); + Assertions.assertTrue(lengthAssert.contains("IS NULL OR LENGTH"), + "expected NULL guard, got: " + lengthAssert); + Assertions.assertTrue(lengthAssert.contains("1024"), + "expected max length value, got: " + lengthAssert); + Assertions.assertTrue( + lengthAssert.contains(BaseAnalysisTask.ANALYZE_SKIP_LONG_STRING_COLUMN_MARKER), + "expected marker in placeholder, got: " + lengthAssert); + } finally { + org.apache.doris.common.Config.statistics_max_string_column_length = savedLen; + } + } + + @Test + public void testAddLengthAssertParamForNonStringColumn() { + // Non-string columns must emit an empty placeholder so SQL stays unchanged + Column intCol = new Column("id", PrimitiveType.INT); + OlapAnalysisTask task = new OlapAnalysisTask(); + AnalysisInfo info = new AnalysisInfoBuilder() + .setJobId(1L).setTaskId(2L).setColName("id") + .build(); + task.info = info; + task.col = intCol; + Map params = Maps.newHashMap(); + task.addLengthAssertParam(params); + Assertions.assertEquals("", params.get("lengthAssert")); + } + + @Test + public void testAddLengthAssertParamConfigDisabled() { + Column strCol = new Column("s", PrimitiveType.VARCHAR); + long savedLen = org.apache.doris.common.Config.statistics_max_string_column_length; + org.apache.doris.common.Config.statistics_max_string_column_length = 0; + try { + OlapAnalysisTask task = new OlapAnalysisTask(); + AnalysisInfo info = new AnalysisInfoBuilder() + .setJobId(1L).setTaskId(2L).setColName("s") + .build(); + task.info = info; + task.col = strCol; + Map params = Maps.newHashMap(); + task.addLengthAssertParam(params); + Assertions.assertEquals("", params.get("lengthAssert")); + } finally { + org.apache.doris.common.Config.statistics_max_string_column_length = savedLen; + } + } + + @Test + public void testFullAnalyzeTemplateRendersLengthAssert() { + // Confirm the rendered FULL_ANALYZE_TEMPLATE wraps the base table in a subquery + // and carries the assert_true clause for a string column. + Map params = new HashMap<>(); + params.put("internalDB", FeConstants.INTERNAL_DB_NAME); + params.put("columnStatTbl", StatisticConstants.TABLE_STATISTIC_TBL_NAME); + params.put("catalogId", "0"); + params.put("dbId", "1"); + params.put("tblId", "2"); + params.put("idxId", "3"); + params.put("colId", "s"); + params.put("dataSizeFunction", "100"); + params.put("catalogName", "internal"); + params.put("dbName", "db1"); + params.put("colName", "s"); + params.put("tblName", "tbl1"); + params.put("index", ""); + params.put("lengthAssert", + ", assert_true(`s` IS NULL OR LENGTH(`s`) <= 1024, '" + + BaseAnalysisTask.ANALYZE_SKIP_LONG_STRING_COLUMN_MARKER + "') AS `__lc`"); + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(BaseAnalysisTask.FULL_ANALYZE_TEMPLATE); + Assertions.assertTrue(sql.contains("FROM (SELECT `s`, assert_true("), sql); + Assertions.assertTrue(sql.contains("IS NULL OR LENGTH(`s`) <= 1024"), sql); + Assertions.assertTrue(sql.endsWith(") __lc_t"), sql); + } + + @Test + public void testFullAnalyzeTemplateRendersWithoutLengthAssert() { + // Non-string columns yield a plain inline view that Nereids can collapse. + Map params = new HashMap<>(); + params.put("internalDB", FeConstants.INTERNAL_DB_NAME); + params.put("columnStatTbl", StatisticConstants.TABLE_STATISTIC_TBL_NAME); + params.put("catalogId", "0"); + params.put("dbId", "1"); + params.put("tblId", "2"); + params.put("idxId", "3"); + params.put("colId", "id"); + params.put("dataSizeFunction", "100"); + params.put("catalogName", "internal"); + params.put("dbName", "db1"); + params.put("colName", "id"); + params.put("tblName", "tbl1"); + params.put("index", ""); + params.put("lengthAssert", ""); + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(BaseAnalysisTask.FULL_ANALYZE_TEMPLATE); + Assertions.assertFalse(sql.contains("assert_true"), sql); + Assertions.assertTrue(sql.contains("FROM (SELECT `id` FROM `internal`.`db1`.`tbl1`"), sql); + Assertions.assertTrue(sql.endsWith(") __lc_t"), sql); + } } diff --git a/regression-test/suites/external_table_p0/hive/test_hive_analyze_long_string.groovy b/regression-test/suites/external_table_p0/hive/test_hive_analyze_long_string.groovy new file mode 100644 index 00000000000000..faf8dd539ab495 --- /dev/null +++ b/regression-test/suites/external_table_p0/hive/test_hive_analyze_long_string.groovy @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.awaitility.Awaitility +import java.util.concurrent.TimeUnit + +suite("test_hive_analyze_long_string", "p0,external,hive,external_docker,external_docker_hive") { + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable Hive test.") + return + } + + // Feature only triggers when values exceed 1024 bytes; this test assumes the FE default. + def cfgRows = sql "admin show frontend config like 'statistics_max_string_column_length'" + assertEquals(1, cfgRows.size()) + assertEquals("1024", cfgRows[0][1].toString()) + + String longVal = "x" * 2048 + + for (String hivePrefix : ["hive3"]) { + setHivePrefix(hivePrefix) + String extHiveHmsHost = context.config.otherConfigs.get("externalEnvIp") + String extHiveHmsPort = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String hdfsPort = context.config.otherConfigs.get(hivePrefix + "HdfsPort") + String catalogName = hivePrefix + "_test_analyze_long_string" + String dbName = "test_analyze_long_string_db" + String tblName = "t1" + + // Seed the hive side via hive_docker so this suite is self-contained and + // does not require any pre-install hql. + hive_docker """drop table if exists ${dbName}.${tblName}""" + hive_docker """drop database if exists ${dbName} cascade""" + hive_docker """create database ${dbName}""" + hive_docker """create table ${dbName}.${tblName} (id int, s string) stored as parquet""" + hive_docker """insert into ${dbName}.${tblName} values (1, 'short'), (2, 'another'), (3, '${longVal}')""" + + sql "drop catalog if exists ${catalogName}" + sql """ + create catalog if not exists ${catalogName} properties ( + 'type'='hms', + 'hadoop.username' = 'hadoop', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}', + 'fs.defaultFS' = 'hdfs://${extHiveHmsHost}:${hdfsPort}' + ) + """ + sql "refresh catalog ${catalogName}" + + // Analyze column `s` which contains a >1024-byte row: task should be marked + // FINISHED with a skip message, and no column stats should be persisted. + def analyzeRes = sql "analyze table ${catalogName}.${dbName}.${tblName}(s)" + assertEquals(1, analyzeRes.size()) + long jobId = Long.parseLong(analyzeRes[0][0].toString()) + logger.info("hive analyze long string jobId=${jobId}") + + Awaitility.await() + .atMost(120, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .until { + def rows = sql "show analyze ${jobId}" + if (rows.isEmpty()) { + return false + } + String state = rows[0][9].toString() + return state == "FINISHED" || state == "FAILED" + } + + def jobRows = sql "show analyze ${jobId}" + assertEquals("FINISHED", jobRows[0][9].toString()) + String jobMessage = jobRows[0][7].toString() + assertTrue(jobMessage.contains("statistics_max_string_column_length") + || jobMessage.contains("exceeds"), + "expected skip reason in job message but got: ${jobMessage}") + + def taskRows = sql "show analyze task status ${jobId}" + def strTaskRow = taskRows.find { it[1].toString() == "s" } + assertNotNull(strTaskRow, "missing analyze task row for column s") + assertEquals("FINISHED", strTaskRow[6].toString()) + String taskMessage = strTaskRow[3].toString() + assertTrue(taskMessage.contains("statistics_max_string_column_length") + || taskMessage.contains("exceeds"), + "expected skip reason in task message but got: ${taskMessage}") + + // The skipped column must NOT have any stored column statistics row. + def colStats = sql "show column stats ${catalogName}.${dbName}.${tblName}(s)" + assertTrue(colStats.isEmpty(), "expected no column stats for skipped column s") + + // A non-string column on the same table should still analyze successfully. + def idAnalyze = sql "analyze table ${catalogName}.${dbName}.${tblName}(id)" + long idJobId = Long.parseLong(idAnalyze[0][0].toString()) + Awaitility.await() + .atMost(120, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .until { + def rows = sql "show analyze ${idJobId}" + if (rows.isEmpty()) { + return false + } + String state = rows[0][9].toString() + return state == "FINISHED" || state == "FAILED" + } + def idJobRows = sql "show analyze ${idJobId}" + assertEquals("FINISHED", idJobRows[0][9].toString()) + def idColStats = sql "show column stats ${catalogName}.${dbName}.${tblName}(id)" + assertEquals(1, idColStats.size()) + + sql "drop catalog if exists ${catalogName}" + hive_docker """drop table if exists ${dbName}.${tblName}""" + hive_docker """drop database if exists ${dbName} cascade""" + } +} diff --git a/regression-test/suites/statistics/analyze_stats.groovy b/regression-test/suites/statistics/analyze_stats.groovy index 7de06f23c6d7b2..46aa9adb529296 100644 --- a/regression-test/suites/statistics/analyze_stats.groovy +++ b/regression-test/suites/statistics/analyze_stats.groovy @@ -1378,7 +1378,7 @@ PARTITION `p599` VALUES IN (599) sql """ INSERT INTO `${tbl}` VALUES (-2103297891,1,101,15248,4761818404925265645,939926.283, 'UTmCFKMbprf0zSVOIlBJRNOl3JcNBdOsnCDt','2022-09-28','2022-10-28 01:56:56','tVvGDSrN6kyn', - -954349107.187117,-40.46286,'1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111g1ZP9nqVgaGKya3kPERdBofTWJQ4TIJEz972Xvw4hfPpTpWwlmondiLVTCyld7rSBlSWrE7NJRB0pvPGEFQKOx1s3', + -954349107.187117,-40.46286,'1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111', '-1559301292834325905', NULL, NULL, NULL, NULL) """ diff --git a/regression-test/suites/statistics/test_analyze_long_string.groovy b/regression-test/suites/statistics/test_analyze_long_string.groovy new file mode 100644 index 00000000000000..a234f44bad1239 --- /dev/null +++ b/regression-test/suites/statistics/test_analyze_long_string.groovy @@ -0,0 +1,288 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_analyze_long_string", "nonConcurrent") { + // `analyze ... with sync` does not persist the job in the in-memory map, + // so we use async analyze and poll SHOW ANALYZE / SHOW ANALYZE TASK STATUS. + + def findJobId = { String ctl, String db, String tbl -> + def rows = sql """show analyze""" + def match = -1L + for (row in rows) { + // columns: job_id, catalog_name, db_name, tbl_name, ... + if (row[1].toString() == ctl + && row[2].toString() == db + && row[3].toString() == tbl) { + long id = Long.parseLong(row[0].toString()) + if (id > match) { + match = id + } + } + } + return match == -1L ? null : match + } + + def waitJobFinished = { long jobId -> + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until { + def rows = sql """show analyze ${jobId}""" + if (rows.isEmpty()) { + return false + } + def state = rows[0][9].toString() + logger.info("job ${jobId} state=${state}") + return state == "FINISHED" || state == "FAILED" + } + } + + def collectTaskStatuses = { long jobId -> + def rows = sql """show analyze task status ${jobId}""" + def result = [:] + for (row in rows) { + // columns: task_id, col_name, index_name, message, + // last_state_change_time, time_cost_in_ms, state + def colName = row[1].toString() + def msg = row[3] == null ? "" : row[3].toString() + def state = row[6].toString() + result[colName] = [state: state, message: msg] + } + return result + } + + def assertTaskSkipped = { long jobId, List expectedSkipped, List expectedOk -> + def statuses = collectTaskStatuses(jobId) + logger.info("job ${jobId} task statuses=${statuses}") + for (c in expectedSkipped) { + assertTrue(statuses.containsKey(c), "missing task for skipped col ${c}") + assertEquals("FINISHED", statuses[c].state, + "expected FINISHED for skipped col ${c}, got ${statuses[c]}") + assertTrue(statuses[c].message.contains("statistics_max_string_column_length") + || statuses[c].message.contains("exceeds"), + "expected skip reason visible for col ${c}, got msg=${statuses[c].message}") + } + for (c in expectedOk) { + assertTrue(statuses.containsKey(c), "missing task for col ${c}") + assertEquals("FINISHED", statuses[c].state, + "expected FINISHED for col ${c}, got ${statuses[c]}") + assertEquals("", statuses[c].message, + "expected empty message for col ${c}, got ${statuses[c].message}") + } + } + + def collectedColumns = { String tbl -> + def rows = sql """show column stats ${tbl}""" + def set = [] as Set + for (r in rows) { + set.add(r[0].toString()) + } + return set + } + + sql """drop database if exists test_analyze_long_string""" + sql """create database test_analyze_long_string""" + sql """use test_analyze_long_string""" + sql """set global enable_auto_analyze=false""" + + // ---------- Case 1: full analyze on a non-partitioned table ---------- + sql """drop table if exists test_analyze_long_string_full""" + sql """ + CREATE TABLE test_analyze_long_string_full ( + `id` bigint, + `name` varchar(100), + `small_str` varchar(100), + `fixed` char(16), + `big_str` string + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`id`) BUCKETS 4 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + sql """insert into test_analyze_long_string_full values(1, 'alice', 'aaa', 'abc', NULL)""" + sql """insert into test_analyze_long_string_full values(2, NULL, 'bbb', 'defg', repeat('y', 100))""" + sql """insert into test_analyze_long_string_full values(3, 'charlie', 'ccc', 'hij', repeat('z', 2048))""" + + setFeConfigTemporary([statistics_max_string_column_length: 1024]) { + sql """analyze table test_analyze_long_string_full""" + def jobId = findJobId("internal", "test_analyze_long_string", "test_analyze_long_string_full") + assertNotNull(jobId, "must find analyze job for test_analyze_long_string_full") + waitJobFinished(jobId) + assertTaskSkipped(jobId, ["big_str"], ["id", "name", "small_str", "fixed"]) + def collected = collectedColumns("test_analyze_long_string_full") + logger.info("test_analyze_long_string_full collected=${collected}") + assertFalse(collected.contains("big_str"), + "big_str should be skipped, got ${collected}") + assertTrue(collected.contains("id")) + assertTrue(collected.contains("name")) + assertTrue(collected.contains("small_str")) + assertTrue(collected.contains("fixed")) + def bigRows = sql """show column stats test_analyze_long_string_full (big_str)""" + assertEquals(0, bigRows.size(), + "expected no stats row for skipped column big_str, got ${bigRows}") + } + + // ---------- Case 2: disabled (limit <= 0) must collect everything ---------- + setFeConfigTemporary([statistics_max_string_column_length: 0]) { + sql """analyze table test_analyze_long_string_full with sync""" + def bigRows = sql """show column stats test_analyze_long_string_full (big_str)""" + assertEquals(1, bigRows.size(), + "protection disabled: big_str should be collected, got ${bigRows}") + assertEquals("3.0", bigRows[0][2].toString()) + } + + // ---------- Case 3: sample analyze (LINEAR / DUJ1 paths) ---------- + sql """drop table if exists test_analyze_long_string_sample""" + sql """ + CREATE TABLE test_analyze_long_string_sample ( + `id` bigint, + `small_str` varchar(100), + `big_str` string + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + sql """insert into test_analyze_long_string_sample values(1, 'aa', repeat('z', 2048))""" + sql """insert into test_analyze_long_string_sample values(2, 'bb', 'short1')""" + sql """insert into test_analyze_long_string_sample values(3, 'cc', 'short2')""" + sql """insert into test_analyze_long_string_sample values(4, 'dd', 'short3')""" + sql """insert into test_analyze_long_string_sample values(5, 'ee', 'short4')""" + + setFeConfigTemporary([statistics_max_string_column_length: 1024]) { + sql """analyze table test_analyze_long_string_sample with sample percent 100""" + def jobId = findJobId("internal", "test_analyze_long_string", "test_analyze_long_string_sample") + assertNotNull(jobId, "must find sample analyze job for test_analyze_long_string_sample") + waitJobFinished(jobId) + assertTaskSkipped(jobId, ["big_str"], ["id", "small_str"]) + def collected = collectedColumns("test_analyze_long_string_sample") + assertFalse(collected.contains("big_str"), "sample analyze: big_str should be skipped") + assertTrue(collected.contains("small_str")) + } + + // ---------- Case 4: partitioned table — long-string guard does NOT apply ---------- + // Partition-granularity analyze uses PARTITION_ANALYZE_TEMPLATE and commits per-batch + // inserts into __partition_stats incrementally. To avoid leaving a mix of fresh and + // stale partition rows on mid-loop abort, the long-string skip guard is intentionally + // NOT wired into this path. Analyze should FINISH normally and produce statistics + // for big_str even when rows exceed statistics_max_string_column_length. + sql """drop table if exists test_analyze_long_string_part""" + sql """ + CREATE TABLE test_analyze_long_string_part ( + `d` date NOT NULL, + `id` bigint, + `big_str` string + ) ENGINE=OLAP + DUPLICATE KEY(`d`) + PARTITION BY RANGE(`d`) ( + PARTITION p1 VALUES LESS THAN ('2024-01-02'), + PARTITION p2 VALUES LESS THAN ('2024-01-03') + ) + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + sql """insert into test_analyze_long_string_part values('2024-01-01', 1, 'short_a'), + ('2024-01-01', 2, repeat('x', 2048)), + ('2024-01-02', 3, 'short_b')""" + + setFeConfigTemporary([statistics_max_string_column_length: 1024]) { + sql """set global enable_partition_analyze = true""" + try { + sql """analyze table test_analyze_long_string_part""" + def jobId = findJobId("internal", "test_analyze_long_string", "test_analyze_long_string_part") + assertNotNull(jobId, "must find analyze job for test_analyze_long_string_part") + waitJobFinished(jobId) + // No task should be skipped: partition path bypasses the long-string guard. + assertTaskSkipped(jobId, [], ["d", "id", "big_str"]) + def collected = collectedColumns("test_analyze_long_string_part") + assertTrue(collected.contains("big_str"), + "partition analyze should still collect big_str (guard does not apply)") + assertTrue(collected.contains("d")) + assertTrue(collected.contains("id")) + } finally { + sql """set global enable_partition_analyze = false""" + } + } + + // ---------- Case 5: sample analyze forced onto DUJ1 template ---------- + // useLinearAnalyzeTemplate() normally returns true on small tables (scanFullTable), + // so LINEAR covers the sample path. Use a FE debug point to force DUJ1 so its + // ${lengthAssert} injection is exercised end-to-end. + sql """drop table if exists test_analyze_long_string_duj1""" + sql """ + CREATE TABLE test_analyze_long_string_duj1 ( + `id` bigint, + `small_str` varchar(100), + `big_str` string + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + sql """insert into test_analyze_long_string_duj1 values(1, 'aa', repeat('z', 2048))""" + sql """insert into test_analyze_long_string_duj1 values(2, 'bb', 'short1')""" + sql """insert into test_analyze_long_string_duj1 values(3, 'cc', 'short2')""" + sql """insert into test_analyze_long_string_duj1 values(4, 'dd', 'short3')""" + sql """insert into test_analyze_long_string_duj1 values(5, 'ee', 'short4')""" + + setFeConfigTemporary([statistics_max_string_column_length: 1024]) { + GetDebugPoint().enableDebugPointForAllFEs('OlapAnalysisTask.useDUJ1Template') + try { + sql """analyze table test_analyze_long_string_duj1 with sample rows 3""" + def jobId = findJobId("internal", "test_analyze_long_string", "test_analyze_long_string_duj1") + assertNotNull(jobId, "must find analyze job for test_analyze_long_string_duj1") + waitJobFinished(jobId) + assertTaskSkipped(jobId, ["big_str"], ["id", "small_str"]) + def collected = collectedColumns("test_analyze_long_string_duj1") + assertFalse(collected.contains("big_str"), + "DUJ1 analyze: big_str should be skipped") + assertTrue(collected.contains("small_str")) + } finally { + GetDebugPoint().disableDebugPointForAllFEs('OlapAnalysisTask.useDUJ1Template') + } + } + + // ---------- Case 6: WITH SYNC — skipped column must have no stats ---------- + // Sync analyze never populates analysisJobIdToTaskMap, so SHOW ANALYZE cannot see + // the skip reason. AnalysisManager.buildAndAssignJob surfaces skip messages via + // ConnectContext OK-packet info for interactive visibility. We cannot easily read + // that info string from JDBC in regression, so we verify the functional outcome: + // the skipped column produces no row in column_statistics, while other columns do. + sql """drop table if exists test_analyze_long_string_full""" + sql """ + CREATE TABLE test_analyze_long_string_full ( + `id` bigint, + `name` varchar(100), + `big_str` string + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`id`) BUCKETS 4 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + sql """insert into test_analyze_long_string_full values(1, 'alice', NULL)""" + sql """insert into test_analyze_long_string_full values(2, 'bob', repeat('y', 100))""" + sql """insert into test_analyze_long_string_full values(3, 'chris', repeat('z', 2048))""" + setFeConfigTemporary([statistics_max_string_column_length: 1024]) { + sql """analyze table test_analyze_long_string_full with sync""" + def bigRows = sql """show column stats test_analyze_long_string_full (big_str)""" + assertEquals(0, bigRows.size(), + "WITH SYNC: big_str should be skipped, got ${bigRows}") + def nameRows = sql """show column stats test_analyze_long_string_full (name)""" + assertEquals(1, nameRows.size(), + "WITH SYNC: name should be collected, got ${nameRows}") + } + + // ---------- Cleanup ---------- + sql """drop database if exists test_analyze_long_string""" +}