Skip to content

Commit

Permalink
Merge pull request #347 from gaoyuanning/ByConity_github
Browse files Browse the repository at this point in the history
feat: ByConity Projection
  • Loading branch information
hustnn committed Jun 15, 2023
2 parents 4c04c1d + 014ecc4 commit 6aceb7b
Show file tree
Hide file tree
Showing 90 changed files with 5,466 additions and 1,024 deletions.
1 change: 1 addition & 0 deletions src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@
M(3004, ILLEAGL_ENFORCE_SINGLE_ROW) \
M(3005, OPTIMIZER_TIMEOUT) \
M(3006, REMOVE_SUBQUERY_ERROR) \
M(3007, PROJECTION_SELECTION_ERROR) \
\
M(4001, RESOURCE_GROUP_ILLEGAL_CONFIG) \
M(4002, RESOURCE_NOT_ENOUGH) \
Expand Down
16 changes: 16 additions & 0 deletions src/Common/LinkedHashMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>
#include <string_view>
#include <Common/ErrorCodes.h>
#include <Parsers/formatAST.h>
namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -135,6 +136,21 @@ class LinkedHashMap {
return ordered_storage.back();
}

String dump() const
{
if constexpr (std::is_same_v<Value, ConstASTPtr>)
{
std::stringstream os;
for (const auto & item: ordered_storage)
os << item.first << " := " << serializeAST(*item.second, true) << std::endl;
return os.str();
}
else
{
return "";
}
}

LinkedHashMap(const LinkedHashMap &) = default;
LinkedHashMap(LinkedHashMap &&) = default;

Expand Down
4 changes: 4 additions & 0 deletions src/Common/Stopwatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ class Stopwatch
UInt64 elapsedNanoseconds() const { return is_running ? nanoseconds() - start_ns : stop_ns - start_ns; }
UInt64 elapsedMicroseconds() const { return elapsedNanoseconds() / 1000U; }
UInt64 elapsedMilliseconds() const { return elapsedNanoseconds() / 1000000UL; }
double elapsedMillisecondsAsDouble() const
{
return static_cast<double>(elapsedNanoseconds()) / 1000000UL;
}
double elapsedSeconds() const { return static_cast<double>(elapsedNanoseconds()) / 1000000000ULL; }

private:
Expand Down
10 changes: 10 additions & 0 deletions src/Common/checkStackSize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <common/scope_guard.h>
#include <pthread.h>
#include <cstdint>
#include <bthread/task_group.h>

#if defined(__FreeBSD__)
# include <pthread_np.h>
Expand All @@ -19,6 +20,10 @@ namespace DB
}
}

namespace bthread {
extern __thread bthread::TaskGroup* tls_task_group;
}

static thread_local void * stack_address = nullptr;
static thread_local size_t max_stack_size = 0;

Expand Down Expand Up @@ -79,6 +84,11 @@ __attribute__((__weak__)) void checkStackSize()
{
using namespace DB;

// do not check the stack size in bthread mode
bthread::TaskGroup* g = bthread::tls_task_group;
if (g && !g->is_current_pthread_task())
return;

if (!stack_address)
max_stack_size = getStackSize(&stack_address);

Expand Down
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,7 @@ class IColumn;
M(Bool, enable_materialized_view_join_rewriting, false, "Whether enable materialized view based rewriter for query using join materialized views", 0) \
M(Bool, enable_sharding_optimize, false, "Whether enable sharding optimization, eg. local join", 0) \
M(Bool, enable_filter_window_to_partition_topn, true, "Filter window to partition topn", 0) \
M(Bool, optimizer_projection_support, false, "Use projection in optimizer mode", 0) \
/** Exchange setttings */ \
M(UInt64, exchange_parallel_size, 1, "Exchange parallel size", 0) \
M(UInt64, exchange_source_pipeline_threads, 16, "Recommend number of threads for pipeline which reading data from exchange, ingoned if exchange need keep data order", 0) \
Expand Down
18 changes: 9 additions & 9 deletions src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1231,7 +1231,7 @@ void NO_INLINE Aggregator::executeOnIntervalWithoutKeyImpl(


void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns & aggregate_columns, Columns & materialized_columns,
AggregateFunctionInstructions & aggregate_functions_instructions, NestedColumnsHolder & nested_columns_holder)
AggregateFunctionInstructions & aggregate_functions_instructions, NestedColumnsHolder & nested_columns_holder) const
{
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i].resize(params.aggregates[i].arguments.size());
Expand Down Expand Up @@ -1283,15 +1283,15 @@ void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns


bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys) const
{
UInt64 num_rows = block.rows();
return executeOnBlock(block.getColumns(), num_rows, result, key_columns, aggregate_columns, no_more_keys);
}


bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys) const
{
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
Expand Down Expand Up @@ -1413,7 +1413,7 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
}


void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path)
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path) const
{
Stopwatch watch;
size_t rows = data_variants.size();
Expand Down Expand Up @@ -1485,7 +1485,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
}


void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants) const
{
String tmp_path = params.tmp_volume->getDisk()->getPath();
return writeToTemporaryFile(data_variants, tmp_path);
Expand Down Expand Up @@ -1547,7 +1547,7 @@ template <typename Method>
void Aggregator::writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
Method & method,
IBlockOutputStream & out)
IBlockOutputStream & out) const
{
size_t max_temporary_block_size_rows = 0;
size_t max_temporary_block_size_bytes = 0;
Expand Down Expand Up @@ -2673,7 +2673,7 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
block.clear();
}

bool Aggregator::mergeBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys)
bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys) const
{
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
Expand Down Expand Up @@ -3022,7 +3022,7 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
}


std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block) const
{
if (!block)
return {};
Expand Down Expand Up @@ -3114,7 +3114,7 @@ void Aggregator::destroyWithoutKey(AggregatedDataVariants & result) const
}


void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result) const
{
if (result.empty())
return;
Expand Down
25 changes: 13 additions & 12 deletions src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ struct AggregatedDataVariants : private boost::noncopyable
* But this can hardly be done simply because it is planned to put variable-length strings into the same pool.
* In this case, the pool will not be able to know with what offsets objects are stored.
*/
Aggregator * aggregator = nullptr;
const Aggregator * aggregator = nullptr;

size_t keys_size{}; /// Number of keys. NOTE do we need this field?
Sizes key_sizes; /// Dimensions of keys, if keys of fixed length
Expand Down Expand Up @@ -1047,12 +1047,15 @@ class Aggregator final
/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
bool executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys);
bool & no_more_keys) const;

bool executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys);
bool & no_more_keys) const;

/// Used for aggregate projection.
bool mergeOnBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys) const;

/** Convert the aggregation data structure into a block.
* If overflow_row = true, then aggregates for rows that are not included in max_rows_to_group_by are put in the first block.
*
Expand All @@ -1068,8 +1071,6 @@ class Aggregator final
/// Merge partially aggregated blocks separated to buckets into one data structure.
void mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads);

bool mergeBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys);

/// Merge several partially aggregated blocks into one.
/// Precondition: for all blocks block.info.is_overflows flag must be the same.
/// (either all blocks are from overflow data or none blocks are).
Expand All @@ -1079,11 +1080,11 @@ class Aggregator final
/** Split block with partially-aggregated data to many blocks, as if two-level method of aggregation was used.
* This is needed to simplify merging of that data with other results, that are already two-level.
*/
std::vector<Block> convertBlockToTwoLevel(const Block & block);
std::vector<Block> convertBlockToTwoLevel(const Block & block) const;

/// For external aggregation.
void writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path);
void writeToTemporaryFile(AggregatedDataVariants & data_variants);
void writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path) const;
void writeToTemporaryFile(AggregatedDataVariants & data_variants) const;

bool hasTemporaryFiles() const { return !temporary_files.empty(); }

Expand Down Expand Up @@ -1155,7 +1156,7 @@ class Aggregator final
Poco::Logger * log = &Poco::Logger::get("Aggregator");

/// For external aggregation.
TemporaryFiles temporary_files;
mutable TemporaryFiles temporary_files;

#if USE_EMBEDDED_COMPILER
std::shared_ptr<CompiledAggregateFunctionsHolder> compiled_aggregate_functions_holder;
Expand All @@ -1178,7 +1179,7 @@ class Aggregator final
/** Call `destroy` methods for states of aggregate functions.
* Used in the exception handler for aggregation, since RAII in this case is not applicable.
*/
void destroyAllAggregateStates(AggregatedDataVariants & result);
void destroyAllAggregateStates(AggregatedDataVariants & result) const;


/// Process one data block, aggregate the data into a hash table.
Expand Down Expand Up @@ -1220,7 +1221,7 @@ class Aggregator final
void writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
Method & method,
IBlockOutputStream & out);
IBlockOutputStream & out) const;

/// Merge NULL key data from hash table `src` into `dst`.
template <typename Method, typename Table>
Expand Down Expand Up @@ -1375,7 +1376,7 @@ class Aggregator final
AggregateColumns & aggregate_columns,
Columns & materialized_columns,
AggregateFunctionInstructions & instructions,
NestedColumnsHolder & nested_columns_holder);
NestedColumnsHolder & nested_columns_holder) const;

void addSingleKeyToAggregateColumns(
const AggregatedDataVariants & data_variants,
Expand Down
10 changes: 8 additions & 2 deletions src/Interpreters/DatabaseCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,11 @@ namespace ErrorCodes

TemporaryTableHolder::TemporaryTableHolder(ContextPtr context_, const TemporaryTableHolder::Creator & creator, const ASTPtr & query)
: WithContext(context_->getGlobalContext())
, temporary_tables(DatabaseCatalog::instance().getDatabaseForTemporaryTables().get())
{
auto & database_catalog = DatabaseCatalog::instance();
database_catalog.initializeAndLoadTemporaryDatabase();
temporary_tables = database_catalog.getDatabaseForTemporaryTables().get();

ASTPtr original_create;
ASTCreateQuery * create = dynamic_cast<ASTCreateQuery *>(query.get());
String global_name;
Expand Down Expand Up @@ -167,6 +170,8 @@ StoragePtr TemporaryTableHolder::getTable() const

void DatabaseCatalog::initializeAndLoadTemporaryDatabase()
{
if (isDatabaseExist(TEMPORARY_DATABASE))
return;
// coverity[store_truncates_time_t]
drop_delay_sec = getContext()->getConfigRef().getInt("database_atomic_delay_before_drop_table_sec", default_drop_delay_sec);

Expand Down Expand Up @@ -399,7 +404,8 @@ void DatabaseCatalog::assertDatabaseDoesntExistUnlocked(const String & database_

void DatabaseCatalog::attachDatabase(const String & database_name, const DatabasePtr & database)
{
if (database->getEngineName() == "Cnch")
// TEMPORARY_DATABASE is stored in DatabaseCatalog becase it is tiny
if (database->getEngineName() == "Cnch" && database_name != TEMPORARY_DATABASE)
return;
std::lock_guard lock{databases_mutex};
assertDatabaseDoesntExistUnlocked(database_name);
Expand Down
7 changes: 5 additions & 2 deletions src/Interpreters/ExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ void ExpressionAnalyzer::analyzeAggregation()

ssize_t group_size = group_elements_ast.size();
const auto & column_name = group_elements_ast[j]->getColumnName();
const auto & column_ast = group_elements_ast[j];
const auto * node = temp_actions->tryFindInIndex(column_name);
if (!node)
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
Expand Down Expand Up @@ -347,6 +348,7 @@ void ExpressionAnalyzer::analyzeAggregation()
unique_keys[key.name] = aggregation_keys.size();
grouping_set_indexes_list.push_back(aggregation_keys.size());
aggregation_keys.push_back(key);
aggregation_key_asts.push_back(column_ast);

/// Key is no longer needed, therefore we can save a little by moving it.
aggregated_columns.push_back(std::move(key));
Expand All @@ -365,6 +367,7 @@ void ExpressionAnalyzer::analyzeAggregation()
getRootActionsNoMakeSet(group_asts[i], true, temp_actions, false);

const auto & column_name = group_asts[i]->getColumnName();
const auto & column_ast = group_asts[i];
const auto * node = temp_actions->tryFindInIndex(column_name);
if (!node)
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
Expand Down Expand Up @@ -396,7 +399,7 @@ void ExpressionAnalyzer::analyzeAggregation()
{
unique_keys[key.name] = aggregation_keys.size();
aggregation_keys.push_back(key);

aggregation_key_asts.push_back(column_ast);
/// Key is no longer needed, therefore we can save a little by moving it.
aggregated_columns.push_back(std::move(key));
}
Expand Down Expand Up @@ -1166,7 +1169,7 @@ JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(
* - this function shows the expression JOIN _data1.
*/
auto interpreter = interpretSubquery(
join_element.table_expression, getContext(), original_right_columns, query_options.copy().setWithAllColumns());
join_element.table_expression, getContext(), original_right_columns, query_options.copy().setWithAllColumns().ignoreProjections(false).ignoreAlias(false));
{
joined_plan = std::make_unique<QueryPlan>();
interpreter->buildQueryPlan(*joined_plan);
Expand Down
4 changes: 4 additions & 0 deletions src/Interpreters/ExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ struct ExpressionAnalyzerData

bool has_aggregation = false;
NamesAndTypesList aggregation_keys;
ASTs aggregation_key_asts;
NamesAndTypesLists aggregation_keys_list;
ColumnNumbersList aggregation_keys_indexes_list;
bool has_const_aggregation_keys = false;
Expand Down Expand Up @@ -358,10 +359,13 @@ class SelectQueryExpressionAnalyzer : public ExpressionAnalyzer
bool useGroupingSetKey() const { return aggregation_keys_list.size() > 1; }

const NamesAndTypesList & aggregationKeys() const { return aggregation_keys; }
const ASTs & aggregationKeyAsts() const { return aggregation_key_asts; }
bool hasConstAggregationKeys() const { return has_const_aggregation_keys; }
const NamesAndTypesLists & aggregationKeysList() const { return aggregation_keys_list; }
const AggregateDescriptions & aggregates() const { return aggregate_descriptions; }

const std::vector<const ASTFunction *> & aggregateAsts() const { return ExpressionAnalyzer::aggregates(); }

const PreparedSets & getPreparedSets() const { return prepared_sets; }
std::unique_ptr<QueryPlan> getJoinedPlan();

Expand Down
6 changes: 6 additions & 0 deletions src/Interpreters/InterpreterCreateQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,10 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
if (create.storage && endsWith(create.storage->engine->name, "MergeTree"))
properties.indices = as_storage_metadata->getSecondaryIndices();

/// Create table as should set projections
if (as_storage_metadata->hasProjections())
properties.projections = as_storage_metadata->getProjections().clone();

properties.constraints = as_storage_metadata->getConstraints();
}
else if (create.select)
Expand Down Expand Up @@ -951,6 +955,8 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)

if (create.storage && create.storage->unique_key && create.storage->cluster_by)
throw Exception("`CLUSTER BY` cannot be used together with `UNIQUE KEY`", ErrorCodes::BAD_ARGUMENTS);
if (create.storage && create.storage->unique_key && create.columns_list->projections)
throw Exception("`Projection` cannot be used together with `UNIQUE KEY`", ErrorCodes::BAD_ARGUMENTS);

String current_database = getContext()->getCurrentDatabase();
auto database_name = create.database.empty() ? current_database : create.database;
Expand Down

0 comments on commit 6aceb7b

Please sign in to comment.