Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update only affected rows in KV storage #48435

Merged
merged 1 commit into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 11 additions & 8 deletions src/Interpreters/MutationsInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,11 @@ MutationsInterpreter::MutationsInterpreter(
ContextPtr context_,
bool can_execute_,
bool return_all_columns_,
bool return_deleted_rows_)
bool return_mutated_rows_)
: MutationsInterpreter(
Source(std::move(storage_)),
metadata_snapshot_, std::move(commands_), std::move(context_),
can_execute_, return_all_columns_, return_deleted_rows_)
can_execute_, return_all_columns_, return_mutated_rows_)
{
if (can_execute_ && dynamic_cast<const MergeTreeData *>(source.getStorage().get()))
{
Expand All @@ -396,11 +396,11 @@ MutationsInterpreter::MutationsInterpreter(
ContextPtr context_,
bool can_execute_,
bool return_all_columns_,
bool return_deleted_rows_)
bool return_mutated_rows_)
: MutationsInterpreter(
Source(storage_, std::move(source_part_)),
metadata_snapshot_, std::move(commands_), std::move(context_),
can_execute_, return_all_columns_, return_deleted_rows_)
can_execute_, return_all_columns_, return_mutated_rows_)
{
}

Expand All @@ -411,15 +411,15 @@ MutationsInterpreter::MutationsInterpreter(
ContextPtr context_,
bool can_execute_,
bool return_all_columns_,
bool return_deleted_rows_)
bool return_mutated_rows_)
: source(std::move(source_))
, metadata_snapshot(metadata_snapshot_)
, commands(std::move(commands_))
, context(Context::createCopy(context_))
, can_execute(can_execute_)
, select_limits(SelectQueryOptions().analyze(!can_execute).ignoreLimits().ignoreProjections())
, return_all_columns(return_all_columns_)
, return_deleted_rows(return_deleted_rows_)
, return_mutated_rows(return_mutated_rows_)
{
prepare(!can_execute);
}
Expand Down Expand Up @@ -600,7 +600,7 @@ void MutationsInterpreter::prepare(bool dry_run)
for (auto & command : commands)
{
// we can return deleted rows only if it's the only present command
assert(command.type == MutationCommand::DELETE || !return_deleted_rows);
assert(command.type == MutationCommand::DELETE || command.type == MutationCommand::UPDATE || !return_mutated_rows);

if (command.type == MutationCommand::DELETE)
{
Expand All @@ -610,7 +610,7 @@ void MutationsInterpreter::prepare(bool dry_run)

auto predicate = getPartitionAndPredicateExpressionForMutationCommand(command);

if (!return_deleted_rows)
if (!return_mutated_rows)
predicate = makeASTFunction("isZeroOrNull", predicate);

stages.back().filters.push_back(predicate);
Expand Down Expand Up @@ -697,6 +697,9 @@ void MutationsInterpreter::prepare(bool dry_run)
type_literal);

stages.back().column_to_updated.emplace(column, updated_column);

if (condition && return_mutated_rows)
stages.back().filters.push_back(condition);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How ALTER UPDATE WHERE worked before if we didn't apply filters?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It correctly mutates the rows it needs by creating SELECT if(condition, mutate, old_value).... The problem for KV store is that MutationIntepreter returns all the rows (changed and unchanged) causing an update on ALL keys while we only need a subset we would like to override.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. So, for rocksdb we will not rewrite values with the same old_value. And for keepermap we will also not update _version.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly 😄

}

if (!affected_materialized.empty())
Expand Down
10 changes: 5 additions & 5 deletions src/Interpreters/MutationsInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class MutationsInterpreter
ContextPtr context_,
bool can_execute_,
bool return_all_columns_ = false,
bool return_deleted_rows_ = false);
bool return_mutated_rows_ = false);

/// Special case for MergeTree
MutationsInterpreter(
Expand All @@ -59,7 +59,7 @@ class MutationsInterpreter
ContextPtr context_,
bool can_execute_,
bool return_all_columns_ = false,
bool return_deleted_rows_ = false);
bool return_mutated_rows_ = false);

void validate();
size_t evaluateCommandsSize();
Expand Down Expand Up @@ -136,7 +136,7 @@ class MutationsInterpreter
ContextPtr context_,
bool can_execute_,
bool return_all_columns_,
bool return_deleted_rows_);
bool return_mutated_rows_);

void prepare(bool dry_run);

Expand Down Expand Up @@ -210,8 +210,8 @@ class MutationsInterpreter
// whether all columns should be returned, not just updated
bool return_all_columns;

// whether we should return deleted or nondeleted rows on DELETE mutation
bool return_deleted_rows;
// whether we should return mutated or all existing rows
bool return_mutated_rows;
};

}
10 changes: 8 additions & 2 deletions src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ void StorageEmbeddedRocksDB::mutate(const MutationCommands & commands, ContextPt
context_,
/*can_execute_*/ true,
/*return_all_columns_*/ true,
/*return_deleted_rows_*/ true);
/*return_mutated_rows*/ true);
auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
PullingPipelineExecutor executor(pipeline);

Expand Down Expand Up @@ -279,7 +279,13 @@ void StorageEmbeddedRocksDB::mutate(const MutationCommands & commands, ContextPt
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Primary key cannot be updated");

auto interpreter = std::make_unique<MutationsInterpreter>(
storage_ptr, metadata_snapshot, commands, context_, /*can_execute_*/ true, /*return_all_columns*/ true);
storage_ptr,
metadata_snapshot,
commands,
context_,
/*can_execute_*/ true,
/*return_all_columns*/ true,
/*return_mutated_rows*/ true);
auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
PullingPipelineExecutor executor(pipeline);

Expand Down
10 changes: 8 additions & 2 deletions src/Storages/StorageKeeperMap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
local_context,
/*can_execute_*/ true,
/*return_all_columns_*/ true,
/*return_deleted_rows_*/ true);
/*return_mutated_rows*/ true);
auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
PullingPipelineExecutor executor(pipeline);

Expand Down Expand Up @@ -927,7 +927,13 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Primary key cannot be updated");

auto interpreter = std::make_unique<MutationsInterpreter>(
storage_ptr, metadata_snapshot, commands, local_context, /*can_execute_*/ true, /*return_all_columns*/ true);
storage_ptr,
metadata_snapshot,
commands,
local_context,
/*can_execute_*/ true,
/*return_all_columns*/ true,
/*return_mutated_rows*/ true);
auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
PullingPipelineExecutor executor(pipeline);

Expand Down
52 changes: 26 additions & 26 deletions tests/queries/0_stateless/02577_keepermap_delete_update.reference
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
1 Some string 0
2 Some other string 0
3 random 0
4 random2 0
1 Some string 0 0
2 Some other string 0 0
3 random 0 0
4 random2 0 0
-----------
3 random 0
4 random2 0
3 random 0 0
4 random2 0 0
-----------
3 random 0
3 random 0 0
-----------
0
-----------
1 String 10
2 String 20
3 String 30
4 String 40
-----------
1 String 10
2 String 20
3 Another 30
4 Another 40
-----------
1 String 10
2 String 20
3 Another 30
4 Another 40
-----------
1 String 102
2 String 202
3 Another 302
4 Another 402
1 String 10 0
2 String 20 0
3 String 30 0
4 String 40 0
-----------
1 String 10 0
2 String 20 0
3 Another 30 1
4 Another 40 1
-----------
1 String 10 0
2 String 20 0
3 Another 30 1
4 Another 40 1
-----------
1 String 102 1
2 String 202 1
3 Another 302 2
4 Another 402 2
-----------
40 changes: 20 additions & 20 deletions tests/queries/0_stateless/02577_keepermap_delete_update.sql
Original file line number Diff line number Diff line change
@@ -1,44 +1,44 @@
-- Tags: no-ordinary-database, no-fasttest

DROP TABLE IF EXISTS 02661_keepermap_delete_update;
DROP TABLE IF EXISTS 02577_keepermap_delete_update;

CREATE TABLE 02661_keepermap_delete_update (key UInt64, value String, value2 UInt64) ENGINE=KeeperMap('/' || currentDatabase() || '/test02661_keepermap_delete_update') PRIMARY KEY(key);
CREATE TABLE 02577_keepermap_delete_update (key UInt64, value String, value2 UInt64) ENGINE=KeeperMap('/' || currentDatabase() || '/test02577_keepermap_delete_update') PRIMARY KEY(key);

INSERT INTO 02661_keepermap_delete_update VALUES (1, 'Some string', 0), (2, 'Some other string', 0), (3, 'random', 0), (4, 'random2', 0);
INSERT INTO 02577_keepermap_delete_update VALUES (1, 'Some string', 0), (2, 'Some other string', 0), (3, 'random', 0), (4, 'random2', 0);

SELECT * FROM 02661_keepermap_delete_update ORDER BY key;
SELECT *, _version FROM 02577_keepermap_delete_update ORDER BY key;
SELECT '-----------';

DELETE FROM 02661_keepermap_delete_update WHERE value LIKE 'Some%string';
DELETE FROM 02577_keepermap_delete_update WHERE value LIKE 'Some%string';

SELECT * FROM 02661_keepermap_delete_update ORDER BY key;
SELECT *, _version FROM 02577_keepermap_delete_update ORDER BY key;
SELECT '-----------';

ALTER TABLE 02661_keepermap_delete_update DELETE WHERE key >= 4;
ALTER TABLE 02577_keepermap_delete_update DELETE WHERE key >= 4;

SELECT * FROM 02661_keepermap_delete_update ORDER BY key;
SELECT *, _version FROM 02577_keepermap_delete_update ORDER BY key;
SELECT '-----------';

DELETE FROM 02661_keepermap_delete_update WHERE 1 = 1;
SELECT count() FROM 02661_keepermap_delete_update;
DELETE FROM 02577_keepermap_delete_update WHERE 1 = 1;
SELECT count() FROM 02577_keepermap_delete_update;
SELECT '-----------';

INSERT INTO 02661_keepermap_delete_update VALUES (1, 'String', 10), (2, 'String', 20), (3, 'String', 30), (4, 'String', 40);
SELECT * FROM 02661_keepermap_delete_update ORDER BY key;
INSERT INTO 02577_keepermap_delete_update VALUES (1, 'String', 10), (2, 'String', 20), (3, 'String', 30), (4, 'String', 40);
SELECT *, _version FROM 02577_keepermap_delete_update ORDER BY key;
SELECT '-----------';

ALTER TABLE 02661_keepermap_delete_update UPDATE value = 'Another' WHERE key > 2;
SELECT * FROM 02661_keepermap_delete_update ORDER BY key;
ALTER TABLE 02577_keepermap_delete_update UPDATE value = 'Another' WHERE key > 2;
SELECT *, _version FROM 02577_keepermap_delete_update ORDER BY key;
SELECT '-----------';

ALTER TABLE 02661_keepermap_delete_update UPDATE key = key * 10 WHERE 1 = 1; -- { serverError BAD_ARGUMENTS }
SELECT * FROM 02661_keepermap_delete_update ORDER BY key;
ALTER TABLE 02577_keepermap_delete_update UPDATE key = key * 10 WHERE 1 = 1; -- { serverError BAD_ARGUMENTS }
SELECT *, _version FROM 02577_keepermap_delete_update ORDER BY key;
SELECT '-----------';

ALTER TABLE 02661_keepermap_delete_update UPDATE value2 = value2 * 10 + 2 WHERE value2 < 100;
SELECT * FROM 02661_keepermap_delete_update ORDER BY key;
ALTER TABLE 02577_keepermap_delete_update UPDATE value2 = value2 * 10 + 2 WHERE value2 < 100;
SELECT *, _version FROM 02577_keepermap_delete_update ORDER BY key;
SELECT '-----------';

ALTER TABLE 02661_keepermap_delete_update ON CLUSTER test_shard_localhost UPDATE value2 = value2 * 10 + 2 WHERE value2 < 100; -- { serverError BAD_ARGUMENTS }
ALTER TABLE 02577_keepermap_delete_update ON CLUSTER test_shard_localhost UPDATE value2 = value2 * 10 + 2 WHERE value2 < 100; -- { serverError BAD_ARGUMENTS }

DROP TABLE IF EXISTS 02661_keepermap_delete_update;
DROP TABLE IF EXISTS 02577_keepermap_delete_update;
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
1 Some string 0
2 Some other string 0
3 random 0
4 random2 0
1 Some string 0 0 0
2 Some other string 0 0 0
3 random 0 0 0
4 random2 0 0 0
-----------
3 random 0
4 random2 0
3 random 0 0
4 random2 0 0
-----------
3 random 0
3 random 0 0
-----------
0
-----------
1 String 10
2 String 20
3 String 30
4 String 40
-----------
1 String 10
2 String 20
3 Another 30
4 Another 40
-----------
1 String 10
2 String 20
3 Another 30
4 Another 40
-----------
1 String 102
2 String 202
3 Another 302
4 Another 402
1 String 10 0
2 String 20 0
3 String 30 0
4 String 40 0
-----------
1 String 10 0
2 String 20 0
3 Another 30 1
4 Another 40 1
-----------
1 String 10 0
2 String 20 0
3 Another 30 1
4 Another 40 1
-----------
1 String 102 1
2 String 202 1
3 Another 302 2
4 Another 402 2
-----------
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,37 @@ CREATE TABLE 02707_keepermap_delete_update (key UInt64, value String, value2 UIn

INSERT INTO 02707_keepermap_delete_update VALUES (1, 'Some string', 0), (2, 'Some other string', 0), (3, 'random', 0), (4, 'random2', 0);

SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT *, _version, _version FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';

DELETE FROM 02707_keepermap_delete_update WHERE value LIKE 'Some%string';

SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT *, _version FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';

ALTER TABLE 02707_keepermap_delete_update DELETE WHERE key >= 4;

SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT *, _version FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';

DELETE FROM 02707_keepermap_delete_update WHERE 1 = 1;
SELECT count() FROM 02707_keepermap_delete_update;
SELECT '-----------';

INSERT INTO 02707_keepermap_delete_update VALUES (1, 'String', 10), (2, 'String', 20), (3, 'String', 30), (4, 'String', 40);
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT *, _version FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';

ALTER TABLE 02707_keepermap_delete_update UPDATE value = 'Another' WHERE key > 2;
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT *, _version FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';

ALTER TABLE 02707_keepermap_delete_update UPDATE key = key * 10 WHERE 1 = 1; -- { serverError 36 }
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT *, _version FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';

ALTER TABLE 02707_keepermap_delete_update UPDATE value2 = value2 * 10 + 2 WHERE value2 < 100;
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT *, _version FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';

DROP TABLE IF EXISTS 02707_keepermap_delete_update;