From 739c6d26df56d38cda7f5420cccca947874d71e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Halil=20Ozan=20Akg=C3=BCl?= Date: Thu, 11 Jan 2024 16:05:14 +0300 Subject: [PATCH] Fix inserting to pg_dist_object for queries from other nodes (#7402) Running a query from a Citus non-main database that inserts to pg_dist_object requires a new connection to the main database itself. This PR adds that connection to the main database. --------- Co-authored-by: Jelte Fennema-Nio --- src/backend/distributed/metadata/distobject.c | 36 ++++++++- .../distributed/metadata/node_metadata.c | 9 +++ src/include/distributed/metadata/distobject.h | 3 +- src/test/regress/citus_tests/common.py | 6 ++ .../citus_tests/test/test_other_databases.py | 74 +++++++++++++++---- 5 files changed, 108 insertions(+), 20 deletions(-) diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index edccd86b989..1d07be8c3a4 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -31,6 +31,7 @@ #include "nodes/makefuncs.h" #include "nodes/pg_list.h" #include "parser/parse_type.h" +#include "postmaster/postmaster.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" @@ -80,7 +81,14 @@ mark_object_distributed(PG_FUNCTION_ARGS) Oid objectId = PG_GETARG_OID(2); ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress)); ObjectAddressSet(*objectAddress, classId, objectId); - MarkObjectDistributedWithName(objectAddress, objectName); + + /* + * This function is called when a query is run from a Citus non-main database. + * We need to insert into local pg_dist_object over a connection to make sure + * 2PC still works. + */ + bool useConnectionForLocalQuery = true; + MarkObjectDistributedWithName(objectAddress, objectName, useConnectionForLocalQuery); PG_RETURN_VOID(); } @@ -184,7 +192,8 @@ ObjectExists(const ObjectAddress *address) void MarkObjectDistributed(const ObjectAddress *distAddress) { - MarkObjectDistributedWithName(distAddress, ""); + bool useConnectionForLocalQuery = false; + MarkObjectDistributedWithName(distAddress, "", useConnectionForLocalQuery); } @@ -194,13 +203,32 @@ MarkObjectDistributed(const ObjectAddress *distAddress) * that is used in case the object does not exists for the current transaction. */ void -MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName) +MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName, + bool useConnectionForLocalQuery) { if (!CitusHasBeenLoaded()) { elog(ERROR, "Cannot mark object distributed because Citus has not been loaded."); } - MarkObjectDistributedLocally(distAddress); + + /* + * When a query is run from a Citus non-main database we need to insert into pg_dist_object + * over a connection to make sure 2PC still works. + */ + if (useConnectionForLocalQuery) + { + StringInfo insertQuery = makeStringInfo(); + appendStringInfo(insertQuery, + "INSERT INTO pg_catalog.pg_dist_object (classid, objid, objsubid)" + "VALUES (%d, %d, %d) ON CONFLICT DO NOTHING", + distAddress->classId, distAddress->objectId, + distAddress->objectSubId); + SendCommandToWorker(LocalHostName, PostPortNumber, insertQuery->data); + } + else + { + MarkObjectDistributedLocally(distAddress); + } if (EnableMetadataSync) { diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index a2df0a410a5..094986c855e 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -1751,6 +1751,10 @@ citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS) /* * FindWorkerNode searches over the worker nodes and returns the workerNode * if it already exists. Else, the function returns NULL. + * + * NOTE: A special case that this handles is when nodeName and nodePort are set + * to LocalHostName and PostPortNumber. In that case we return the primary node + * for the local group. */ WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort) @@ -1773,6 +1777,11 @@ FindWorkerNode(const char *nodeName, int32 nodePort) return workerNode; } + if (strcmp(LocalHostName, nodeName) == 0 && nodePort == PostPortNumber) + { + return PrimaryNodeForGroup(GetLocalGroupId(), NULL); + } + return NULL; } diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index bbbbdf9da20..13f38178bd4 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -23,7 +23,8 @@ extern bool CitusExtensionObject(const ObjectAddress *objectAddress); extern bool IsAnyObjectDistributed(const List *addresses); extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern void MarkObjectDistributed(const ObjectAddress *distAddress); -extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name); +extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name, + bool useConnectionForLocalQuery); extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); diff --git a/src/test/regress/citus_tests/common.py b/src/test/regress/citus_tests/common.py index 40c727189b9..99e419267b2 100644 --- a/src/test/regress/citus_tests/common.py +++ b/src/test/regress/citus_tests/common.py @@ -431,6 +431,12 @@ def sudo(command, *args, shell=True, **kwargs): def notice_handler(diag: psycopg.errors.Diagnostic): print(f"{diag.severity}: {diag.message_primary}") + if diag.message_detail: + print(f"DETAIL: {diag.message_detail}") + if diag.message_hint: + print(f"HINT: {diag.message_hint}") + if diag.context: + print(f"CONTEXT: {diag.context}") def cleanup_test_leftovers(nodes): diff --git a/src/test/regress/citus_tests/test/test_other_databases.py b/src/test/regress/citus_tests/test/test_other_databases.py index cf824f926f0..925b065a74a 100644 --- a/src/test/regress/citus_tests/test/test_other_databases.py +++ b/src/test/regress/citus_tests/test/test_other_databases.py @@ -36,14 +36,25 @@ def test_main_commited_outer_not_yet(cluster): assert ( int(role_before_commit) == 0 - ), "role is on pg_dist_object despite not committing" + ), "role is in pg_dist_object despite not committing" + + # user should not be in pg_dist_object on the coordinator because outer transaction is not committed yet + pdo_coordinator_before_commit = c.sql_value( + "SELECT count(*) FROM pg_dist_object WHERE objid = 123123" + ) + + assert ( + int(pdo_coordinator_before_commit) == 0 + ), "role is in pg_dist_object on coordinator despite not committing" # user should not be in pg_dist_object on the worker because outer transaction is not committed yet - pdo_before_commit = w0.sql_value( + pdo_worker_before_commit = w0.sql_value( "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'" ) - assert int(pdo_before_commit) == 0, "role is created despite not committing" + assert ( + int(pdo_worker_before_commit) == 0 + ), "role is in pg_dist_object on worker despite not committing" # commit in cur1 so the transaction recovery thinks this is a successful transaction cur1.execute("COMMIT") @@ -60,14 +71,23 @@ def test_main_commited_outer_not_yet(cluster): int(role_after_commit) == 1 ), "role is not created during recovery despite committing" - # check that the user is on pg_dist_object on the worker after transaction recovery - pdo_after_commit = w0.sql_value( + # check that the user is in pg_dist_object on the coordinator after transaction recovery + pdo_coordinator_after_commit = c.sql_value( + "SELECT count(*) FROM pg_dist_object WHERE objid = 123123" + ) + + assert ( + int(pdo_coordinator_after_commit) == 1 + ), "role is not in pg_dist_object on coordinator after recovery despite committing" + + # check that the user is in pg_dist_object on the worker after transaction recovery + pdo_worker_after_commit = w0.sql_value( "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'" ) assert ( - int(pdo_after_commit) == 1 - ), "role is not on pg_dist_object after recovery despite committing" + int(pdo_worker_after_commit) == 1 + ), "role is not in pg_dist_object on worker after recovery despite committing" c.sql("DROP DATABASE db1") c.sql( @@ -81,6 +101,12 @@ def test_main_commited_outer_not_yet(cluster): $$) """ ) + c.sql( + """ + DELETE FROM pg_dist_object + WHERE objid = 123123 + """ + ) def test_main_commited_outer_aborted(cluster): @@ -121,14 +147,23 @@ def test_main_commited_outer_aborted(cluster): assert int(role_before_recovery) == 0, "role is already created before recovery" - # check that the user is not on pg_dist_object on the worker - pdo_before_recovery = w0.sql_value( + # check that the user is not in pg_dist_object on the coordinator + pdo_coordinator_before_recovery = c.sql_value( + "SELECT count(*) FROM pg_dist_object WHERE objid = 321321" + ) + + assert ( + int(pdo_coordinator_before_recovery) == 0 + ), "role is already in pg_dist_object on coordinator before recovery" + + # check that the user is not in pg_dist_object on the worker + pdo_worker_before_recovery = w0.sql_value( "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'" ) assert ( - int(pdo_before_recovery) == 0 - ), "role is already on pg_dist_object before recovery" + int(pdo_worker_before_recovery) == 0 + ), "role is already in pg_dist_object on worker before recovery" # run the transaction recovery c.sql("SELECT recover_prepared_transactions()") @@ -142,13 +177,22 @@ def test_main_commited_outer_aborted(cluster): int(role_after_recovery) == 0 ), "role is created during recovery despite aborting" - # check that the user is not on pg_dist_object on the worker after transaction recovery - pdo_after_recovery = w0.sql_value( + # check that the user is not in pg_dist_object on the coordinator after transaction recovery + pdo_coordinator_after_recovery = c.sql_value( + "SELECT count(*) FROM pg_dist_object WHERE objid = 321321" + ) + + assert ( + int(pdo_coordinator_after_recovery) == 0 + ), "role is in pg_dist_object on coordinator after recovery despite aborting" + + # check that the user is not in pg_dist_object on the worker after transaction recovery + pdo_worker_after_recovery = w0.sql_value( "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'" ) assert ( - int(pdo_after_recovery) == 0 - ), "role is on pg_dist_object after recovery despite aborting" + int(pdo_worker_after_recovery) == 0 + ), "role is in pg_dist_object on worker after recovery despite aborting" c.sql("DROP DATABASE db2")