Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clamp settings in distributed insert #9852

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 4 additions & 3 deletions dbms/src/DataStreams/RemoteBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ namespace ErrorCodes
RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_,
const ConnectionTimeouts & timeouts,
const String & query_,
const Settings * settings_)
: connection(connection_), query(query_), settings(settings_)
const Settings * settings_,
const ClientInfo * client_info_)
: connection(connection_), query(query_), settings(settings_), client_info(client_info_)
{
/** Send query and receive "header", that describe table structure.
* Header is needed to know, what structure is required for blocks to be passed to 'write' method.
*/
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, settings, nullptr);
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, settings, client_info);

while (true)
{
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/DataStreams/RemoteBlockOutputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class RemoteBlockOutputStream : public IBlockOutputStream
RemoteBlockOutputStream(Connection & connection_,
const ConnectionTimeouts & timeouts,
const String & query_,
const Settings * settings_ = nullptr);
const Settings * settings_ = nullptr,
const ClientInfo * client_info_ = nullptr);

Block getHeader() const override { return header; }

Expand All @@ -38,6 +39,7 @@ class RemoteBlockOutputStream : public IBlockOutputStream
Connection & connection;
String query;
const Settings * settings;
const ClientInfo * client_info;
Block header;
bool finished = false;
};
Expand Down
33 changes: 21 additions & 12 deletions dbms/src/Storages/Distributed/DirectoryMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,10 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa

Settings insert_settings;
std::string insert_query;
readHeader(in, insert_settings, insert_query, log);
ClientInfo client_info;
readHeader(in, insert_settings, insert_query, client_info, log);

RemoteBlockOutputStream remote{*connection, timeouts, insert_query, &insert_settings};
RemoteBlockOutputStream remote{*connection, timeouts, insert_query, &insert_settings, &client_info};

remote.writePrefix();
remote.writePrepared(in);
Expand All @@ -299,7 +300,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
}

void StorageDistributedDirectoryMonitor::readHeader(
ReadBuffer & in, Settings & insert_settings, std::string & insert_query, Logger * log)
ReadBuffer & in, Settings & insert_settings, std::string & insert_query, ClientInfo & client_info, Logger * log)
{
UInt64 query_size;
readVarUInt(query_size, in);
Expand Down Expand Up @@ -331,8 +332,11 @@ void StorageDistributedDirectoryMonitor::readHeader(
readStringBinary(insert_query, header_buf);
insert_settings.deserialize(header_buf);

if (header_buf.hasPendingData())
client_info.read(header_buf, initiator_revision);

/// Add handling new data here, for example:
/// if (initiator_revision >= DBMS_MIN_REVISION_WITH_MY_NEW_DATA)
/// if (header_buf.hasPendingData())
/// readVarUInt(my_new_data, header_buf);

return;
Expand All @@ -353,19 +357,21 @@ struct StorageDistributedDirectoryMonitor::BatchHeader
{
Settings settings;
String query;
ClientInfo client_info;
Block sample_block;

BatchHeader(Settings settings_, String query_, Block sample_block_)
BatchHeader(Settings settings_, String query_, ClientInfo client_info_, Block sample_block_)
: settings(std::move(settings_))
, query(std::move(query_))
, client_info(std::move(client_info_))
, sample_block(std::move(sample_block_))
{
}

bool operator==(const BatchHeader & other) const
{
return settings == other.settings && query == other.query &&
blocksHaveEqualStructure(sample_block, other.sample_block);
return settings == other.settings && query == other.query && client_info.query_kind == other.client_info.query_kind
&& blocksHaveEqualStructure(sample_block, other.sample_block);
}

struct Hash
Expand Down Expand Up @@ -445,6 +451,7 @@ struct StorageDistributedDirectoryMonitor::Batch
{
Settings insert_settings;
String insert_query;
ClientInfo client_info;
std::unique_ptr<RemoteBlockOutputStream> remote;
bool first = true;

Expand All @@ -459,12 +466,12 @@ struct StorageDistributedDirectoryMonitor::Batch
}

ReadBufferFromFile in(file_path->second);
parent.readHeader(in, insert_settings, insert_query, parent.log);
parent.readHeader(in, insert_settings, insert_query, client_info, parent.log);

if (first)
{
first = false;
remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts, insert_query, &insert_settings);
remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts, insert_query, &insert_settings, &client_info);
remote->writePrefix();
}

Expand Down Expand Up @@ -541,7 +548,8 @@ class DirectoryMonitorBlockInputStream : public IBlockInputStream
{
Settings insert_settings;
String insert_query;
StorageDistributedDirectoryMonitor::readHeader(in, insert_settings, insert_query, log);
ClientInfo client_info;
StorageDistributedDirectoryMonitor::readHeader(in, insert_settings, insert_query, client_info, log);

block_in.readPrefix();
first_block = block_in.read();
Expand Down Expand Up @@ -610,11 +618,12 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
Block sample_block;
Settings insert_settings;
String insert_query;
ClientInfo client_info;
try
{
/// Determine metadata of the current file and check if it is not broken.
ReadBufferFromFile in{file_path};
readHeader(in, insert_settings, insert_query, log);
readHeader(in, insert_settings, insert_query, client_info, log);

CompressedReadBuffer decompressing_in(in);
NativeBlockInputStream block_in(decompressing_in, ClickHouseRevision::get());
Expand All @@ -641,7 +650,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
throw;
}

BatchHeader batch_header(std::move(insert_settings), std::move(insert_query), std::move(sample_block));
BatchHeader batch_header(std::move(insert_settings), std::move(insert_query), std::move(client_info), std::move(sample_block));
Batch & batch = header_to_batch.try_emplace(batch_header, *this, files).first->second;

batch.file_indices.push_back(file_idx);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Distributed/DirectoryMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class StorageDistributedDirectoryMonitor
ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this};

/// Read insert query and insert settings for backward compatible.
static void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query, Logger * log);
static void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query, ClientInfo & client_info, Logger * log);

friend class DirectoryMonitorBlockInputStream;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
if (throttler)
job.connection_entry->setThrottler(throttler);

job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, timeouts, query_string, &settings);
job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, timeouts, query_string, &settings, &context.getClientInfo());
job.stream->writePrefix();
}

Expand Down Expand Up @@ -598,6 +598,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
writeVarUInt(ClickHouseRevision::get(), header_buf);
writeStringBinary(query_string, header_buf);
context.getSettingsRef().serialize(header_buf);
context.getClientInfo().write(header_buf, ClickHouseRevision::get());

/// Add new fields here, for example:
/// writeVarUInt(my_new_data, header_buf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
<replica>
<host>node1</host>
<port>9000</port>
<user>normal</user>
<user>shard</user>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
<user>readonly</user>
<user>shard</user>
</replica>
</shard>
</test_cluster>
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@

cluster = ClickHouseCluster(__file__)

node1 = cluster.add_instance('node1', user_configs=['configs/users_on_cluster.xml'])
node2 = cluster.add_instance('node2', user_configs=['configs/users_on_cluster.xml'])

distributed = cluster.add_instance('distributed', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users_on_distributed.xml'])
node1 = cluster.add_instance('node1')
node2 = cluster.add_instance('node2')
distributed = cluster.add_instance('distributed', main_configs=['configs/remote_servers.xml'])


@pytest.fixture(scope="module")
Expand All @@ -20,19 +19,30 @@ def started_cluster():
cluster.start()

for node in [node1, node2]:
node.query("CREATE TABLE sometable(date Date, id UInt32, value Int32) ENGINE = MergeTree() ORDER BY id;")
node.query("INSERT INTO sometable VALUES (toDate('2020-01-20'), 1, 1)")
node.query("CREATE TABLE sometable (date Date, id UInt32, value Int32) ENGINE = MergeTree() ORDER BY id;")
node.query("INSERT INTO sometable VALUES (toDate('2010-01-10'), 1, 1)")
node.query("CREATE USER shard")
node.query("GRANT ALL ON *.* TO shard")

distributed.query("CREATE TABLE proxy (date Date, id UInt32, value Int32) ENGINE = Distributed(test_cluster, default, sometable);")
distributed.query("CREATE TABLE sysproxy (name String, value String) ENGINE = Distributed(test_cluster, system, settings);")
distributed.query("CREATE TABLE proxy (date Date, id UInt32, value Int32) ENGINE = Distributed(test_cluster, default, sometable, toUInt64(date));")
distributed.query("CREATE TABLE shard_settings (name String, value String) ENGINE = Distributed(test_cluster, system, settings);")
distributed.query("CREATE ROLE admin")
distributed.query("GRANT ALL ON *.* TO admin")

yield cluster

finally:
cluster.shutdown()


def test_shard_doesnt_throw_on_constraint_violation(started_cluster):
def test_select_clamps_settings(started_cluster):
distributed.query("CREATE USER normal DEFAULT ROLE admin SETTINGS max_memory_usage = 80000000")
distributed.query("CREATE USER wasteful DEFAULT ROLE admin SETTINGS max_memory_usage = 2000000000")
distributed.query("CREATE USER readonly DEFAULT ROLE admin SETTINGS readonly = 1")
node1.query("ALTER USER shard SETTINGS max_memory_usage = 50000000 MIN 11111111 MAX 99999999")
node2.query("ALTER USER shard SETTINGS readonly = 1")

# Check that shards doesn't throw exceptions on constraints violation
query = "SELECT COUNT() FROM proxy"
assert distributed.query(query) == '2\n'
assert distributed.query(query, user = 'normal') == '2\n'
Expand All @@ -47,9 +57,8 @@ def test_shard_doesnt_throw_on_constraint_violation(started_cluster):
assert distributed.query(query, user = 'normal') == '2\n'
assert distributed.query(query, user = 'wasteful') == '2\n'


def test_shard_clamps_settings(started_cluster):
query = "SELECT hostName() as host, name, value FROM sysproxy WHERE name = 'max_memory_usage' OR name = 'readonly' ORDER BY host, name, value"
# Check that shards clamp passed settings.
query = "SELECT hostName() as host, name, value FROM shard_settings WHERE name = 'max_memory_usage' OR name = 'readonly' ORDER BY host, name, value"
assert distributed.query(query) == 'node1\tmax_memory_usage\t99999999\n'\
'node1\treadonly\t0\n'\
'node2\tmax_memory_usage\t10000000000\n'\
Expand Down Expand Up @@ -79,3 +88,11 @@ def test_shard_clamps_settings(started_cluster):
'node1\treadonly\t2\n'\
'node2\tmax_memory_usage\t10000000000\n'\
'node2\treadonly\t1\n'

def test_insert_clamps_settings(started_cluster):
node1.query("ALTER USER shard SETTINGS max_memory_usage = 50000000 MIN 11111111 MAX 99999999")
node2.query("ALTER USER shard SETTINGS max_memory_usage = 50000000 MIN 11111111 MAX 99999999")

distributed.query("INSERT INTO proxy VALUES (toDate('2020-02-20'), 2, 2)")
distributed.query("INSERT INTO proxy VALUES (toDate('2020-02-21'), 2, 2)", settings={"max_memory_usage": 5000000})
assert distributed.query("SELECT COUNT() FROM proxy") == "4\n"