Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions src/DataTypes/NestedUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,8 @@ std::string concatenateName(const std::string & nested_table_name, const std::st
*/
std::pair<std::string, std::string> splitName(const std::string & name, bool reverse)
{
auto idx = (reverse ? name.find_last_of('.') : name.find_first_of('.'));
if (idx == std::string::npos || idx == 0 || idx + 1 == name.size())
return {name, {}};

return {name.substr(0, idx), name.substr(idx + 1)};
auto res = splitName(std::string_view(name), reverse);
return {std::string(res.first), std::string(res.second)};
}

std::pair<std::string_view, std::string_view> splitName(std::string_view name, bool reverse)
Expand Down
2 changes: 2 additions & 0 deletions src/DataTypes/NestedUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ namespace Nested
std::string concatenateName(const std::string & nested_table_name, const std::string & nested_field_name);

/// Splits name of compound identifier by first/last dot (depending on 'reverse' parameter).
/// If the name is not nested (no dot or dot at start/end),
/// returns {name, ""}.
std::pair<std::string, std::string> splitName(const std::string & name, bool reverse = false);
std::pair<std::string_view, std::string_view> splitName(std::string_view name, bool reverse = false);

Expand Down
2 changes: 1 addition & 1 deletion src/Formats/FormatFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class FormatFactory final : private boost::noncopyable
UInt64 max_block_size,
const std::optional<FormatSettings> & format_settings = std::nullopt,
FormatParserSharedResourcesPtr parser_shared_resources = nullptr,
FormatFilterInfoPtr format_filter_info = std::make_shared<FormatFilterInfo>(),
FormatFilterInfoPtr format_filter_info = nullptr,
// affects things like buffer sizes and parallel reading
bool is_remote_fs = false,
// allows to do: buf -> parallel read -> decompression,
Expand Down
5 changes: 5 additions & 0 deletions src/Formats/FormatFilterInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ICEBERG_SPECIFICATION_VIOLATION;
}

void ColumnMapper::setStorageColumnEncoding(std::unordered_map<String, Int64> && storage_encoding_)
{
chassert(storage_encoding.empty());
storage_encoding = std::move(storage_encoding_);
for (const auto & [column_name, field_id] : storage_encoding)
if (!field_id_to_clickhouse_name.emplace(field_id, column_name).second)
throw Exception(ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, "Duplicate field id {}", field_id);
}

std::pair<std::unordered_map<String, String>, std::unordered_map<String, String>> ColumnMapper::makeMapping(
Expand Down
6 changes: 6 additions & 0 deletions src/Formats/FormatFilterInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ class ColumnMapper
{
public:
/// clickhouse_column_name -> field_id
/// For tuples, the map contains both the tuple itself and all its elements, e.g. {t, t.x, t.y}.
/// Note that parquet schema reader has to apply the mapping to all tuple fields recursively
/// even if the whole tuple was requested, because the names of the fields may be different.
void setStorageColumnEncoding(std::unordered_map<String, Int64> && storage_encoding_);

const std::unordered_map<String, Int64> & getStorageColumnEncoding() const { return storage_encoding; }
const std::unordered_map<Int64, String> & getFieldIdToClickHouseName() const { return field_id_to_clickhouse_name; }

/// clickhouse_column_name -> format_column_name (just join the maps above by field_id).
std::pair<std::unordered_map<String, String>, std::unordered_map<String, String>> makeMapping(const std::unordered_map<Int64, String> & format_encoding);

private:
std::unordered_map<String, Int64> storage_encoding;
std::unordered_map<Int64, String> field_id_to_clickhouse_name;
};

using ColumnMapperPtr = std::shared_ptr<ColumnMapper>;
Expand Down
13 changes: 12 additions & 1 deletion src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1556,7 +1556,18 @@ Chunk ArrowColumnToCHColumn::arrowTableToCHChunk(
auto arrow_field = table->schema()->GetFieldByName(column_name);

if (parquet_columns_to_clickhouse)
column_name = parquet_columns_to_clickhouse->at(column_name);
{
auto column_name_it = parquet_columns_to_clickhouse->find(column_name);
if (column_name_it == parquet_columns_to_clickhouse->end())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Column '{}' is not present in input data. Column name mapping has {} columns",
column_name,
parquet_columns_to_clickhouse->size());
}
column_name = column_name_it->second;
}

if (case_insensitive_matching)
boost::to_lower(column_name);
Expand Down
4 changes: 2 additions & 2 deletions src/Processors/Formats/Impl/Parquet/Decoding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ struct ByteStreamSplitDecoder : public PageDecoder

bool PageDecoderInfo::canReadDirectlyIntoColumn(parq::Encoding::type encoding, size_t num_values, IColumn & col, std::span<char> & out) const
{
if (encoding == parq::Encoding::PLAIN && fixed_size_converter && fixed_size_converter->isTrivial())
if (encoding == parq::Encoding::PLAIN && fixed_size_converter && physical_type != parq::Type::BOOLEAN && fixed_size_converter->isTrivial())
{
chassert(col.sizeOfValueIfFixed() == fixed_size_converter->input_size);
out = col.insertRawUninitialized(num_values);
Expand Down Expand Up @@ -1417,7 +1417,7 @@ void GeoConverter::convertColumn(std::span<const char> chars, const UInt64 * off
{
col.reserve(col.size() + num_values);
chassert(chars.size() >= offsets[num_values - 1]);
for (size_t i = 0; i < num_values; ++i)
for (ssize_t i = 0; i < ssize_t(num_values); ++i)
{
char * ptr = const_cast<char*>(chars.data() + offsets[i - 1]);
size_t length = offsets[i] - offsets[i - 1] - separator_bytes;
Expand Down
3 changes: 2 additions & 1 deletion src/Processors/Formats/Impl/Parquet/ReadManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ void ReadManager::finishRowSubgroupStage(size_t row_group_idx, size_t row_subgro
}
case ReadStage::MainData:
{
row_subgroup.stage.store(ReadStage::Deliver, std::memory_order::relaxed);

/// Must add to delivery_queue before advancing read_ptr to deliver subgroups in order.
/// (If we advanced read_ptr first, another thread could start and finish reading the
/// next subgroup before we add this one to delivery_queue, and ReadManager::read could
Expand All @@ -368,7 +370,6 @@ void ReadManager::finishRowSubgroupStage(size_t row_group_idx, size_t row_subgro
size_t prev = row_group.read_ptr.exchange(row_subgroup_idx + 1);
chassert(prev == row_subgroup_idx);
advanced_read_ptr = prev + 1;
row_subgroup.stage.store(ReadStage::Deliver, std::memory_order::relaxed);
delivery_cv.notify_one();
break; // proceed to advancing read_ptr
}
Expand Down
91 changes: 58 additions & 33 deletions src/Processors/Formats/Impl/Parquet/Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,31 +274,34 @@ void Reader::prefilterAndInitRowGroups()
SchemaConverter schemer(file_metadata, options, &extended_sample_block);
if (prewhere_info && !prewhere_info->remove_prewhere_column)
schemer.external_columns.push_back(prewhere_info->prewhere_column_name);
schemer.column_mapper = format_filter_info->column_mapper.get();
schemer.prepareForReading();
primitive_columns = std::move(schemer.primitive_columns);
total_primitive_columns_in_file = schemer.primitive_column_idx;
output_columns = std::move(schemer.output_columns);

/// Precalculate some column index mappings.

sample_block_to_output_columns_idx.resize(extended_sample_block.columns(), UINT64_MAX);
sample_block_to_output_columns_idx.resize(extended_sample_block.columns());
for (size_t i = 0; i < output_columns.size(); ++i)
{
const auto & idx = output_columns[i].idx_in_output_block;
if (idx.has_value())
{
chassert(sample_block_to_output_columns_idx.at(*idx) == UINT64_MAX);
chassert(!sample_block_to_output_columns_idx.at(*idx).has_value());
sample_block_to_output_columns_idx.at(*idx) = i;
}
}
chassert(std::all_of(sample_block_to_output_columns_idx.begin(), sample_block_to_output_columns_idx.end(), [](size_t x) { return x != UINT64_MAX; }));

if (format_filter_info->key_condition)
{
for (size_t idx_in_output_block : format_filter_info->key_condition->getUsedColumns())
{
size_t output_idx = sample_block_to_output_columns_idx.at(idx_in_output_block);
const OutputColumnInfo & output_info = output_columns[output_idx];
const auto & output_idx = sample_block_to_output_columns_idx.at(idx_in_output_block);
if (!output_idx.has_value())
throw Exception(ErrorCodes::LOGICAL_ERROR, "KeyCondition uses PREWHERE output");
const OutputColumnInfo & output_info = output_columns[output_idx.value()];

if (output_info.is_primitive)
primitive_columns[output_info.primitive_start].used_by_key_condition = idx_in_output_block;
}
Expand Down Expand Up @@ -363,7 +366,11 @@ void Reader::prefilterAndInitRowGroups()
const auto & column_conditions = static_cast<FilterInfoExt *>(format_filter_info->opaque.get())->column_conditions;
for (const auto & [idx_in_output_block, key_condition] : column_conditions)
{
const OutputColumnInfo & output_info = output_columns[sample_block_to_output_columns_idx.at(idx_in_output_block)];
const auto & output_idx = sample_block_to_output_columns_idx.at(idx_in_output_block);
if (!output_idx.has_value())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column condition uses PREWHERE output");
const OutputColumnInfo & output_info = output_columns[output_idx.value()];

if (!output_info.is_primitive)
continue;
primitive_columns[output_info.primitive_start].column_index_condition = key_condition.get();
Expand Down Expand Up @@ -602,44 +609,47 @@ void Reader::initializePrefetches()
void Reader::preparePrewhere()
{
PrewhereInfoPtr prewhere_info = format_filter_info->prewhere_info;
if (!prewhere_info)
return;
if (prewhere_info)
{
/// TODO [parquet]: We currently run prewhere after reading all prewhere columns of the row
/// subgroup, in one thread per row group. Instead, we could extract single-column conditions
/// and run them after decoding the corresponding columns, in parallel.
/// (Still run multi-column conditions, like `col1 = 42 or col2 = 'yes'`, after reading all columns.)
/// Probably reuse tryBuildPrewhereSteps from MergeTree for splitting the expression.

/// TODO [parquet]: We currently run prewhere after reading all prewhere columns of the row
/// subgroup, in one thread per row group. Instead, we could extract single-column conditions
/// and run them after decoding the corresponding columns, in parallel.
/// (Still run multi-column conditions, like `col1 = 42 or col2 = 'yes'`, after reading all columns.)
/// Probably reuse tryBuildPrewhereSteps from MergeTree for splitting the expression.

/// Convert ActionsDAG to ExpressionActions.
ExpressionActionsSettings actions_settings;
if (prewhere_info->row_level_filter.has_value())
{
ExpressionActions actions(prewhere_info->row_level_filter->clone(), actions_settings);
/// Convert ActionsDAG to ExpressionActions.
ExpressionActionsSettings actions_settings;
if (prewhere_info->row_level_filter.has_value())
{
ExpressionActions actions(prewhere_info->row_level_filter->clone(), actions_settings);
prewhere_steps.push_back(PrewhereStep
{
.actions = std::move(actions),
.result_column_name = prewhere_info->row_level_column_name,
});
}
ExpressionActions actions(prewhere_info->prewhere_actions.clone(), actions_settings);
prewhere_steps.push_back(PrewhereStep
{
.actions = std::move(actions),
.result_column_name = prewhere_info->row_level_column_name
.result_column_name = prewhere_info->prewhere_column_name,
.need_filter = prewhere_info->need_filter,
});
if (!prewhere_info->remove_prewhere_column)
prewhere_steps.back().idx_in_output_block = sample_block->getPositionByName(prewhere_info->prewhere_column_name);
}
ExpressionActions actions(prewhere_info->prewhere_actions.clone(), actions_settings);
prewhere_steps.push_back(PrewhereStep
{
.actions = std::move(actions),
.result_column_name = prewhere_info->prewhere_column_name,
.need_filter = prewhere_info->need_filter,
});
if (!prewhere_info->remove_prewhere_column)
prewhere_steps.back().idx_in_output_block = sample_block->getPositionByName(prewhere_info->prewhere_column_name);

/// Look up expression inputs in extended_sample_block.
for (PrewhereStep & step : prewhere_steps)
{
for (const auto & col : step.actions.getRequiredColumnsWithTypes())
{
size_t idx_in_output_block = extended_sample_block.getPositionByName(col.name, /* case_insensitive= */ false);
size_t output_idx = sample_block_to_output_columns_idx.at(idx_in_output_block);
OutputColumnInfo & output_info = output_columns[output_idx];
const auto & output_idx = sample_block_to_output_columns_idx.at(idx_in_output_block);
if (!output_idx.has_value())
throw Exception(ErrorCodes::LOGICAL_ERROR, "PREWHERE appears to use its own output as input");
OutputColumnInfo & output_info = output_columns[output_idx.value()];

output_info.use_prewhere = true;
bool only_for_prewhere = idx_in_output_block >= sample_block->columns();

Expand All @@ -649,7 +659,21 @@ void Reader::preparePrewhere()
primitive_columns[primitive_idx].only_for_prewhere = only_for_prewhere;
}

step.input_column_idxs.push_back(output_idx);
step.input_column_idxs.push_back(output_idx.value());
}
}

/// Assert that sample_block_to_output_columns_idx is valid.
for (size_t i = 0; i < sample_block_to_output_columns_idx.size(); ++i)
{
/// (`prewhere_steps` has at most two elements)
size_t is_prewhere_output = std::count_if(prewhere_steps.begin(), prewhere_steps.end(),
[&](const PrewhereStep & step) { return step.idx_in_output_block == i; });
if (is_prewhere_output > 1 ||
/// Column must appear in exactly one of {output_columns, prewhere output}.
sample_block_to_output_columns_idx[i].has_value() != !is_prewhere_output)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected column in sample block: {}", extended_sample_block.getByPosition(i).name);
}
}
}
Expand Down Expand Up @@ -974,7 +998,8 @@ void Reader::intersectColumnIndexResultsAndInitSubgroups(RowGroup & row_group)
bytes_per_row += estimateColumnMemoryBytesPerRow(row_group.columns.at(i), row_group, primitive_columns.at(i));

size_t n = size_t(options.format.parquet.prefer_block_bytes / std::max(bytes_per_row, 1.));
rows_per_subgroup = std::min(rows_per_subgroup, std::max(n, 1ul));
n = std::max(n, size_t(128)); // avoid super tiny blocks if something is wrong with stats
rows_per_subgroup = std::min(rows_per_subgroup, n);
}
chassert(rows_per_subgroup > 0);

Expand Down
16 changes: 7 additions & 9 deletions src/Processors/Formats/Impl/Parquet/Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,17 @@ namespace DB::Parquet
{

// TODO [parquet]:
// * column_mapper
// * allow_geoparquet_parser
// * either multistage PREWHERE or make query optimizer selectively move parts of the condition to prewhere instead of the whole condition
// * test on files from https://github.com/apache/parquet-testing
// * check fields for false sharing, add cacheline padding as needed
// * make sure userspace page cache read buffer supports readBigAt
// * assert that memory usage is zero at the end, the reset()s are easy to miss
// * support newer parquet versions: https://github.com/apache/parquet-format/blob/master/CHANGES.md
// * make writer write DataPageV2
// * make writer write PageEncodingStats
// * make writer write DELTA_LENGTH_BYTE_ARRAY
// * try adding [[unlikely]] to all ifs
// * try adding __restrict to pointers on hot paths
// * support or deprecate the preserve-order setting
// * stats (reuse the ones from the other PR?)
// - number of row groups that were split into chunks
// * add comments everywhere
// * progress indication and estimating bytes to read; allow negative total_bytes_to_read?
// * cache FileMetaData in something like SchemaCache
Expand Down Expand Up @@ -156,7 +152,7 @@ struct Reader
size_t column_idx;
/// Index in parquet `schema` (in FileMetaData).
size_t schema_idx;
String name;
String name; // possibly mapped by ColumnMapper (e.g. using iceberg metadata)
PageDecoderInfo decoder;

DataTypePtr raw_decoded_type; // not Nullable
Expand Down Expand Up @@ -192,7 +188,7 @@ struct Reader

struct OutputColumnInfo
{
String name;
String name; // possibly mapped by ColumnMapper
/// Range in primitive_columns.
size_t primitive_start = 0;
size_t primitive_end = 0;
Expand Down Expand Up @@ -455,8 +451,10 @@ struct Reader
size_t total_primitive_columns_in_file = 0;
std::vector<OutputColumnInfo> output_columns;
/// Maps idx_in_output_block to index in output_columns. I.e.:
/// sample_block_to_output_columns_idx[output_columns[i].idx_in_output_block] = i
std::vector<size_t> sample_block_to_output_columns_idx;
/// sample_block_to_output_columns_idx[output_columns[i].idx_in_output_block] = i
/// nullopt if the column is produced by PREWHERE expression:
/// prewhere_steps[?].idx_in_output_block == i
std::vector<std::optional<size_t>> sample_block_to_output_columns_idx;

/// sample_block with maybe some columns added at the end.
/// The added columns are used as inputs to prewhere expression, then discarded.
Expand Down
Loading
Loading