branch-4.1 cherry-pick [feat](format) support native format#61286
branch-4.1 cherry-pick [feat](format) support native format#61286yiguolei merged 1 commit intoapache:branch-4.1from
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
There was a problem hiding this comment.
Pull request overview
Cherry-picks upstream work to add Doris Native (“native”) file format support across FE/BE for export/outfile and scanning, with specific improvements for VARIANT schema properties.
Changes:
- Add
FORMAT_NATIVEto file format enums/properties and wire it through FE utilities and Nereids load scanning. - Implement BE native writer/reader (
VNativeTransformer,NativeReader) and integrate with file result writer + scanner + internal service schema fetch. - Add regression and BE unit tests covering native round-trip and S3 OUTFILE/export flows.
Reviewed changes
Copilot reviewed 21 out of 22 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| regression-test/suites/export_p0/test_export_variant_10k_columns.groovy | New regression test for exporting/loading wide VARIANT data using native format |
| regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy | New OUTFILE-to-S3 + S3 TVF queryback regression test for native |
| regression-test/data/export_p0/outfile/native/test_outfile_native.out | Expected output for the OUTFILE native regression test |
| gensrc/thrift/PlanNodes.thrift | Adds FORMAT_NATIVE to TFileFormatType |
| fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java | Preserves VARIANT properties when reconstructing types from external schema |
| fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java | Treats native like arrow for using real column types during load |
| fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/NativeFileFormatProperties.java | New file format properties class for native |
| fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java | Registers native in file format factory |
| fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java | Maps "native" format name to FORMAT_NATIVE |
| fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java | Adds "native" constant |
| be/test/vec/exec/format/native/native_reader_writer_test.cpp | New BE UT suite covering native read/write, schema probing, and many types |
| be/src/vec/sink/writer/vfile_result_writer.cpp | Adds native writer support and format name mapping |
| be/src/vec/runtime/vnative_transformer.h / .cpp | Implements native file writer with header + block framing |
| be/src/vec/functions/cast/cast_to_variant.h | Refactors variant casting logic and improves wrapper behavior |
| be/src/vec/functions/cast/cast_to_jsonb.h | Comment typo fix |
| be/src/vec/exec/scan/file_scanner.cpp | Adds native reader integration and enables casting with FunctionContext |
| be/src/vec/exec/format/native/native_reader.h / .cpp | Implements native file reader with header validation + schema probing |
| be/src/vec/exec/format/native/native_format.h | Defines file header/constants and documents native layout |
| be/src/service/internal_service.cpp | Adds native format handling for fetch_table_schema |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| buff->assign(len, '\0'); | ||
| Slice data_slice(buff->data(), len); | ||
| bytes_read = 0; | ||
| RETURN_IF_ERROR(_file_reader->read_at(_current_offset, data_slice, &bytes_read)); | ||
| if (bytes_read != len) { | ||
| return Status::InternalError( | ||
| "Failed to read native block body from file {}, expect {}, " | ||
| "actual {}", | ||
| _scan_range.path, len, bytes_read); | ||
| } |
There was a problem hiding this comment.
NativeReader allocates len bytes directly from the file-provided block length without validating it against remaining file size (or any upper bound). A malformed/corrupt file can trigger huge allocations (OOM) before read_at fails. Add a validation before buff->assign(...), e.g. ensure len <= (_file_size - _current_offset) and optionally enforce a reasonable maximum native block size to protect BE.
| reader = vectorized::NativeReader::create_unique(profile.get(), params, range, &io_ctx, | ||
| nullptr); |
There was a problem hiding this comment.
NativeReader is constructed with RuntimeState* state = nullptr here, but NativeReader::init_reader() passes _state into FileFactory::get_reader_options(_state, ...). If get_reader_options (or other downstream logic) assumes non-null, this will crash during schema fetch. Pass a valid RuntimeState (if available in this RPC path) or update NativeReader/FileFactory usage to safely handle a null state for schema-only operations.
| reader = vectorized::NativeReader::create_unique(profile.get(), params, range, &io_ctx, | |
| nullptr); | |
| RuntimeState dummy_state; | |
| reader = vectorized::NativeReader::create_unique(profile.get(), params, range, &io_ctx, | |
| &dummy_state); |
| // - Detects the optional file header by checking the first 8 bytes against DORIS_NATIVE_MAGIC. | ||
| // - If the header is present, it skips 12 bytes and then starts reading blocks as | ||
| // [uint64_t block_size][PBlock bytes]... | ||
| // - If the header is absent (legacy files), it starts reading blocks from offset 0. |
There was a problem hiding this comment.
The documentation says the header is optional and NativeReader can read legacy files without it, but validate_and_consume_header() currently requires the magic header and errors if it is absent. Either implement the documented backward-compatible behavior (detect header presence and fall back to offset 0) or update this comment to match the actual requirement.
| // - Detects the optional file header by checking the first 8 bytes against DORIS_NATIVE_MAGIC. | |
| // - If the header is present, it skips 12 bytes and then starts reading blocks as | |
| // [uint64_t block_size][PBlock bytes]... | |
| // - If the header is absent (legacy files), it starts reading blocks from offset 0. | |
| // - Expects a file header at the beginning of the file and verifies the first 8 bytes | |
| // against DORIS_NATIVE_MAGIC. | |
| // - After validating the header, it skips 12 bytes and then starts reading blocks as | |
| // [uint64_t block_size][PBlock bytes]... |
| def value_type = "VARIANT<PROPERTIES (\"variant_max_subcolumns_count\" = \"2048\")>" | ||
| if (new Random().nextInt(2) == 0) { | ||
| value_type = "text" | ||
| } |
There was a problem hiding this comment.
This introduces randomness into a P0 regression test, which can make CI runs flaky and non-reproducible (schema differs between runs). Make the choice deterministic (e.g., run both cases explicitly in separate steps / separate suites, or base it on a fixed seed) so failures are reproducible.
| def waiting_export = { export_label -> | ||
| while (true) { | ||
| def res = sql """ show export where label = "${export_label}" """ | ||
| logger.info("export state: " + res[0][2]) | ||
| if (res[0][2] == "FINISHED") { | ||
| def json = parseJson(res[0][11]) | ||
| assert json instanceof List | ||
| // assertEquals("1", json.fileNumber[0][0]) | ||
| log.info("outfile_path: ${json.url[0][0]}") | ||
| return json.url[0][0]; | ||
| } else if (res[0][2] == "CANCELLED") { | ||
| throw new IllegalStateException("""export failed: ${res[0][10]}""") | ||
| } else { | ||
| sleep(5000) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The export wait loop has no timeout, so a stuck export can hang the suite indefinitely. Add a maximum wait time (or max iterations) and fail with a clear message when exceeded; alternatively reuse Awaitility here (similar to the load wait later) for consistent timeouts.
| // Insert 10 rows of test data (the last row is all NULL) | ||
| StringBuilder sb = new StringBuilder() | ||
| int i = 1 | ||
| for (; i < 10000; i ++) { | ||
| sb.append(""" | ||
| (${i}, '2024-01-01', '2024-01-01 00:00:00', 's${i}', ${i}, ${i % 128}, true, ${i}.${i}), | ||
| """) | ||
| } | ||
| sb.append(""" | ||
| (${i}, '2024-01-01', '2024-01-01 00:00:00', NULL, NULL, NULL, NULL, NULL) | ||
| """) | ||
| sql """ INSERT INTO ${tableName} VALUES ${sb.toString()} """ |
There was a problem hiding this comment.
The comment says 'Insert 10 rows', but the loop inserts 9,999 rows plus a final row (10,000 total). Update the comment to match the actual data volume so future readers understand the test’s intent and runtime characteristics.
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| import org.codehaus.groovy.runtime.IOGroovyMethods |
There was a problem hiding this comment.
This import is not used anywhere in the file. Removing it reduces noise and avoids misleading readers about expected I/O helpers.
| import org.codehaus.groovy.runtime.IOGroovyMethods |
| // Currently no predefined fields are carried in PTypeNode for VARIANT, so use empty list and default | ||
| // values for other properties. | ||
| type = new VariantType(new ArrayList<>(), maxSubcolumns, | ||
| /*enableTypedPathsToSparse*/ false, | ||
| /*variantMaxSparseColumnStatisticsSize*/ 10000, | ||
| /*variantSparseHashShardCount*/ 0); |
There was a problem hiding this comment.
This constructs VariantType with several hard-coded magic values (e.g. variantMaxSparseColumnStatisticsSize = 10000, shard count 0). If defaults/configs change elsewhere, ExternalFileTVF may diverge from normal VARIANT semantics. Prefer using a centralized factory/default-builder (or Config-backed defaults) so only variant_max_subcolumns_count is overridden here, and other properties stay consistent with the rest of FE/BE.
| // Currently no predefined fields are carried in PTypeNode for VARIANT, so use empty list and default | |
| // values for other properties. | |
| type = new VariantType(new ArrayList<>(), maxSubcolumns, | |
| /*enableTypedPathsToSparse*/ false, | |
| /*variantMaxSparseColumnStatisticsSize*/ 10000, | |
| /*variantSparseHashShardCount*/ 0); | |
| // Currently no predefined fields are carried in PTypeNode for VARIANT, so use empty list and | |
| // Config-backed defaults for other properties to keep semantics consistent across FE/BE. | |
| type = new VariantType(new ArrayList<>(), maxSubcolumns, | |
| Config.enable_typed_paths_to_sparse, | |
| Config.variant_max_sparse_column_statistics_size, | |
| Config.variant_sparse_hash_shard_count); |
| DCHECK(_state != nullptr); | ||
| auto ctx = FunctionContext::create_context(_state, {}, {}); | ||
| RETURN_IF_ERROR( | ||
| func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size())); | ||
| func_cast->execute(ctx.get(), *_src_block_ptr, {idx}, idx, arg.column->size())); |
There was a problem hiding this comment.
A new FunctionContext is created for every column cast inside the loop. This can add overhead for wide schemas (and native/arrow explicitly targets real typed columns). Consider creating a single FunctionContext once per _cast_to_input_block invocation and reusing it for all casts in the loop, unless specific casts require isolated contexts.
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage |
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
cherry-pick #58711