Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Add support for COPY and partition tuple routing
This makes it possible to:
- populate a Firebird foreign table using the COPY command
- use Firebird foreign tables as part of a partitioned table

This commit provides initial support for this functionality.
There are various corner-cases which still need to be handled.
  • Loading branch information
ibarwick committed Aug 15, 2020
1 parent 4194415 commit bc569d0
Show file tree
Hide file tree
Showing 9 changed files with 486 additions and 90 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG
Expand Up @@ -7,10 +7,10 @@ Revision history for firebird_fdw
- Add function firebird_fdw_server_options()
- Add function firebird_version()
- Apply CREATE SERVER's "port" option, if provided
- Add support for COPY and partition tuple routing (PostgreSQL 11 and later)
- Add support for generated columns (PostgreSQL 12 and later)
- Push down boolean tests in WHERE clauses where possible
- Prevent tables defined as queries from being set as "updatable = 'true'"
- Prevent crash on attempted COPY operation (PostgreSQL 11 and later)

1.1.0 2019-05-31
- Add utility function "firebird_fdw_close_connections()"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -40,7 +40,7 @@ Features
- Connection caching
- Supports triggers on foreign tables (PostgreSQL 9.4 and later)
- Supports `IMPORT FOREIGN SCHEMA` (PostgreSQL 9.5 and later)

- Supports `COPY` and partition tuple routing (PostgreSQL 11 and later)

Supported platforms
-------------------
Expand Down
262 changes: 188 additions & 74 deletions src/firebird_fdw.c
Expand Up @@ -227,13 +227,27 @@ static List *firebirdImportForeignSchema(ImportForeignSchemaStmt *stmt,
static void
firebirdBeginForeignInsert(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo);
static void
firebirdEndForeignInsert(EState *estate,
ResultRelInfo *resultRelInfo);
#endif

/* Internal functions */

static void exitHook(int code, Datum arg);
static FirebirdFdwState *getFdwState(Oid foreigntableid);

static FirebirdFdwModifyState *
create_foreign_modify(ResultRelInfo *resultRelInfo,
EState *estate,
CmdType operation,
Plan *subplan,
char *query,
List *target_attrs,
bool has_returning,
List *retrieved_attrs);


static void firebirdEstimateCosts(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid);

static const char **convert_prep_stmt_params(FirebirdFdwModifyState *fmstate,
Expand Down Expand Up @@ -726,7 +740,7 @@ firebird_fdw_handler(PG_FUNCTION_ARGS)
#if (PG_VERSION_NUM >= 110000)
/* Handle COPY */
fdwroutine->BeginForeignInsert = firebirdBeginForeignInsert;
fdwroutine->EndForeignInsert = NULL;
fdwroutine->EndForeignInsert = firebirdEndForeignInsert;
#endif

PG_RETURN_POINTER(fdwroutine);
Expand Down Expand Up @@ -1757,6 +1771,8 @@ firebirdIsForeignRelUpdatable(Relation rel)
table,
&table_options);

elog(DEBUG2, "exiting function %s", __func__);

return updatable ?
(1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
}
Expand Down Expand Up @@ -2024,83 +2040,48 @@ firebirdPlanForeignModify(PlannerInfo *root,


/**
* firebirdBeginForeignModify()
* create_foreign_modify()
*
* Preparation for executing a foreign table modification operation.
* Called during executor startup. One of ExecForeignInsert(),
* ExecForeignUpdate() or ExecForeignDelete() will subsequently be called
* for each tuple to be processed.
*
* Parameters:
* (ModifyTableState *) mtstate
* overall state of the ModifyTable plan node being executed;
* provides global data about the plan and execution state
*
* (ResultRelInfo) *resultRelInfo
* The ResultRelInfo struct describing the target foreign table.
* The ri_FdwState field of ResultRelInfo can be used to store
* the FDW's private state.
*
* (List *)fdw_private
* contains private data generated by firebirdPlanForeignModify(), if any.
*
* (int) subplan_index
* identifies which target of the ModifyTable plan node this is.
*
* (int) eflags
* contains flag bits describing the executor's operating mode for
* this plan node. See also comment about (eflags & EXEC_FLAG_EXPLAIN_ONLY)
* in function body.
*
* Returns:
* void
*/

static void
firebirdBeginForeignModify(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo,
List *fdw_private,
int subplan_index,
int eflags)
static FirebirdFdwModifyState *
create_foreign_modify(ResultRelInfo *resultRelInfo,
EState *estate,
CmdType operation,
Plan *subplan,
char *query,
List *target_attrs,
bool has_returning,
List *retrieved_attrs)
{
FirebirdFdwModifyState *fmstate;
EState *estate = mtstate->ps.state;

Relation rel = resultRelInfo->ri_RelationDesc;
RangeTblEntry *rte;
Relation rel = resultRelInfo->ri_RelationDesc;

#if (PG_VERSION_NUM >= 110000)
TupleDesc tupdesc = RelationGetDescr(rel);
#endif

CmdType operation = mtstate->operation;
RangeTblEntry *rte;
Oid userid;

ForeignTable *table;
ForeignServer *server;

UserMapping *user;

AttrNumber n_params;

Oid typefnoid;
bool isvarlena;

AttrNumber n_params;
ListCell *lc;

elog(DEBUG2, "entering function %s", __func__);

/*
* Do nothing in EXPLAIN (no ANALYZE) case.
* resultRelInfo->ri_FdwState stays NULL.
*/
if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
return;

/* Begin constructing FirebirdFdwModifyState. */
fmstate = (FirebirdFdwModifyState *) palloc0(sizeof(FirebirdFdwModifyState));

fmstate->rel = rel;

rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);

userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();

elog(DEBUG2, "userid resolved to: %i", (int)userid);
Expand All @@ -2116,21 +2097,13 @@ firebirdBeginForeignModify(ModifyTableState *mtstate,
(errcode(ERRCODE_FDW_UNABLE_TO_ESTABLISH_CONNECTION),
errmsg("unable to connect to foreign server")));


fmstate->conn->autocommit = true;
fmstate->conn->client_min_messages = DEBUG1;

/* Deconstruct fdw_private data. */
/* this is the list returned by firebirdPlanForeignModify() */
fmstate->query = strVal(list_nth(fdw_private,
FdwModifyPrivateUpdateSql));

fmstate->target_attrs = (List *) list_nth(fdw_private,
FdwModifyPrivateTargetAttnums);
fmstate->has_returning = intVal(list_nth(fdw_private,
FdwModifyPrivateHasReturning));
fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
FdwModifyPrivateRetrievedAttrs);
fmstate->query = query;
fmstate->target_attrs = target_attrs;
fmstate->has_returning = has_returning;
fmstate->retrieved_attrs = retrieved_attrs;


/* Create context for per-tuple temp workspace */
Expand Down Expand Up @@ -2183,7 +2156,6 @@ firebirdBeginForeignModify(ModifyTableState *mtstate,
halves of the 8-byte RDB$DB_KEY value so update and delete
operations can locate the correct row
*/
Plan *subplan = mtstate->mt_plans[subplan_index]->plan;

fmstate->db_keyAttno_CtidPart = ExecFindJunkAttributeInTlist(
subplan->targetlist,
Expand All @@ -2192,7 +2164,6 @@ firebirdBeginForeignModify(ModifyTableState *mtstate,
if (!AttributeNumberIsValid(fmstate->db_keyAttno_CtidPart))
{
elog(ERROR, "Resjunk column \"db_key_ctidpart\" not found");
return;
}

elog(DEBUG2, "Found resjunk db_key_ctidpart, attno %i", (int)fmstate->db_keyAttno_CtidPart);
Expand All @@ -2205,7 +2176,6 @@ firebirdBeginForeignModify(ModifyTableState *mtstate,
if (!AttributeNumberIsValid(fmstate->db_keyAttno_XmaxPart))
{
elog(ERROR, "Resjunk column \"db_key_xmaxpart\" not found");
return;
}

elog(DEBUG2, "Found resjunk \"db_key_xmaxpart\", attno %i", (int)fmstate->db_keyAttno_XmaxPart);
Expand All @@ -2219,6 +2189,82 @@ firebirdBeginForeignModify(ModifyTableState *mtstate,
elog(DEBUG2, " p_nums %i; n_params: %i", fmstate->p_nums, n_params);
Assert(fmstate->p_nums <= n_params);

return fmstate;
}


/**
* firebirdBeginForeignModify()
*
* Preparation for executing a foreign table modification operation.
* Called during executor startup. One of ExecForeignInsert(),
* ExecForeignUpdate() or ExecForeignDelete() will subsequently be called
* for each tuple to be processed.
*
* Parameters:
* (ModifyTableState *) mtstate
* overall state of the ModifyTable plan node being executed;
* provides global data about the plan and execution state
*
* (ResultRelInfo) *resultRelInfo
* The ResultRelInfo struct describing the target foreign table.
* The ri_FdwState field of ResultRelInfo can be used to store
* the FDW's private state.
*
* (List *)fdw_private
* contains private data generated by firebirdPlanForeignModify(), if any.
*
* (int) subplan_index
* identifies which target of the ModifyTable plan node this is.
*
* (int) eflags
* contains flag bits describing the executor's operating mode for
* this plan node. See also comment about (eflags & EXEC_FLAG_EXPLAIN_ONLY)
* in function body.
*
* Returns:
* void
*/

static void
firebirdBeginForeignModify(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo,
List *fdw_private,
int subplan_index,
int eflags)
{
FirebirdFdwModifyState *fmstate;
EState *estate = mtstate->ps.state;

CmdType operation = mtstate->operation;

elog(DEBUG2, "entering function %s", __func__);

/*
* Do nothing in EXPLAIN (no ANALYZE) case.
* resultRelInfo->ri_FdwState stays NULL.
*/
if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
return;

fmstate = create_foreign_modify(resultRelInfo,
estate,
operation,
mtstate->mt_plans[subplan_index]->plan,
strVal(list_nth(fdw_private,
FdwModifyPrivateUpdateSql)),
(List *) list_nth(fdw_private,
FdwModifyPrivateTargetAttnums),
intVal(list_nth(fdw_private,
FdwModifyPrivateHasReturning)),
(List *) list_nth(fdw_private,
FdwModifyPrivateRetrievedAttrs));


/* Deconstruct fdw_private data. */
/* this is the list returned by firebirdPlanForeignModify() */


resultRelInfo->ri_FdwState = fmstate;
}

Expand Down Expand Up @@ -2587,21 +2633,89 @@ firebirdExplainForeignModify(ModifyTableState *mtstate,
#endif

#if (PG_VERSION_NUM >= 110000)
/*
* firebidBeginForeignInsert
/**
* firebidBeginForeignInsert()
*
* Begin an insert operation on a foreign table.
* This is not yet supported, so raise an error.
* Initialize the FDW state for COPY to a foreign table.
*
* Note we do not yet support the case where the table is the partition
* chosen for tuple routing.
*/

static void
firebirdBeginForeignInsert(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo)
{
ereport(ERROR,
(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
errmsg("COPY and foreign partition routing not supported")));
FirebirdFdwModifyState *fmstate;

Index resultRelation = resultRelInfo->ri_RangeTableIndex;
EState *estate = mtstate->ps.state;
Relation rel = resultRelInfo->ri_RelationDesc;

TupleDesc tupdesc = RelationGetDescr(rel);
RangeTblEntry *rte;

int attnum;

List *targetAttrs = NIL;
List *retrieved_attrs = NIL;

StringInfoData sql;

FirebirdFdwState *fdw_state = getFdwState(RelationGetRelid(rel));

elog(DEBUG2, "%s: begin foreign table insert on %s",
__func__,
RelationGetRelationName(rel));

rte = exec_rt_fetch(resultRelation, estate);

/* Transmit all columns that are defined in the foreign table. */
for (attnum = 1; attnum <= tupdesc->natts; attnum++)
{
Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);

if (!attr->attisdropped)
{
elog(DEBUG3, "attribute is: %s", attr->attname.data);
targetAttrs = lappend_int(targetAttrs, attnum);
}
}

initStringInfo(&sql);

buildInsertSql(&sql,
rte,
fdw_state,
resultRelation,
rel,
targetAttrs,
resultRelInfo->ri_returningList,
&retrieved_attrs);

elog(DEBUG2, "%s", sql.data);

fmstate = create_foreign_modify(resultRelInfo,
estate,
mtstate->operation,
NULL,
sql.data,
targetAttrs,
retrieved_attrs != NIL,
retrieved_attrs);

resultRelInfo->ri_FdwState = fmstate;
}

static void
firebirdEndForeignInsert(EState *estate,
ResultRelInfo *resultRelInfo)
{
FirebirdFdwModifyState *fm_state = (FirebirdFdwModifyState *)resultRelInfo->ri_FdwState;

MemoryContextDelete(fm_state->temp_cxt);
}

#endif


Expand Down

0 comments on commit bc569d0

Please sign in to comment.