Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not allow creating replicated table with inconsistent merge params #56833

Merged
merged 8 commits into from Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/Processors/Merges/Algorithms/Graphite.h
@@ -1,5 +1,6 @@
#pragma once

#include <Common/SipHash.h>
#include <Common/OptimizedRegularExpression.h>
#include <AggregateFunctions/IAggregateFunction.h>

Expand Down Expand Up @@ -122,6 +123,17 @@ struct Pattern
AggregateFunctionPtr function;
Retentions retentions; /// Must be ordered by 'age' descending.
enum { TypeUndef, TypeRetention, TypeAggregation, TypeAll } type = TypeAll; /// The type of defined pattern, filled automatically
void updateHash(SipHash & hash) const
{
hash.update(rule_type);
hash.update(regexp_str);
hash.update(function->getName());
for (const auto & r : retentions)
{
hash.update(r.age);
hash.update(r.precision);
}
}
};

bool operator==(const Pattern & a, const Pattern & b);
Expand All @@ -142,6 +154,21 @@ struct Params
Graphite::Patterns patterns;
Graphite::Patterns patterns_plain;
Graphite::Patterns patterns_tagged;
void updateHash(SipHash & hash) const
{
hash.update(path_column_name);
hash.update(time_column_name);
hash.update(value_column_name);
hash.update(value_column_name);
hash.update(version_column_name);
hash.update(patterns_typed);
for (const auto & p : patterns)
p.updateHash(hash);
for (const auto & p : patterns_plain)
p.updateHash(hash);
for (const auto & p : patterns_tagged)
p.updateHash(hash);
}
};

using RollupRule = std::pair<const RetentionPattern *, const AggregationPattern *>;
Expand Down
82 changes: 82 additions & 0 deletions src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.cpp
Expand Up @@ -6,6 +6,9 @@
#include <Parsers/ExpressionListParsers.h>
#include <IO/Operators.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Common/SipHash.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>


namespace DB
Expand Down Expand Up @@ -49,6 +52,17 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
index_granularity = data_settings->index_granularity;
merging_params_mode = static_cast<int>(data.merging_params.mode);
sign_column = data.merging_params.sign_column;
is_deleted_column = data.merging_params.is_deleted_column;
columns_to_sum = fmt::format("{}", fmt::join(data.merging_params.columns_to_sum.begin(), data.merging_params.columns_to_sum.end(), ","));
version_column = data.merging_params.version_column;
if (data.merging_params.mode == MergeTreeData::MergingParams::Graphite)
{
SipHash graphite_hash;
data.merging_params.graphite_params.updateHash(graphite_hash);
WriteBufferFromOwnString wb;
writeText(graphite_hash.get128(), wb);
graphite_params_hash = std::move(wb.str());
}

/// This code may looks strange, but previously we had only one entity: PRIMARY KEY (or ORDER BY, it doesn't matter)
/// Now we have two different entities ORDER BY and it's optional prefix -- PRIMARY KEY.
Expand Down Expand Up @@ -90,6 +104,22 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr

void ReplicatedMergeTreeTableMetadata::write(WriteBuffer & out) const
{
/// Important notes: new added field must always be append to the end of serialized metadata
/// for backward compatible.

/// In addition, two consecutive fields should not share any prefix, otherwise deserialize may fails.
/// For example, if you have two field `v1` and `v2` serialized as:
/// if (!v1.empty()) out << "v1: " << v1 << "\n";
/// if (!v2.empty()) out << "v2: " << v2 << "\n";
/// Let say if `v1` is empty and v2 is non-empty, then `v1` is not in serialized metadata.
/// Later, to deserialize the metadata, `read` will sequentially check if each field with `checkString`.
/// When it begin to check for `v1` and `v2`, the metadata buffer look like this:
/// v2: <v2 value>
/// ^
/// cursor
/// `checkString("v1: ", in)` will be called first and it moves the cursor to `2` instead of `v`, so the
/// subsequent call `checkString("v2: ", in)` will also fails.

out << "metadata format version: 1\n"
<< "date column: " << date_column << "\n"
<< "sampling expression: " << sampling_expression << "\n"
Expand Down Expand Up @@ -121,6 +151,19 @@ void ReplicatedMergeTreeTableMetadata::write(WriteBuffer & out) const

if (!constraints.empty())
out << "constraints: " << constraints << "\n";

if (merge_params_version >= REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS)
{
out << "merge parameters format version: " << merge_params_version << "\n";
canhld94 marked this conversation as resolved.
Show resolved Hide resolved
if (!version_column.empty())
out << "version column: " << version_column << "\n";
if (!is_deleted_column.empty())
out << "is_deleted column: " << is_deleted_column << "\n";
if (!columns_to_sum.empty())
out << "columns to sum: " << columns_to_sum << "\n";
if (!graphite_params_hash.empty())
out << "graphite hash: " << graphite_params_hash << "\n";
}
}

String ReplicatedMergeTreeTableMetadata::toString() const
Expand Down Expand Up @@ -170,6 +213,26 @@ void ReplicatedMergeTreeTableMetadata::read(ReadBuffer & in)

if (checkString("constraints: ", in))
in >> constraints >> "\n";

if (checkString("merge parameters format version: ", in))
in >> merge_params_version >> "\n";
else
merge_params_version = REPLICATED_MERGE_TREE_METADATA_LEGACY_VERSION;

if (merge_params_version >= REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS)
{
if (checkString("version column: ", in))
in >> version_column >> "\n";

if (checkString("is_deleted column: ", in))
in >> is_deleted_column >> "\n";

if (checkString("columns to sum: ", in))
in >> columns_to_sum >> "\n";

if (checkString("graphite hash: ", in))
in >> graphite_params_hash >> "\n";
}
}

ReplicatedMergeTreeTableMetadata ReplicatedMergeTreeTableMetadata::parse(const String & s)
Expand Down Expand Up @@ -210,6 +273,25 @@ void ReplicatedMergeTreeTableMetadata::checkImmutableFieldsEquals(const Replicat
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in sign column. "
"Stored in ZooKeeper: {}, local: {}", from_zk.sign_column, sign_column);

if (merge_params_version >= REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS && from_zk.merge_params_version >= REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS)
{
if (version_column != from_zk.version_column)
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in version column. "
"Stored in ZooKeeper: {}, local: {}", from_zk.version_column, version_column);

if (is_deleted_column != from_zk.is_deleted_column)
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in is_deleted column. "
"Stored in ZooKeeper: {}, local: {}", from_zk.is_deleted_column, is_deleted_column);

if (columns_to_sum != from_zk.columns_to_sum)
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in sum columns. "
"Stored in ZooKeeper: {}, local: {}", from_zk.columns_to_sum, columns_to_sum);

if (graphite_params_hash != from_zk.graphite_params_hash)
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in graphite params. "
"Stored in ZooKeeper hash: {}, local hash: {}", from_zk.graphite_params_hash, graphite_params_hash);
}

/// NOTE: You can make a less strict check of match expressions so that tables do not break from small changes
/// in formatAST code.
String parsed_zk_primary_key = formattedAST(KeyDescription::parse(from_zk.primary_key, columns, context).expression_list_ast);
Expand Down
10 changes: 10 additions & 0 deletions src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h
Expand Up @@ -4,6 +4,7 @@
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
#include <base/types.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <IO/ReadBufferFromString.h>

namespace DB
{
Expand All @@ -17,11 +18,20 @@ class ReadBuffer;
*/
struct ReplicatedMergeTreeTableMetadata
{
static constexpr int REPLICATED_MERGE_TREE_METADATA_LEGACY_VERSION = 1;
static constexpr int REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS = 2;

String date_column;
String sampling_expression;
UInt64 index_granularity;
/// Merging related params
int merging_params_mode;
int merge_params_version = REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS;
String sign_column;
String version_column;
String is_deleted_column;
String columns_to_sum;
String graphite_params_hash;
String primary_key;
MergeTreeDataFormatVersion data_format_version;
String partition_key;
Expand Down
29 changes: 29 additions & 0 deletions tests/config/config.d/graphite_alternative.xml
@@ -0,0 +1,29 @@
<!-- alternative graphite config, for testing 02910_replicated_merge_parameters_must_consistent -->
<clickhouse>
<graphite_rollup_alternative>
<version_column_name>Version</version_column_name>
<pattern>
<regexp>sum</regexp>
<function>any</function>
<retention>
<age>0</age>
<precision>600</precision>
</retention>
<retention>
<age>17280</age>
<precision>6000</precision>
</retention>
</pattern>
<default>
<function>any</function>
<retention>
<age>0</age>
<precision>600</precision>
</retention>
<retention>
<age>17280</age>
<precision>6000</precision>
</retention>
</default>
</graphite_rollup_alternative>
</clickhouse>
1 change: 1 addition & 0 deletions tests/config/install.sh
Expand Up @@ -24,6 +24,7 @@ ln -sf $SRC_PATH/config.d/macros.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/secure_ports.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/clusters.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/graphite.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/graphite_alternative.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/merge_tree_settings.xml $DEST_SERVER_PATH/config.d/
Expand Down
@@ -1,3 +1,3 @@
metadata format version: 1\ndate column: \nsampling expression: \nindex granularity: 8192\nmode: 7\nsign column: sign\nprimary key: key1, key2\ndata format version: 1\npartition key: d\ngranularity bytes: 10485760\n
metadata format version: 1\ndate column: \nsampling expression: \nindex granularity: 8192\nmode: 7\nsign column: sign\nprimary key: key1, key2\ndata format version: 1\npartition key: d\ngranularity bytes: 10485760\nmerge parameters format version: 2\nversion column: version\n
1
1
@@ -1,2 +1,2 @@
CREATE TABLE default.x\n(\n `i` Int32,\n INDEX mm log2(i) TYPE minmax GRANULARITY 1,\n INDEX nn log2(i) TYPE minmax GRANULARITY 1,\n PROJECTION p\n (\n SELECT max(i)\n ),\n PROJECTION p2\n (\n SELECT min(i)\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/default/x\', \'r\')\nORDER BY i\nSETTINGS index_granularity = 8192
metadata format version: 1\ndate column: \nsampling expression: \nindex granularity: 8192\nmode: 0\nsign column: \nprimary key: i\ndata format version: 1\npartition key: \nindices: mm log2(i) TYPE minmax GRANULARITY 1, nn log2(i) TYPE minmax GRANULARITY 1\nprojections: p (SELECT max(i)), p2 (SELECT min(i))\ngranularity bytes: 10485760\n
metadata format version: 1\ndate column: \nsampling expression: \nindex granularity: 8192\nmode: 0\nsign column: \nprimary key: i\ndata format version: 1\npartition key: \nindices: mm log2(i) TYPE minmax GRANULARITY 1, nn log2(i) TYPE minmax GRANULARITY 1\nprojections: p (SELECT max(i)), p2 (SELECT min(i))\ngranularity bytes: 10485760\nmerge parameters format version: 2\n
@@ -0,0 +1,80 @@
-- Tags: zookeeper, no-replicated-database
CREATE TABLE t
(
`id` UInt64,
`val` String,
`legacy_ver` UInt64,
)
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t/', 'r1', legacy_ver)
ORDER BY id;

CREATE TABLE t_r
(
`id` UInt64,
`val` String,
`legacy_ver` UInt64
)
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t/', 'r2')
ORDER BY id; -- { serverError METADATA_MISMATCH }

CREATE TABLE t2
(
`id` UInt64,
`val` String,
`legacy_ver` UInt64,
`deleted` UInt8
)
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t2/', 'r1', legacy_ver)
ORDER BY id;

CREATE TABLE t2_r
(
`id` UInt64,
`val` String,
`legacy_ver` UInt64,
`deleted` UInt8
)
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t2/', 'r2', legacy_ver, deleted)
ORDER BY id; -- { serverError METADATA_MISMATCH }

CREATE TABLE t3
(
`key` UInt64,
`metrics1` UInt64,
`metrics2` UInt64
)
ENGINE = ReplicatedSummingMergeTree('/tables/{database}/t3/', 'r1', metrics1)
ORDER BY key;

CREATE TABLE t3_r
(
`key` UInt64,
`metrics1` UInt64,
`metrics2` UInt64
)
ENGINE = ReplicatedSummingMergeTree('/tables/{database}/t3/', 'r2', metrics2)
ORDER BY key; -- { serverError METADATA_MISMATCH }

CREATE TABLE t4
(
`key` UInt32,
`Path` String,
`Time` DateTime('UTC'),
`Value` Float64,
`Version` UInt32,
`col` UInt64
)
ENGINE = ReplicatedGraphiteMergeTree('/tables/{database}/t4/', 'r1', 'graphite_rollup')
ORDER BY key;

CREATE TABLE t4_r
(
`key` UInt32,
`Path` String,
`Time` DateTime('UTC'),
`Value` Float64,
`Version` UInt32,
`col` UInt64
)
ENGINE = ReplicatedGraphiteMergeTree('/tables/{database}/t4/', 'r2', 'graphite_rollup_alternative')
ORDER BY key; -- { serverError METADATA_MISMATCH }