Skip to content

Commit

Permalink
Fix inserting to pg_dist_object for queries from other nodes (#7402)
Browse files Browse the repository at this point in the history
Running a query from a Citus non-main database that inserts to
pg_dist_object requires a new connection to the main database itself.
This PR adds that connection to the main database.

---------

Co-authored-by: Jelte Fennema-Nio <github-tech@jeltef.nl>
  • Loading branch information
halilozanakgul and JelteF committed Jan 11, 2024
1 parent 00068e0 commit 739c6d2
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 20 deletions.
36 changes: 32 additions & 4 deletions src/backend/distributed/metadata/distobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "nodes/makefuncs.h"
#include "nodes/pg_list.h"
#include "parser/parse_type.h"
#include "postmaster/postmaster.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
Expand Down Expand Up @@ -80,7 +81,14 @@ mark_object_distributed(PG_FUNCTION_ARGS)
Oid objectId = PG_GETARG_OID(2);
ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*objectAddress, classId, objectId);
MarkObjectDistributedWithName(objectAddress, objectName);

/*
* This function is called when a query is run from a Citus non-main database.
* We need to insert into local pg_dist_object over a connection to make sure
* 2PC still works.
*/
bool useConnectionForLocalQuery = true;
MarkObjectDistributedWithName(objectAddress, objectName, useConnectionForLocalQuery);
PG_RETURN_VOID();
}

Expand Down Expand Up @@ -184,7 +192,8 @@ ObjectExists(const ObjectAddress *address)
void
MarkObjectDistributed(const ObjectAddress *distAddress)
{
MarkObjectDistributedWithName(distAddress, "");
bool useConnectionForLocalQuery = false;
MarkObjectDistributedWithName(distAddress, "", useConnectionForLocalQuery);
}


Expand All @@ -194,13 +203,32 @@ MarkObjectDistributed(const ObjectAddress *distAddress)
* that is used in case the object does not exists for the current transaction.
*/
void
MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName)
MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName,
bool useConnectionForLocalQuery)
{
if (!CitusHasBeenLoaded())
{
elog(ERROR, "Cannot mark object distributed because Citus has not been loaded.");
}
MarkObjectDistributedLocally(distAddress);

/*
* When a query is run from a Citus non-main database we need to insert into pg_dist_object
* over a connection to make sure 2PC still works.
*/
if (useConnectionForLocalQuery)
{
StringInfo insertQuery = makeStringInfo();
appendStringInfo(insertQuery,
"INSERT INTO pg_catalog.pg_dist_object (classid, objid, objsubid)"
"VALUES (%d, %d, %d) ON CONFLICT DO NOTHING",
distAddress->classId, distAddress->objectId,
distAddress->objectSubId);
SendCommandToWorker(LocalHostName, PostPortNumber, insertQuery->data);
}
else
{
MarkObjectDistributedLocally(distAddress);
}

if (EnableMetadataSync)
{
Expand Down
9 changes: 9 additions & 0 deletions src/backend/distributed/metadata/node_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -1751,6 +1751,10 @@ citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS)
/*
* FindWorkerNode searches over the worker nodes and returns the workerNode
* if it already exists. Else, the function returns NULL.
*
* NOTE: A special case that this handles is when nodeName and nodePort are set
* to LocalHostName and PostPortNumber. In that case we return the primary node
* for the local group.
*/
WorkerNode *
FindWorkerNode(const char *nodeName, int32 nodePort)
Expand All @@ -1773,6 +1777,11 @@ FindWorkerNode(const char *nodeName, int32 nodePort)
return workerNode;
}

if (strcmp(LocalHostName, nodeName) == 0 && nodePort == PostPortNumber)
{
return PrimaryNodeForGroup(GetLocalGroupId(), NULL);
}

return NULL;
}

Expand Down
3 changes: 2 additions & 1 deletion src/include/distributed/metadata/distobject.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ extern bool CitusExtensionObject(const ObjectAddress *objectAddress);
extern bool IsAnyObjectDistributed(const List *addresses);
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name);
extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name,
bool useConnectionForLocalQuery);
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
extern void UnmarkObjectDistributed(const ObjectAddress *address);
Expand Down
6 changes: 6 additions & 0 deletions src/test/regress/citus_tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,12 @@ def sudo(command, *args, shell=True, **kwargs):

def notice_handler(diag: psycopg.errors.Diagnostic):
print(f"{diag.severity}: {diag.message_primary}")
if diag.message_detail:
print(f"DETAIL: {diag.message_detail}")
if diag.message_hint:
print(f"HINT: {diag.message_hint}")
if diag.context:
print(f"CONTEXT: {diag.context}")


def cleanup_test_leftovers(nodes):
Expand Down
74 changes: 59 additions & 15 deletions src/test/regress/citus_tests/test/test_other_databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,25 @@ def test_main_commited_outer_not_yet(cluster):

assert (
int(role_before_commit) == 0
), "role is on pg_dist_object despite not committing"
), "role is in pg_dist_object despite not committing"

# user should not be in pg_dist_object on the coordinator because outer transaction is not committed yet
pdo_coordinator_before_commit = c.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid = 123123"
)

assert (
int(pdo_coordinator_before_commit) == 0
), "role is in pg_dist_object on coordinator despite not committing"

# user should not be in pg_dist_object on the worker because outer transaction is not committed yet
pdo_before_commit = w0.sql_value(
pdo_worker_before_commit = w0.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'"
)

assert int(pdo_before_commit) == 0, "role is created despite not committing"
assert (
int(pdo_worker_before_commit) == 0
), "role is in pg_dist_object on worker despite not committing"

# commit in cur1 so the transaction recovery thinks this is a successful transaction
cur1.execute("COMMIT")
Expand All @@ -60,14 +71,23 @@ def test_main_commited_outer_not_yet(cluster):
int(role_after_commit) == 1
), "role is not created during recovery despite committing"

# check that the user is on pg_dist_object on the worker after transaction recovery
pdo_after_commit = w0.sql_value(
# check that the user is in pg_dist_object on the coordinator after transaction recovery
pdo_coordinator_after_commit = c.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid = 123123"
)

assert (
int(pdo_coordinator_after_commit) == 1
), "role is not in pg_dist_object on coordinator after recovery despite committing"

# check that the user is in pg_dist_object on the worker after transaction recovery
pdo_worker_after_commit = w0.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'"
)

assert (
int(pdo_after_commit) == 1
), "role is not on pg_dist_object after recovery despite committing"
int(pdo_worker_after_commit) == 1
), "role is not in pg_dist_object on worker after recovery despite committing"

c.sql("DROP DATABASE db1")
c.sql(
Expand All @@ -81,6 +101,12 @@ def test_main_commited_outer_not_yet(cluster):
$$)
"""
)
c.sql(
"""
DELETE FROM pg_dist_object
WHERE objid = 123123
"""
)


def test_main_commited_outer_aborted(cluster):
Expand Down Expand Up @@ -121,14 +147,23 @@ def test_main_commited_outer_aborted(cluster):

assert int(role_before_recovery) == 0, "role is already created before recovery"

# check that the user is not on pg_dist_object on the worker
pdo_before_recovery = w0.sql_value(
# check that the user is not in pg_dist_object on the coordinator
pdo_coordinator_before_recovery = c.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid = 321321"
)

assert (
int(pdo_coordinator_before_recovery) == 0
), "role is already in pg_dist_object on coordinator before recovery"

# check that the user is not in pg_dist_object on the worker
pdo_worker_before_recovery = w0.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'"
)

assert (
int(pdo_before_recovery) == 0
), "role is already on pg_dist_object before recovery"
int(pdo_worker_before_recovery) == 0
), "role is already in pg_dist_object on worker before recovery"

# run the transaction recovery
c.sql("SELECT recover_prepared_transactions()")
Expand All @@ -142,13 +177,22 @@ def test_main_commited_outer_aborted(cluster):
int(role_after_recovery) == 0
), "role is created during recovery despite aborting"

# check that the user is not on pg_dist_object on the worker after transaction recovery
pdo_after_recovery = w0.sql_value(
# check that the user is not in pg_dist_object on the coordinator after transaction recovery
pdo_coordinator_after_recovery = c.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid = 321321"
)

assert (
int(pdo_coordinator_after_recovery) == 0
), "role is in pg_dist_object on coordinator after recovery despite aborting"

# check that the user is not in pg_dist_object on the worker after transaction recovery
pdo_worker_after_recovery = w0.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'"
)

assert (
int(pdo_after_recovery) == 0
), "role is on pg_dist_object after recovery despite aborting"
int(pdo_worker_after_recovery) == 0
), "role is in pg_dist_object on worker after recovery despite aborting"

c.sql("DROP DATABASE db2")

0 comments on commit 739c6d2

Please sign in to comment.