Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add query 'ALTER ... MATERIALIZE TTL' #8775

Merged
merged 10 commits into from Feb 23, 2020
4 changes: 3 additions & 1 deletion dbms/src/Access/AccessFlags.h
Expand Up @@ -282,6 +282,8 @@ class AccessFlags::Impl

auto modify_ttl = std::make_unique<Node>("MODIFY TTL", next_flag++, TABLE_LEVEL);
modify_ttl->aliases.push_back("ALTER MODIFY TTL");
auto materialize_ttl = std::make_unique<Node>("MATERIALIZE TTL", next_flag++, TABLE_LEVEL);
materialize_ttl->aliases.push_back("ALTER MATERIALIZE TTL");

auto modify_setting = std::make_unique<Node>("MODIFY SETTING", next_flag++, TABLE_LEVEL);
modify_setting->aliases.push_back("ALTER MODIFY SETTING");
Expand All @@ -293,7 +295,7 @@ class AccessFlags::Impl
auto freeze_partition = std::make_unique<Node>("FREEZE PARTITION", next_flag++, TABLE_LEVEL);
ext::push_back(freeze_partition->aliases, "ALTER FREEZE PARTITION");

auto alter_table = std::make_unique<Node>("ALTER TABLE", std::move(update), std::move(delet), std::move(alter_column), std::move(index), std::move(alter_constraint), std::move(modify_ttl), std::move(modify_setting), std::move(move_partition), std::move(fetch_partition), std::move(freeze_partition));
auto alter_table = std::make_unique<Node>("ALTER TABLE", std::move(update), std::move(delet), std::move(alter_column), std::move(index), std::move(alter_constraint), std::move(modify_ttl), std::move(materialize_ttl), std::move(modify_setting), std::move(move_partition), std::move(fetch_partition), std::move(freeze_partition));

auto refresh_view = std::make_unique<Node>("REFRESH VIEW", next_flag++, VIEW_LEVEL);
ext::push_back(refresh_view->aliases, "ALTER LIVE VIEW REFRESH");
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Access/AccessType.h
Expand Up @@ -45,6 +45,7 @@ enum class AccessType
ALTER_CONSTRAINT, /// allows to execute ALTER {ADD|DROP} CONSTRAINT

MODIFY_TTL, /// allows to execute ALTER MODIFY TTL
MATERIALIZE_TTL, /// allows to execute ALTER MATERIALIZE TTL
MODIFY_SETTING, /// allows to execute ALTER MODIFY SETTING

MOVE_PARTITION,
Expand Down Expand Up @@ -204,6 +205,7 @@ namespace impl
ACCESS_TYPE_TO_KEYWORD_CASE(ALTER_CONSTRAINT);

ACCESS_TYPE_TO_KEYWORD_CASE(MODIFY_TTL);
ACCESS_TYPE_TO_KEYWORD_CASE(MATERIALIZE_TTL);
ACCESS_TYPE_TO_KEYWORD_CASE(MODIFY_SETTING);

ACCESS_TYPE_TO_KEYWORD_CASE(MOVE_PARTITION);
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/DataStreams/TTLBlockInputStream.cpp
Expand Up @@ -150,6 +150,10 @@ void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
std::vector<String> columns_to_remove;
for (const auto & [name, ttl_entry] : storage.column_ttl_entries_by_name)
{
/// If we read not all table columns. E.g. while mutation.
if (!block.has(name))
continue;

const auto & old_ttl_info = old_ttl_infos.columns_ttl[name];
auto & new_ttl_info = new_ttl_infos.columns_ttl[name];

Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Interpreters/InterpreterAlterQuery.cpp
Expand Up @@ -25,6 +25,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int ILLEGAL_COLUMN;
extern const int SUPPORT_IS_DISABLED;
extern const int INCORRECT_QUERY;
}


Expand Down Expand Up @@ -69,7 +70,13 @@ BlockIO InterpreterAlterQuery::execute()
partition_commands.emplace_back(std::move(*partition_command));
}
else if (auto mut_command = MutationCommand::parse(command_ast))
{
if (mut_command->type == MutationCommand::MATERIALIZE_TTL && !table->hasAnyTTL())
throw Exception("Cannot MATERIALIZE TTL as there is no TTL set for table "
+ table->getStorageID().getNameForLogs(), ErrorCodes::INCORRECT_QUERY);

mutation_commands.emplace_back(std::move(*mut_command));
}
else if (auto live_view_command = LiveViewCommand::parse(command_ast))
live_view_commands.emplace_back(std::move(*live_view_command));
else
Expand Down Expand Up @@ -207,6 +214,11 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccess() const
required_access.emplace_back(AccessType::MODIFY_TTL, alter.database, alter.table);
break;
}
case ASTAlterCommand::MATERIALIZE_TTL:
{
required_access.emplace_back(AccessType::MATERIALIZE_TTL, alter.database, alter.table);
break;
}
case ASTAlterCommand::MODIFY_SETTING:
{
required_access.emplace_back(AccessType::MODIFY_SETTING, alter.database, alter.table);
Expand Down
155 changes: 117 additions & 38 deletions dbms/src/Interpreters/MutationsInterpreter.cpp
Expand Up @@ -124,6 +124,28 @@ ASTPtr prepareQueryAffectedAST(const std::vector<MutationCommand> & commands)
return select;
}

ColumnDependencies getAllColumnDependencies(const StoragePtr & storage, const NameSet & updated_columns)
{
NameSet new_updated_columns = updated_columns;
ColumnDependencies dependencies;
while (!new_updated_columns.empty())
{
auto new_dependencies = storage->getColumnDependencies(new_updated_columns);
new_updated_columns.clear();
for (const auto & dependency : new_dependencies)
{
if (!dependencies.count(dependency))
{
dependencies.insert(dependency);
if (!dependency.isReadOnly())
new_updated_columns.insert(dependency.column_name);
}
}
}

return dependencies;
}

};

bool isStorageTouchedByMutations(
Expand Down Expand Up @@ -274,7 +296,6 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
/// We need to know which columns affect which MATERIALIZED columns and data skipping indices
/// to recalculate them if dependencies are updated.
std::unordered_map<String, Names> column_to_affected_materialized;
NameSet affected_indices_columns;
if (!updated_columns.empty())
{
for (const auto & column : columns_desc)
Expand All @@ -290,25 +311,13 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
}
}
}
for (const auto & index : indices_desc.indices)
{
auto query = index->expr->clone();
auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns);
const auto required_columns = syntax_result->requiredSourceColumns();

for (const String & dependency : required_columns)
{
if (updated_columns.count(dependency))
{
affected_indices_columns.insert(std::cbegin(required_columns), std::cend(required_columns));
break;
}
}
}

validateUpdateColumns(storage, updated_columns, column_to_affected_materialized);
}

/// Columns, that we need to read for calculation of skip indices or TTL expressions.
auto dependencies = getAllColumnDependencies(storage, updated_columns);

/// First, break a sequence of commands into stages.
for (const auto & command : commands)
{
Expand Down Expand Up @@ -380,40 +389,110 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
auto query = (*it)->expr->clone();
auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns);
const auto required_columns = syntax_result->requiredSourceColumns();
affected_indices_columns.insert(std::cbegin(required_columns), std::cend(required_columns));
for (const auto & column : required_columns)
dependencies.emplace(column, ColumnDependency::SKIP_INDEX);
}
else if (command.type == MutationCommand::MATERIALIZE_TTL)
{
if (storage->hasRowsTTL())
{
for (const auto & column : all_columns)
dependencies.emplace(column.name, ColumnDependency::TTL_TARGET);
}
else
{
NameSet new_updated_columns;
auto column_ttls = storage->getColumns().getColumnTTLs();
for (const auto & elem : column_ttls)
{
dependencies.emplace(elem.first, ColumnDependency::TTL_TARGET);
new_updated_columns.insert(elem.first);
}

auto all_columns_vec = all_columns.getNames();
auto all_dependencies = getAllColumnDependencies(storage, NameSet(all_columns_vec.begin(), all_columns_vec.end()));

for (const auto & dependency : all_dependencies)
{
if (dependency.kind == ColumnDependency::TTL_EXPRESSION)
dependencies.insert(dependency);
}

/// Recalc only skip indices of columns, that could be updated by TTL.
auto new_dependencies = storage->getColumnDependencies(new_updated_columns);
for (const auto & dependency : new_dependencies)
{
if (dependency.kind == ColumnDependency::SKIP_INDEX)
dependencies.insert(dependency);
}

if (dependencies.empty())
{
/// Very rare case. It can happen if we have only one MOVE TTL with constant expression.
/// But we still have to read at least one column.
dependencies.emplace(all_columns.front().name, ColumnDependency::TTL_EXPRESSION);
}
}
}
else
throw Exception("Unknown mutation command type: " + DB::toString<int>(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND);
}

/// We cares about affected indices because we also need to rewrite them
/// when one of index columns updated or filtered with delete
if (!affected_indices_columns.empty())
/// when one of index columns updated or filtered with delete.
/// The same about colums, that are needed for calculation of TTL expressions.
if (!dependencies.empty())
{
if (!stages.empty())
NameSet changed_columns;
NameSet unchanged_columns;
for (const auto & dependency : dependencies)
{
std::vector<Stage> stages_copy;
/// Copy all filled stages except index calculation stage.
for (const auto & stage : stages)
{
stages_copy.emplace_back(context);
stages_copy.back().column_to_updated = stage.column_to_updated;
stages_copy.back().output_columns = stage.output_columns;
stages_copy.back().filters = stage.filters;
}
if (dependency.isReadOnly())
unchanged_columns.insert(dependency.column_name);
else
changed_columns.insert(dependency.column_name);
}

const ASTPtr select_query = prepareInterpreterSelectQuery(stages_copy, /* dry_run = */ true);
InterpreterSelectQuery interpreter{select_query, context, storage, SelectQueryOptions().analyze(/* dry_run = */ false).ignoreLimits()};
if (!changed_columns.empty())
{
if (stages.empty() || !stages.back().column_to_updated.empty())
stages.emplace_back(context);
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
stages.emplace_back(context);

auto first_stage_header = interpreter.getSampleBlock();
auto in = std::make_shared<NullBlockInputStream>(first_stage_header);
updated_header = std::make_unique<Block>(addStreamsForLaterStages(stages_copy, in)->getHeader());
for (const auto & column : changed_columns)
stages.back().column_to_updated.emplace(
column, std::make_shared<ASTIdentifier>(column));
}
/// Special step to recalculate affected indices.
stages.emplace_back(context);
for (const auto & column : affected_indices_columns)
stages.back().column_to_updated.emplace(

if (!unchanged_columns.empty())
{
if (!stages.empty())
{
std::vector<Stage> stages_copy;
/// Copy all filled stages except index calculation stage.
for (const auto & stage : stages)
{
stages_copy.emplace_back(context);
stages_copy.back().column_to_updated = stage.column_to_updated;
stages_copy.back().output_columns = stage.output_columns;
stages_copy.back().filters = stage.filters;
}

const ASTPtr select_query = prepareInterpreterSelectQuery(stages_copy, /* dry_run = */ true);
InterpreterSelectQuery interpreter{select_query, context, storage, SelectQueryOptions().analyze(/* dry_run = */ false).ignoreLimits()};

auto first_stage_header = interpreter.getSampleBlock();
auto in = std::make_shared<NullBlockInputStream>(first_stage_header);
updated_header = std::make_unique<Block>(addStreamsForLaterStages(stages_copy, in)->getHeader());
}

/// Special step to recalculate affected indices and TTL expressions.
stages.emplace_back(context);
for (const auto & column : unchanged_columns)
stages.back().column_to_updated.emplace(
column, std::make_shared<ASTIdentifier>(column));
}
}

is_prepared = true;
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Parsers/ASTAlterQuery.cpp
Expand Up @@ -261,6 +261,11 @@ void ASTAlterCommand::formatImpl(
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY TTL " << (settings.hilite ? hilite_none : "");
ttl->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::MATERIALIZE_TTL)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MATERIALIZE TTL"
<< (settings.hilite ? hilite_none : "");
}
else if (type == ASTAlterCommand::MODIFY_SETTING)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY SETTING " << (settings.hilite ? hilite_none : "");
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Parsers/ASTAlterQuery.h
Expand Up @@ -31,6 +31,7 @@ class ASTAlterCommand : public IAST
COMMENT_COLUMN,
MODIFY_ORDER_BY,
MODIFY_TTL,
MATERIALIZE_TTL,
MODIFY_SETTING,
MODIFY_QUERY,

Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Parsers/ParserAlterQuery.cpp
Expand Up @@ -30,6 +30,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_comment_column("COMMENT COLUMN");
ParserKeyword s_modify_order_by("MODIFY ORDER BY");
ParserKeyword s_modify_ttl("MODIFY TTL");
ParserKeyword s_materialize_ttl("MATERIALIZE TTL");
ParserKeyword s_modify_setting("MODIFY SETTING");
ParserKeyword s_modify_query("MODIFY QUERY");

Expand Down Expand Up @@ -458,6 +459,10 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
return false;
command->type = ASTAlterCommand::MODIFY_TTL;
}
else if (s_materialize_ttl.ignore(pos, expected))
{
command->type = ASTAlterCommand::MATERIALIZE_TTL;
}
else if (s_modify_setting.ignore(pos, expected))
{
if (!parser_settings.parse(pos, command->settings_changes, expected))
Expand Down
57 changes: 57 additions & 0 deletions dbms/src/Storages/ColumnDependency.h
@@ -0,0 +1,57 @@
#pragma once

#include <Common/SipHash.h>
#include <Core/Types.h>
#include <unordered_set>

namespace DB
{

/// Represents dependency from other column.
/// Used to determine, which columns we have to read, if we want to update some other column.
/// Necessary, because table can have some depenendecies, which requires several columns for calculation.
struct ColumnDependency
{
enum Kind : UInt8
{
/// Exists any skip index, that requires @column_name
SKIP_INDEX,

/// Exists any TTL expression, that requires @column_name
TTL_EXPRESSION,

/// TTL is set for @column_name.
TTL_TARGET
};

ColumnDependency(const String & column_name_, Kind kind_)
: column_name(column_name_), kind(kind_) {}

String column_name;
Kind kind;

bool isReadOnly() const
{
return kind == SKIP_INDEX || kind == TTL_EXPRESSION;
}

bool operator==(const ColumnDependency & other) const
{
return kind == other.kind && column_name == other.column_name;
}

struct Hash
{
UInt64 operator()(const ColumnDependency & dependency) const
{
SipHash hash;
hash.update(dependency.column_name);
hash.update(dependency.kind);
return hash.get64();
}
};
};

using ColumnDependencies = std::unordered_set<ColumnDependency, ColumnDependency::Hash>;

}