Skip to content

Commit

Permalink
Propagate DROP FOREIGN DATA WRAPPER commands
Browse files Browse the repository at this point in the history
  • Loading branch information
agedemenli committed Nov 17, 2021
1 parent 943f069 commit 5a408f1
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 0 deletions.
12 changes: 12 additions & 0 deletions src/backend/distributed/commands/distribute_object_ops.c
Expand Up @@ -274,6 +274,13 @@ static DistributeObjectOps Extension_Drop = {
.qualify = NULL,
.preprocess = PreprocessDropExtensionStmt,
.postprocess = NULL,
.address = NULL,PreprocessDropFdwStmt
};
static DistributeObjectOps Fdw_Drop = {
.deparse = DeparseDropFdwStmt,
.qualify = NULL,
.preprocess = PreprocessDropFdwStmt,
.postprocess = NULL,
.address = NULL,
};
static DistributeObjectOps ForeignServer_Drop = {
Expand Down Expand Up @@ -925,6 +932,11 @@ GetDistributeObjectOps(Node *node)
return &Extension_Drop;
}

case OBJECT_FDW:
{
return &Fdw_Drop;
}

case OBJECT_FUNCTION:
{
return &Function_Drop;
Expand Down
64 changes: 64 additions & 0 deletions src/backend/distributed/commands/foreign_data_wrapper.c
Expand Up @@ -36,6 +36,70 @@ PreprocessCreateFdwStmt(Node *node, const char *queryString,
}


List *
PreprocessDropFdwStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
DropStmt *stmt = castNode(DropStmt, node);
Assert(stmt->removeType == OBJECT_FDW);

List *allFdwNamesToDrop = stmt->objects;
List *distributedFdwAddresses = NIL;
List *distributedFdwNames = NIL;
Value *fdwValue = NULL;
foreach_ptr(fdwValue, stmt->objects)
{
char *fdwString = strVal(fdwValue);
ForeignDataWrapper *fdw = GetForeignDataWrapperByName(fdwString, false);

ObjectAddress address = { 0 };
ObjectAddressSet(address, ForeignDataWrapperRelationId, fdw->fdwid);

/* filter distributed fdws */
if (IsObjectDistributed(&address))
{
distributedFdwAddresses = lappend(distributedFdwAddresses, &address);
distributedFdwNames = lappend(distributedFdwNames, fdwValue);
}
}

if (list_length(distributedFdwNames) <= 0)
{
return NIL;
}

EnsureCoordinator();

/* unmark each distributed fdw */
ObjectAddress *address = NULL;
foreach_ptr(address, distributedFdwAddresses)
{
UnmarkObjectDistributed(address);
}

/*
* Temporary swap the lists of objects to delete with the distributed
* objects and deparse to an sql statement for the workers.
* Then switch back to allFdwNamesToDrop to drop all specified
* servers in coordinator after PreprocessDropForeignServerStmt completes
* its execution.
*/
stmt->objects = distributedFdwNames;
const char *deparsedStmt = DeparseTreeNode((Node *) stmt);
stmt->objects = allFdwNamesToDrop;

/*
* To prevent recursive propagation in mx architecture, we disable ddl
* propagation before sending the command to workers.
*/
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) deparsedStmt,
ENABLE_DDL_PROPAGATION);

return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}


List *
PostprocessCreateFdwStmt(Node *node, const char *queryString)
{
Expand Down
84 changes: 84 additions & 0 deletions src/backend/distributed/deparser/deparse_fdw_stmts.c
@@ -0,0 +1,84 @@
/*-------------------------------------------------------------------------
*
* deparse_fdw_stmts.c
* All routines to deparse foreign data wrapper statements.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"

#include "distributed/citus_ruleutils.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/relay_utility.h"
#include "lib/stringinfo.h"
#include "nodes/nodes.h"
#include "utils/builtins.h"

static void AppendDropFdwStmt(StringInfo buf, DropStmt *stmt);
static void AppendFdwNames(StringInfo buf, DropStmt *stmt);
static void AppendBehavior(StringInfo buf, DropStmt *stmt);

char *
DeparseDropFdwStmt(Node *node)
{
DropStmt *stmt = castNode(DropStmt, node);

Assert(stmt->removeType == OBJECT_FDW);

StringInfoData str;
initStringInfo(&str);

AppendDropFdwStmt(&str, stmt);

return str.data;
}


static void
AppendDropFdwStmt(StringInfo buf, DropStmt *stmt)
{
appendStringInfoString(buf, "DROP FOREIGN DATA WRAPPER ");

if (stmt->missing_ok)
{
appendStringInfoString(buf, "IF EXISTS ");
}

AppendFdwNames(buf, stmt);

AppendBehavior(buf, stmt);
}


static void
AppendFdwNames(StringInfo buf, DropStmt *stmt)
{
Value *fdwValue = NULL;
foreach_ptr(fdwValue, stmt->objects)
{
char *fdwString = strVal(fdwValue);
appendStringInfo(buf, "%s", fdwString);

if (fdwValue != llast(stmt->objects))
{
appendStringInfoString(buf, ", ");
}
}
}


static void
AppendBehavior(StringInfo buf, DropStmt *stmt)
{
if (stmt->behavior == DROP_CASCADE)
{
appendStringInfoString(buf, " CASCADE");
}
else if (stmt->behavior == DROP_RESTRICT)
{
appendStringInfoString(buf, " RESTRICT");
}
}
2 changes: 2 additions & 0 deletions src/include/distributed/commands.h
Expand Up @@ -226,6 +226,8 @@ extern bool RelationInvolvedInAnyNonInheritedForeignKeys(Oid relationId);
/* foreign_data_wrapper.c - forward declarations */
extern List * PreprocessCreateFdwStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessDropFdwStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessCreateFdwStmt(Node *node, const char *queryString);
extern ObjectAddress CreateFdwStmtObjectAddress(Node *node, bool missing_ok);

Expand Down
3 changes: 3 additions & 0 deletions src/include/distributed/deparser.h
Expand Up @@ -46,6 +46,9 @@ extern void QualifyRenameCollationStmt(Node *stmt);
extern void QualifyAlterCollationSchemaStmt(Node *stmt);
extern void QualifyAlterCollationOwnerStmt(Node *stmt);

/* forward declarations for deparse_fdw_stmts.c */
extern char * DeparseDropFdwStmt(Node *node);

/* forward declarations for deparse_foreign_server_stmts.c */
extern char * DeparseDropForeignServerStmt(Node *node);

Expand Down

0 comments on commit 5a408f1

Please sign in to comment.