Skip to content

Commit

Permalink
Implement "citus local table" creation logic (#3852)
Browse files Browse the repository at this point in the history
(A detailed commit message will be added later)
  • Loading branch information
onurctirtir committed Sep 7, 2020
1 parent ba208ea commit 2b6ec2a
Show file tree
Hide file tree
Showing 25 changed files with 2,104 additions and 27 deletions.
818 changes: 818 additions & 0 deletions src/backend/distributed/commands/create_citus_local_table.c

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ static void EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
Oid distributionColumnType,
Oid sourceRelationId);
static void EnsureLocalTableEmpty(Oid relationId);
static void EnsureTableNotDistributed(Oid relationId);
static void EnsureRelationHasNoTriggers(Oid relationId);
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
int16 supportFunctionNumber);
Expand Down Expand Up @@ -398,7 +397,7 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn,
colocationId, replicationModel);

/* foreign tables does not support TRUNCATE trigger */
/* foreign tables do not support TRUNCATE trigger */
if (RegularTable(relationId))
{
CreateTruncateTrigger(relationId);
Expand All @@ -424,7 +423,6 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
CreateReferenceTableShard(relationId);
}


if (ShouldSyncTableMetadata(relationId))
{
CreateTableMetadataOnWorkers(relationId);
Expand Down Expand Up @@ -955,7 +953,7 @@ EnsureLocalTableEmpty(Oid relationId)
/*
* EnsureTableNotDistributed errors out if the table is distributed.
*/
static void
void
EnsureTableNotDistributed(Oid relationId)
{
char *relationName = get_rel_name(relationId);
Expand Down
50 changes: 32 additions & 18 deletions src/backend/distributed/commands/foreign_constraint.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,6 @@
#include "utils/ruleutils.h"
#include "utils/syscache.h"

/*
* Flags that can be passed to GetForeignKeyOids to indicate
* which foreign key constraint OIDs are to be extracted
*/
typedef enum ExtractForeignKeyConstrainstMode
{
/* extract the foreign key OIDs where the table is the referencing one */
INCLUDE_REFERENCING_CONSTRAINTS = 1 << 0,

/* extract the foreign key OIDs the table is the referenced one */
INCLUDE_REFERENCED_CONSTRAINTS = 1 << 1,

/* exclude the self-referencing foreign keys */
EXCLUDE_SELF_REFERENCES = 1 << 2
} ExtractForeignKeyConstraintMode;

/* Local functions forward declarations */
static bool HeapTupleOfForeignConstraintIncludesColumn(HeapTuple heapTuple,
Oid relationId,
Expand All @@ -67,7 +51,6 @@ static void ForeignConstraintFindDistKeys(HeapTuple pgConstraintTuple,
static List * GetForeignConstraintCommandsInternal(Oid relationId, int flags);
static Oid get_relation_constraint_oid_compat(HeapTuple heapTuple);
static List * GetForeignKeyOidsToReferenceTables(Oid relationId);
static List * GetForeignKeyOids(Oid relationId, int flags);

/*
* ConstraintIsAForeignKeyToReferenceTable checks if the given constraint is a
Expand Down Expand Up @@ -674,13 +657,44 @@ FindForeignKeyOidWithName(List *foreignKeyOids, const char *inputConstraintName)
}


/*
* ErrorIfTableHasExternalForeignKeys errors out if the relation with relationId
* is involved in a foreign key relationship other than the self-referencing ones.
*/
void
ErrorIfTableHasExternalForeignKeys(Oid relationId)
{
int flags = (INCLUDE_REFERENCING_CONSTRAINTS | EXCLUDE_SELF_REFERENCES);
List *foreignKeyIdsTableReferencing = GetForeignKeyOids(relationId, flags);

flags = (INCLUDE_REFERENCED_CONSTRAINTS | EXCLUDE_SELF_REFERENCES);
List *foreignKeyIdsTableReferenced = GetForeignKeyOids(relationId, flags);

List *foreignKeysWithOtherTables = list_concat(foreignKeyIdsTableReferencing,
foreignKeyIdsTableReferenced);

if (list_length(foreignKeysWithOtherTables) == 0)
{
return;
}

const char *relationName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("relation \"%s\" is involved in a foreign key relationship "
"with another table", relationName),
errhint("Drop foreign keys with other tables and re-define them "
"with ALTER TABLE commands after the current operation "
"is done.")));
}


/*
* GetForeignKeyOids takes in a relationId, and returns a list of OIDs for
* foreign constraints that the relation with relationId is involved according
* to "flags" argument. See ExtractForeignKeyConstrainstMode enum definition
* for usage of the flags.
*/
static List *
List *
GetForeignKeyOids(Oid relationId, int flags)
{
AttrNumber pgConstraintTargetAttrNumber = InvalidAttrNumber;
Expand Down
68 changes: 68 additions & 0 deletions src/backend/distributed/commands/trigger.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#else
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/sysattr.h"
#endif
#include "catalog/indexing.h"
#include "catalog/namespace.h"
Expand All @@ -28,6 +29,8 @@
#include "distributed/namespace_utils.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"


/*
* GetExplicitTriggerCommandList returns the list of DDL commands to create
Expand Down Expand Up @@ -59,6 +62,71 @@ GetExplicitTriggerCommandList(Oid relationId)
}


/*
* GetExplicitTriggerNameList returns a list of trigger names that are explicitly
* created for the table with relationId. See comment of GetExplicitTriggerIdList
* function.
*/
List *
GetExplicitTriggerNameList(Oid relationId)
{
List *triggerNameList = NIL;

List *triggerIdList = GetExplicitTriggerIdList(relationId);

Oid triggerId = InvalidOid;
foreach_oid(triggerId, triggerIdList)
{
char *triggerHame = GetTriggerNameById(triggerId);
triggerNameList = lappend(triggerNameList, triggerHame);
}

return triggerNameList;
}


/*
* GetTriggerNameById returns name of the trigger identified by triggerId if it
* exists. Otherwise, returns NULL.
*/
char *
GetTriggerNameById(Oid triggerId)
{
Relation pgTrigger = heap_open(TriggerRelationId, AccessShareLock);

int scanKeyCount = 1;
ScanKeyData scanKey[1];

#if PG_VERSION_NUM >= PG_VERSION_12
AttrNumber attrNumber = Anum_pg_trigger_oid;
#else
AttrNumber attrNumber = ObjectIdAttributeNumber;
#endif

ScanKeyInit(&scanKey[0], attrNumber, BTEqualStrategyNumber,
F_OIDEQ, ObjectIdGetDatum(triggerId));

bool useIndex = true;
SysScanDesc scanDescriptor = systable_beginscan(pgTrigger, TriggerOidIndexId,
useIndex, NULL, scanKeyCount,
scanKey);

char *triggerName = NULL;

HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(heapTuple))
{
Form_pg_trigger triggerForm = (Form_pg_trigger) GETSTRUCT(heapTuple);
triggerName = pstrdup(NameStr(triggerForm->tgname));
}

systable_endscan(scanDescriptor);
heap_close(pgTrigger, NoLock);

return triggerName;
}


/*
* GetExplicitTriggerIdList returns a list of OIDs corresponding to the triggers
* that are explicitly created on the relation with relationId. That means,
Expand Down
16 changes: 11 additions & 5 deletions src/backend/distributed/metadata/metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -3809,16 +3809,22 @@ ReferenceTableOidList()
while (HeapTupleIsValid(heapTuple))
{
bool isNull = false;
Datum relationIdDatum = heap_getattr(heapTuple,
Anum_pg_dist_partition_logicalrelid,
tupleDescriptor, &isNull);
Oid relationId = DatumGetObjectId(relationIdDatum);
char partitionMethod = heap_getattr(heapTuple,
Anum_pg_dist_partition_partmethod,
tupleDescriptor, &isNull);
char replicationModel = heap_getattr(heapTuple,
Anum_pg_dist_partition_repmodel,
tupleDescriptor, &isNull);

if (partitionMethod == DISTRIBUTE_BY_NONE)
if (partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel == REPLICATION_MODEL_2PC)
{
Datum relationIdDatum = heap_getattr(heapTuple,
Anum_pg_dist_partition_logicalrelid,
tupleDescriptor, &isNull);

Oid relationId = DatumGetObjectId(relationIdDatum);

referenceTableOidList = lappend_oid(referenceTableOidList, relationId);
}

Expand Down
59 changes: 59 additions & 0 deletions src/backend/distributed/operations/worker_node_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "miscadmin.h"

#include "commands/dbcommands.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/hash_helpers.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
Expand Down Expand Up @@ -404,6 +405,25 @@ NodeIsPrimaryWorker(WorkerNode *node)
}


/*
* CoordinatorAddedAsWorkerNode returns true if coordinator is added to the
* pg_dist_node. This function also acquires RowExclusiveLock on pg_dist_node
* and does not release it to ensure that existency of the coordinator in
* metadata won't be changed until the end of transaction.
*/
bool
CoordinatorAddedAsWorkerNode()
{
bool groupContainsNodes = false;

LockRelationOid(DistNodeRelationId(), RowExclusiveLock);

PrimaryNodeForGroup(COORDINATOR_GROUP_ID, &groupContainsNodes);

return groupContainsNodes;
}


/*
* ReferenceTablePlacementNodeList returns the set of nodes that should have
* reference table placements. This includes all primaries, including the
Expand All @@ -417,6 +437,45 @@ ReferenceTablePlacementNodeList(LOCKMODE lockMode)
}


/*
* CoordinatorNode returns the WorkerNode object for coordinator node if it is
* added to pg_dist_node, otherwise errors out.
* Also, as CoordinatorAddedAsWorkerNode acquires AccessShareLock on
* pg_dist_node and doesn't release it, callers can safely assume coordinator
* won't be removed from metadata until the end of transaction when this function
* returns coordinator node.
*/
WorkerNode *
CoordinatorNode()
{
ErrorIfCoordinatorNotAddedAsWorkerNode();

WorkerNode *coordinatorNode = LookupNodeForGroup(COORDINATOR_GROUP_ID);

WorkerNode *coordinatorNodeCopy = palloc0(sizeof(WorkerNode));
*coordinatorNodeCopy = *coordinatorNode;

return coordinatorNodeCopy;
}


/*
* ErrorIfCoordinatorNotAddedAsWorkerNode errors out if coordinator is not added
* to metadata.
*/
void
ErrorIfCoordinatorNotAddedAsWorkerNode()
{
if (CoordinatorAddedAsWorkerNode())
{
return;
}

ereport(ERROR, (errmsg("could not find the coordinator node in "
"metadata as it is not added as a worker")));
}


/*
* DistributedTablePlacementNodeList returns a list of all active, primary
* worker nodes that can store new data, i.e shouldstoreshards is 'true'
Expand Down
1 change: 1 addition & 0 deletions src/backend/distributed/sql/citus--9.3-2--9.4-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
-- bump version to 9.4-1
#include "udfs/worker_last_saved_explain_analyze/9.4-1.sql"
#include "udfs/worker_save_query_explain_analyze/9.4-1.sql"
#include "udfs/create_citus_local_table/9.4-1.sql"

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION pg_catalog.create_citus_local_table(table_name regclass)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$create_citus_local_table$$;
COMMENT ON FUNCTION pg_catalog.create_citus_local_table(table_name regclass)
IS 'create a citus local table';
7 changes: 7 additions & 0 deletions src/backend/distributed/utils/colocation_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "catalog/indexing.h"
#include "catalog/pg_type.h"
#include "commands/sequence.h"
#include "distributed/create_citus_local_table.h"
#include "distributed/colocation_utils.h"
#include "distributed/listutils.h"
#include "distributed/metadata_utility.h"
Expand Down Expand Up @@ -241,6 +242,12 @@ CreateColocationGroupForRelation(Oid sourceRelationId)
static void
MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId)
{
if (IsCitusLocalTable(sourceRelationId) || IsCitusLocalTable(targetRelationId))
{
ereport(ERROR, (errmsg(
"citus local tables cannot be colocated with other tables")));
}

CheckReplicationModel(sourceRelationId, targetRelationId);
CheckDistributionColumnType(sourceRelationId, targetRelationId);

Expand Down
19 changes: 19 additions & 0 deletions src/backend/distributed/worker/worker_shard_visibility.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,25 @@ citus_table_is_visible(PG_FUNCTION_ARGS)
}


/*
* ErrorIfRelationIsAKnownShard errors out if the relation with relationId is
* a shard relation.
*/
void
ErrorIfRelationIsAKnownShard(Oid relationId)
{
/* search the relation in all schemas */
bool onlySearchPath = false;
if (!RelationIsAKnownShard(relationId, onlySearchPath))
{
return;
}

const char *relationName = get_rel_name(relationId);
ereport(ERROR, (errmsg("relation \"%s\" is a shard relation ", relationName)));
}


/*
* RelationIsAKnownShard gets a relationId, check whether it's a shard of
* any distributed table. If onlySearchPath is true, then it searches
Expand Down

0 comments on commit 2b6ec2a

Please sign in to comment.