Skip to content

Commit

Permalink
address some feedback: WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Jun 4, 2020
1 parent 551db18 commit 1615b56
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 145 deletions.
247 changes: 132 additions & 115 deletions src/backend/distributed/commands/create_citus_local_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include "access/htup_details.h"
#include "catalog/pg_constraint.h"
#include "distributed/citus_local_table_utils.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
Expand All @@ -31,30 +30,27 @@
#include "utils/lsyscache.h"
#include "utils/syscache.h"

static void ErrorIfUnsupportedCreateCitusLocalTable(Oid relationId);
static void CreateCitusLocalTableShard(Oid relationId, uint64 shardId);
static void ErrorIfUnsupportedCreateCitusLocalTable(Relation relation);
static void ErrorIfUnsupportedCitusLocalTableKind(Oid relationId);
static uint64 ConvertLocalTableToShard(Oid relationId);
static void RenameRelationToShardRelation(Oid shellRelationId, uint64 shardId);
static void RenameShardRelationConstraints(Oid shardRelationId, uint64 shardId);
static List * GetConstraintNameList(Oid relationId);
static void RenameForeignConstraintsReferencingToShard(Oid shardRelationId,
uint64 shardId);
static void RenameShardRelationIndexes(Oid shardRelationId, uint64 shardId);
static List * GetExplicitIndexNameList(Oid relationId);
static void CreateCitusLocalTable(Oid shellRelationId);
static void CreateCitusLocalTable(Oid relationId);
static List * GetShellTableDDLEventsForCitusLocalTable(Oid relationId);
static void CreateShellTableForCitusLocalTable(List *shellTableDDLEvents);
static void InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId);

PG_FUNCTION_INFO_V1(create_citus_local_table);

/*
* create_citus_local_table creates a citus table from the table with relationId.
* The created table would have the following properties:
* - it will have only one shard,
* - its distribution method will be DISTRIBUTE_BY_NONE,
* - its replication model will be ReplicationModel,
* - its replication factor will be set to 1.
* Similar to reference tables, it has only 1 placement. In addition to that, that
* single placement is only allowed to be on the coordinator.
* by executing the internal method CreateCitusLocalTable.
* (See CreateCitusLocalTable function's comment.)
*/
Datum
create_citus_local_table(PG_FUNCTION_ARGS)
Expand All @@ -63,123 +59,132 @@ create_citus_local_table(PG_FUNCTION_ARGS)

Oid relationId = PG_GETARG_OID(0);

/*
* Lock target relation with an exclusive lock - there's no way to make
* sense of this table until we've committed, and we don't want multiple
* backends manipulating this relation.
*/
Relation relation = try_relation_open(relationId, ExclusiveLock);
CreateCitusLocalTable(relationId);

PG_RETURN_VOID();
}


/*
* ErrorIfUnsupportedCreateCitusLocalTable errors out if we cannot create the
* citus local table from the relation with relationId.
*/
static void
ErrorIfUnsupportedCreateCitusLocalTable(Relation relation)
{
if (relation == NULL)
{
ereport(ERROR, (errmsg("could not create citus local table: "
"relation does not exist")));
}

ErrorIfUnsupportedCreateCitusLocalTable(relationId);
Oid relationId = relation->rd_id;

ObjectAddress tableAddress = { 0 };
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
ErrorIfUnsupportedCitusLocalTableKind(relationId);

/*
* Ensure dependencies first as we will create shell table on the other nodes
* in the MX case.
*/
EnsureDependenciesExistOnAllNodes(&tableAddress);
EnsureTableNotDistributed(relationId);

CreateCitusLocalTable(relationId);
if (!CoordinatorAddedAsWorkerNode())
{
const char *relationName = get_rel_name(relationId);

relation_close(relation, NoLock);
Assert(relationName != NULL);

PG_RETURN_VOID();
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot create citus local table \"%s\", citus local "
"tables can only be created from coordinator node if "
"it is added as a worker node", relationName),
errhint(
"First, add the coordinator with master_add_node command")));
}
}


/*
* ErrorIfUnsupportedCreateCitusLocalTable errors out if we cannot create the
* citus local table from the relation relationId.
*/
/* TODO: @onurctirtir: add comment here */
static void
ErrorIfUnsupportedCreateCitusLocalTable(Oid relationId)
ErrorIfUnsupportedCitusLocalTableKind(Oid relationId)
{
/* citus local tables can only be created from coordinator for now */
EnsureCoordinator();

EnsureTableOwner(relationId);
const char *relationName = get_rel_name(relationId);

/* we allow creating citus local tables only from relations with RELKIND_RELATION */
EnsureRelationKindSupported(relationId);
Assert(relationName != NULL);

if (PartitionTable(relationId) || PartitionedTable(relationId))
if (IsChildTable(relationId) || IsParentTable(relationId))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("citus local tables can not be involved in a "
"partition relationship")));
errmsg("cannot create citus local table \"%s\", citus local "
"tables cannot be involved in a parent/child relationship ",
relationName)));
}

EnsureTableNotDistributed(relationId);

if (!CoordinatorAddedAsWorkerNode())
if (PartitionTable(relationId))
{
const char *relationName = get_rel_name(relationId);

Assert(relationName != NULL);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create citus local table \"%s\", citus local "
"tables cannot be partition of another table ",
relationName)));
}

ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg(
"cannot create citus local table \"%s\", citus local "
"tables can only be created from coordinator node if "
"it is added to pg_dist_node", relationName)));
char relationKind = get_rel_relkind(relationId);
if (!(relationKind == RELKIND_RELATION || relationKind == RELKIND_FOREIGN_TABLE))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create citus local table \"%s\", only regular "
"tables and foreign tables are supported for citus local "
"tables", relationName)));
}
}


/*
* CreateCitusLocalTable is the internal method to create citus local table's
* shard and metadata.
* Note that this function does not perform any validation on the table as it
* is already done in create_citus_local_table.
* CreateCitusLocalTable is the internal method that creates a citus table
* from the table with relationId. The created table would have the following
* properties:
* - it will have only one shard,
* - its distribution method will be DISTRIBUTE_BY_NONE,
* - its replication model will be ReplicationModel,
* - its replication factor will be set to 1.
* Similar to reference tables, it has only 1 placement. In addition to that, that
* single placement is only allowed to be on the coordinator.
*/
static void
CreateCitusLocalTable(Oid shellRelationId)
CreateCitusLocalTable(Oid relationId)
{
/*
* Make sure that existing reference tables have been replicated to all
* the nodes such that we can create foreign keys and joins work
* immediately after creation.
*/
EnsureReferenceTablesExistOnAllNodes();
/* these checks should be done before acquiring the lock on the table */
EnsureCoordinator();
EnsureTableOwner(relationId);

/*
* Get an exclusive lock on relation with shellRelationId as the operations
* done in this function should be automic.
* Lock target relation with an exclusive lock - there's no way to make
* sense of this table until we've committed, and we don't want multiple
* backends manipulating this relation.
*/
LockRelationOid(shellRelationId, ExclusiveLock);
Relation relation = try_relation_open(relationId, ExclusiveLock);

ErrorIfUnsupportedCreateCitusLocalTable(relation);

ObjectAddress tableAddress = { 0 };
ObjectAddressSet(tableAddress, RelationRelationId, relationId);

/*
* Get necessary commands to recreate the shell table before renaming the
* given relation to the shard relation.
* Ensure dependencies first as we will create shell table on the other nodes
* in the MX case.
*/
List *foreignConstraintCommands =
GetForeignConstraintCommandsTableInvolved(shellRelationId);
EnsureDependenciesExistOnAllNodes(&tableAddress);

/*
* Include DEFAULT clauses for columns getting their default values from
* a sequence.
* Make sure that existing reference tables have been replicated to all
* the nodes such that we can create foreign keys and joins work
* immediately after creation.
*/
bool includeSequenceDefaults = true;

List *shellTableDDLEvents = GetTableDDLEvents(shellRelationId,
includeSequenceDefaults);
shellTableDDLEvents = list_concat(shellTableDDLEvents, foreignConstraintCommands);
EnsureReferenceTablesExistOnAllNodes();

uint64 shardId = GetNextShardId();
List *shellTableDDLEvents = GetShellTableDDLEventsForCitusLocalTable(relationId);

char *shellRelationName = get_rel_name(shellRelationId);
Oid shellRelationSchemaId = get_rel_namespace(shellRelationId);
char *relationName = get_rel_name(relationId);
Oid relationSchemaId = get_rel_namespace(relationId);

/* below we convert relation with shellRelationId to the shard relation */
CreateCitusLocalTableShard(shellRelationId, shardId);
/* below we convert relation with relationId to the shard relation */
uint64 shardId = ConvertLocalTableToShard(relationId);

/*
* As we retrieved the DDL commands necessary to create the shell table
Expand All @@ -188,50 +193,67 @@ CreateCitusLocalTable(Oid shellRelationId)
*/
CreateShellTableForCitusLocalTable(shellTableDDLEvents);

/* update shellRelationId so it points to the shell table that we just created */
shellRelationId = get_relname_relid(shellRelationName, shellRelationSchemaId);
/*
* Get new relationId as the relation with relationId now points
* to the shard relation.
*/
Oid shellRelationId = get_relname_relid(relationName, relationSchemaId);

/* assert that we created the shell table properly in the same schema */
Assert(OidIsValid(shellRelationId));

InsertMetadataForCitusLocalTable(shellRelationId, shardId);

/* foreign tables does not support TRUNCATE trigger */
if (RegularTable(shellRelationId))
{
CreateTruncateTrigger(shellRelationId);
}
PostCreateTableOperations(shellRelationId);

if (ShouldSyncTableMetadata(shellRelationId))
{
CreateTableMetadataOnWorkers(shellRelationId);
}
relation_close(relation, ExclusiveLock);
}


/* TODO: @onurctirtir: improve comment here */

/*
* Get necessary commands to recreate the shell table before renaming the
* given relation to the shard relation.
*/
static List *
GetShellTableDDLEventsForCitusLocalTable(Oid relationId)
{
List *foreignConstraintCommands =
GetForeignConstraintCommandsTableInvolved(relationId);

/*
* We've a custom way of foreign key graph invalidation,
* see InvalidateForeignKeyGraph().
* Include DEFAULT clauses for columns getting their default values from
* a sequence.
*/
if (TableReferenced(shellRelationId) || TableReferencing(shellRelationId))
{
InvalidateForeignKeyGraph();
}
bool includeSequenceDefaults = true;

List *shellTableDDLEvents = GetTableDDLEvents(relationId,
includeSequenceDefaults);
shellTableDDLEvents = list_concat(shellTableDDLEvents, foreignConstraintCommands);

return shellTableDDLEvents;
}


/*
* CreateCitusLocalTableShard creates the one and only shard of the citus
* local table lazily. That means, this function suffixes shardId to:
* ConvertLocalTableToShard converts the givent relation with relationId to the
* shard relation with shardId. That means, this function suffixes shardId to:
* - relation name,
* - all the objects "defined on" the relation and
* - the foreign keys referencing to the relation.
*/
static void
CreateCitusLocalTableShard(Oid relationId, uint64 shardId)
static uint64
ConvertLocalTableToShard(Oid relationId)
{
uint64 shardId = GetNextShardId();

RenameRelationToShardRelation(relationId, shardId);
RenameShardRelationConstraints(relationId, shardId);
RenameForeignConstraintsReferencingToShard(relationId, shardId);
RenameShardRelationIndexes(relationId, shardId);

return shardId;
}


Expand Down Expand Up @@ -302,7 +324,7 @@ RenameShardRelationConstraints(Oid shardRelationId, uint64 shardId)


/*
* GetConstraintNameList returns a list constraint names "defined on" the
* GetConstraintNameList returns a list of constraint names "defined on" the
* relation with relationId. Those constraints can be:
* - "check" constraints or,
* - "primary key" constraints or,
Expand Down Expand Up @@ -445,8 +467,6 @@ RenameShardRelationIndexes(Oid shardRelationId, uint64 shardId)
static List *
GetExplicitIndexNameList(Oid relationId)
{
List *indexNameList = NIL;

int scanKeyCount = 1;
ScanKeyData scanKey[1];

Expand All @@ -470,6 +490,8 @@ GetExplicitIndexNameList(Oid relationId)
useIndex, NULL, scanKeyCount,
scanKey);

List *indexNameList = NIL;

HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
Expand Down Expand Up @@ -518,12 +540,7 @@ CreateShellTableForCitusLocalTable(List *shellTableDDLEvents)
char *ddlCommand = NULL;
foreach_ptr(ddlCommand, shellTableDDLEvents)
{
StringInfo semicolonEndedCommand = makeStringInfo();
appendStringInfo(semicolonEndedCommand, "%s;", ddlCommand);

const char *commandString = semicolonEndedCommand->data;

Node *parseTree = ParseTreeNode(commandString);
Node *parseTree = ParseTreeNode(ddlCommand);

/*
* If the command defines a constraint, initially do not validate it
Expand All @@ -547,7 +564,7 @@ CreateShellTableForCitusLocalTable(List *shellTableDDLEvents)
}
}

CitusProcessUtility(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL,
CitusProcessUtility(parseTree, ddlCommand, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL);
}
}
Expand Down Expand Up @@ -593,7 +610,7 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId)


/*
* IscitusLocalTable returns whether the given relationId identifies a citus
* IsCitusLocalTable returns whether the given relationId identifies a citus
* local table.
*/
bool
Expand Down

0 comments on commit 1615b56

Please sign in to comment.