Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support foreign tables in MX #5461

Merged
merged 62 commits into from Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
eef9d76
Fix issues preventing creation of foreign Citus local tables
marcocitus Sep 16, 2021
d986697
Fix undistribute_table for foreign tables
agedemenli Nov 11, 2021
82cd83a
Fix test output
agedemenli Nov 11, 2021
30f372d
Add foreign table support to GUC citus.use_citus_managed_tables
agedemenli Nov 11, 2021
2390e71
Move test to mx schedule
agedemenli Nov 12, 2021
0e283e2
Add user mapping to test
agedemenli Nov 12, 2021
1be73ff
Create shard foreign servers when converting table
agedemenli Nov 15, 2021
7950c8c
Style
agedemenli Nov 15, 2021
e8009e4
Remove rename server part
agedemenli Dec 24, 2021
d94a70a
Fix test output
agedemenli Dec 24, 2021
7a1259d
Move IsForeignTable to metadata utility
agedemenli Dec 24, 2021
5141a61
Use IsForeignTable all places
agedemenli Dec 24, 2021
e77c0fa
Support alter foreign table schema stmts
agedemenli Dec 27, 2021
9aa2ef8
Style
agedemenli Dec 27, 2021
eda8d47
Fix creating partitioned foreign tables
agedemenli Dec 28, 2021
4ecd727
Add test for foreign table partitions
agedemenli Dec 28, 2021
94ec5fc
Fix undistribute_table bug
agedemenli Dec 28, 2021
c12f53a
Fix test output
agedemenli Dec 28, 2021
392031e
Fix citus_add_node failure
agedemenli Dec 29, 2021
6ea1fc8
Add test for alter user mapping
agedemenli Dec 29, 2021
e484bad
Fix test output
agedemenli Dec 29, 2021
3d1290e
Fix alter schema issue
agedemenli Dec 30, 2021
ee41210
Update hint for truncate
agedemenli Dec 30, 2021
615bbea
Add test for truncate
agedemenli Dec 30, 2021
4d3eda3
Fix comment
agedemenli Dec 30, 2021
ae5f118
Do not drop the server when dropping the foreign table
agedemenli Dec 30, 2021
43256e1
Add test for undistribute
agedemenli Dec 30, 2021
63e3abc
Fix test
agedemenli Dec 30, 2021
a10fa7c
Add test local foreign tbl JOIN dist&ref tbl
agedemenli Dec 31, 2021
8a87195
Add test with bigserial
agedemenli Dec 31, 2021
d82dd14
Add test for rename foreign table
agedemenli Jan 3, 2022
15feb7a
Add test for rename column
agedemenli Jan 3, 2022
ce4427c
Add if exists to rename test
agedemenli Jan 3, 2022
4432541
Add test for add column
agedemenli Jan 3, 2022
0e963c5
Add test for alter column type
agedemenli Jan 3, 2022
dd44b11
Style
agedemenli Jan 3, 2022
26d1edf
Add tests with ADD/SET DEFAULT
agedemenli Jan 4, 2022
87d9577
Add alter owner test
agedemenli Jan 4, 2022
e46e2df
Fix test
agedemenli Jan 4, 2022
775fe2c
Support command: ALTER FOREIGN TABLE OPTIONS
agedemenli Jan 4, 2022
5c63575
Minor improvements
agedemenli Jan 4, 2022
8acc644
Unify alter schema logic
agedemenli Jan 5, 2022
8e01ad9
Style
agedemenli Jan 5, 2022
43286ab
Add test with citus_add_node
agedemenli Jan 5, 2022
df1bf49
Add separate test for partitioned tables
agedemenli Jan 5, 2022
b35413f
Add join testw on the worker
agedemenli Jan 5, 2022
b055f77
Add DROP FOREIGN TABLE command to test
agedemenli Jan 6, 2022
b592589
Add FOREIGN clause to alter table
agedemenli Jan 6, 2022
e6a71db
Move tests to a separate file
agedemenli Jan 6, 2022
fae934d
Add set verbosity
agedemenli Jan 6, 2022
cfd3533
Style
agedemenli Jan 6, 2022
65dfa67
Add test querying partitioned foreign tables
agedemenli Jan 6, 2022
44747be
Add constraint tests
agedemenli Jan 6, 2022
11c0a3a
Add trigger test
agedemenli Jan 6, 2022
1f3d68c
Style
agedemenli Jan 6, 2022
6deef85
Error out if table_name is missing with postgres_fdw
agedemenli Jan 6, 2022
68b3610
Style
agedemenli Jan 6, 2022
399d6ca
Add order by to select queries
agedemenli Jan 6, 2022
d7f7954
Cleanup at exit
agedemenli Jan 6, 2022
e524408
Improve comments
agedemenli Jan 6, 2022
afcd212
Style
agedemenli Jan 6, 2022
20d8b04
Undo config.h changes
agedemenli Jan 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/backend/distributed/commands/alter_table.c
Expand Up @@ -368,7 +368,7 @@ UndistributeTable(TableConversionParameters *params)
EnsureTableNotReferencing(params->relationId, UNDISTRIBUTE_TABLE);
EnsureTableNotReferenced(params->relationId, UNDISTRIBUTE_TABLE);
}
EnsureTableNotForeign(params->relationId);
onderkalaci marked this conversation as resolved.
Show resolved Hide resolved

EnsureTableNotPartition(params->relationId);

if (PartitionedTable(params->relationId))
Expand Down Expand Up @@ -994,8 +994,7 @@ EnsureTableNotReferenced(Oid relationId, char conversionType)
void
EnsureTableNotForeign(Oid relationId)
{
char relationKind = get_rel_relkind(relationId);
if (relationKind == RELKIND_FOREIGN_TABLE)
if (IsForeignTable(relationId))
{
ereport(ERROR, (errmsg("cannot complete operation "
"because it is a foreign table")));
Expand Down Expand Up @@ -1063,7 +1062,7 @@ CreateTableConversion(TableConversionParameters *params)
BuildDistributionKeyFromColumnName(relation, con->distributionColumn);

con->originalAccessMethod = NULL;
if (!PartitionedTable(con->relationId))
if (!PartitionedTable(con->relationId) && !IsForeignTable(con->relationId))
{
HeapTuple amTuple = SearchSysCache1(AMOID, ObjectIdGetDatum(
relation->rd_rel->relam));
Expand Down Expand Up @@ -1305,7 +1304,7 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,

StringInfo query = makeStringInfo();

if (!PartitionedTable(sourceId))
if (!PartitionedTable(sourceId) && !IsForeignTable(sourceId))
{
if (!suppressNoticeMessages)
{
Expand Down Expand Up @@ -1402,7 +1401,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
}

resetStringInfo(query);
appendStringInfo(query, "DROP TABLE %s CASCADE",
appendStringInfo(query, "DROP %sTABLE %s CASCADE",
IsForeignTable(sourceId) ? "FOREIGN " : "",
quote_qualified_identifier(schemaName, sourceName));
ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY);

Expand Down
1 change: 1 addition & 0 deletions src/backend/distributed/commands/distribute_object_ops.c
Expand Up @@ -797,6 +797,7 @@ GetDistributeObjectOps(Node *node)
return &Statistics_AlterObjectSchema;
}

case OBJECT_FOREIGN_TABLE:
case OBJECT_TABLE:
{
return &Table_AlterObjectSchema;
Expand Down
19 changes: 15 additions & 4 deletions src/backend/distributed/commands/table.c
Expand Up @@ -648,7 +648,7 @@ List *
PostprocessAlterTableSchemaStmt(Node *node, const char *queryString)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TABLE);
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);

/*
* We will let Postgres deal with missing_ok
Expand Down Expand Up @@ -1054,7 +1054,8 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
*/
Assert(IsCitusTable(rightRelationId));
}
else if (attachedRelationKind == RELKIND_RELATION)
else if (attachedRelationKind == RELKIND_RELATION ||
attachedRelationKind == RELKIND_FOREIGN_TABLE)
{
Assert(list_length(commandList) <= 1);

Expand Down Expand Up @@ -1761,7 +1762,7 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TABLE);
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);

if (stmt->relation == NULL)
{
Expand Down Expand Up @@ -2951,6 +2952,16 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
break;
}

case AT_GenericOptions:
{
if (IsForeignTable(relationId))
{
break;
}
}

/* fallthrough */

default:
{
ereport(ERROR,
Expand Down Expand Up @@ -3326,7 +3337,7 @@ ObjectAddress
AlterTableSchemaStmtObjectAddress(Node *node, bool missing_ok)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TABLE);
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);

const char *tableName = stmt->relation->relname;
Oid tableOid = InvalidOid;
Expand Down
8 changes: 3 additions & 5 deletions src/backend/distributed/commands/truncate.c
Expand Up @@ -267,15 +267,13 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement)

ErrorIfIllegallyChangingKnownShard(relationId);

char relationKind = get_rel_relkind(relationId);
if (IsCitusTable(relationId) &&
relationKind == RELKIND_FOREIGN_TABLE)
if (IsCitusTable(relationId) && IsForeignTable(relationId))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
onderkalaci marked this conversation as resolved.
Show resolved Hide resolved
errmsg("truncating distributed foreign tables is "
"currently unsupported"),
errhint("Use citus_drop_all_shards to remove "
"foreign table's shards.")));
errhint("Consider undistributing table before TRUNCATE, "
"and then distribute or add to metadata again")));
}
}
}
Expand Down
88 changes: 84 additions & 4 deletions src/backend/distributed/commands/utility_hook.c
Expand Up @@ -63,6 +63,7 @@
#include "distributed/transmit.h"
#include "distributed/version_compat.h"
#include "distributed/worker_transaction.h"
#include "foreign/foreign.h"
#include "lib/stringinfo.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
Expand Down Expand Up @@ -98,6 +99,8 @@ static void DecrementUtilityHookCountersIfNecessary(Node *parsetree);
static bool IsDropSchemaOrDB(Node *parsetree);
static bool ShouldCheckUndistributeCitusLocalTables(void);
static bool ShouldAddNewTableToMetadata(Node *parsetree);
static bool ServerUsesPostgresFDW(char *serverName);
static void ErrorIfOptionListHasNoTableName(List *optionList);


/*
Expand Down Expand Up @@ -662,6 +665,29 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
PostprocessCreateTableStmt(createStatement, queryString);
}

if (IsA(parsetree, CreateForeignTableStmt))
{
CreateForeignTableStmt *createForeignTableStmt =
(CreateForeignTableStmt *) parsetree;

CreateStmt *createTableStmt = (CreateStmt *) (&createForeignTableStmt->base);

/*
* Error out with a hint if the foreign table is using postgres_fdw and
* the option table_name is not provided.
* Citus relays all the Citus local foreign table logic to the placement of the
* Citus local table. If table_name is NOT provided, Citus would try to talk to
* the foreign postgres table over the shard's table name, which would not exist
* on the remote server.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? Maybe mention in the comment:

		 * Error out with a hint if the foreign table is using postgres_fdw and
		 * the option table_name is not provided.
           * Citus relays all the Citus local foreign table logic to the placement of the
           *  Citus local table. If table_name is NOT provided, Citus would try to talk to the
           * foreign postgres table over the shard's table name, which would not exist on the
           * remote server.  It is better to error on the creation time.

if (ServerUsesPostgresFDW(createForeignTableStmt->servername))
{
ErrorIfOptionListHasNoTableName(createForeignTableStmt->options);
}

PostprocessCreateTableStmt(createTableStmt, queryString);
}

/* after local command has completed, finish by executing worker DDLJobs, if any */
if (ddlJobs != NIL)
{
Expand Down Expand Up @@ -891,14 +917,24 @@ ShouldCheckUndistributeCitusLocalTables(void)
static bool
ShouldAddNewTableToMetadata(Node *parsetree)
{
if (!IsA(parsetree, CreateStmt))
CreateStmt *createTableStmt;
onderkalaci marked this conversation as resolved.
Show resolved Hide resolved

if (IsA(parsetree, CreateStmt))
{
createTableStmt = (CreateStmt *) parsetree;
}
else if (IsA(parsetree, CreateForeignTableStmt))
{
CreateForeignTableStmt *createForeignTableStmt =
(CreateForeignTableStmt *) parsetree;
createTableStmt = (CreateStmt *) &(createForeignTableStmt->base);
}
else
{
/* if the command is not CREATE TABLE, we can early return false */
/* if the command is not CREATE [FOREIGN] TABLE, we can early return false */
return false;
}

CreateStmt *createTableStmt = (CreateStmt *) parsetree;

if (createTableStmt->relation->relpersistence == RELPERSISTENCE_TEMP ||
createTableStmt->partbound != NULL)
{
Expand All @@ -924,6 +960,50 @@ ShouldAddNewTableToMetadata(Node *parsetree)
}


/*
* ServerUsesPostgresFDW gets a foreign server name and returns true if the FDW that
* the server depends on is postgres_fdw. Returns false otherwise.
*/
static bool
ServerUsesPostgresFDW(char *serverName)
{
ForeignServer *server = GetForeignServerByName(serverName, false);
ForeignDataWrapper *fdw = GetForeignDataWrapper(server->fdwid);

if (strcmp(fdw->fdwname, "postgres_fdw") == 0)
{
return true;
}

return false;
}


/*
* ErrorIfOptionListHasNoTableName gets an option list (DefElem) and errors out
* if the list does not contain a table_name element.
*/
static void
ErrorIfOptionListHasNoTableName(List *optionList)
{
char *table_nameString = "table_name";
DefElem *option = NULL;
foreach_ptr(option, optionList)
{
char *optionName = option->defname;
if (strcmp(optionName, table_nameString) == 0)
{
return;
}
}

onderkalaci marked this conversation as resolved.
Show resolved Hide resolved
ereport(ERROR, (errmsg(
"table_name option must be provided when using postgres_fdw with Citus"),
errhint("Provide the option \"table_name\" with value target table's"
" name")));
}


/*
* NotifyUtilityHookConstraintDropped sets ConstraintDropped to true to tell us
* last command dropped a table constraint.
Expand Down
8 changes: 5 additions & 3 deletions src/backend/distributed/deparser/deparse_table_stmts.c
Expand Up @@ -30,7 +30,7 @@ DeparseAlterTableSchemaStmt(Node *node)
StringInfoData str = { 0 };
initStringInfo(&str);

Assert(stmt->objectType == OBJECT_TABLE);
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);

AppendAlterTableSchemaStmt(&str, stmt);
return str.data;
Expand All @@ -40,8 +40,10 @@ DeparseAlterTableSchemaStmt(Node *node)
static void
AppendAlterTableSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt)
{
Assert(stmt->objectType == OBJECT_TABLE);
appendStringInfo(buf, "ALTER TABLE ");
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);

bool isForeignTable = stmt->objectType == OBJECT_FOREIGN_TABLE;
appendStringInfo(buf, "ALTER %sTABLE ", isForeignTable ? "FOREIGN " : "");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very minor: I think it'd be easier to read ALTER %s TABLE ", isForeignTable ? "FOREIGN" : ". Is that breaking other tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no that wouldn't break anything but it just annoys me with two space characters together, between ALTER and TABLE.

if (stmt->missing_ok)
{
appendStringInfo(buf, "IF EXISTS ");
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/deparser/qualify_table_stmt.c
Expand Up @@ -29,7 +29,7 @@ void
QualifyAlterTableSchemaStmt(Node *node)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TABLE);
Assert(stmt->objectType == OBJECT_TABLE || stmt->objectType == OBJECT_FOREIGN_TABLE);

if (stmt->relation->schemaname == NULL)
{
Expand Down
24 changes: 15 additions & 9 deletions src/backend/distributed/metadata/metadata_sync.c
Expand Up @@ -679,21 +679,24 @@ MetadataCreateCommands(void)
/* after all tables are created, create the metadata */
foreach_ptr(cacheEntry, propagatedTableList)
{
Oid clusteredTableId = cacheEntry->relationId;
Oid relationId = cacheEntry->relationId;
onderkalaci marked this conversation as resolved.
Show resolved Hide resolved

/* add the table metadata command first*/
char *metadataCommand = DistributionCreateCommand(cacheEntry);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
metadataCommand);

/* add the truncate trigger command after the table became distributed */
char *truncateTriggerCreateCommand =
TruncateTriggerCreateCommand(cacheEntry->relationId);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
truncateTriggerCreateCommand);
if (!IsForeignTable(relationId))
{
/* add the truncate trigger command after the table became distributed */
char *truncateTriggerCreateCommand =
TruncateTriggerCreateCommand(cacheEntry->relationId);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
truncateTriggerCreateCommand);
}

/* add the pg_dist_shard{,placement} entries */
List *shardIntervalList = LoadShardIntervalList(clusteredTableId);
List *shardIntervalList = LoadShardIntervalList(relationId);
List *shardCreateCommandList = ShardListInsertCommand(shardIntervalList);

metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
Expand Down Expand Up @@ -844,8 +847,11 @@ GetDistributedTableDDLEvents(Oid relationId)
commandList = lappend(commandList, metadataCommand);

/* commands to create the truncate trigger of the table */
char *truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
commandList = lappend(commandList, truncateTriggerCreateCommand);
if (!IsForeignTable(relationId))
{
char *truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
commandList = lappend(commandList, truncateTriggerCreateCommand);
}

/* commands to insert pg_dist_shard & pg_dist_placement entries */
List *shardIntervalList = LoadShardIntervalList(relationId);
Expand Down
11 changes: 11 additions & 0 deletions src/backend/distributed/metadata/metadata_utility.c
Expand Up @@ -2162,3 +2162,14 @@ TableOwner(Oid relationId)

return GetUserNameFromId(userId, false);
}


/*
* IsForeignTable takes a relation id and returns true if it's a foreign table.
* Returns false otherwise.
*/
bool
IsForeignTable(Oid relationId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One another question I have is regarding this: https://www.postgresql.org/docs/14/sql-importforeignschema.html

How would IMPORT FOREIGN SCHEMA work with Citus when SET citus.use_citus_managed_tables TO ON;?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping on this, it seems like a common pattern

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we decided that we'll do this later on with a separate PR

{
return get_rel_relkind(relationId) == RELKIND_FOREIGN_TABLE;
}
16 changes: 0 additions & 16 deletions src/backend/distributed/operations/node_protocol.c
Expand Up @@ -576,22 +576,6 @@ GetPreLoadTableCreationCommands(Oid relationId,

PushOverrideEmptySearchPath(CurrentMemoryContext);

/* if foreign table, fetch extension and server definitions */
char tableType = get_rel_relkind(relationId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting for future reference. Our reason to remove these here has two aspects:

(a) we do not find dependency between the server and the extension as we expect all extensions to be already distributed and exists on the workers
(b) The server has already been created and exists on the workers

if (tableType == RELKIND_FOREIGN_TABLE)
{
char *extensionDef = pg_get_extensiondef_string(relationId);
char *serverDef = pg_get_serverdef_string(relationId);

if (extensionDef != NULL)
{
tableDDLEventList = lappend(tableDDLEventList,
makeTableDDLCommandString(extensionDef));
}
tableDDLEventList = lappend(tableDDLEventList,
makeTableDDLCommandString(serverDef));
}

/* fetch table schema and column option definitions */
char *tableSchemaDef = pg_get_tableschemadef_string(relationId,
includeSequenceDefaults,
Expand Down