Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ByConity Projection #347

Merged
merged 5 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@
M(3004, ILLEAGL_ENFORCE_SINGLE_ROW) \
M(3005, OPTIMIZER_TIMEOUT) \
M(3006, REMOVE_SUBQUERY_ERROR) \
M(3007, PROJECTION_SELECTION_ERROR) \
\
M(4001, RESOURCE_GROUP_ILLEGAL_CONFIG) \
M(4002, RESOURCE_NOT_ENOUGH) \
Expand Down
16 changes: 16 additions & 0 deletions src/Common/LinkedHashMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>
#include <string_view>
#include <Common/ErrorCodes.h>
#include <Parsers/formatAST.h>
namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -135,6 +136,21 @@ class LinkedHashMap {
return ordered_storage.back();
}

String dump() const
{
if constexpr (std::is_same_v<Value, ConstASTPtr>)
{
std::stringstream os;
for (const auto & item: ordered_storage)
os << item.first << " := " << serializeAST(*item.second, true) << std::endl;
return os.str();
}
else
{
return "";
}
}

LinkedHashMap(const LinkedHashMap &) = default;
LinkedHashMap(LinkedHashMap &&) = default;

Expand Down
4 changes: 4 additions & 0 deletions src/Common/Stopwatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ class Stopwatch
UInt64 elapsedNanoseconds() const { return is_running ? nanoseconds() - start_ns : stop_ns - start_ns; }
UInt64 elapsedMicroseconds() const { return elapsedNanoseconds() / 1000U; }
UInt64 elapsedMilliseconds() const { return elapsedNanoseconds() / 1000000UL; }
double elapsedMillisecondsAsDouble() const
{
return static_cast<double>(elapsedNanoseconds()) / 1000000UL;
}
double elapsedSeconds() const { return static_cast<double>(elapsedNanoseconds()) / 1000000000ULL; }

private:
Expand Down
10 changes: 10 additions & 0 deletions src/Common/checkStackSize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <common/scope_guard.h>
#include <pthread.h>
#include <cstdint>
#include <bthread/task_group.h>

#if defined(__FreeBSD__)
# include <pthread_np.h>
Expand All @@ -19,6 +20,10 @@ namespace DB
}
}

namespace bthread {
extern __thread bthread::TaskGroup* tls_task_group;
}

static thread_local void * stack_address = nullptr;
static thread_local size_t max_stack_size = 0;

Expand Down Expand Up @@ -79,6 +84,11 @@ __attribute__((__weak__)) void checkStackSize()
{
using namespace DB;

// do not check the stack size in bthread mode
bthread::TaskGroup* g = bthread::tls_task_group;
if (g && !g->is_current_pthread_task())
return;

if (!stack_address)
max_stack_size = getStackSize(&stack_address);

Expand Down
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,7 @@ class IColumn;
M(Bool, enable_materialized_view_join_rewriting, false, "Whether enable materialized view based rewriter for query using join materialized views", 0) \
M(Bool, enable_sharding_optimize, false, "Whether enable sharding optimization, eg. local join", 0) \
M(Bool, enable_filter_window_to_partition_topn, true, "Filter window to partition topn", 0) \
M(Bool, optimizer_projection_support, false, "Use projection in optimizer mode", 0) \
/** Exchange setttings */ \
M(UInt64, exchange_parallel_size, 1, "Exchange parallel size", 0) \
M(UInt64, exchange_source_pipeline_threads, 16, "Recommend number of threads for pipeline which reading data from exchange, ingoned if exchange need keep data order", 0) \
Expand Down
18 changes: 9 additions & 9 deletions src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1231,7 +1231,7 @@ void NO_INLINE Aggregator::executeOnIntervalWithoutKeyImpl(


void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns & aggregate_columns, Columns & materialized_columns,
AggregateFunctionInstructions & aggregate_functions_instructions, NestedColumnsHolder & nested_columns_holder)
AggregateFunctionInstructions & aggregate_functions_instructions, NestedColumnsHolder & nested_columns_holder) const
{
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i].resize(params.aggregates[i].arguments.size());
Expand Down Expand Up @@ -1283,15 +1283,15 @@ void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns


bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys) const
{
UInt64 num_rows = block.rows();
return executeOnBlock(block.getColumns(), num_rows, result, key_columns, aggregate_columns, no_more_keys);
}


bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys) const
{
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
Expand Down Expand Up @@ -1413,7 +1413,7 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
}


void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path)
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path) const
{
Stopwatch watch;
size_t rows = data_variants.size();
Expand Down Expand Up @@ -1485,7 +1485,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
}


void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants) const
{
String tmp_path = params.tmp_volume->getDisk()->getPath();
return writeToTemporaryFile(data_variants, tmp_path);
Expand Down Expand Up @@ -1547,7 +1547,7 @@ template <typename Method>
void Aggregator::writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
Method & method,
IBlockOutputStream & out)
IBlockOutputStream & out) const
{
size_t max_temporary_block_size_rows = 0;
size_t max_temporary_block_size_bytes = 0;
Expand Down Expand Up @@ -2673,7 +2673,7 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
block.clear();
}

bool Aggregator::mergeBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys)
bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys) const
{
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
Expand Down Expand Up @@ -3022,7 +3022,7 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
}


std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block) const
{
if (!block)
return {};
Expand Down Expand Up @@ -3114,7 +3114,7 @@ void Aggregator::destroyWithoutKey(AggregatedDataVariants & result) const
}


void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result) const
{
if (result.empty())
return;
Expand Down
25 changes: 13 additions & 12 deletions src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ struct AggregatedDataVariants : private boost::noncopyable
* But this can hardly be done simply because it is planned to put variable-length strings into the same pool.
* In this case, the pool will not be able to know with what offsets objects are stored.
*/
Aggregator * aggregator = nullptr;
const Aggregator * aggregator = nullptr;

size_t keys_size{}; /// Number of keys. NOTE do we need this field?
Sizes key_sizes; /// Dimensions of keys, if keys of fixed length
Expand Down Expand Up @@ -1047,12 +1047,15 @@ class Aggregator final
/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
bool executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys);
bool & no_more_keys) const;

bool executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys);
bool & no_more_keys) const;

/// Used for aggregate projection.
bool mergeOnBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys) const;

/** Convert the aggregation data structure into a block.
* If overflow_row = true, then aggregates for rows that are not included in max_rows_to_group_by are put in the first block.
*
Expand All @@ -1068,8 +1071,6 @@ class Aggregator final
/// Merge partially aggregated blocks separated to buckets into one data structure.
void mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads);

bool mergeBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys);

/// Merge several partially aggregated blocks into one.
/// Precondition: for all blocks block.info.is_overflows flag must be the same.
/// (either all blocks are from overflow data or none blocks are).
Expand All @@ -1079,11 +1080,11 @@ class Aggregator final
/** Split block with partially-aggregated data to many blocks, as if two-level method of aggregation was used.
* This is needed to simplify merging of that data with other results, that are already two-level.
*/
std::vector<Block> convertBlockToTwoLevel(const Block & block);
std::vector<Block> convertBlockToTwoLevel(const Block & block) const;

/// For external aggregation.
void writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path);
void writeToTemporaryFile(AggregatedDataVariants & data_variants);
void writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path) const;
void writeToTemporaryFile(AggregatedDataVariants & data_variants) const;

bool hasTemporaryFiles() const { return !temporary_files.empty(); }

Expand Down Expand Up @@ -1155,7 +1156,7 @@ class Aggregator final
Poco::Logger * log = &Poco::Logger::get("Aggregator");

/// For external aggregation.
TemporaryFiles temporary_files;
mutable TemporaryFiles temporary_files;

#if USE_EMBEDDED_COMPILER
std::shared_ptr<CompiledAggregateFunctionsHolder> compiled_aggregate_functions_holder;
Expand All @@ -1178,7 +1179,7 @@ class Aggregator final
/** Call `destroy` methods for states of aggregate functions.
* Used in the exception handler for aggregation, since RAII in this case is not applicable.
*/
void destroyAllAggregateStates(AggregatedDataVariants & result);
void destroyAllAggregateStates(AggregatedDataVariants & result) const;


/// Process one data block, aggregate the data into a hash table.
Expand Down Expand Up @@ -1220,7 +1221,7 @@ class Aggregator final
void writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
Method & method,
IBlockOutputStream & out);
IBlockOutputStream & out) const;

/// Merge NULL key data from hash table `src` into `dst`.
template <typename Method, typename Table>
Expand Down Expand Up @@ -1375,7 +1376,7 @@ class Aggregator final
AggregateColumns & aggregate_columns,
Columns & materialized_columns,
AggregateFunctionInstructions & instructions,
NestedColumnsHolder & nested_columns_holder);
NestedColumnsHolder & nested_columns_holder) const;

void addSingleKeyToAggregateColumns(
const AggregatedDataVariants & data_variants,
Expand Down
10 changes: 8 additions & 2 deletions src/Interpreters/DatabaseCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,11 @@ namespace ErrorCodes

TemporaryTableHolder::TemporaryTableHolder(ContextPtr context_, const TemporaryTableHolder::Creator & creator, const ASTPtr & query)
: WithContext(context_->getGlobalContext())
, temporary_tables(DatabaseCatalog::instance().getDatabaseForTemporaryTables().get())
{
auto & database_catalog = DatabaseCatalog::instance();
database_catalog.initializeAndLoadTemporaryDatabase();
temporary_tables = database_catalog.getDatabaseForTemporaryTables().get();

ASTPtr original_create;
ASTCreateQuery * create = dynamic_cast<ASTCreateQuery *>(query.get());
String global_name;
Expand Down Expand Up @@ -167,6 +170,8 @@ StoragePtr TemporaryTableHolder::getTable() const

void DatabaseCatalog::initializeAndLoadTemporaryDatabase()
{
if (isDatabaseExist(TEMPORARY_DATABASE))
return;
// coverity[store_truncates_time_t]
drop_delay_sec = getContext()->getConfigRef().getInt("database_atomic_delay_before_drop_table_sec", default_drop_delay_sec);

Expand Down Expand Up @@ -399,7 +404,8 @@ void DatabaseCatalog::assertDatabaseDoesntExistUnlocked(const String & database_

void DatabaseCatalog::attachDatabase(const String & database_name, const DatabasePtr & database)
{
if (database->getEngineName() == "Cnch")
// TEMPORARY_DATABASE is stored in DatabaseCatalog becase it is tiny
if (database->getEngineName() == "Cnch" && database_name != TEMPORARY_DATABASE)
return;
std::lock_guard lock{databases_mutex};
assertDatabaseDoesntExistUnlocked(database_name);
Expand Down
7 changes: 5 additions & 2 deletions src/Interpreters/ExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ void ExpressionAnalyzer::analyzeAggregation()

ssize_t group_size = group_elements_ast.size();
const auto & column_name = group_elements_ast[j]->getColumnName();
const auto & column_ast = group_elements_ast[j];
const auto * node = temp_actions->tryFindInIndex(column_name);
if (!node)
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
Expand Down Expand Up @@ -347,6 +348,7 @@ void ExpressionAnalyzer::analyzeAggregation()
unique_keys[key.name] = aggregation_keys.size();
grouping_set_indexes_list.push_back(aggregation_keys.size());
aggregation_keys.push_back(key);
aggregation_key_asts.push_back(column_ast);

/// Key is no longer needed, therefore we can save a little by moving it.
aggregated_columns.push_back(std::move(key));
Expand All @@ -365,6 +367,7 @@ void ExpressionAnalyzer::analyzeAggregation()
getRootActionsNoMakeSet(group_asts[i], true, temp_actions, false);

const auto & column_name = group_asts[i]->getColumnName();
const auto & column_ast = group_asts[i];
const auto * node = temp_actions->tryFindInIndex(column_name);
if (!node)
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
Expand Down Expand Up @@ -396,7 +399,7 @@ void ExpressionAnalyzer::analyzeAggregation()
{
unique_keys[key.name] = aggregation_keys.size();
aggregation_keys.push_back(key);

aggregation_key_asts.push_back(column_ast);
/// Key is no longer needed, therefore we can save a little by moving it.
aggregated_columns.push_back(std::move(key));
}
Expand Down Expand Up @@ -1166,7 +1169,7 @@ JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(
* - this function shows the expression JOIN _data1.
*/
auto interpreter = interpretSubquery(
join_element.table_expression, getContext(), original_right_columns, query_options.copy().setWithAllColumns());
join_element.table_expression, getContext(), original_right_columns, query_options.copy().setWithAllColumns().ignoreProjections(false).ignoreAlias(false));
{
joined_plan = std::make_unique<QueryPlan>();
interpreter->buildQueryPlan(*joined_plan);
Expand Down
4 changes: 4 additions & 0 deletions src/Interpreters/ExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ struct ExpressionAnalyzerData

bool has_aggregation = false;
NamesAndTypesList aggregation_keys;
ASTs aggregation_key_asts;
NamesAndTypesLists aggregation_keys_list;
ColumnNumbersList aggregation_keys_indexes_list;
bool has_const_aggregation_keys = false;
Expand Down Expand Up @@ -358,10 +359,13 @@ class SelectQueryExpressionAnalyzer : public ExpressionAnalyzer
bool useGroupingSetKey() const { return aggregation_keys_list.size() > 1; }

const NamesAndTypesList & aggregationKeys() const { return aggregation_keys; }
const ASTs & aggregationKeyAsts() const { return aggregation_key_asts; }
bool hasConstAggregationKeys() const { return has_const_aggregation_keys; }
const NamesAndTypesLists & aggregationKeysList() const { return aggregation_keys_list; }
const AggregateDescriptions & aggregates() const { return aggregate_descriptions; }

const std::vector<const ASTFunction *> & aggregateAsts() const { return ExpressionAnalyzer::aggregates(); }

const PreparedSets & getPreparedSets() const { return prepared_sets; }
std::unique_ptr<QueryPlan> getJoinedPlan();

Expand Down
6 changes: 6 additions & 0 deletions src/Interpreters/InterpreterCreateQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,10 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
if (create.storage && endsWith(create.storage->engine->name, "MergeTree"))
properties.indices = as_storage_metadata->getSecondaryIndices();

/// Create table as should set projections
if (as_storage_metadata->hasProjections())
properties.projections = as_storage_metadata->getProjections().clone();

properties.constraints = as_storage_metadata->getConstraints();
}
else if (create.select)
Expand Down Expand Up @@ -951,6 +955,8 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)

if (create.storage && create.storage->unique_key && create.storage->cluster_by)
throw Exception("`CLUSTER BY` cannot be used together with `UNIQUE KEY`", ErrorCodes::BAD_ARGUMENTS);
if (create.storage && create.storage->unique_key && create.columns_list->projections)
throw Exception("`Projection` cannot be used together with `UNIQUE KEY`", ErrorCodes::BAD_ARGUMENTS);

String current_database = getContext()->getCurrentDatabase();
auto database_name = create.database.empty() ? current_database : create.database;
Expand Down