Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add strict mode for KeeperMap #48293

Merged
merged 7 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/en/engines/table-engines/special/keepermap.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ Of course, it's possible to manually run `CREATE TABLE` with same path on nonrel

### Inserts

When new rows are inserted into `KeeperMap`, if the key already exists, the value will be updated, otherwise new key is created.
When new rows are inserted into `KeeperMap`, if the key does not exist, a new entry for the key is created.
If the key exists, and setting `keeper_map_strict_mode` is set to `true`, an exception is thrown, otherwise, the value for the key is overwritten.

Example:

Expand All @@ -89,6 +90,7 @@ INSERT INTO keeper_map_table VALUES ('some key', 1, 'value', 3.2);
### Deletes

Rows can be deleted using `DELETE` query or `TRUNCATE`.
If the key exists, and setting `keeper_map_strict_mode` is set to `true`, fetching and deleting data will succeed only if it can be executed atomically.

```sql
DELETE FROM keeper_map_table WHERE key LIKE 'some%' AND v1 > 1;
Expand All @@ -105,6 +107,7 @@ TRUNCATE TABLE keeper_map_table;
### Updates

Values can be updated using `ALTER TABLE` query. Primary key cannot be updated.
If setting `keeper_map_strict_mode` is set to `true`, fetching and updating data will succeed only if it's executed atomically.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be sure: we execute ALTER in one mutate call and update actual values in sink->finalize<true> call. That's why updating one chunk of data and getting an exception later is impossible, is it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we actualy send requests only on finalize.
Until then we fetch and modify data locally.


```sql
ALTER TABLE keeper_map_table UPDATE v1 = v1 * 10 + 2 WHERE key LIKE 'some%' AND v3 > 3.1;
Expand Down
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,7 @@ class IColumn;
M(Bool, force_aggregation_in_order, false, "Force use of aggregation in order on remote nodes during distributed aggregation. PLEASE, NEVER CHANGE THIS SETTING VALUE MANUALLY!", IMPORTANT) \
M(UInt64, http_max_request_param_data_size, 10_MiB, "Limit on size of request data used as a query parameter in predefined HTTP requests.", 0) \
M(Bool, allow_experimental_undrop_table_query, false, "Allow to use undrop query to restore dropped table in a limited time", 0) \
M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \
M(Bool, function_json_value_return_type_allow_nullable, false, "Allow function to return nullable type.", 0) \
M(Bool, function_json_value_return_type_allow_complex, false, "Allow function to return complex type, such as: struct, array, map.", 0) \
// End of COMMON_SETTINGS
Expand Down
9 changes: 9 additions & 0 deletions src/Interpreters/MutationsInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,12 @@ void MutationsInterpreter::prepare(bool dry_run)
if (source.hasLightweightDeleteMask())
all_columns.push_back({LightweightDeleteDescription::FILTER_COLUMN});

if (return_all_columns)
{
for (const auto & column : source.getStorage()->getVirtuals())
all_columns.push_back(column);
}

NameSet updated_columns;
bool materialize_ttl_recalculate_only = source.materializeTTLRecalculateOnly();

Expand Down Expand Up @@ -906,6 +912,8 @@ void MutationsInterpreter::prepareMutationStages(std::vector<Stage> & prepared_s
{
auto storage_snapshot = source.getStorageSnapshot(metadata_snapshot, context);
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
if (return_all_columns)
options.withVirtuals();
auto all_columns = storage_snapshot->getColumns(options);

/// Add _row_exists column if it is present in the part
Expand Down Expand Up @@ -1256,6 +1264,7 @@ void MutationsInterpreter::validate()
}

QueryPlan plan;

initQueryPlan(stages.front(), plan);
auto pipeline = addStreamsForLaterStages(stages, plan);
}
Expand Down
138 changes: 118 additions & 20 deletions src/Storages/StorageKeeperMap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ namespace ErrorCodes
namespace
{

constexpr std::string_view version_column_name = "_version";

std::string formattedAST(const ASTPtr & ast)
{
if (!ast)
Expand All @@ -77,7 +79,6 @@ void verifyTableId(const StorageID & table_id)
table_id.getDatabaseName(),
database->getEngineName());
}

}

}
Expand All @@ -86,11 +87,13 @@ class StorageKeeperMapSink : public SinkToStorage
{
StorageKeeperMap & storage;
std::unordered_map<std::string, std::string> new_values;
std::unordered_map<std::string, int32_t> versions;
size_t primary_key_pos;
ContextPtr context;

public:
StorageKeeperMapSink(StorageKeeperMap & storage_, const StorageMetadataPtr & metadata_snapshot)
: SinkToStorage(metadata_snapshot->getSampleBlock()), storage(storage_)
StorageKeeperMapSink(StorageKeeperMap & storage_, Block header, ContextPtr context_)
: SinkToStorage(header), storage(storage_), context(std::move(context_))
{
auto primary_key = storage.getPrimaryKey();
assert(primary_key.size() == 1);
Expand All @@ -113,18 +116,36 @@ class StorageKeeperMapSink : public SinkToStorage
wb_value.restart();

size_t idx = 0;

int32_t version = -1;
for (const auto & elem : block)
{
if (elem.name == version_column_name)
{
version = assert_cast<const ColumnVector<Int32> &>(*elem.column).getData()[i];
continue;
}

elem.type->getDefaultSerialization()->serializeBinary(*elem.column, i, idx == primary_key_pos ? wb_key : wb_value, {});
++idx;
}

auto key = base64Encode(wb_key.str(), /* url_encoding */ true);

if (version != -1)
versions[key] = version;

new_values[std::move(key)] = std::move(wb_value.str());
}
}

void onFinish() override
{
finalize<false>(/*strict*/ context->getSettingsRef().keeper_map_strict_mode);
}

template <bool for_update>
void finalize(bool strict)
{
auto zookeeper = storage.getClient();

Expand All @@ -147,21 +168,39 @@ class StorageKeeperMapSink : public SinkToStorage
for (const auto & [key, _] : new_values)
key_paths.push_back(storage.fullPathForKey(key));

auto results = zookeeper->exists(key_paths);
zkutil::ZooKeeper::MultiExistsResponse results;

if constexpr (!for_update)
{
if (!strict)
results = zookeeper->exists(key_paths);
}

Coordination::Requests requests;
requests.reserve(key_paths.size());
for (size_t i = 0; i < key_paths.size(); ++i)
{
auto key = fs::path(key_paths[i]).filename();
if (results[i].error == Coordination::Error::ZOK)

if constexpr (for_update)
{
requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], -1));
int32_t version = -1;
if (strict)
version = versions.at(key);

requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], version));
}
else
{
requests.push_back(zkutil::makeCreateRequest(key_paths[i], new_values[key], zkutil::CreateMode::Persistent));
++new_keys_num;
if (!strict && results[i].error == Coordination::Error::ZOK)
{
requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], -1));
}
else
{
requests.push_back(zkutil::makeCreateRequest(key_paths[i], new_values[key], zkutil::CreateMode::Persistent));
++new_keys_num;
}
}
}

Expand Down Expand Up @@ -193,15 +232,29 @@ class StorageKeeperMapSource : public ISource
KeyContainerIter it;
KeyContainerIter end;

bool with_version_column = false;

static Block getHeader(Block header, bool with_version_column)
{
if (with_version_column)
header.insert(
{DataTypeInt32{}.createColumn(),
std::make_shared<DataTypeInt32>(), std::string{version_column_name}});

return header;
}

public:
StorageKeeperMapSource(
const StorageKeeperMap & storage_,
const Block & header,
size_t max_block_size_,
KeyContainerPtr container_,
KeyContainerIter begin_,
KeyContainerIter end_)
: ISource(header), storage(storage_), max_block_size(max_block_size_), container(std::move(container_)), it(begin_), end(end_)
KeyContainerIter end_,
bool with_version_column_)
: ISource(getHeader(header, with_version_column_)), storage(storage_), max_block_size(max_block_size_), container(std::move(container_)), it(begin_), end(end_)
, with_version_column(with_version_column_)
{
}

Expand All @@ -225,12 +278,12 @@ class StorageKeeperMapSource : public ISource
for (auto & raw_key : raw_keys)
raw_key = base64Encode(raw_key, /* url_encoding */ true);

return storage.getBySerializedKeys(raw_keys, nullptr);
return storage.getBySerializedKeys(raw_keys, nullptr, with_version_column);
}
else
{
size_t elem_num = std::min(max_block_size, static_cast<size_t>(end - it));
auto chunk = storage.getBySerializedKeys(std::span{it, it + elem_num}, nullptr);
auto chunk = storage.getBySerializedKeys(std::span{it, it + elem_num}, nullptr, with_version_column);
it += elem_num;
return chunk;
}
Expand Down Expand Up @@ -426,6 +479,16 @@ Pipe StorageKeeperMap::read(
auto primary_key_type = sample_block.getByName(primary_key).type;
std::tie(filtered_keys, all_scan) = getFilterKeys(primary_key, primary_key_type, query_info, context_);

bool with_version_column = false;
for (const auto & column : column_names)
{
if (column == version_column_name)
{
with_version_column = true;
break;
}
}

const auto process_keys = [&]<typename KeyContainerPtr>(KeyContainerPtr keys) -> Pipe
{
if (keys->empty())
Expand All @@ -449,7 +512,7 @@ Pipe StorageKeeperMap::read(

using KeyContainer = typename KeyContainerPtr::element_type;
pipes.emplace_back(std::make_shared<StorageKeeperMapSource<KeyContainer>>(
*this, sample_block, max_block_size, keys, keys->begin() + begin, keys->begin() + end));
*this, sample_block, max_block_size, keys, keys->begin() + begin, keys->begin() + end, with_version_column));
}
return Pipe::unitePipes(std::move(pipes));
};
Expand All @@ -461,10 +524,10 @@ Pipe StorageKeeperMap::read(
return process_keys(std::move(filtered_keys));
}

SinkToStoragePtr StorageKeeperMap::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
SinkToStoragePtr StorageKeeperMap::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
checkTable<true>();
return std::make_shared<StorageKeeperMapSink>(*this, metadata_snapshot);
return std::make_shared<StorageKeeperMapSink>(*this, metadata_snapshot->getSampleBlock(), local_context);
}

void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
Expand Down Expand Up @@ -554,6 +617,12 @@ void StorageKeeperMap::drop()
dropTable(client, metadata_drop_lock);
}

NamesAndTypesList StorageKeeperMap::getVirtuals() const
{
return NamesAndTypesList{
{std::string{version_column_name}, std::make_shared<DataTypeInt32>()}};
}

zkutil::ZooKeeperPtr StorageKeeperMap::getClient() const
{
std::lock_guard lock{zookeeper_mutex};
Expand Down Expand Up @@ -670,13 +739,18 @@ Chunk StorageKeeperMap::getByKeys(const ColumnsWithTypeAndName & keys, PaddedPOD
if (raw_keys.size() != keys[0].column->size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Assertion failed: {} != {}", raw_keys.size(), keys[0].column->size());

return getBySerializedKeys(raw_keys, &null_map);
return getBySerializedKeys(raw_keys, &null_map, /* version_column */ false);
}

Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map) const
Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map, bool with_version) const
{
Block sample_block = getInMemoryMetadataPtr()->getSampleBlock();
MutableColumns columns = sample_block.cloneEmptyColumns();
MutableColumnPtr version_column = nullptr;

if (with_version)
version_column = ColumnVector<Int32>::create();

size_t primary_key_pos = getPrimaryKeyPos(sample_block, getPrimaryKey());

if (null_map)
Expand Down Expand Up @@ -706,6 +780,9 @@ Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> k
if (code == Coordination::Error::ZOK)
{
fillColumns(base64Decode(keys[i], true), response.data, primary_key_pos, sample_block, columns);

if (version_column)
version_column->insert(response.stat.version);
}
else if (code == Coordination::Error::ZNONODE)
{
Expand All @@ -714,6 +791,9 @@ Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> k
(*null_map)[i] = 0;
for (size_t col_idx = 0; col_idx < sample_block.columns(); ++col_idx)
columns[col_idx]->insert(sample_block.getByPosition(col_idx).type->getDefault());

if (version_column)
version_column->insert(-1);
}
}
else
Expand All @@ -723,6 +803,10 @@ Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> k
}

size_t num_rows = columns.at(0)->size();

if (version_column)
columns.push_back(std::move(version_column));

return Chunk(std::move(columns), num_rows);
}

Expand Down Expand Up @@ -763,6 +847,8 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
if (commands.empty())
return;

bool strict = local_context->getSettingsRef().keeper_map_strict_mode;

assert(commands.size() == 1);

auto metadata_snapshot = getInMemoryMetadataPtr();
Expand All @@ -784,23 +870,34 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca

auto header = interpreter->getUpdatedHeader();
auto primary_key_pos = header.getPositionByName(primary_key);
auto version_position = header.getPositionByName(std::string{version_column_name});

auto client = getClient();

Block block;
while (executor.pull(block))
{
auto & column_type_name = block.getByPosition(primary_key_pos);
auto column = column_type_name.column;
auto size = column->size();


WriteBufferFromOwnString wb_key;
Coordination::Requests delete_requests;

for (size_t i = 0; i < size; ++i)
{
int32_t version = -1;
if (strict)
{
const auto & version_column = block.getByPosition(version_position).column;
version = assert_cast<const ColumnVector<Int32> &>(*version_column).getData()[i];
}

wb_key.restart();

column_type_name.type->getDefaultSerialization()->serializeBinary(*column, i, wb_key, {});
delete_requests.emplace_back(zkutil::makeRemoveRequest(fullPathForKey(base64Encode(wb_key.str(), true)), -1));
delete_requests.emplace_back(zkutil::makeRemoveRequest(fullPathForKey(base64Encode(wb_key.str(), true)), version));
}

Coordination::Responses responses;
Expand Down Expand Up @@ -834,12 +931,13 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
PullingPipelineExecutor executor(pipeline);

auto sink = std::make_shared<StorageKeeperMapSink>(*this, metadata_snapshot);
auto sink = std::make_shared<StorageKeeperMapSink>(*this, executor.getHeader(), local_context);

Block block;
while (executor.pull(block))
sink->consume(Chunk{block.getColumns(), block.rows()});
sink->onFinish();

sink->finalize<true>(strict);
}

namespace
Expand Down