Skip to content

Commit

Permalink
Add support for alter/drop role propagation from non-main databases (#…
Browse files Browse the repository at this point in the history
…7461)

DESCRIPTION: Adds support for distributed `ALTER/DROP ROLE` commands
from the databases where Citus is not installed

---------

Co-authored-by: Onur Tirtir <onurcantirtir@gmail.com>
  • Loading branch information
gurkanindibay and onurctirtir committed Feb 28, 2024
1 parent f424268 commit 51009d0
Show file tree
Hide file tree
Showing 13 changed files with 711 additions and 88 deletions.
206 changes: 159 additions & 47 deletions src/backend/distributed/commands/utility_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,40 @@
"SELECT citus_internal.start_management_transaction('%lu')"
#define MARK_OBJECT_DISTRIBUTED \
"SELECT citus_internal.mark_object_distributed(%d, %s, %d, %s)"
#define UNMARK_OBJECT_DISTRIBUTED \
"SELECT pg_catalog.citus_unmark_object_distributed(%d, %d, %d,%s)"

/* see NonMainDbDistributedStatementInfo for the explanation of these flags */
typedef enum DistObjectOperation
{
NO_DIST_OBJECT_OPERATION,
MARK_DISTRIBUTED_GLOBALLY,
UNMARK_DISTRIBUTED_LOCALLY
} DistObjectOperation;


/*
* NonMainDbDistributedStatementInfo is used to determine whether a statement is
* supported from non-main databases and whether it should be marked as
* distributed explicitly (*).
* supported from non-main databases and whether it should be marked or unmarked
* as distributed.
*
* When creating a distributed object, we always have to mark such objects as
* "distributed" but while for some object types we can delegate this to main
* database, for some others we have to explicitly send a command to all nodes
* in this code-path to achieve this. Callers need to provide
* MARK_DISTRIBUTED_GLOBALLY when that is the case.
*
* (*) We always have to mark such objects as "distributed" but while for some
* object types we can delegate this to main database, for some others we have
* to explicitly send a command to all nodes in this code-path to achieve this.
* Similarly when dropping a distributed object, we always have to unmark such
* objects as "distributed" and our utility hook on remote nodes achieve this
* via UnmarkNodeWideObjectsDistributed() because the commands that we send to
* workers are executed via main db. However for the local node, this is not the
* case as we're not in the main db. For this reason, callers need to provide
* UNMARK_DISTRIBUTED_LOCALLY to unmark an object for local node as well.
*/
typedef struct NonMainDbDistributedStatementInfo
{
int statementType;
bool explicitlyMarkAsDistributed;
DistObjectOperation DistObjectOperation;

/*
* checkSupportedObjectTypes is a callback function that checks whether
Expand All @@ -118,15 +138,16 @@ typedef struct NonMainDbDistributedStatementInfo
} NonMainDbDistributedStatementInfo;

/*
* MarkObjectDistributedParams is used to pass parameters to the
* MarkObjectDistributedFromNonMainDb function.
* DistObjectOperationParams is used to pass parameters to the
* MarkObjectDistributedGloballyFromNonMainDb function and
* UnMarkObjectDistributedLocallyFromNonMainDb functions.
*/
typedef struct MarkObjectDistributedParams
typedef struct DistObjectOperationParams
{
char *name;
Oid id;
uint16 catalogRelId;
} MarkObjectDistributedParams;
} DistObjectOperationParams;


bool EnableDDLPropagation = true; /* ddl propagation is enabled */
Expand Down Expand Up @@ -166,9 +187,11 @@ static bool IsCommandToCreateOrDropMainDB(Node *parsetree);
static void RunPreprocessMainDBCommand(Node *parsetree);
static void RunPostprocessMainDBCommand(Node *parsetree);
static bool IsStatementSupportedFromNonMainDb(Node *parsetree);
static bool StatementRequiresMarkDistributedFromNonMainDb(Node *parsetree);
static void MarkObjectDistributedFromNonMainDb(Node *parsetree);
static MarkObjectDistributedParams GetMarkObjectDistributedParams(Node *parsetree);
static bool StatementRequiresMarkDistributedGloballyFromNonMainDb(Node *parsetree);
static bool StatementRequiresUnmarkDistributedLocallyFromNonMainDb(Node *parsetree);
static void MarkObjectDistributedGloballyFromNonMainDb(Node *parsetree);
static void UnMarkObjectDistributedLocallyFromNonMainDb(List *unmarkDistributedList);
static List * GetDistObjectOperationParams(Node *parsetree);

/*
* checkSupportedObjectTypes callbacks for
Expand All @@ -184,12 +207,15 @@ static bool NonMainDbCheckSupportedObjectTypeForSecLabel(Node *node);
*/
ObjectType supportedObjectTypesForGrantStmt[] = { OBJECT_DATABASE };
static const NonMainDbDistributedStatementInfo NonMainDbSupportedStatements[] = {
{ T_GrantRoleStmt, false, NULL },
{ T_CreateRoleStmt, true, NULL },
{ T_GrantStmt, false, NonMainDbCheckSupportedObjectTypeForGrant },
{ T_CreatedbStmt, false, NULL },
{ T_DropdbStmt, false, NULL },
{ T_SecLabelStmt, false, NonMainDbCheckSupportedObjectTypeForSecLabel },
{ T_GrantRoleStmt, NO_DIST_OBJECT_OPERATION, NULL },
{ T_CreateRoleStmt, MARK_DISTRIBUTED_GLOBALLY, NULL },
{ T_DropRoleStmt, UNMARK_DISTRIBUTED_LOCALLY, NULL },
{ T_AlterRoleStmt, NO_DIST_OBJECT_OPERATION, NULL },
{ T_GrantStmt, NO_DIST_OBJECT_OPERATION, NonMainDbCheckSupportedObjectTypeForGrant },
{ T_CreatedbStmt, NO_DIST_OBJECT_OPERATION, NULL },
{ T_DropdbStmt, NO_DIST_OBJECT_OPERATION, NULL },
{ T_SecLabelStmt, NO_DIST_OBJECT_OPERATION,
NonMainDbCheckSupportedObjectTypeForSecLabel },
};


Expand Down Expand Up @@ -1743,12 +1769,19 @@ RunPreprocessMainDBCommand(Node *parsetree)
START_MANAGEMENT_TRANSACTION,
GetCurrentFullTransactionId().value);
RunCitusMainDBQuery(mainDBQuery->data);

mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery,
EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER,
quote_literal_cstr(queryString),
quote_literal_cstr(CurrentUserName()));
RunCitusMainDBQuery(mainDBQuery->data);

if (StatementRequiresUnmarkDistributedLocallyFromNonMainDb(parsetree))
{
List *unmarkParams = GetDistObjectOperationParams(parsetree);
UnMarkObjectDistributedLocallyFromNonMainDb(unmarkParams);
}
}


Expand All @@ -1760,9 +1793,9 @@ static void
RunPostprocessMainDBCommand(Node *parsetree)
{
if (IsStatementSupportedFromNonMainDb(parsetree) &&
StatementRequiresMarkDistributedFromNonMainDb(parsetree))
StatementRequiresMarkDistributedGloballyFromNonMainDb(parsetree))
{
MarkObjectDistributedFromNonMainDb(parsetree);
MarkObjectDistributedGloballyFromNonMainDb(parsetree);
}
}

Expand Down Expand Up @@ -1793,11 +1826,11 @@ IsStatementSupportedFromNonMainDb(Node *parsetree)


/*
* StatementRequiresMarkDistributedFromNonMainDb returns true if the statement should be marked
* StatementRequiresMarkDistributedGloballyFromNonMainDb returns true if the statement should be marked
* as distributed when executed from a non-main database.
*/
static bool
StatementRequiresMarkDistributedFromNonMainDb(Node *parsetree)
StatementRequiresMarkDistributedGloballyFromNonMainDb(Node *parsetree)
{
NodeTag type = nodeTag(parsetree);

Expand All @@ -1806,7 +1839,8 @@ StatementRequiresMarkDistributedFromNonMainDb(Node *parsetree)
{
if (type == NonMainDbSupportedStatements[i].statementType)
{
return NonMainDbSupportedStatements[i].explicitlyMarkAsDistributed;
return NonMainDbSupportedStatements[i].DistObjectOperation ==
MARK_DISTRIBUTED_GLOBALLY;
}
}

Expand All @@ -1815,47 +1849,125 @@ StatementRequiresMarkDistributedFromNonMainDb(Node *parsetree)


/*
* MarkObjectDistributedFromNonMainDb marks the given object as distributed on the
* StatementRequiresUnmarkDistributedLocallyFromNonMainDb returns true if the statement should be unmarked
* as distributed when executed from a non-main database.
*/
static bool
StatementRequiresUnmarkDistributedLocallyFromNonMainDb(Node *parsetree)
{
NodeTag type = nodeTag(parsetree);

for (int i = 0; i < sizeof(NonMainDbSupportedStatements) /
sizeof(NonMainDbSupportedStatements[0]); i++)
{
if (type == NonMainDbSupportedStatements[i].statementType)
{
return NonMainDbSupportedStatements[i].DistObjectOperation ==
UNMARK_DISTRIBUTED_LOCALLY;
}
}

return false;
}


/*
* MarkObjectDistributedGloballyFromNonMainDb marks the given object as distributed on the
* non-main database.
*/
static void
MarkObjectDistributedFromNonMainDb(Node *parsetree)
MarkObjectDistributedGloballyFromNonMainDb(Node *parsetree)
{
MarkObjectDistributedParams markObjectDistributedParams =
GetMarkObjectDistributedParams(parsetree);
StringInfo mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery,
MARK_OBJECT_DISTRIBUTED,
markObjectDistributedParams.catalogRelId,
quote_literal_cstr(markObjectDistributedParams.name),
markObjectDistributedParams.id,
quote_literal_cstr(CurrentUserName()));
RunCitusMainDBQuery(mainDBQuery->data);
List *distObjectOperationParams =
GetDistObjectOperationParams(parsetree);

DistObjectOperationParams *distObjectOperationParam = NULL;

foreach_ptr(distObjectOperationParam, distObjectOperationParams)
{
StringInfo mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery,
MARK_OBJECT_DISTRIBUTED,
distObjectOperationParam->catalogRelId,
quote_literal_cstr(distObjectOperationParam->name),
distObjectOperationParam->id,
quote_literal_cstr(CurrentUserName()));
RunCitusMainDBQuery(mainDBQuery->data);
}
}


/*
* UnMarkObjectDistributedLocallyFromNonMainDb unmarks the given object as distributed on the
* non-main database.
*/
static void
UnMarkObjectDistributedLocallyFromNonMainDb(List *markObjectDistributedParamList)
{
DistObjectOperationParams *markObjectDistributedParam = NULL;
int subObjectId = 0;
char *checkObjectExistence = "false";
foreach_ptr(markObjectDistributedParam, markObjectDistributedParamList)
{
StringInfo query = makeStringInfo();
appendStringInfo(query,
UNMARK_OBJECT_DISTRIBUTED,
AuthIdRelationId,
markObjectDistributedParam->id,
subObjectId, checkObjectExistence);
RunCitusMainDBQuery(query->data);
}
}


/*
* GetMarkObjectDistributedParams returns MarkObjectDistributedParams for the target
* GetDistObjectOperationParams returns DistObjectOperationParams for the target
* object of given parsetree.
*/
static MarkObjectDistributedParams
GetMarkObjectDistributedParams(Node *parsetree)
List *
GetDistObjectOperationParams(Node *parsetree)
{
List *paramsList = NIL;
if (IsA(parsetree, CreateRoleStmt))
{
CreateRoleStmt *stmt = castNode(CreateRoleStmt, parsetree);
MarkObjectDistributedParams info = {
.name = stmt->role,
.catalogRelId = AuthIdRelationId,
.id = get_role_oid(stmt->role, false)
};
DistObjectOperationParams *params =
(DistObjectOperationParams *) palloc(sizeof(DistObjectOperationParams));
params->name = stmt->role;
params->catalogRelId = AuthIdRelationId;
params->id = get_role_oid(stmt->role, false);

return info;
paramsList = lappend(paramsList, params);
}
else if (IsA(parsetree, DropRoleStmt))
{
DropRoleStmt *stmt = castNode(DropRoleStmt, parsetree);
RoleSpec *roleSpec;
foreach_ptr(roleSpec, stmt->roles)
{
DistObjectOperationParams *params = (DistObjectOperationParams *) palloc(
sizeof(DistObjectOperationParams));

Oid roleOid = get_role_oid(roleSpec->rolename, true);

if (roleOid == InvalidOid)
{
continue;
}

params->id = roleOid;
params->name = roleSpec->rolename;
params->catalogRelId = AuthIdRelationId;

/* Add else if branches for other statement types */
paramsList = lappend(paramsList, params);
}
}
else
{
elog(ERROR, "unsupported statement type");
}

elog(ERROR, "unsupported statement type");
return paramsList;
}


Expand Down
14 changes: 10 additions & 4 deletions src/backend/distributed/metadata/distobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,23 @@ mark_object_distributed(PG_FUNCTION_ARGS)


/*
* citus_unmark_object_distributed(classid oid, objid oid, objsubid int)
* citus_unmark_object_distributed(classid oid, objid oid, objsubid int,checkobjectexistence bool)
*
* removes the entry for an object address from pg_dist_object. Only removes the entry if
* the object does not exist anymore.
* Removes the entry for an object address from pg_dist_object. If checkobjectexistence is true,
* throws an error if the object still exists.
*/
Datum
citus_unmark_object_distributed(PG_FUNCTION_ARGS)
{
Oid classid = PG_GETARG_OID(0);
Oid objid = PG_GETARG_OID(1);
int32 objsubid = PG_GETARG_INT32(2);
bool checkObjectExistence = true;
if (!PG_ARGISNULL(3))
{
checkObjectExistence = PG_GETARG_BOOL(3);
}


ObjectAddress address = { 0 };
ObjectAddressSubSet(address, classid, objid, objsubid);
Expand All @@ -119,7 +125,7 @@ citus_unmark_object_distributed(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}

if (ObjectExists(&address))
if (checkObjectExistence && ObjectExists(&address))
{
ereport(ERROR, (errmsg("object still exists"),
errdetail("the %s \"%s\" still exists",
Expand Down
2 changes: 2 additions & 0 deletions src/backend/distributed/sql/citus--12.1-1--12.2-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include "udfs/start_management_transaction/12.2-1.sql"
#include "udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql"
#include "udfs/mark_object_distributed/12.2-1.sql"
DROP FUNCTION pg_catalog.citus_unmark_object_distributed(oid, oid, int);
#include "udfs/citus_unmark_object_distributed/12.2-1.sql"
#include "udfs/commit_management_command_2pc/12.2-1.sql"

ALTER TABLE pg_catalog.pg_dist_transaction ADD COLUMN outer_xid xid8;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ DROP FUNCTION citus_internal.mark_object_distributed(
classId Oid, objectName text, objectId Oid, connectionUser text
);

DROP FUNCTION pg_catalog.citus_unmark_object_distributed(oid,oid,int,boolean);
#include "../udfs/citus_unmark_object_distributed/10.0-1.sql"

DROP FUNCTION citus_internal.commit_management_command_2pc();

ALTER TABLE pg_catalog.pg_dist_transaction DROP COLUMN outer_xid;
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
CREATE FUNCTION pg_catalog.citus_unmark_object_distributed(classid oid, objid oid, objsubid int)
CREATE FUNCTION pg_catalog.citus_unmark_object_distributed(classid oid, objid oid, objsubid int, checkobjectexistence boolean DEFAULT true)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_unmark_object_distributed$$;
COMMENT ON FUNCTION pg_catalog.citus_unmark_object_distributed(classid oid, objid oid, objsubid int)
IS 'remove an object address from citus.pg_dist_object once the object has been deleted';
COMMENT ON FUNCTION pg_catalog.citus_unmark_object_distributed(classid oid, objid oid, objsubid int, checkobjectexistence boolean)
IS 'Removes an object from citus.pg_dist_object after deletion. If checkobjectexistence is true, object existence check performed.'
'Otherwise, object existence check is skipped.';
Loading

0 comments on commit 51009d0

Please sign in to comment.