Skip to content

Commit

Permalink
fixup: move sequences to shell table
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Jun 9, 2020
1 parent 34bd9ff commit fbc981f
Showing 1 changed file with 156 additions and 10 deletions.
166 changes: 156 additions & 10 deletions src/backend/distributed/commands/create_citus_local_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "access/htup_details.h"
#include "catalog/pg_constraint.h"
#include "distributed/create_citus_local_table.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
Expand All @@ -40,15 +41,21 @@ static void RenameShardRelationConstraints(Oid shardRelationId, uint64 shardId);
static List * GetConstraintNameList(Oid relationId);
static void RenameForeignConstraintsReferencingToShard(Oid shardRelationId,
uint64 shardId);
static char * GetRenameShardConstraintCommand(Oid relationId, char *constraintName, uint64
shardId);
static char * GetRenameShardConstraintCommand(Oid relationId, char *constraintName,
uint64 shardId);
static void RenameShardRelationIndexes(Oid shardRelationId, uint64 shardId);
static char * GetRenameShardIndexCommand(char *indexName, uint64 shardId);
static List * GetExplicitIndexNameList(Oid relationId);
static void CreateCitusLocalTable(Oid relationId);
static void FinalizeCitusLocalTableCreation(Oid relationId);
static List * GetShellTableDDLEventsForCitusLocalTable(Oid relationId);
static void CreateShellTableForCitusLocalTable(List *shellTableDDLEvents);
static void MoveSequenceOwnerships(Oid sourceRelationId, Oid targetRelationId);
static void ExtractColumnsOwningSequences(Oid relationId, List **columnNameList,
List **ownedSequenceIdList);
static void DropDefaultColumnDefinition(Oid relationId, char *columnName);
static void TransferSequenceOwnership(Oid ownedSequenceId, Oid targetRelationId,
char *columnName);
static void InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId);


Expand Down Expand Up @@ -157,11 +164,15 @@ CreateCitusLocalTable(Oid relationId)
EnsureTableOwner(relationId);

/*
* Lock target relation with an exclusive lock - there's no way to make
* sense of this table until we've committed, and we don't want multiple
* Lock target relation with an exclusive lock as we don't want multiple
* backends manipulating this relation.
* We should also note that here we intentionally do not use relation_open
* to lock the table. This is because, in this function, we may execute
* ALTER TABLE commands modifying relation's column definitions and postgres
* does not allow us to do so when the table is still open.
* (See the postgres function CheckTableNotInUse for more information.)
*/
Relation relation = relation_open(relationId, ExclusiveLock);
LockRelationOid(relationId, ExclusiveLock);

ErrorIfUnsupportedCreateCitusLocalTable(relationId);

Expand Down Expand Up @@ -197,19 +208,22 @@ CreateCitusLocalTable(Oid relationId)
CreateShellTableForCitusLocalTable(shellTableDDLEvents);

/*
* Get new relationId as the relation with relationId now points
* Set shellRelationId as the relation with relationId now points
* to the shard relation.
*/
Oid shellRelationId = get_relname_relid(relationName, relationSchemaId);

/* assert that we created the shell table properly in the same schema */
Assert(OidIsValid(shellRelationId));

Oid shardRelationId = relationId;
MoveSequenceOwnerships(shardRelationId, shellRelationId);

InsertMetadataForCitusLocalTable(shellRelationId, shardId);

FinalizeCitusLocalTableCreation(shellRelationId);

relation_close(relation, ExclusiveLock);
UnlockRelationOid(relationId, ExclusiveLock);
}


Expand Down Expand Up @@ -314,7 +328,6 @@ RenameRelationToShardRelation(Oid shellRelationId, uint64 shardId)
const char *commandString = renameCommand->data;

Node *parseTree = ParseTreeNode(commandString);

CitusProcessUtility(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL);
}
Expand Down Expand Up @@ -422,7 +435,6 @@ RenameForeignConstraintsReferencingToShard(Oid shardRelationId, uint64 shardId)
GetRenameShardConstraintCommand(referencingTableId, constraintName, shardId);

Node *parseTree = ParseTreeNode(commandString);

CitusProcessUtility(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL);

Expand Down Expand Up @@ -474,7 +486,6 @@ RenameShardRelationIndexes(Oid shardRelationId, uint64 shardId)
const char *commandString = GetRenameShardIndexCommand(indexName, shardId);

Node *parseTree = ParseTreeNode(commandString);

CitusProcessUtility(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL);
}
Expand Down Expand Up @@ -593,6 +604,141 @@ CreateShellTableForCitusLocalTable(List *shellTableDDLEvents)
}


/*
* MoveSequenceOwnerships finds each column of relation with sourceRelationId
* defaulting to an owned sequence. Then, drops default definitions for those
* columns and grants ownership of each of the owned sequences to the same named
* column of the relation with targetRelationId.
*/
static void
MoveSequenceOwnerships(Oid sourceRelationId, Oid targetRelationId)
{
List *columnNameList = NIL;
List *ownedSequenceIdList = NIL;
ExtractColumnsOwningSequences(sourceRelationId, &columnNameList,
&ownedSequenceIdList);

Assert(list_length(columnNameList) == list_length(ownedSequenceIdList));

ListCell *columnNameCell = NULL;
ListCell *ownedSequenceIdCell = NULL;
forboth(columnNameCell, columnNameList, ownedSequenceIdCell, ownedSequenceIdList)
{
char *columnName = (char *) lfirst(columnNameCell);
Oid ownedSequenceId = lfirst_oid(ownedSequenceIdCell);

DropDefaultColumnDefinition(sourceRelationId, columnName);
TransferSequenceOwnership(ownedSequenceId, targetRelationId, columnName);
}
}


/*
* ExtractColumnsOwningSequences finds each column of relation with relationId
* defaulting to an owned sequence. Then, appends the column name and id of the
* owned sequence -that the column defaults- to the lists passed as NIL initially.
*/
static void
ExtractColumnsOwningSequences(Oid relationId, List **columnNameList,
List **ownedSequenceIdList)
{
Assert(*columnNameList == NIL && *ownedSequenceIdList == NIL);

Relation relation = relation_open(relationId, ExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation);

/* iterate on columns that are not dropped and have DEFAULT definitions */
AttrNumber defaultValueIndex = 0;

for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex);
if (attributeForm->attisdropped || !attributeForm->atthasdef)
{
/*
* If this column has already been dropped or it has no DEFAULT
* definition, skip it.
*/
continue;
}

List *columnOwnedSequences = getOwnedSequences(relationId, defaultValueIndex++);
if (list_length(columnOwnedSequences) == 0)
{
/*
* Skip if the column does not own any sequences. In that case, column
* either does not take the default value from a sequence or it does
* not own the sequence, i.e, sequence is created and attached to the
* column explicitly by the user.
*/
continue;
}

/* TODO: this assumption should be validated */
Assert(list_length(columnOwnedSequences) == 1);

Oid ownedSequenceId = linitial_oid(columnOwnedSequences);
*ownedSequenceIdList = lappend_oid(*ownedSequenceIdList, ownedSequenceId);

char *columnName = NameStr(attributeForm->attname);
*columnNameList = lappend(*columnNameList, columnName);
}

relation_close(relation, NoLock);
}


/*
* DropDefaultColumnDefinition drops the DEFAULT definiton of the column with
* columnName of the relation with relationId via process utility.
*/
static void
DropDefaultColumnDefinition(Oid relationId, char *columnName)
{
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
const char *quotedColumnName = quote_identifier(columnName);

StringInfo sequenceDropCommand = makeStringInfo();
appendStringInfo(sequenceDropCommand,
"ALTER TABLE %s ALTER COLUMN %s DROP DEFAULT",
qualifiedRelationName, quotedColumnName);

const char *commandString = sequenceDropCommand->data;

Node *parseTree = ParseTreeNode(commandString);
CitusProcessUtility(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL);
}


/*
* TransferSequenceOwnership grants ownership of the sequence with sequenceId
* to the column with targetColumnName of relation with targetRelationId via
* process utility. Note that this function assumes that the target relation
* has a column with targetColumnName which can default to the given sequence.
*/
static void
TransferSequenceOwnership(Oid sequenceId, Oid targetRelationId, char *targetColumnName)
{
char *qualifiedSequenceName = generate_qualified_relation_name(sequenceId);
char *qualifiedTargetRelationName =
generate_qualified_relation_name(targetRelationId);
const char *quotedTargetColumnName = quote_identifier(targetColumnName);

StringInfo sequenceOwnershipCommand = makeStringInfo();
appendStringInfo(sequenceOwnershipCommand, "ALTER SEQUENCE %s OWNED BY %s.%s",
qualifiedSequenceName, qualifiedTargetRelationName,
quotedTargetColumnName);

const char *commandString = sequenceOwnershipCommand->data;

Node *parseTree = ParseTreeNode(commandString);
CitusProcessUtility(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL);
}


/*
* InsertMetadataForCitusLocalTable inserts necessary metadata for the citus
* local table to the following metadata tables:
Expand Down

0 comments on commit fbc981f

Please sign in to comment.