Skip to content

Commit

Permalink
Merge pull request #4133 from citusdata/single-placement-table/planner
Browse files Browse the repository at this point in the history
Citus Local Tables: planner + some other last pieces of changes
  • Loading branch information
onurctirtir committed Sep 8, 2020
2 parents d4c31f9 + 873e2c6 commit 07cbaac
Show file tree
Hide file tree
Showing 38 changed files with 4,079 additions and 158 deletions.
47 changes: 42 additions & 5 deletions src/backend/distributed/commands/foreign_constraint.c
Original file line number Diff line number Diff line change
Expand Up @@ -773,18 +773,31 @@ TableReferencing(Oid relationId)


/*
* ConstraintIsAForeignKey returns true if the given constraint name
* is a foreign key defined on the relation.
* ConstraintIsAForeignKey is a wrapper around GetForeignKeyOidByName that
* returns true if the given constraint name identifies a foreign key
* constraint defined on relation with relationId.
*/
bool
ConstraintIsAForeignKey(char *inputConstaintName, Oid relationId)
{
Oid foreignKeyId = GetForeignKeyOidByName(inputConstaintName, relationId);
return OidIsValid(foreignKeyId);
}


/*
* GetForeignKeyOidByName returns OID of the foreign key with name and defined
* on relation with relationId. If there is no such foreign key constraint, then
* this function returns InvalidOid.
*/
Oid
GetForeignKeyOidByName(char *inputConstaintName, Oid relationId)
{
int flags = INCLUDE_REFERENCING_CONSTRAINTS;
List *foreignKeyOids = GetForeignKeyOids(relationId, flags);

Oid foreignKeyOid = FindForeignKeyOidWithName(foreignKeyOids, inputConstaintName);

return OidIsValid(foreignKeyOid);
Oid foreignKeyId = FindForeignKeyOidWithName(foreignKeyOids, inputConstaintName);
return foreignKeyId;
}


Expand Down Expand Up @@ -945,3 +958,27 @@ GetForeignKeyOids(Oid relationId, int flags)

return foreignKeyOids;
}


/*
* GetReferencedTableId returns OID of the referenced relation for the foreign
* key with foreignKeyId. If there is no such foreign key, then this function
* returns InvalidOid.
*/
Oid
GetReferencedTableId(Oid foreignKeyId)
{
HeapTuple heapTuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(foreignKeyId));
if (!HeapTupleIsValid(heapTuple))
{
/* no such foreign key */
return InvalidOid;
}

Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
Oid referencedTableId = constraintForm->confrelid;

ReleaseSysCache(heapTuple);

return referencedTableId;
}
28 changes: 22 additions & 6 deletions src/backend/distributed/commands/table.c
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand)
* only subcommand of ALTER TABLE. It was already checked in
* ErrorIfUnsupportedAlterTableStmt.
*/
Assert(list_length(commandList) <= 1);
Assert(list_length(commandList) == 1);

rightRelationId = RangeVarGetRelid(constraint->pktable, lockmode,
alterTableStatement->missing_ok);
Expand All @@ -449,6 +449,22 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand)
constraint->skip_validation = true;
}
}
else if (alterTableType == AT_DropConstraint)
{
char *constraintName = command->name;
if (ConstraintIsAForeignKey(constraintName, leftRelationId))
{
/*
* We only support ALTER TABLE DROP CONSTRAINT ... FOREIGN KEY, if it is
* only subcommand of ALTER TABLE. It was already checked in
* ErrorIfUnsupportedAlterTableStmt.
*/
Assert(list_length(commandList) == 1);

Oid foreignKeyId = GetForeignKeyOidByName(constraintName, leftRelationId);
rightRelationId = GetReferencedTableId(foreignKeyId);
}
}
else if (alterTableType == AT_AddColumn)
{
/*
Expand Down Expand Up @@ -1670,8 +1686,8 @@ CreateRightShardListForInterShardDDLTask(Oid rightRelationId, Oid leftRelationId
* in a way that the right shard stays the same since we only have one
* placement per worker.
* If left relation is a citus local table, then we don't need to populate
* reference table shards as we will set foreign key only on reference
* table's coordinator placement.
* reference table shards as we will execute ADD/DROP constraint command
* only for coordinator placement of reference table.
*/
ShardInterval *rightShard = (ShardInterval *) linitial(rightShardList);
int leftShardCount = list_length(leftShardList);
Expand All @@ -1696,9 +1712,9 @@ SetInterShardDDLTaskPlacementList(Task *task, ShardInterval *leftShardInterval,
IsCitusTableType(rightRelationId, CITUS_LOCAL_TABLE))
{
/*
* If we are defining foreign key from a reference table to a citus
* local table, then we will set foreign key only on reference table's
* coordinator placement.
* If we are defining/dropping a foreign key from a reference table
* to a citus local table, then we will execute ADD/DROP constraint
* command only for coordinator placement of reference table.
*/
task->taskPlacementList = GroupShardPlacementsForTableOnGroup(leftRelationId,
COORDINATOR_GROUP_ID);
Expand Down
6 changes: 2 additions & 4 deletions src/backend/distributed/metadata/metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ static ScanKeyData DistObjectScanKey[3];
/* local function forward declarations */
static bool IsCitusTableViaCatalog(Oid relationId);
static ShardIdCacheEntry * LookupShardIdCacheEntry(int64 shardId);
static CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId);
static CitusTableCacheEntry * BuildCitusTableCacheEntry(Oid relationId);
static void BuildCachedShardList(CitusTableCacheEntry *cacheEntry);
static void PrepareWorkerNodeCache(void);
Expand Down Expand Up @@ -954,7 +953,7 @@ GetCitusTableCacheEntry(Oid distributedRelationId)
* passed relationId. For efficiency it caches lookups. This function returns
* NULL if the relation isn't a distributed table.
*/
static CitusTableCacheEntry *
CitusTableCacheEntry *
LookupCitusTableCacheEntry(Oid relationId)
{
bool foundInCache = false;
Expand Down Expand Up @@ -3847,8 +3846,7 @@ ReferenceTableOidList()
Anum_pg_dist_partition_repmodel,
tupleDescriptor, &isNull);

if (partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel == REPLICATION_MODEL_2PC)
if (IsReferenceTableByDistParams(partitionMethod, replicationModel))
{
Datum relationIdDatum = heap_getattr(heapTuple,
Anum_pg_dist_partition_logicalrelid,
Expand Down
8 changes: 8 additions & 0 deletions src/backend/distributed/operations/repair_shards.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,14 @@ ErrorIfTableCannotBeReplicated(Oid relationId)
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
char *relationName = get_rel_name(relationId);

if (IsCitusTableTypeCacheEntry(tableEntry, CITUS_LOCAL_TABLE))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
(errmsg("Table %s is a citus local table. Replicating "
"shard of a citus local table currently is not "
"supported", quote_literal_cstr(relationName)))));
}

/*
* ShouldSyncTableMetadata() returns true also for reference table,
* we don't want to error in that case since reference tables aren't
Expand Down
67 changes: 67 additions & 0 deletions src/backend/distributed/planner/distributed_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *pla
Node *distributionKeyValue);
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
int rteIdCounter);
static RTEListProperties * GetRTEListProperties(List *rangeTableList);


/* Distributed planner hook */
Expand Down Expand Up @@ -2262,3 +2263,69 @@ HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams)
boundParams);
}
}


/*
* GetRTEListPropertiesForQuery is a wrapper around GetRTEListProperties that
* returns RTEListProperties for the rte list retrieved from query.
*/
RTEListProperties *
GetRTEListPropertiesForQuery(Query *query)
{
List *rteList = ExtractRangeTableEntryList(query);
return GetRTEListProperties(rteList);
}


/*
* GetRTEListProperties returns RTEListProperties struct processing the given
* rangeTableList.
*/
static RTEListProperties *
GetRTEListProperties(List *rangeTableList)
{
RTEListProperties *rteListProperties = palloc0(sizeof(RTEListProperties));

RangeTblEntry *rangeTableEntry = NULL;
foreach_ptr(rangeTableEntry, rangeTableList)
{
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind == RELKIND_RELATION))
{
continue;
}

Oid relationId = rangeTableEntry->relid;
CitusTableCacheEntry *cacheEntry = LookupCitusTableCacheEntry(relationId);
if (!cacheEntry)
{
rteListProperties->hasPostgresLocalTable = true;
}
else if (IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE))
{
rteListProperties->hasReferenceTable = true;
}
else if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_LOCAL_TABLE))
{
rteListProperties->hasCitusLocalTable = true;
}
else if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE))
{
rteListProperties->hasDistributedTable = true;
}
else
{
/* it's not expected, but let's do a bug catch here */
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("encountered with an unexpected citus "
"table type while processing range table "
"entries of query")));
}
}

rteListProperties->hasCitusTable = (rteListProperties->hasDistributedTable ||
rteListProperties->hasReferenceTable ||
rteListProperties->hasCitusLocalTable);

return rteListProperties;
}
55 changes: 49 additions & 6 deletions src/backend/distributed/planner/insert_select_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,40 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
NULL, NULL);
}

RTEListProperties *subqueryRteListProperties = GetRTEListPropertiesForQuery(subquery);
if (subqueryRteListProperties->hasDistributedTable &&
(subqueryRteListProperties->hasCitusLocalTable ||
subqueryRteListProperties->hasPostgresLocalTable))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"distributed INSERT ... SELECT cannot select from "
"distributed tables and local tables at the same time",
NULL, NULL);
}

if (subqueryRteListProperties->hasDistributedTable &&
IsCitusTableType(targetRelationId, CITUS_LOCAL_TABLE))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"distributed INSERT ... SELECT cannot insert into a "
"citus local table",
NULL, NULL);
}

/*
* In some cases, it might be possible to allow postgres local tables
* in distributed insert select. However, we want to behave consistent
* on all cases including Citus MX, and let insert select via coordinator
* to kick-in.
*/
if (subqueryRteListProperties->hasPostgresLocalTable)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"distributed INSERT ... SELECT cannot select from "
"a local table", NULL, NULL);
return NULL;
}

/* we do not expect to see a view in modify target */
foreach(rangeTableCell, queryTree->rtable)
{
Expand Down Expand Up @@ -584,12 +618,19 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
return error;
}

/*
* If we're inserting into a reference table, all participating tables
* should be reference tables as well.
*/
if (IsCitusTableType(targetRelationId, REFERENCE_TABLE))
if (IsCitusTableType(targetRelationId, CITUS_LOCAL_TABLE))
{
/*
* If we're inserting into a citus local table, it is ok because we've
* checked the non-existence of distributed tables in the subquery.
*/
}
else if (IsCitusTableType(targetRelationId, REFERENCE_TABLE))
{
/*
* If we're inserting into a reference table, all participating tables
* should be reference tables as well.
*/
if (!allReferenceTables)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
Expand Down Expand Up @@ -726,7 +767,9 @@ RouterModifyTaskForShardInterval(Query *originalQuery,
* prevent shard pruning logic (i.e, namely UpdateRelationNames())
* modifies range table entries, which makes hard to add the quals.
*/
if (!allReferenceTables)
RTEListProperties *subqueryRteListProperties = GetRTEListPropertiesForQuery(
copiedSubquery);
if (subqueryRteListProperties->hasDistributedTable)
{
AddShardIntervalRestrictionToSelect(copiedSubquery, shardInterval);
}
Expand Down
38 changes: 37 additions & 1 deletion src/backend/distributed/planner/multi_logical_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,9 @@ NodeTryGetRteRelid(Node *node)
}

RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node;
if (rangeTableEntry->rtekind != RTE_RELATION)

if (!(rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind == RELKIND_RELATION))
{
return InvalidOid;
}
Expand All @@ -332,6 +334,18 @@ IsCitusTableRTE(Node *node)
}


/*
* IsPostgresLocalTableRte gets a node and returns true if the node is a
* range table relation entry that points to a postgres local table.
*/
bool
IsPostgresLocalTableRte(Node *node)
{
Oid relationId = NodeTryGetRteRelid(node);
return OidIsValid(relationId) && !IsCitusTable(relationId);
}


/*
* IsDistributedTableRTE gets a node and returns true if the node
* is a range table relation entry that points to a distributed relation,
Expand All @@ -357,6 +371,18 @@ IsReferenceTableRTE(Node *node)
}


/*
* IsCitusLocalTableRTE gets a node and returns true if the node
* is a range table relation entry that points to a citus local table.
*/
bool
IsCitusLocalTableRTE(Node *node)
{
Oid relationId = NodeTryGetRteRelid(node);
return OidIsValid(relationId) && IsCitusTableType(relationId, CITUS_LOCAL_TABLE);
}


/*
* FullCompositeFieldList gets a composite field list, and checks if all fields
* of composite type are used in the list.
Expand Down Expand Up @@ -926,6 +952,16 @@ DeferErrorIfQueryNotSupported(Query *queryTree)
errorMessage = "subquery in OFFSET is not supported in multi-shard queries";
}

RTEListProperties *queryRteListProperties = GetRTEListPropertiesForQuery(queryTree);
if (queryRteListProperties->hasCitusLocalTable ||
queryRteListProperties->hasPostgresLocalTable)
{
preconditionsSatisfied = false;
errorMessage = "direct joins between distributed and local tables are "
"not supported";
errorHint = LOCAL_TABLE_SUBQUERY_CTE_HINT;
}

/* finally check and error out if not satisfied */
if (!preconditionsSatisfied)
{
Expand Down

0 comments on commit 07cbaac

Please sign in to comment.