Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Core/ServerSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,7 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_
DECLARE(UInt32, max_database_replicated_create_table_thread_pool_size, 1, R"(The number of threads to create tables during replica recovery in DatabaseReplicated. Zero means number of threads equal number of cores.)", 0) \
DECLARE(Bool, database_replicated_allow_detach_permanently, true, R"(Allow detaching tables permanently in Replicated databases)", 0) \
DECLARE(Bool, database_replicated_drop_broken_tables, false, R"(Drop unexpected tables from Replicated databases instead of moving them to a separate local database)", 0) \
DECLARE(Bool, distributed_ddl_use_initial_user_and_roles, false, R"(If enabled, ON CLUSTER queries will preserve and use the initiator's user and roles for execution on remote shards. This ensures consistent access control across the cluster but requires that the user and roles exist on all nodes.)", 0) \
DECLARE(String, default_replica_path, "/clickhouse/tables/{uuid}/{shard}", R"(
The path to the table in ZooKeeper.

Expand Down
4 changes: 2 additions & 2 deletions src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -760,8 +760,8 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
void setUsersConfig(const ConfigurationPtr & config);
ConfigurationPtr getUsersConfig();

/// Sets the current user assuming that he/she is already authenticated.
/// WARNING: This function doesn't check password!
/// Sets the current user, assuming they are already authenticated.
/// WARNING: This function doesn't check the password!
void setUser(const UUID & user_id_, const std::vector<UUID> & external_roles_ = {});
UserPtr getUser() const;

Expand Down
48 changes: 48 additions & 0 deletions src/Interpreters/DDLTask.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#include <Access/AccessControl.h>
#include <Access/Role.h>
#include <Access/User.h>
#include <Core/ServerSettings.h>
#include <Core/ServerUUID.h>
#include <Core/Settings.h>
Expand Down Expand Up @@ -35,13 +38,20 @@ namespace Setting
extern const SettingsUInt64 max_query_size;
}

namespace ServerSetting
{
extern const ServerSettingsBool distributed_ddl_use_initial_user_and_roles;
}

namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT_VERSION;
extern const int UNKNOWN_TYPE_OF_QUERY;
extern const int INCONSISTENT_CLUSTER_DEFINITION;
extern const int LOGICAL_ERROR;
extern const int DNS_ERROR;
extern const int UNKNOWN_USER;
extern const int UNKNOWN_ROLE;
}


Expand Down Expand Up @@ -170,6 +180,12 @@ String DDLLogEntry::toString() const
wb << "\n";
}

if (version >= INITIATOR_USER_VERSION)
{
wb << "initiator_user: " << initiator_user << "\n";
wb << "initiator_roles: " << initiator_user_roles << "\n";
}

return wb.str();
}

Expand Down Expand Up @@ -242,6 +258,12 @@ void DDLLogEntry::parse(const String & data)
rb >> "\n";
}

if (version >= INITIATOR_USER_VERSION)
{
rb >> "initiator_user: " >> initiator_user >> "\n";
rb >> "initiator_roles: " >> initiator_user_roles >> "\n";
}

assertEOF(rb);

if (!host_id_strings.empty())
Expand Down Expand Up @@ -276,8 +298,34 @@ ContextMutablePtr DDLTaskBase::makeQueryContext(ContextPtr from_context, const Z
query_context->makeQueryContext();
query_context->setCurrentQueryId(""); // generate random query_id
query_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY);

const bool preserve_user = from_context->getServerSettings()[ServerSetting::distributed_ddl_use_initial_user_and_roles];
if (preserve_user && !entry.initiator_user.empty())
{
const auto & access_control = from_context->getAccessControl();

/// Find the user by name
auto user_id = access_control.find<User>(entry.initiator_user);
if (!user_id)
throw Exception(ErrorCodes::UNKNOWN_USER, "User '{}' required for executing distributed DDL query is not found on this instance", entry.initiator_user);

/// Find all roles by name
std::vector<UUID> role_ids;
role_ids.reserve(entry.initiator_user_roles.size());
for (const auto & role_name : entry.initiator_user_roles)
{
auto role_id = access_control.find<Role>(role_name);
if (!role_id)
throw Exception(ErrorCodes::UNKNOWN_ROLE, "Role '{}' required for executing distributed DDL query is not found on this instance", role_name);
role_ids.push_back(*role_id);
}

query_context->setUser(*user_id, role_ids);
}

if (entry.settings)
query_context->applySettingsChanges(*entry.settings);

return query_context;
}

Expand Down
6 changes: 5 additions & 1 deletion src/Interpreters/DDLTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ struct DDLLogEntry
static constexpr const UInt64 PRESERVE_INITIAL_QUERY_ID_VERSION = 5;
static constexpr const UInt64 BACKUP_RESTORE_FLAG_IN_ZK_VERSION = 6;
static constexpr const UInt64 PARENT_TABLE_UUID_VERSION = 7;
static constexpr const UInt64 INITIATOR_USER_VERSION = 8;
/// Add new version here

/// Remember to update the value below once new version is added
static constexpr const UInt64 DDL_ENTRY_FORMAT_MAX_VERSION = 7;
static constexpr const UInt64 DDL_ENTRY_FORMAT_MAX_VERSION = 8;

UInt64 version = 1;
String query;
Expand All @@ -96,6 +97,9 @@ struct DDLLogEntry
/// Only for DatabaseReplicated.
std::optional<UUID> parent_table_uuid;

String initiator_user;
Strings initiator_user_roles;

void setSettingsIfRequired(ContextPtr context);
String toString() const;
void parse(const String & data);
Expand Down
12 changes: 12 additions & 0 deletions src/Interpreters/executeDDLQueryOnCluster.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#include <filesystem>
#include <Access/AccessControl.h>
#include <Access/Common/AccessRightsElement.h>
#include <Access/ContextAccess.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Core/ServerSettings.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
Expand Down Expand Up @@ -37,6 +39,11 @@ namespace Setting
extern const SettingsBool throw_on_unsupported_query_inside_transaction;
}

namespace ServerSetting
{
extern const ServerSettingsBool distributed_ddl_use_initial_user_and_roles;
}

namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
Expand Down Expand Up @@ -188,6 +195,11 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context,
entry.setSettingsIfRequired(context);
entry.tracing_context = OpenTelemetry::CurrentContext();
entry.initial_query_id = context->getClientInfo().initial_query_id;
if (context->getServerSettings()[ServerSetting::distributed_ddl_use_initial_user_and_roles])
{
entry.initiator_user = context->getUserName();
entry.initiator_user_roles = context->getAccessControl().tryReadNames(context->getCurrentRoles());
}
String node_path = ddl_worker.enqueueQuery(entry, params.retries_info);

return getDDLOnClusterStatus(node_path, ddl_worker.getReplicasDir(), entry, context);
Expand Down
Empty file.
23 changes: 23 additions & 0 deletions tests/integration/test_replicated_access/configs/config.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<clickhouse>
<distributed_ddl_use_initial_user_and_roles>1</distributed_ddl_use_initial_user_and_roles>
<remote_servers>
<default>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</default>
</remote_servers>
<user_directories replace="replace">
<replicated>
<zookeeper_path>/clickhouse/access</zookeeper_path>
</replicated>
</user_directories>
</clickhouse>
17 changes: 17 additions & 0 deletions tests/integration/test_replicated_access/configs/zookeeper.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<clickhouse>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
<session_timeout_ms>20000</session_timeout_ms>
</zookeeper>
</clickhouse>
87 changes: 87 additions & 0 deletions tests/integration/test_replicated_access/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from os import path as p

import pytest

from helpers.cluster import ClickHouseCluster

default_zk_config = p.join(p.dirname(p.realpath(__file__)), "configs/zookeeper.xml")
cluster = ClickHouseCluster(__file__, zookeeper_config_path="configs/zookeeper.xml")

node1 = cluster.add_instance(
"node1",
main_configs=["configs/config.xml"],
with_zookeeper=True,
stay_alive=True,
)

node2 = cluster.add_instance(
"node2",
main_configs=["configs/config.xml"],
with_zookeeper=True,
stay_alive=True,
)

all_nodes = [node1, node2]


@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()


@pytest.fixture(scope="function", autouse=True)
def prepare_test():
node1.query("CREATE USER test")
node1.query("CREATE TABLE IF NOT EXISTS table ON CLUSTER default (x UInt64) ENGINE=MergeTree ORDER BY x")
node1.query("CREATE TABLE IF NOT EXISTS secret ON CLUSTER default (value String) ENGINE=MergeTree ORDER BY value")
try:
yield
finally:
node1.query("DROP USER IF EXISTS test")
node1.query("DROP TABLE IF EXISTS table ON CLUSTER default")
node1.query("DROP TABLE IF EXISTS secret ON CLUSTER default")


def test_initiator_user_in_ddl(started_cluster):
node1.query("INSERT INTO secret VALUES ('super_secret')")

node1.query("GRANT ALTER ON table TO test")
node1.query("GRANT CLUSTER ON *.* TO test")

query = """
ALTER TABLE table ON CLUSTER default
ADD PROJECTION test (
SELECT
x,
(SELECT * FROM secret LIMIT 1) as bar
ORDER BY x
)
SETTINGS distributed_ddl_entry_format_version = 8
"""

error = node1.query_and_get_error(query, user="test")
assert "Not enough privileges" in error


for node in all_nodes:
node.replace_in_config(
"/etc/clickhouse-server/config.d/config.xml",
"<distributed_ddl_use_initial_user_and_roles>1</distributed_ddl_use_initial_user_and_roles>",
"<distributed_ddl_use_initial_user_and_roles>0</distributed_ddl_use_initial_user_and_roles>",
)
node.restart_clickhouse()

error = node1.query_and_get_error(query, user="test")
assert "Not enough privileges" not in error

for node in all_nodes:
node.replace_in_config(
"/etc/clickhouse-server/config.d/config.xml",
"<distributed_ddl_use_initial_user_and_roles>0</distributed_ddl_use_initial_user_and_roles>",
"<distributed_ddl_use_initial_user_and_roles>1</distributed_ddl_use_initial_user_and_roles>",
)
node.restart_clickhouse()
Loading