From 6da7fa1c78e44aff9dcc20d32bb86cbf8543a78e Mon Sep 17 00:00:00 2001 From: Xin Liao Date: Tue, 3 Mar 2026 17:12:10 +0800 Subject: [PATCH] [fix](stream-load) Fix LZ4 compressed stream load decompress made no progress error (#60852) Problem Summary: When LZ4F_decompress produces output from its internal buffer (tmpOut) without consuming new input, input_read_bytes is 0 but decompressed_len is positive. The original condition only checked input_read_bytes == 0 and incorrectly treated this as "no progress", returning an error. Add decompressed_len == 0 check to the condition so that real progress (output produced from decompressor internal buffer) is not mistakenly flagged as a stall. Also add a debug point to shrink the output buffer for regression testing, and a regression test that uses it to reproduce the bug scenario. --- .../new_plain_text_line_reader.cpp | 9 +- .../test_json_lz4_decompress_progress.groovy | 84 +++++++++++++++++++ 2 files changed, 92 insertions(+), 1 deletion(-) create mode 100644 regression-test/suites/load_p0/stream_load/test_json_lz4_decompress_progress.groovy diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp index 275d2add8b36ac..5cad41dc22c705 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp @@ -232,6 +232,13 @@ NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile, _decompress_timer(nullptr) { _bytes_decompress_counter = ADD_COUNTER(_profile, "BytesDecompressed", TUnit::BYTES); _decompress_timer = ADD_TIMER(_profile, "DecompressTime"); + + DBUG_EXECUTE_IF("NewPlainTextLineReader.shrink_output_buf", { + size_t new_size = dp->param("output_buf_size", 64 * 1024); + delete[] _output_buf; + _output_buf = new uint8_t[new_size]; + _output_buf_size = new_size; + }); } NewPlainTextLineReader::~NewPlainTextLineReader() { @@ -467,7 +474,7 @@ Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool COUNTER_UPDATE(_bytes_decompress_counter, decompressed_len); // TODO(cmy): watch this case - if ((input_read_bytes == 0 /*decompressed_len == 0*/) && _more_input_bytes == 0 && + if (input_read_bytes == 0 && decompressed_len == 0 && _more_input_bytes == 0 && _more_output_bytes == 0) { // decompress made no progress, may be // A. input data is not enough to decompress data to output diff --git a/regression-test/suites/load_p0/stream_load/test_json_lz4_decompress_progress.groovy b/regression-test/suites/load_p0/stream_load/test_json_lz4_decompress_progress.groovy new file mode 100644 index 00000000000000..ed69c957d0b43b --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_json_lz4_decompress_progress.groovy @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_json_lz4_decompress_progress", "p0,nonConcurrent") { + def tableName = "test_lz4_decompress_progress" + def debugPoint = "NewPlainTextLineReader.shrink_output_buf" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} + ( + k00 INT NOT NULL, + k01 DATE NOT NULL, + k02 BOOLEAN NULL, + k03 TINYINT NULL, + k04 SMALLINT NULL, + k05 INT NULL, + k06 BIGINT NULL, + k07 LARGEINT NULL, + k08 FLOAT NULL, + k09 DOUBLE NULL, + k10 DECIMAL(9,1) NULL, + k11 DECIMALV3(9,1) NULL, + k12 DATETIME NULL, + k13 DATEV2 NULL, + k14 DATETIMEV2 NULL, + k15 CHAR NULL, + k16 VARCHAR NULL, + k17 STRING NULL, + k18 JSON NULL + + ) + DUPLICATE KEY(k00) + DISTRIBUTED BY HASH(k00) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + + try { + // Shrink output buffer to 2KB so that the decompressed LZ4 block + // (6.7KB) exceeds the buffer, triggering tmpOut internal buffering. + // Without the fix, this causes "decompress made no progress" error. + GetDebugPoint().enableDebugPointForAllBEs(debugPoint, [output_buf_size: 2048]) + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'format', 'csv' + set 'compress_type', 'LZ4' + + file "basic_data.csv.lz4" + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(0, json.NumberFilteredRows) + } + } + } finally { + GetDebugPoint().disableDebugPointForAllBEs(debugPoint) + sql """ DROP TABLE IF EXISTS ${tableName} """ + } +}