Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to have several MaterializedPostgreSQL tables following the same Postgres table #55145

Merged
5 changes: 5 additions & 0 deletions docs/en/engines/database-engines/materialized-postgresql.md
Expand Up @@ -197,6 +197,11 @@ Replication of [**TOAST**](https://www.postgresql.org/docs/9.5/storage-toast.htm
ALTER DATABASE postgres_database MODIFY SETTING materialized_postgresql_max_block_size = <new_size>;
```

### `materialized_postgresql_use_unique_replication_consumer_identifier` {#materialized_postgresql_use_unique_replication_consumer_identifier}

Use a unique replication consumer identifier for replication. Default: `0`.
If set to `1`, allows to setup several `MaterializedPostgreSQL` tables pointing to the same `PostgreSQL` table.

## Notes {#notes}

### Failover of the logical replication slot {#logical-replication-slot-failover}
Expand Down
3 changes: 2 additions & 1 deletion src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp
Expand Up @@ -64,9 +64,10 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
return;

replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
/* replication_identifier */ TSA_SUPPRESS_WARNING_FOR_READ(database_name), /// FIXME
remote_database_name,
/* table_name */"",
TSA_SUPPRESS_WARNING_FOR_READ(database_name), /// FIXME
toString(getUUID()),
connection_info,
getContext(),
is_attach,
Expand Down
5 changes: 4 additions & 1 deletion src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp
Expand Up @@ -566,6 +566,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl

void MaterializedPostgreSQLConsumer::syncTables()
{
size_t synced_tables = 0;
while (!tables_to_sync.empty())
{
auto table_name = *tables_to_sync.begin();
Expand Down Expand Up @@ -596,6 +597,7 @@ void MaterializedPostgreSQLConsumer::syncTables()

CompletedPipelineExecutor executor(io.pipeline);
executor.execute();
++synced_tables;
}
}
catch (...)
Expand All @@ -608,7 +610,8 @@ void MaterializedPostgreSQLConsumer::syncTables()
tables_to_sync.erase(tables_to_sync.begin());
}

LOG_DEBUG(log, "Table sync end for {} tables, last lsn: {} = {}, (attempted lsn {})", tables_to_sync.size(), current_lsn, getLSNValue(current_lsn), getLSNValue(final_lsn));
LOG_DEBUG(log, "Table sync end for {} tables, last lsn: {} = {}, (attempted lsn {})",
synced_tables, current_lsn, getLSNValue(current_lsn), getLSNValue(final_lsn));

updateLsn();
}
Expand Down
1 change: 1 addition & 0 deletions src/Storages/PostgreSQL/MaterializedPostgreSQLSettings.h
Expand Up @@ -24,6 +24,7 @@ namespace DB
M(UInt64, materialized_postgresql_backoff_min_ms, 200, "Poll backoff start point", 0) \
M(UInt64, materialized_postgresql_backoff_max_ms, 10000, "Poll backoff max point", 0) \
M(UInt64, materialized_postgresql_backoff_factor, 2, "Poll backoff factor", 0) \
M(Bool, materialized_postgresql_use_unique_replication_consumer_identifier, false, "Should a unique consumer be registered for table replication", 0) \

DECLARE_SETTINGS_TRAITS(MaterializedPostgreSQLSettingsTraits, LIST_OF_MATERIALIZED_POSTGRESQL_SETTINGS)

Expand Down
92 changes: 77 additions & 15 deletions src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp
Expand Up @@ -17,12 +17,14 @@
#include <Interpreters/Context.h>
#include <Databases/DatabaseOnDisk.h>
#include <boost/algorithm/string/trim.hpp>
#include <Poco/String.h>


namespace DB
{

static const auto CLEANUP_RESCHEDULE_MS = 600000 * 3; /// 30 min
static constexpr size_t replication_slot_name_max_size = 64;

namespace ErrorCodes
{
Expand Down Expand Up @@ -56,10 +58,70 @@ class TemporaryReplicationSlot
};


namespace
{
/// There can be several replication slots per publication, but one publication per table/database replication.
/// Replication slot might be unique (contain uuid) to allow have multiple replicas for the same PostgreSQL table/database.

String getPublicationName(const String & postgres_database, const String & postgres_table)
{
return fmt::format(
"{}_ch_publication",
postgres_table.empty() ? postgres_database : fmt::format("{}_{}", postgres_database, postgres_table));
}

void checkReplicationSlot(String name)
{
for (const auto & c : name)
{
const bool ok = (std::isalpha(c) && std::islower(c)) || std::isdigit(c) || c == '_';
if (!ok)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Replication slot can contain lower-case letters, numbers, and the underscore character. "
"Got: {}", name);
}
}

if (name.size() > replication_slot_name_max_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Too big replication slot size: {}", name);
}

String normalizeReplicationSlot(String name)
{
name = Poco::toLower(name);
for (auto & c : name)
if (c == '-')
c = '_';
return name;
}

String getReplicationSlotName(
const String & postgres_database,
const String & postgres_table,
const String & clickhouse_uuid,
const MaterializedPostgreSQLSettings & replication_settings)
{
String slot_name = replication_settings.materialized_postgresql_replication_slot;
if (slot_name.empty())
{
if (replication_settings.materialized_postgresql_use_unique_replication_consumer_identifier)
slot_name = clickhouse_uuid;
else
slot_name = postgres_table.empty() ? postgres_database : fmt::format("{}_{}_ch_replication_slot", postgres_database, postgres_table);

slot_name = normalizeReplicationSlot(slot_name);
}
return slot_name;
}
}

PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
const String & replication_identifier,
const String & postgres_database_,
const String & current_database_name_,
const String & postgres_table_,
const String & clickhouse_database_,
const String & clickhouse_uuid_,
const postgres::ConnectionInfo & connection_info_,
ContextPtr context_,
bool is_attach_,
Expand All @@ -70,14 +132,18 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
, is_attach(is_attach_)
, postgres_database(postgres_database_)
, postgres_schema(replication_settings.materialized_postgresql_schema)
, current_database_name(current_database_name_)
, current_database_name(clickhouse_database_)
, connection_info(connection_info_)
, max_block_size(replication_settings.materialized_postgresql_max_block_size)
, is_materialized_postgresql_database(is_materialized_postgresql_database_)
, tables_list(replication_settings.materialized_postgresql_tables_list)
, schema_list(replication_settings.materialized_postgresql_schema_list)
, schema_as_a_part_of_table_name(!schema_list.empty() || replication_settings.materialized_postgresql_tables_list_with_schema)
, user_managed_slot(!replication_settings.materialized_postgresql_replication_slot.value.empty())
, user_provided_snapshot(replication_settings.materialized_postgresql_snapshot)
, replication_slot(getReplicationSlotName(postgres_database_, postgres_table_, clickhouse_uuid_, replication_settings))
, tmp_replication_slot(replication_slot + "_tmp")
, publication_name(getPublicationName(postgres_database_, postgres_table_))
, reschedule_backoff_min_ms(replication_settings.materialized_postgresql_backoff_min_ms)
, reschedule_backoff_max_ms(replication_settings.materialized_postgresql_backoff_max_ms)
, reschedule_backoff_factor(replication_settings.materialized_postgresql_backoff_factor)
Expand All @@ -89,13 +155,9 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
if (!schema_list.empty() && !postgres_schema.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot have schema list and common schema at the same time");

replication_slot = replication_settings.materialized_postgresql_replication_slot;
if (replication_slot.empty())
{
user_managed_slot = false;
replication_slot = fmt::format("{}_ch_replication_slot", replication_identifier);
}
publication_name = fmt::format("{}_ch_publication", replication_identifier);
checkReplicationSlot(replication_slot);

LOG_INFO(log, "Using replication slot {} and publication {}", replication_slot, publication_name);

startup_task = getContext()->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ checkConnectionAndStart(); });
consumer_task = getContext()->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ consumerFunc(); });
Expand Down Expand Up @@ -496,7 +558,7 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::nontransactio
throw Exception(ErrorCodes::LOGICAL_ERROR, "No table found to be replicated");

/// 'ONLY' means just a table, without descendants.
std::string query_str = fmt::format("CREATE PUBLICATION {} FOR TABLE ONLY {}", publication_name, tables_list);
std::string query_str = fmt::format("CREATE PUBLICATION {} FOR TABLE ONLY {}", doubleQuoteString(publication_name), tables_list);
try
{
tx.exec(query_str);
Expand All @@ -519,7 +581,7 @@ bool PostgreSQLReplicationHandler::isReplicationSlotExist(pqxx::nontransaction &
{
String slot_name;
if (temporary)
slot_name = replication_slot + "_tmp";
slot_name = tmp_replication_slot;
else
slot_name = replication_slot;

Expand All @@ -546,11 +608,11 @@ void PostgreSQLReplicationHandler::createReplicationSlot(

String query_str, slot_name;
if (temporary)
slot_name = replication_slot + "_tmp";
slot_name = tmp_replication_slot;
else
slot_name = replication_slot;

query_str = fmt::format("CREATE_REPLICATION_SLOT {} LOGICAL pgoutput EXPORT_SNAPSHOT", slot_name);
query_str = fmt::format("CREATE_REPLICATION_SLOT {} LOGICAL pgoutput EXPORT_SNAPSHOT", doubleQuoteString(slot_name));

try
{
Expand All @@ -573,7 +635,7 @@ void PostgreSQLReplicationHandler::dropReplicationSlot(pqxx::nontransaction & tx

std::string slot_name;
if (temporary)
slot_name = replication_slot + "_tmp";
slot_name = tmp_replication_slot;
else
slot_name = replication_slot;

Expand Down
14 changes: 8 additions & 6 deletions src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h
Expand Up @@ -21,9 +21,10 @@ friend class TemporaryReplicationSlot;
using ConsumerPtr = std::shared_ptr<MaterializedPostgreSQLConsumer>;

PostgreSQLReplicationHandler(
const String & replication_identifier,
const String & postgres_database_,
const String & current_database_name_,
const String & postgres_table_,
const String & clickhouse_database_,
const String & clickhouse_uuid_,
const postgres::ConnectionInfo & connection_info_,
ContextPtr context_,
bool is_attach_,
Expand Down Expand Up @@ -128,10 +129,11 @@ friend class TemporaryReplicationSlot;
/// This is possible to allow replicating tables from multiple schemas in the same MaterializedPostgreSQL database engine.
mutable bool schema_as_a_part_of_table_name = false;

bool user_managed_slot = true;
String user_provided_snapshot;

String replication_slot, publication_name;
const bool user_managed_slot;
const String user_provided_snapshot;
const String replication_slot;
const String tmp_replication_slot;
const String publication_name;

/// Replication consumer. Manages decoding of replication stream and syncing into tables.
ConsumerPtr consumer;
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp
Expand Up @@ -74,13 +74,13 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(

setInMemoryMetadata(storage_metadata);

String replication_identifier = remote_database_name + "_" + remote_table_name_;
replication_settings->materialized_postgresql_tables_list = remote_table_name_;

replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
replication_identifier,
remote_database_name,
remote_table_name_,
table_id_.database_name,
toString(table_id_.uuid),
connection_info,
getContext(),
is_attach,
Expand Down
19 changes: 17 additions & 2 deletions tests/integration/helpers/postgres_utility.py
Expand Up @@ -113,11 +113,19 @@ def __init__(self):
self.created_materialized_postgres_db_list = set()
self.created_ch_postgres_db_list = set()

def init(self, instance, ip, port, default_database="postgres_database"):
def init(
self,
instance,
ip,
port,
default_database="postgres_database",
postgres_db_exists=False,
):
self.instance = instance
self.ip = ip
self.port = port
self.default_database = default_database
self.postgres_db_exists = postgres_db_exists
self.prepare()

def get_default_database(self):
Expand All @@ -138,7 +146,8 @@ def prepare(self):
self.conn = get_postgres_conn(ip=self.ip, port=self.port)
self.cursor = self.conn.cursor()
if self.default_database != "":
self.create_postgres_db(self.default_database)
if not self.postgres_db_exists:
self.create_postgres_db(self.default_database)
self.conn = get_postgres_conn(
ip=self.ip,
port=self.port,
Expand Down Expand Up @@ -364,6 +373,12 @@ def check_tables_are_synchronized(
time.sleep(1)
result = instance.query(result_query)

if result != expected:
count = int(instance.query(f"select count() from {table_path}"))
expected_count = int(
instance.query(f"select count() from {postgres_database}.{table_name}")
)
print(f"Having {count}, expected {expected_count}")
assert result == expected


Expand Down
Expand Up @@ -719,6 +719,51 @@ def test_too_many_parts(started_cluster):
pg_manager2.drop_materialized_db()


def test_replica_consumer(started_cluster):
table = "test_replica_consumer"

pg_manager_replica = PostgresManager()
pg_manager_replica.init(
instance2,
cluster.postgres_ip,
cluster.postgres_port,
default_database="postgres_database",
postgres_db_exists=True,
)

for pm in [pg_manager, pg_manager_replica]:
pm.create_and_fill_postgres_table(table)
pm.create_materialized_db(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=[
f"materialized_postgresql_tables_list = '{table}'",
"materialized_postgresql_backoff_min_ms = 100",
"materialized_postgresql_backoff_max_ms = 100",
"materialized_postgresql_use_unique_replication_consumer_identifier = 1",
],
)

assert 50 == int(instance.query(f"SELECT count() FROM test_database.{table}"))
assert 50 == int(instance2.query(f"SELECT count() FROM test_database.{table}"))

instance.query(
f"INSERT INTO postgres_database.{table} SELECT number, number from numbers(1000, 1000)"
)

check_tables_are_synchronized(
instance, table, postgres_database=pg_manager.get_default_database()
)
check_tables_are_synchronized(
instance2, table, postgres_database=pg_manager_replica.get_default_database()
)

assert 1050 == int(instance.query(f"SELECT count() FROM test_database.{table}"))
assert 1050 == int(instance2.query(f"SELECT count() FROM test_database.{table}"))

pg_manager_replica.clear()


if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")
Expand Down