Skip to content

Commit

Permalink
Merge pull request #55145 from kssenii/materialized-pg-allow-unique-i…
Browse files Browse the repository at this point in the history
…dentifier

Allow to have several MaterializedPostgreSQL tables following the same Postgres table
  • Loading branch information
kssenii committed Oct 17, 2023
2 parents e85bf21 + c220cc6 commit 7cedfef
Show file tree
Hide file tree
Showing 11 changed files with 212 additions and 43 deletions.
5 changes: 5 additions & 0 deletions docs/en/engines/database-engines/materialized-postgresql.md
Expand Up @@ -197,6 +197,11 @@ Replication of [**TOAST**](https://www.postgresql.org/docs/9.5/storage-toast.htm
ALTER DATABASE postgres_database MODIFY SETTING materialized_postgresql_max_block_size = <new_size>;
```

### `materialized_postgresql_use_unique_replication_consumer_identifier` {#materialized_postgresql_use_unique_replication_consumer_identifier}

Use a unique replication consumer identifier for replication. Default: `0`.
If set to `1`, allows to setup several `MaterializedPostgreSQL` tables pointing to the same `PostgreSQL` table.

## Notes {#notes}

### Failover of the logical replication slot {#logical-replication-slot-failover}
Expand Down
36 changes: 24 additions & 12 deletions src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp
Expand Up @@ -52,10 +52,29 @@ DatabaseMaterializedPostgreSQL::DatabaseMaterializedPostgreSQL(
, remote_database_name(postgres_database_name)
, connection_info(connection_info_)
, settings(std::move(settings_))
, startup_task(getContext()->getSchedulePool().createTask("MaterializedPostgreSQLDatabaseStartup", [this]{ startSynchronization(); }))
, startup_task(getContext()->getSchedulePool().createTask("MaterializedPostgreSQLDatabaseStartup", [this]{ tryStartSynchronization(); }))
{
}

void DatabaseMaterializedPostgreSQL::tryStartSynchronization()
{
if (shutdown_called)
return;

try
{
startSynchronization();
LOG_INFO(log, "Successfully loaded tables from PostgreSQL and started replication");
}
catch (...)
{
LOG_ERROR(log, "Failed to start replication from PostgreSQL, "
"will retry. Error: {}", getCurrentExceptionMessage(true));

if (!shutdown_called)
startup_task->scheduleAfter(5000);
}
}

void DatabaseMaterializedPostgreSQL::startSynchronization()
{
Expand All @@ -64,9 +83,10 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
return;

replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
/* replication_identifier */ TSA_SUPPRESS_WARNING_FOR_READ(database_name), /// FIXME
remote_database_name,
/* table_name */"",
TSA_SUPPRESS_WARNING_FOR_READ(database_name), /// FIXME
toString(getUUID()),
connection_info,
getContext(),
is_attach,
Expand Down Expand Up @@ -114,15 +134,7 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()

LOG_TRACE(log, "Loaded {} tables. Starting synchronization", materialized_tables.size());

try
{
replication_handler->startup(/* delayed */false);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
replication_handler->startup(/* delayed */false);
}


Expand Down Expand Up @@ -401,6 +413,7 @@ void DatabaseMaterializedPostgreSQL::detachTablePermanently(ContextPtr, const St

void DatabaseMaterializedPostgreSQL::shutdown()
{
shutdown_called = true;
startup_task->deactivate();
stopReplication();
DatabaseAtomic::shutdown();
Expand All @@ -413,7 +426,6 @@ void DatabaseMaterializedPostgreSQL::stopReplication()
if (replication_handler)
replication_handler->shutdown();

shutdown_called = true;
/// Clear wrappers over nested, all access is not done to nested tables directly.
materialized_tables.clear();
}
Expand Down
1 change: 1 addition & 0 deletions src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h
Expand Up @@ -73,6 +73,7 @@ class DatabaseMaterializedPostgreSQL : public DatabaseAtomic
ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr local_context, bool throw_on_error) const override;

private:
void tryStartSynchronization();
void startSynchronization();

ASTPtr createAlterSettingsQuery(const SettingChange & new_setting);
Expand Down
15 changes: 13 additions & 2 deletions src/Databases/PostgreSQL/DatabasePostgreSQL.cpp
Expand Up @@ -322,8 +322,19 @@ void DatabasePostgreSQL::loadStoredObjects(ContextMutablePtr /* context */, Load
void DatabasePostgreSQL::removeOutdatedTables()
{
std::lock_guard lock{mutex};
auto connection_holder = pool->get();
auto actual_tables = fetchPostgreSQLTablesList(connection_holder->get(), configuration.schema);

std::set<std::string> actual_tables;
try
{
auto connection_holder = pool->get();
actual_tables = fetchPostgreSQLTablesList(connection_holder->get(), configuration.schema);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
cleaner_task->scheduleAfter(cleaner_reschedule_ms);
return;
}

if (cache_tables)
{
Expand Down
5 changes: 4 additions & 1 deletion src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp
Expand Up @@ -566,6 +566,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl

void MaterializedPostgreSQLConsumer::syncTables()
{
size_t synced_tables = 0;
while (!tables_to_sync.empty())
{
auto table_name = *tables_to_sync.begin();
Expand Down Expand Up @@ -596,6 +597,7 @@ void MaterializedPostgreSQLConsumer::syncTables()

CompletedPipelineExecutor executor(io.pipeline);
executor.execute();
++synced_tables;
}
}
catch (...)
Expand All @@ -608,7 +610,8 @@ void MaterializedPostgreSQLConsumer::syncTables()
tables_to_sync.erase(tables_to_sync.begin());
}

LOG_DEBUG(log, "Table sync end for {} tables, last lsn: {} = {}, (attempted lsn {})", tables_to_sync.size(), current_lsn, getLSNValue(current_lsn), getLSNValue(final_lsn));
LOG_DEBUG(log, "Table sync end for {} tables, last lsn: {} = {}, (attempted lsn {})",
synced_tables, current_lsn, getLSNValue(current_lsn), getLSNValue(final_lsn));

updateLsn();
}
Expand Down
1 change: 1 addition & 0 deletions src/Storages/PostgreSQL/MaterializedPostgreSQLSettings.h
Expand Up @@ -24,6 +24,7 @@ namespace DB
M(UInt64, materialized_postgresql_backoff_min_ms, 200, "Poll backoff start point", 0) \
M(UInt64, materialized_postgresql_backoff_max_ms, 10000, "Poll backoff max point", 0) \
M(UInt64, materialized_postgresql_backoff_factor, 2, "Poll backoff factor", 0) \
M(Bool, materialized_postgresql_use_unique_replication_consumer_identifier, false, "Should a unique consumer be registered for table replication", 0) \

DECLARE_SETTINGS_TRAITS(MaterializedPostgreSQLSettingsTraits, LIST_OF_MATERIALIZED_POSTGRESQL_SETTINGS)

Expand Down
92 changes: 77 additions & 15 deletions src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp
Expand Up @@ -17,12 +17,14 @@
#include <Interpreters/Context.h>
#include <Databases/DatabaseOnDisk.h>
#include <boost/algorithm/string/trim.hpp>
#include <Poco/String.h>


namespace DB
{

static const auto CLEANUP_RESCHEDULE_MS = 600000 * 3; /// 30 min
static constexpr size_t replication_slot_name_max_size = 64;

namespace ErrorCodes
{
Expand Down Expand Up @@ -56,10 +58,70 @@ class TemporaryReplicationSlot
};


namespace
{
/// There can be several replication slots per publication, but one publication per table/database replication.
/// Replication slot might be unique (contain uuid) to allow have multiple replicas for the same PostgreSQL table/database.

String getPublicationName(const String & postgres_database, const String & postgres_table)
{
return fmt::format(
"{}_ch_publication",
postgres_table.empty() ? postgres_database : fmt::format("{}_{}", postgres_database, postgres_table));
}

void checkReplicationSlot(String name)
{
for (const auto & c : name)
{
const bool ok = (std::isalpha(c) && std::islower(c)) || std::isdigit(c) || c == '_';
if (!ok)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Replication slot can contain lower-case letters, numbers, and the underscore character. "
"Got: {}", name);
}
}

if (name.size() > replication_slot_name_max_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Too big replication slot size: {}", name);
}

String normalizeReplicationSlot(String name)
{
name = Poco::toLower(name);
for (auto & c : name)
if (c == '-')
c = '_';
return name;
}

String getReplicationSlotName(
const String & postgres_database,
const String & postgres_table,
const String & clickhouse_uuid,
const MaterializedPostgreSQLSettings & replication_settings)
{
String slot_name = replication_settings.materialized_postgresql_replication_slot;
if (slot_name.empty())
{
if (replication_settings.materialized_postgresql_use_unique_replication_consumer_identifier)
slot_name = clickhouse_uuid;
else
slot_name = postgres_table.empty() ? postgres_database : fmt::format("{}_{}_ch_replication_slot", postgres_database, postgres_table);

slot_name = normalizeReplicationSlot(slot_name);
}
return slot_name;
}
}

PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
const String & replication_identifier,
const String & postgres_database_,
const String & current_database_name_,
const String & postgres_table_,
const String & clickhouse_database_,
const String & clickhouse_uuid_,
const postgres::ConnectionInfo & connection_info_,
ContextPtr context_,
bool is_attach_,
Expand All @@ -70,14 +132,18 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
, is_attach(is_attach_)
, postgres_database(postgres_database_)
, postgres_schema(replication_settings.materialized_postgresql_schema)
, current_database_name(current_database_name_)
, current_database_name(clickhouse_database_)
, connection_info(connection_info_)
, max_block_size(replication_settings.materialized_postgresql_max_block_size)
, is_materialized_postgresql_database(is_materialized_postgresql_database_)
, tables_list(replication_settings.materialized_postgresql_tables_list)
, schema_list(replication_settings.materialized_postgresql_schema_list)
, schema_as_a_part_of_table_name(!schema_list.empty() || replication_settings.materialized_postgresql_tables_list_with_schema)
, user_managed_slot(!replication_settings.materialized_postgresql_replication_slot.value.empty())
, user_provided_snapshot(replication_settings.materialized_postgresql_snapshot)
, replication_slot(getReplicationSlotName(postgres_database_, postgres_table_, clickhouse_uuid_, replication_settings))
, tmp_replication_slot(replication_slot + "_tmp")
, publication_name(getPublicationName(postgres_database_, postgres_table_))
, reschedule_backoff_min_ms(replication_settings.materialized_postgresql_backoff_min_ms)
, reschedule_backoff_max_ms(replication_settings.materialized_postgresql_backoff_max_ms)
, reschedule_backoff_factor(replication_settings.materialized_postgresql_backoff_factor)
Expand All @@ -89,13 +155,9 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
if (!schema_list.empty() && !postgres_schema.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot have schema list and common schema at the same time");

replication_slot = replication_settings.materialized_postgresql_replication_slot;
if (replication_slot.empty())
{
user_managed_slot = false;
replication_slot = fmt::format("{}_ch_replication_slot", replication_identifier);
}
publication_name = fmt::format("{}_ch_publication", replication_identifier);
checkReplicationSlot(replication_slot);

LOG_INFO(log, "Using replication slot {} and publication {}", replication_slot, publication_name);

startup_task = getContext()->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ checkConnectionAndStart(); });
consumer_task = getContext()->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ consumerFunc(); });
Expand Down Expand Up @@ -496,7 +558,7 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::nontransactio
throw Exception(ErrorCodes::LOGICAL_ERROR, "No table found to be replicated");

/// 'ONLY' means just a table, without descendants.
std::string query_str = fmt::format("CREATE PUBLICATION {} FOR TABLE ONLY {}", publication_name, tables_list);
std::string query_str = fmt::format("CREATE PUBLICATION {} FOR TABLE ONLY {}", doubleQuoteString(publication_name), tables_list);
try
{
tx.exec(query_str);
Expand All @@ -519,7 +581,7 @@ bool PostgreSQLReplicationHandler::isReplicationSlotExist(pqxx::nontransaction &
{
String slot_name;
if (temporary)
slot_name = replication_slot + "_tmp";
slot_name = tmp_replication_slot;
else
slot_name = replication_slot;

Expand All @@ -546,11 +608,11 @@ void PostgreSQLReplicationHandler::createReplicationSlot(

String query_str, slot_name;
if (temporary)
slot_name = replication_slot + "_tmp";
slot_name = tmp_replication_slot;
else
slot_name = replication_slot;

query_str = fmt::format("CREATE_REPLICATION_SLOT {} LOGICAL pgoutput EXPORT_SNAPSHOT", slot_name);
query_str = fmt::format("CREATE_REPLICATION_SLOT {} LOGICAL pgoutput EXPORT_SNAPSHOT", doubleQuoteString(slot_name));

try
{
Expand All @@ -573,7 +635,7 @@ void PostgreSQLReplicationHandler::dropReplicationSlot(pqxx::nontransaction & tx

std::string slot_name;
if (temporary)
slot_name = replication_slot + "_tmp";
slot_name = tmp_replication_slot;
else
slot_name = replication_slot;

Expand Down
14 changes: 8 additions & 6 deletions src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h
Expand Up @@ -21,9 +21,10 @@ friend class TemporaryReplicationSlot;
using ConsumerPtr = std::shared_ptr<MaterializedPostgreSQLConsumer>;

PostgreSQLReplicationHandler(
const String & replication_identifier,
const String & postgres_database_,
const String & current_database_name_,
const String & postgres_table_,
const String & clickhouse_database_,
const String & clickhouse_uuid_,
const postgres::ConnectionInfo & connection_info_,
ContextPtr context_,
bool is_attach_,
Expand Down Expand Up @@ -128,10 +129,11 @@ friend class TemporaryReplicationSlot;
/// This is possible to allow replicating tables from multiple schemas in the same MaterializedPostgreSQL database engine.
mutable bool schema_as_a_part_of_table_name = false;

bool user_managed_slot = true;
String user_provided_snapshot;

String replication_slot, publication_name;
const bool user_managed_slot;
const String user_provided_snapshot;
const String replication_slot;
const String tmp_replication_slot;
const String publication_name;

/// Replication consumer. Manages decoding of replication stream and syncing into tables.
ConsumerPtr consumer;
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp
Expand Up @@ -74,13 +74,13 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(

setInMemoryMetadata(storage_metadata);

String replication_identifier = remote_database_name + "_" + remote_table_name_;
replication_settings->materialized_postgresql_tables_list = remote_table_name_;

replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
replication_identifier,
remote_database_name,
remote_table_name_,
table_id_.database_name,
toString(table_id_.uuid),
connection_info,
getContext(),
is_attach,
Expand Down

0 comments on commit 7cedfef

Please sign in to comment.