Skip to content

Commit

Permalink
Support foreign tables in MX (#5461)
Browse files Browse the repository at this point in the history
  • Loading branch information
agedemenli committed Jan 6, 2022
1 parent 5305aa4 commit 45e4231
Show file tree
Hide file tree
Showing 30 changed files with 883 additions and 127 deletions.
12 changes: 6 additions & 6 deletions src/backend/distributed/commands/alter_table.c
Expand Up @@ -368,7 +368,7 @@ UndistributeTable(TableConversionParameters *params)
EnsureTableNotReferencing(params->relationId, UNDISTRIBUTE_TABLE);
EnsureTableNotReferenced(params->relationId, UNDISTRIBUTE_TABLE);
}
EnsureTableNotForeign(params->relationId);

EnsureTableNotPartition(params->relationId);

if (PartitionedTable(params->relationId))
Expand Down Expand Up @@ -994,8 +994,7 @@ EnsureTableNotReferenced(Oid relationId, char conversionType)
void
EnsureTableNotForeign(Oid relationId)
{
char relationKind = get_rel_relkind(relationId);
if (relationKind == RELKIND_FOREIGN_TABLE)
if (IsForeignTable(relationId))
{
ereport(ERROR, (errmsg("cannot complete operation "
"because it is a foreign table")));
Expand Down Expand Up @@ -1063,7 +1062,7 @@ CreateTableConversion(TableConversionParameters *params)
BuildDistributionKeyFromColumnName(relation, con->distributionColumn);

con->originalAccessMethod = NULL;
if (!PartitionedTable(con->relationId))
if (!PartitionedTable(con->relationId) && !IsForeignTable(con->relationId))
{
HeapTuple amTuple = SearchSysCache1(AMOID, ObjectIdGetDatum(
relation->rd_rel->relam));
Expand Down Expand Up @@ -1305,7 +1304,7 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,

StringInfo query = makeStringInfo();

if (!PartitionedTable(sourceId))
if (!PartitionedTable(sourceId) && !IsForeignTable(sourceId))
{
if (!suppressNoticeMessages)
{
Expand Down Expand Up @@ -1402,7 +1401,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
}

resetStringInfo(query);
appendStringInfo(query, "DROP TABLE %s CASCADE",
appendStringInfo(query, "DROP %sTABLE %s CASCADE",
IsForeignTable(sourceId) ? "FOREIGN " : "",
quote_qualified_identifier(schemaName, sourceName));
ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY);

Expand Down
1 change: 1 addition & 0 deletions src/backend/distributed/commands/distribute_object_ops.c
Expand Up @@ -797,6 +797,7 @@ GetDistributeObjectOps(Node *node)
return &Statistics_AlterObjectSchema;
}

case OBJECT_FOREIGN_TABLE:
case OBJECT_TABLE:
{
return &Table_AlterObjectSchema;
Expand Down
19 changes: 15 additions & 4 deletions src/backend/distributed/commands/table.c
Expand Up @@ -648,7 +648,7 @@ List *
PostprocessAlterTableSchemaStmt(Node *node, const char *queryString)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TABLE);
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);

/*
* We will let Postgres deal with missing_ok
Expand Down Expand Up @@ -1054,7 +1054,8 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
*/
Assert(IsCitusTable(rightRelationId));
}
else if (attachedRelationKind == RELKIND_RELATION)
else if (attachedRelationKind == RELKIND_RELATION ||
attachedRelationKind == RELKIND_FOREIGN_TABLE)
{
Assert(list_length(commandList) <= 1);

Expand Down Expand Up @@ -1761,7 +1762,7 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TABLE);
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);

if (stmt->relation == NULL)
{
Expand Down Expand Up @@ -2951,6 +2952,16 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
break;
}

case AT_GenericOptions:
{
if (IsForeignTable(relationId))
{
break;
}
}

/* fallthrough */

default:
{
ereport(ERROR,
Expand Down Expand Up @@ -3326,7 +3337,7 @@ ObjectAddress
AlterTableSchemaStmtObjectAddress(Node *node, bool missing_ok)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TABLE);
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);

const char *tableName = stmt->relation->relname;
Oid tableOid = InvalidOid;
Expand Down
8 changes: 3 additions & 5 deletions src/backend/distributed/commands/truncate.c
Expand Up @@ -267,15 +267,13 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement)

ErrorIfIllegallyChangingKnownShard(relationId);

char relationKind = get_rel_relkind(relationId);
if (IsCitusTable(relationId) &&
relationKind == RELKIND_FOREIGN_TABLE)
if (IsCitusTable(relationId) && IsForeignTable(relationId))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("truncating distributed foreign tables is "
"currently unsupported"),
errhint("Use citus_drop_all_shards to remove "
"foreign table's shards.")));
errhint("Consider undistributing table before TRUNCATE, "
"and then distribute or add to metadata again")));
}
}
}
Expand Down
88 changes: 84 additions & 4 deletions src/backend/distributed/commands/utility_hook.c
Expand Up @@ -63,6 +63,7 @@
#include "distributed/transmit.h"
#include "distributed/version_compat.h"
#include "distributed/worker_transaction.h"
#include "foreign/foreign.h"
#include "lib/stringinfo.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
Expand Down Expand Up @@ -98,6 +99,8 @@ static void DecrementUtilityHookCountersIfNecessary(Node *parsetree);
static bool IsDropSchemaOrDB(Node *parsetree);
static bool ShouldCheckUndistributeCitusLocalTables(void);
static bool ShouldAddNewTableToMetadata(Node *parsetree);
static bool ServerUsesPostgresFDW(char *serverName);
static void ErrorIfOptionListHasNoTableName(List *optionList);


/*
Expand Down Expand Up @@ -662,6 +665,29 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
PostprocessCreateTableStmt(createStatement, queryString);
}

if (IsA(parsetree, CreateForeignTableStmt))
{
CreateForeignTableStmt *createForeignTableStmt =
(CreateForeignTableStmt *) parsetree;

CreateStmt *createTableStmt = (CreateStmt *) (&createForeignTableStmt->base);

/*
* Error out with a hint if the foreign table is using postgres_fdw and
* the option table_name is not provided.
* Citus relays all the Citus local foreign table logic to the placement of the
* Citus local table. If table_name is NOT provided, Citus would try to talk to
* the foreign postgres table over the shard's table name, which would not exist
* on the remote server.
*/
if (ServerUsesPostgresFDW(createForeignTableStmt->servername))
{
ErrorIfOptionListHasNoTableName(createForeignTableStmt->options);
}

PostprocessCreateTableStmt(createTableStmt, queryString);
}

/* after local command has completed, finish by executing worker DDLJobs, if any */
if (ddlJobs != NIL)
{
Expand Down Expand Up @@ -891,14 +917,24 @@ ShouldCheckUndistributeCitusLocalTables(void)
static bool
ShouldAddNewTableToMetadata(Node *parsetree)
{
if (!IsA(parsetree, CreateStmt))
CreateStmt *createTableStmt;

if (IsA(parsetree, CreateStmt))
{
createTableStmt = (CreateStmt *) parsetree;
}
else if (IsA(parsetree, CreateForeignTableStmt))
{
CreateForeignTableStmt *createForeignTableStmt =
(CreateForeignTableStmt *) parsetree;
createTableStmt = (CreateStmt *) &(createForeignTableStmt->base);
}
else
{
/* if the command is not CREATE TABLE, we can early return false */
/* if the command is not CREATE [FOREIGN] TABLE, we can early return false */
return false;
}

CreateStmt *createTableStmt = (CreateStmt *) parsetree;

if (createTableStmt->relation->relpersistence == RELPERSISTENCE_TEMP ||
createTableStmt->partbound != NULL)
{
Expand All @@ -924,6 +960,50 @@ ShouldAddNewTableToMetadata(Node *parsetree)
}


/*
* ServerUsesPostgresFDW gets a foreign server name and returns true if the FDW that
* the server depends on is postgres_fdw. Returns false otherwise.
*/
static bool
ServerUsesPostgresFDW(char *serverName)
{
ForeignServer *server = GetForeignServerByName(serverName, false);
ForeignDataWrapper *fdw = GetForeignDataWrapper(server->fdwid);

if (strcmp(fdw->fdwname, "postgres_fdw") == 0)
{
return true;
}

return false;
}


/*
* ErrorIfOptionListHasNoTableName gets an option list (DefElem) and errors out
* if the list does not contain a table_name element.
*/
static void
ErrorIfOptionListHasNoTableName(List *optionList)
{
char *table_nameString = "table_name";
DefElem *option = NULL;
foreach_ptr(option, optionList)
{
char *optionName = option->defname;
if (strcmp(optionName, table_nameString) == 0)
{
return;
}
}

ereport(ERROR, (errmsg(
"table_name option must be provided when using postgres_fdw with Citus"),
errhint("Provide the option \"table_name\" with value target table's"
" name")));
}


/*
* NotifyUtilityHookConstraintDropped sets ConstraintDropped to true to tell us
* last command dropped a table constraint.
Expand Down
8 changes: 5 additions & 3 deletions src/backend/distributed/deparser/deparse_table_stmts.c
Expand Up @@ -30,7 +30,7 @@ DeparseAlterTableSchemaStmt(Node *node)
StringInfoData str = { 0 };
initStringInfo(&str);

Assert(stmt->objectType == OBJECT_TABLE);
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);

AppendAlterTableSchemaStmt(&str, stmt);
return str.data;
Expand All @@ -40,8 +40,10 @@ DeparseAlterTableSchemaStmt(Node *node)
static void
AppendAlterTableSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt)
{
Assert(stmt->objectType == OBJECT_TABLE);
appendStringInfo(buf, "ALTER TABLE ");
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);

bool isForeignTable = stmt->objectType == OBJECT_FOREIGN_TABLE;
appendStringInfo(buf, "ALTER %sTABLE ", isForeignTable ? "FOREIGN " : "");
if (stmt->missing_ok)
{
appendStringInfo(buf, "IF EXISTS ");
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/deparser/qualify_table_stmt.c
Expand Up @@ -29,7 +29,7 @@ void
QualifyAlterTableSchemaStmt(Node *node)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TABLE);
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);

if (stmt->relation->schemaname == NULL)
{
Expand Down
24 changes: 15 additions & 9 deletions src/backend/distributed/metadata/metadata_sync.c
Expand Up @@ -679,21 +679,24 @@ MetadataCreateCommands(void)
/* after all tables are created, create the metadata */
foreach_ptr(cacheEntry, propagatedTableList)
{
Oid clusteredTableId = cacheEntry->relationId;
Oid relationId = cacheEntry->relationId;

/* add the table metadata command first*/
char *metadataCommand = DistributionCreateCommand(cacheEntry);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
metadataCommand);

/* add the truncate trigger command after the table became distributed */
char *truncateTriggerCreateCommand =
TruncateTriggerCreateCommand(cacheEntry->relationId);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
truncateTriggerCreateCommand);
if (!IsForeignTable(relationId))
{
/* add the truncate trigger command after the table became distributed */
char *truncateTriggerCreateCommand =
TruncateTriggerCreateCommand(cacheEntry->relationId);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
truncateTriggerCreateCommand);
}

/* add the pg_dist_shard{,placement} entries */
List *shardIntervalList = LoadShardIntervalList(clusteredTableId);
List *shardIntervalList = LoadShardIntervalList(relationId);
List *shardCreateCommandList = ShardListInsertCommand(shardIntervalList);

metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
Expand Down Expand Up @@ -844,8 +847,11 @@ GetDistributedTableDDLEvents(Oid relationId)
commandList = lappend(commandList, metadataCommand);

/* commands to create the truncate trigger of the table */
char *truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
commandList = lappend(commandList, truncateTriggerCreateCommand);
if (!IsForeignTable(relationId))
{
char *truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
commandList = lappend(commandList, truncateTriggerCreateCommand);
}

/* commands to insert pg_dist_shard & pg_dist_placement entries */
List *shardIntervalList = LoadShardIntervalList(relationId);
Expand Down
11 changes: 11 additions & 0 deletions src/backend/distributed/metadata/metadata_utility.c
Expand Up @@ -2162,3 +2162,14 @@ TableOwner(Oid relationId)

return GetUserNameFromId(userId, false);
}


/*
* IsForeignTable takes a relation id and returns true if it's a foreign table.
* Returns false otherwise.
*/
bool
IsForeignTable(Oid relationId)
{
return get_rel_relkind(relationId) == RELKIND_FOREIGN_TABLE;
}
16 changes: 0 additions & 16 deletions src/backend/distributed/operations/node_protocol.c
Expand Up @@ -576,22 +576,6 @@ GetPreLoadTableCreationCommands(Oid relationId,

PushOverrideEmptySearchPath(CurrentMemoryContext);

/* if foreign table, fetch extension and server definitions */
char tableType = get_rel_relkind(relationId);
if (tableType == RELKIND_FOREIGN_TABLE)
{
char *extensionDef = pg_get_extensiondef_string(relationId);
char *serverDef = pg_get_serverdef_string(relationId);

if (extensionDef != NULL)
{
tableDDLEventList = lappend(tableDDLEventList,
makeTableDDLCommandString(extensionDef));
}
tableDDLEventList = lappend(tableDDLEventList,
makeTableDDLCommandString(serverDef));
}

/* fetch table schema and column option definitions */
char *tableSchemaDef = pg_get_tableschemadef_string(relationId,
includeSequenceDefaults,
Expand Down

0 comments on commit 45e4231

Please sign in to comment.