Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ebeb67a
[refactor](be) Simplify file reader schema layout
suxiaogang223 Jun 3, 2026
4214525
[chore](be) Clarify file block layout comments
suxiaogang223 Jun 3, 2026
a3f9c28
[refactor](be) Remove parquet shape-only reader
suxiaogang223 Jun 3, 2026
f494b9d
[refactor](be) Clarify parquet map reader shape
suxiaogang223 Jun 3, 2026
4dff7bd
[refactor](be) Share parquet map aligned stream logic
suxiaogang223 Jun 3, 2026
0a50a4b
[refactor](be) Extract parquet map list value helper
suxiaogang223 Jun 3, 2026
a15d20d
[doc](be) Mark parquet map refactor phase complete
suxiaogang223 Jun 3, 2026
2ce3c00
[refactor](be) Inline parquet list level assembly
suxiaogang223 Jun 3, 2026
d6d4be7
[refactor](be) Remove unused parquet list sinks
suxiaogang223 Jun 3, 2026
56e1686
[refactor](be) Simplify parquet nested scalar batch state
suxiaogang223 Jun 3, 2026
66ef662
[refactor](be) Localize parquet map repeated assembly
suxiaogang223 Jun 3, 2026
7af142c
[refactor](be) Omit dense parquet nested value indices
suxiaogang223 Jun 3, 2026
15e9130
[doc](be) Clean up completed refactoring documents, keep only remaini…
suxiaogang223 Jun 3, 2026
f90ff6f
[refactor](be) Clarify parquet list level traversal
suxiaogang223 Jun 3, 2026
cb9157b
[doc](be) Update parquet reader layering status
suxiaogang223 Jun 3, 2026
6dc3259
[doc](be) Document parquet page-level skip plan
suxiaogang223 Jun 3, 2026
646e5b5
[doc](be) Replace layering doc with focused task plans
suxiaogang223 Jun 3, 2026
4261332
[refactor](be) Use cursor for parquet nested scalar values
suxiaogang223 Jun 3, 2026
739bdc9
[fix](be) Fix nested scalar cursor null check
suxiaogang223 Jun 3, 2026
a8ab168
[doc](be) Remove completed parquet cursor refactor plan
suxiaogang223 Jun 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 0 additions & 94 deletions be/src/format/new_parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@

#include "core/assert_cast.h"
#include "core/block/block.h"
#include "core/data_type/data_type_array.h"
#include "core/data_type/data_type_map.h"
#include "core/data_type/data_type_nullable.h"
#include "core/data_type/data_type_struct.h"
#include "format/new_parquet/parquet_column_schema.h"
#include "format/new_parquet/parquet_file_context.h"
#include "format/new_parquet/parquet_scan.h"
Expand Down Expand Up @@ -93,96 +89,6 @@ void ParquetReader::_fill_schema_field(const ParquetColumnSchema& column_schema,
}
}

Status ParquetReader::_fill_projected_schema_field(const ParquetColumnSchema& column_schema,
const reader::FieldProjection* projection,
reader::SchemaField* field) const {
if (field == nullptr) {
return Status::InvalidArgument("projected schema field is null");
}
_fill_schema_field(column_schema, field);
if (projection == nullptr || projection->project_all_children ||
column_schema.children.empty()) {
return Status::OK();
}

field->children.clear();
std::map<int32_t, const reader::FieldProjection*> child_projection_by_idx;
for (const auto& child_projection : projection->children) {
child_projection_by_idx.emplace(child_projection.field_id, &child_projection);
}

DataTypes child_types;
Strings child_names;
for (size_t child_idx = 0; child_idx < column_schema.children.size(); ++child_idx) {
auto it = child_projection_by_idx.find(static_cast<int32_t>(child_idx));
if (it == child_projection_by_idx.end()) {
continue;
}
if (it->second->field_id != column_schema.children[child_idx]->field_id) {
return Status::InvalidArgument("Invalid parquet projection field_id for column {}",
column_schema.children[child_idx]->name);
}
reader::SchemaField child_field;
RETURN_IF_ERROR(_fill_projected_schema_field(*column_schema.children[child_idx], it->second,
&child_field));
child_types.push_back(child_field.type);
child_names.push_back(child_field.name);
field->children.push_back(std::move(child_field));
}

if (field->children.empty()) {
return Status::NotSupported("Parquet projection for column {} contains no children",
column_schema.name);
}

const auto primitive_type = remove_nullable(column_schema.type)->get_primitive_type();
DataTypePtr projected_type;
switch (primitive_type) {
case TYPE_STRUCT:
projected_type = std::make_shared<DataTypeStruct>(child_types, child_names);
break;
case TYPE_ARRAY:
DORIS_CHECK(child_types.size() == 1);
projected_type = std::make_shared<DataTypeArray>(child_types[0]);
break;
case TYPE_MAP:
DORIS_CHECK(child_types.size() == 1);
DORIS_CHECK(remove_nullable(child_types[0])->get_primitive_type() == TYPE_STRUCT);
DORIS_CHECK(remove_nullable(column_schema.type)->get_primitive_type() == TYPE_MAP);
{
const auto* entry_type =
assert_cast<const DataTypeStruct*>(remove_nullable(child_types[0]).get());
DORIS_CHECK(entry_type->get_elements().size() == 1 ||
entry_type->get_elements().size() == 2);
const auto value_idx = entry_type->get_elements().size() == 1 ? 0 : 1;
projected_type = std::make_shared<DataTypeMap>(
assert_cast<const DataTypeMap*>(remove_nullable(column_schema.type).get())
->get_key_type(),
entry_type->get_element(value_idx));
}
break;
default:
return Status::InvalidArgument("Cannot project children from non-complex parquet column {}",
column_schema.name);
}
field->type =
column_schema.type->is_nullable() ? make_nullable(projected_type) : projected_type;
return Status::OK();
}

Status ParquetReader::_get_projected_schema_field(reader::ColumnId file_column_id,
const reader::FieldProjection* projection,
reader::SchemaField* field) const {
if (file_column_id < 0 ||
file_column_id >= static_cast<reader::ColumnId>(_state->file_schema.size())) {
return Status::InvalidArgument("Invalid parquet field id {}", file_column_id);
}
RETURN_IF_ERROR(
_fill_projected_schema_field(*_state->file_schema[file_column_id], projection, field));
field->id = file_column_id;
return Status::OK();
}

ParquetReader::ParquetReader(std::shared_ptr<io::FileSystemProperties>& system_properties,
std::unique_ptr<io::FileDescription>& file_description,
std::shared_ptr<io::IOContext> io_ctx, RuntimeProfile* profile)
Expand Down
6 changes: 0 additions & 6 deletions be/src/format/new_parquet/parquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,6 @@ class ParquetReader : public reader::FileReader {
};
void _fill_schema_field(const ParquetColumnSchema& column_schema,
reader::SchemaField* field) const;
Status _fill_projected_schema_field(const ParquetColumnSchema& column_schema,
const reader::FieldProjection* projection,
reader::SchemaField* field) const;
Status _get_projected_schema_field(reader::ColumnId file_column_id,
const reader::FieldProjection* projection,
reader::SchemaField* field) const;

std::unique_ptr<ParquetReaderScanState> _state;
ParquetProfile _parquet_profile;
Expand Down
41 changes: 11 additions & 30 deletions be/src/format/new_parquet/reader/arrow_leaf_reader_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include <algorithm>
#include <exception>
#include <limits>
#include <memory>
#include <vector>

Expand Down Expand Up @@ -216,6 +217,8 @@ Status read_nested_leaf_batch(const ArrowLeafReaderContext& context, int64_t bat
context.column_name());
}
*batch = NestedScalarBatch();
batch->value_slot_definition_level = value_slot_definition_level;
batch->value_slot_repetition_level = value_slot_repetition_level;

::parquet::internal::RecordReader* record_reader = nullptr;
RETURN_IF_ERROR(read_leaf_records(context, batch_rows, &record_reader, &batch->records_read));
Expand All @@ -225,24 +228,22 @@ Status read_nested_leaf_batch(const ArrowLeafReaderContext& context, int64_t bat
context.column_name());
}
batch->levels_written = record_reader->levels_position();
batch->values_written = record_reader->values_written();
const int64_t values_written = record_reader->values_written();
if (batch->levels_written > record_reader->levels_written()) {
return Status::Corruption(
"Invalid nested parquet level position for column {}: position={}, levels={}",
context.column_name(), batch->levels_written, record_reader->levels_written());
}
if (batch->levels_written == 0 && batch->records_read > 0 &&
batch->values_written == batch->records_read &&
context.descriptor->max_definition_level() == 0 &&
values_written == batch->records_read && context.descriptor->max_definition_level() == 0 &&
context.descriptor->max_repetition_level() == 0) {
batch->levels_written = batch->records_read;
}
if (batch->levels_written < batch->records_read || batch->values_written < 0 ||
batch->values_written > batch->levels_written) {
if (batch->levels_written < batch->records_read || values_written < 0 ||
values_written > batch->levels_written) {
return Status::Corruption(
"Invalid nested parquet read result for column {}: rows={}, levels={}, values={}",
context.column_name(), batch->records_read, batch->levels_written,
batch->values_written);
context.column_name(), batch->records_read, batch->levels_written, values_written);
}
if (batch->levels_written == 0) {
return Status::OK();
Expand Down Expand Up @@ -275,33 +276,13 @@ Status read_nested_leaf_batch(const ArrowLeafReaderContext& context, int64_t bat
std::copy(rep_levels, rep_levels + batch->levels_written, batch->rep_levels.begin());
}

batch->value_indices.resize(static_cast<size_t>(batch->levels_written), -1);
int64_t value_idx = 0;
const bool dense_value_slots = batch->values_written == batch->levels_written;
for (int64_t level_idx = 0; level_idx < batch->levels_written; ++level_idx) {
if (batch->def_levels[level_idx] < value_slot_definition_level ||
batch->rep_levels[level_idx] > value_slot_repetition_level) {
continue;
}
if (dense_value_slots) {
batch->value_indices[level_idx] = level_idx;
} else {
if (value_idx >= batch->values_written) {
return Status::Corruption(
"Nested parquet reader returned fewer values than definition levels for "
"column {}",
context.column_name());
}
batch->value_indices[level_idx] = value_idx++;
}
}
const auto value_type = remove_nullable(context.data_type());
batch->values_column = value_type->create_column();
if (batch->values_written > 0) {
if (values_written > 0) {
ArrowLeafReaderContext value_context = context;
value_context.type = &value_type;
RETURN_IF_ERROR(append_leaf_values(value_context, *record_reader, batch->values_written,
nullptr, batch->values_column));
RETURN_IF_ERROR(append_leaf_values(value_context, *record_reader, values_written, nullptr,
batch->values_column));
}
return Status::OK();
}
Expand Down
5 changes: 1 addition & 4 deletions be/src/format/new_parquet/reader/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
#include "format/new_parquet/parquet_column_schema.h"
#include "format/new_parquet/reader/list_column_reader.h"
#include "format/new_parquet/reader/map_column_reader.h"
#include "format/new_parquet/reader/row_position_column_reader.h"
#include "format/new_parquet/reader/scalar_column_reader.h"
#include "format/new_parquet/reader/shape_only_column_reader.h"
#include "format/new_parquet/reader/struct_column_reader.h"
#include "format/reader/file_reader.h"

Expand Down Expand Up @@ -271,9 +271,6 @@ Status ParquetColumnReaderFactory::create_struct_column_reader(
projected_child_types.push_back(child_reader->type());
projected_child_names.push_back(child_reader->name());
} else {
if (child_schema->kind != ParquetColumnSchemaKind::PRIMITIVE) {
child_reader = std::make_unique<ShapeOnlyColumnReader>(std::move(child_reader));
}
child_output_indices.push_back(-1);
}
child_readers.push_back(std::move(child_reader));
Expand Down
Loading
Loading