Skip to content

Commit

Permalink
Propagate CREATE FOREIGN DATA WRAPPEER commands
Browse files Browse the repository at this point in the history
  • Loading branch information
agedemenli committed Nov 17, 2021
1 parent 255bd2f commit 943f069
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 25 deletions.
16 changes: 14 additions & 2 deletions src/backend/distributed/commands/distribute_object_ops.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ static DistributeObjectOps Any_CreatePolicy = {
.postprocess = NULL,
.address = NULL,
};
static DistributeObjectOps Any_CreateFdw = {
.deparse = NULL,
.qualify = NULL,
.preprocess = PreprocessCreateFdwStmt,
.postprocess = PostprocessCreateFdwStmt,
.address = CreateFdwStmtObjectAddress,
};
static DistributeObjectOps Any_CreateForeignServer = {
.deparse = NULL,
.qualify = NULL,
Expand Down Expand Up @@ -851,16 +858,21 @@ GetDistributeObjectOps(Node *node)
return &Any_CreateFunction;
}

case T_CreatePolicyStmt:
case T_CreateFdwStmt:
{
return &Any_CreatePolicy;
return &Any_CreateFdw;
}

case T_CreateForeignServerStmt:
{
return &Any_CreateForeignServer;
}

case T_CreatePolicyStmt:
{
return &Any_CreatePolicy;
}

case T_CreateStatsStmt:
{
return &Any_CreateStatistics;
Expand Down
61 changes: 61 additions & 0 deletions src/backend/distributed/commands/foreign_data_wrapper.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*-------------------------------------------------------------------------
*
* foreign_data_wrapper.c
* Commands for FOREIGN DATA WRAPPER statements.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/

#include "postgres.h"

#include "catalog/pg_foreign_data_wrapper.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/commands.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h"
#include "distributed/worker_transaction.h"
#include "foreign/foreign.h"
#include "nodes/primnodes.h"

List *
PreprocessCreateFdwStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
EnsureCoordinator();

/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) queryString,
ENABLE_DDL_PROPAGATION);

return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}


List *
PostprocessCreateFdwStmt(Node *node, const char *queryString)
{
ObjectAddress typeAddress = GetObjectAddressFromParseTree(node, false);
EnsureDependenciesExistOnAllNodes(&typeAddress);

MarkObjectDistributed(&typeAddress);

return NIL;
}


ObjectAddress
CreateFdwStmtObjectAddress(Node *node, bool missing_ok)
{
CreateFdwStmt *stmt = castNode(CreateFdwStmt, node);
ForeignDataWrapper *fdw = GetForeignDataWrapperByName(stmt->fdwname, false);
Oid fdwOid = fdw->fdwid;
ObjectAddress address = { 0 };
ObjectAddressSet(address, ForeignDataWrapperRelationId, fdwOid);

return address;
}
1 change: 0 additions & 1 deletion src/backend/distributed/commands/foreign_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h"
#include "distributed/worker_transaction.h"
#include "distributed/worker_create_or_replace.h"
#include "foreign/foreign.h"
#include "nodes/primnodes.h"

Expand Down
40 changes: 20 additions & 20 deletions src/backend/distributed/deparser/deparse_foreign_server_stmts.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ static void AppendBehavior(StringInfo buf, DropStmt *stmt);
char *
DeparseDropForeignServerStmt(Node *node)
{
DropStmt *stmt = castNode(DropStmt, node);
DropStmt *stmt = castNode(DropStmt, node);

Assert(stmt->removeType == OBJECT_FOREIGN_SERVER);
Assert(stmt->removeType == OBJECT_FOREIGN_SERVER);

StringInfoData str;
initStringInfo(&str);
Expand All @@ -40,16 +40,16 @@ DeparseDropForeignServerStmt(Node *node)
static void
AppendDropForeignServerStmt(StringInfo buf, DropStmt *stmt)
{
appendStringInfoString(buf, "DROP SERVER ");
appendStringInfoString(buf, "DROP SERVER ");

if (stmt->missing_ok)
{
appendStringInfoString(buf, "IF EXISTS ");
}
if (stmt->missing_ok)
{
appendStringInfoString(buf, "IF EXISTS ");
}

AppendServerNames(buf, stmt);
AppendServerNames(buf, stmt);

AppendBehavior(buf, stmt);
AppendBehavior(buf, stmt);
}


Expand All @@ -59,26 +59,26 @@ AppendServerNames(StringInfo buf, DropStmt *stmt)
Value *serverValue = NULL;
foreach_ptr(serverValue, stmt->objects)
{
char *serverString = strVal(serverValue);
appendStringInfo(buf, "%s", serverString);
char *serverString = strVal(serverValue);
appendStringInfo(buf, "%s", serverString);

if (serverValue != llast(stmt->objects))
{
appendStringInfoString(buf, ", ");
}
}
}
}


static void
AppendBehavior(StringInfo buf, DropStmt *stmt)
{
if (stmt->behavior == DROP_CASCADE)
{
appendStringInfoString(buf, " CASCADE");
}
else if (stmt->behavior == DROP_RESTRICT)
{
appendStringInfoString(buf, " RESTRICT");
}
if (stmt->behavior == DROP_CASCADE)
{
appendStringInfoString(buf, " CASCADE");
}
else if (stmt->behavior == DROP_RESTRICT)
{
appendStringInfoString(buf, " RESTRICT");
}
}
9 changes: 7 additions & 2 deletions src/include/distributed/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,19 @@ extern Oid GetReferencedTableId(Oid foreignKeyId);
extern Oid GetReferencingTableId(Oid foreignKeyId);
extern bool RelationInvolvedInAnyNonInheritedForeignKeys(Oid relationId);

/* foreign_data_wrapper.c - forward declarations */
extern List * PreprocessCreateFdwStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessCreateFdwStmt(Node *node, const char *queryString);
extern ObjectAddress CreateFdwStmtObjectAddress(Node *node, bool missing_ok);

/* foreign_server.c - forward declarations */
extern List * PreprocessCreateForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessDropForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
ProcessUtilityContext
processUtilityContext);
extern List * PostprocessCreateForeignServerStmt(Node *node, const char *queryString);
extern ObjectAddress CreateForeignServerStmtObjectAddress(Node *node, bool missing_ok);

Expand Down

0 comments on commit 943f069

Please sign in to comment.