Skip to content

Commit

Permalink
Merge branch 'main' into alter_database_additional_options
Browse files Browse the repository at this point in the history
  • Loading branch information
gurkanindibay committed Dec 26, 2023
2 parents 09a7a33 + b877d60 commit ce32be6
Show file tree
Hide file tree
Showing 38 changed files with 1,146 additions and 23 deletions.
1 change: 1 addition & 0 deletions src/backend/distributed/commands/function.c
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,

char *workerPgDistObjectUpdateCommand =
MarkObjectsDistributedCreateCommand(objectAddressList,
NIL,
distArgumentIndexList,
colocationIdList,
forceDelegationList);
Expand Down
68 changes: 68 additions & 0 deletions src/backend/distributed/commands/utility_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "access/htup_details.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_database.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
Expand All @@ -44,6 +45,7 @@
#include "nodes/makefuncs.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "postmaster/postmaster.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
Expand Down Expand Up @@ -77,13 +79,21 @@
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/reference_table_utils.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/string_utils.h"
#include "distributed/transaction_management.h"
#include "distributed/version_compat.h"
#include "distributed/worker_shard_visibility.h"
#include "distributed/worker_transaction.h"

#define EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER \
"SELECT citus_internal.execute_command_on_remote_nodes_as_user(%s, %s)"
#define START_MANAGEMENT_TRANSACTION \
"SELECT citus_internal.start_management_transaction('%lu')"
#define MARK_OBJECT_DISTRIBUTED \
"SELECT citus_internal.mark_object_distributed(%d, %s, %d)"


bool EnableDDLPropagation = true; /* ddl propagation is enabled */
int CreateObjectPropagationMode = CREATE_OBJECT_PROPAGATION_IMMEDIATE;
Expand Down Expand Up @@ -112,6 +122,8 @@ static void PostStandardProcessUtility(Node *parsetree);
static void DecrementUtilityHookCountersIfNecessary(Node *parsetree);
static bool IsDropSchemaOrDB(Node *parsetree);
static bool ShouldCheckUndistributeCitusLocalTables(void);
static void RunPreprocessMainDBCommand(Node *parsetree, const char *queryString);
static void RunPostprocessMainDBCommand(Node *parsetree);

/*
* ProcessUtilityParseTree is a convenience method to create a PlannedStmt out of
Expand Down Expand Up @@ -243,13 +255,23 @@ citus_ProcessUtility(PlannedStmt *pstmt,

if (!CitusHasBeenLoaded())
{
if (!IsMainDB)
{
RunPreprocessMainDBCommand(parsetree, queryString);
}

/*
* Ensure that utility commands do not behave any differently until CREATE
* EXTENSION is invoked.
*/
PrevProcessUtility(pstmt, queryString, false, context,
params, queryEnv, dest, completionTag);

if (!IsMainDB)
{
RunPostprocessMainDBCommand(parsetree);
}

return;
}
else if (IsA(parsetree, CallStmt))
Expand Down Expand Up @@ -1572,3 +1594,49 @@ DropSchemaOrDBInProgress(void)
{
return activeDropSchemaOrDBs > 0;
}


/*
* RunPreprocessMainDBCommand runs the necessary commands for a query, in main
* database before query is run on the local node with PrevProcessUtility
*/
static void
RunPreprocessMainDBCommand(Node *parsetree, const char *queryString)
{
if (IsA(parsetree, CreateRoleStmt))
{
StringInfo mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery,
START_MANAGEMENT_TRANSACTION,
GetCurrentFullTransactionId().value);
RunCitusMainDBQuery(mainDBQuery->data);
mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery,
EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER,
quote_literal_cstr(queryString),
quote_literal_cstr(CurrentUserName()));
RunCitusMainDBQuery(mainDBQuery->data);
}
}


/*
* RunPostprocessMainDBCommand runs the necessary commands for a query, in main
* database after query is run on the local node with PrevProcessUtility
*/
static void
RunPostprocessMainDBCommand(Node *parsetree)
{
if (IsA(parsetree, CreateRoleStmt))
{
StringInfo mainDBQuery = makeStringInfo();
CreateRoleStmt *createRoleStmt = castNode(CreateRoleStmt, parsetree);
Oid roleOid = get_role_oid(createRoleStmt->role, false);
appendStringInfo(mainDBQuery,
MARK_OBJECT_DISTRIBUTED,
AuthIdRelationId,
quote_literal_cstr(createRoleStmt->role),
roleOid);
RunCitusMainDBQuery(mainDBQuery->data);
}
}
22 changes: 17 additions & 5 deletions src/backend/distributed/connection/connection_configuration.c
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,13 @@ GetConnParam(const char *keyword)

/*
* GetEffectiveConnKey checks whether there is any pooler configuration for the
* provided key (host/port combination). The one case where this logic is not
* applied is for loopback connections originating within the task tracker. If
* a corresponding row is found in the poolinfo table, a modified (effective)
* key is returned with the node, port, and dbname overridden, as applicable,
* otherwise, the original key is returned unmodified.
* provided key (host/port combination). If a corresponding row is found in the
* poolinfo table, a modified (effective) key is returned with the node, port,
* and dbname overridden, as applicable, otherwise, the original key is returned
* unmodified.
*
* In the case of Citus non-main databases we just return the key, since we
* would not have access to tables with worker information.
*/
ConnectionHashKey *
GetEffectiveConnKey(ConnectionHashKey *key)
Expand All @@ -444,7 +446,17 @@ GetEffectiveConnKey(ConnectionHashKey *key)
return key;
}

if (!CitusHasBeenLoaded())
{
/*
* This happens when we connect to main database over localhost
* from some non Citus database.
*/
return key;
}

WorkerNode *worker = FindWorkerNode(key->hostname, key->port);

if (worker == NULL)
{
/* this can be hit when the key references an unknown node */
Expand Down
52 changes: 48 additions & 4 deletions src/backend/distributed/metadata/distobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,42 @@
#include "distributed/metadata/pg_dist_object.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/remote_commands.h"
#include "distributed/version_compat.h"
#include "distributed/worker_transaction.h"

static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress,
char *objectName);
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
Datum *paramValues);
static bool IsObjectDistributed(const ObjectAddress *address);

PG_FUNCTION_INFO_V1(mark_object_distributed);
PG_FUNCTION_INFO_V1(citus_unmark_object_distributed);
PG_FUNCTION_INFO_V1(master_unmark_object_distributed);


/*
* mark_object_distributed adds an object to pg_dist_object
* in all of the nodes.
*/
Datum
mark_object_distributed(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();

Oid classId = PG_GETARG_OID(0);
text *objectNameText = PG_GETARG_TEXT_P(1);
char *objectName = text_to_cstring(objectNameText);
Oid objectId = PG_GETARG_OID(2);
ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*objectAddress, classId, objectId);
MarkObjectDistributedWithName(objectAddress, objectName);
PG_RETURN_VOID();
}


/*
* citus_unmark_object_distributed(classid oid, objid oid, objsubid int)
*
Expand Down Expand Up @@ -160,12 +184,28 @@ ObjectExists(const ObjectAddress *address)
void
MarkObjectDistributed(const ObjectAddress *distAddress)
{
MarkObjectDistributedWithName(distAddress, "");
}


/*
* MarkObjectDistributedWithName marks an object as a distributed object.
* Same as MarkObjectDistributed but this function also allows passing an objectName
* that is used in case the object does not exists for the current transaction.
*/
void
MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName)
{
if (!CitusHasBeenLoaded())
{
elog(ERROR, "Cannot mark object distributed because Citus has not been loaded.");
}
MarkObjectDistributedLocally(distAddress);

if (EnableMetadataSync)
{
char *workerPgDistObjectUpdateCommand =
CreatePgDistObjectEntryCommand(distAddress);
CreatePgDistObjectEntryCommand(distAddress, objectName);
SendCommandToRemoteNodesWithMetadata(workerPgDistObjectUpdateCommand);
}
}
Expand All @@ -188,7 +228,7 @@ MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress)
if (EnableMetadataSync)
{
char *workerPgDistObjectUpdateCommand =
CreatePgDistObjectEntryCommand(distAddress);
CreatePgDistObjectEntryCommand(distAddress, "");
SendCommandToRemoteNodesWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand);
}
}
Expand Down Expand Up @@ -279,17 +319,21 @@ ShouldMarkRelationDistributed(Oid relationId)
* for the given object address.
*/
static char *
CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress)
CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress, char *objectName)
{
/* create a list by adding the address of value to not to have warning */
List *objectAddressList =
list_make1((ObjectAddress *) objectAddress);

/* names also require a list so we create a nested list here */
List *objectNameList = list_make1(list_make1((char *) objectName));
List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX);
List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID);
List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN);

char *workerPgDistObjectUpdateCommand =
MarkObjectsDistributedCreateCommand(objectAddressList,
objectNameList,
distArgumetIndexList,
colocationIdList,
forceDelegationList);
Expand Down
9 changes: 9 additions & 0 deletions src/backend/distributed/metadata/metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_placement.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/remote_commands.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shared_library_init.h"
#include "distributed/utils/array_type.h"
Expand Down Expand Up @@ -5722,6 +5723,14 @@ GetPoolinfoViaCatalog(int32 nodeId)
char *
GetAuthinfoViaCatalog(const char *roleName, int64 nodeId)
{
/*
* Citus will not be loaded when we run a global DDL command from a
* Citus non-main database.
*/
if (!CitusHasBeenLoaded())
{
return "";
}
char *authinfo = "";
Datum nodeIdDatumArray[2] = {
Int32GetDatum(nodeId),
Expand Down
23 changes: 21 additions & 2 deletions src/backend/distributed/metadata/metadata_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
#include "distributed/pg_dist_shard.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
#include "distributed/resource_lock.h"
#include "distributed/tenant_schema_metadata.h"
#include "distributed/utils/array_type.h"
Expand Down Expand Up @@ -900,6 +901,7 @@ NodeListIdempotentInsertCommand(List *workerNodeList)
*/
char *
MarkObjectsDistributedCreateCommand(List *addresses,
List *namesArg,
List *distributionArgumentIndexes,
List *colocationIds,
List *forceDelegations)
Expand All @@ -924,9 +926,25 @@ MarkObjectsDistributedCreateCommand(List *addresses,
int forceDelegation = list_nth_int(forceDelegations, currentObjectCounter);
List *names = NIL;
List *args = NIL;
char *objectType = NULL;

char *objectType = getObjectTypeDescription(address, false);
getObjectIdentityParts(address, &names, &args, false);
if (IsMainDBCommand)
{
/*
* When we try to distribute an object that's being created in a non Citus
* main database, we cannot find the name, since the object is not visible
* in Citus main database.
* Because of that we need to pass the name to this function.
*/
names = list_nth(namesArg, currentObjectCounter);
bool missingOk = false;
objectType = getObjectTypeDescription(address, missingOk);
}
else
{
objectType = getObjectTypeDescription(address, false);
getObjectIdentityParts(address, &names, &args, IsMainDBCommand);
}

if (!isFirstObject)
{
Expand Down Expand Up @@ -5148,6 +5166,7 @@ SendDistObjectCommands(MetadataSyncContext *context)

char *workerMetadataUpdateCommand =
MarkObjectsDistributedCreateCommand(list_make1(address),
NIL,
list_make1_int(distributionArgumentIndex),
list_make1_int(colocationId),
list_make1_int(forceDelegation));
Expand Down
14 changes: 14 additions & 0 deletions src/backend/distributed/shared_library_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
#include "distributed/repartition_executor.h"
#include "distributed/replication_origin_session_utils.h"
#include "distributed/resource_lock.h"
Expand Down Expand Up @@ -2570,6 +2571,17 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NoticeIfSubqueryPushdownEnabled, NULL, NULL);

DefineCustomStringVariable(
"citus.superuser",
gettext_noop("Name of a superuser role to be used in Citus main database "
"connections"),
NULL,
&SuperuserRole,
"",
PGC_SUSET,
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomEnumVariable(
"citus.task_assignment_policy",
gettext_noop("Sets the policy to use when assigning tasks to worker nodes."),
Expand Down Expand Up @@ -3149,6 +3161,8 @@ CitusAuthHook(Port *port, int status)
*/
InitializeBackendData(port->application_name);

IsMainDB = (strncmp(MainDb, "", NAMEDATALEN) == 0 ||
strncmp(MainDb, port->database_name, NAMEDATALEN) == 0);

/* let other authentication hooks to kick in first */
if (original_client_auth_hook)
Expand Down
7 changes: 7 additions & 0 deletions src/backend/distributed/sql/citus--12.1-1--12.2-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,10 @@

#include "udfs/citus_internal_database_command/12.2-1.sql"
#include "udfs/citus_add_rebalance_strategy/12.2-1.sql"

#include "udfs/start_management_transaction/12.2-1.sql"
#include "udfs/execute_command_on_remote_nodes_as_user/12.2-1.sql"
#include "udfs/mark_object_distributed/12.2-1.sql"
#include "udfs/commit_management_command_2pc/12.2-1.sql"

ALTER TABLE pg_catalog.pg_dist_transaction ADD COLUMN outer_xid xid8;
Loading

0 comments on commit ce32be6

Please sign in to comment.