Skip to content

Commit

Permalink
Issue (#118) Add support for push down of WHERE clause with Param nodes.
Browse files Browse the repository at this point in the history
  • Loading branch information
gabbasb committed Feb 8, 2017
1 parent c14ee1d commit 13b95af
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 4 deletions.
13 changes: 11 additions & 2 deletions deparse.c
Expand Up @@ -1328,8 +1328,17 @@ foreign_expr_walker(Node *node,
break;
case T_Param:
{
/* We are not supporting param push down*/
return false;
Param *p = (Param *) node;

/*
* Collation rule is same as for Consts and non-foreign Vars.
*/
collation = p->paramcollid;
if (collation == InvalidOid ||
collation == DEFAULT_COLLATION_OID)
state = FDW_COLLATE_NONE;
else
state = FDW_COLLATE_UNSAFE;
}
break;
case T_ArrayRef:
Expand Down
40 changes: 40 additions & 0 deletions expected/mysql_fdw.out
Expand Up @@ -5,7 +5,9 @@ CREATE USER MAPPING FOR postgres SERVER mysql_svr OPTIONS(username 'foo', passwo
CREATE FOREIGN TABLE department(department_id int, department_name text) SERVER mysql_svr OPTIONS(dbname 'testdb', table_name 'department');
CREATE FOREIGN TABLE employee(emp_id int, emp_name text, emp_dept_id int) SERVER mysql_svr OPTIONS(dbname 'testdb', table_name 'employee');
CREATE FOREIGN TABLE empdata(emp_id int, emp_dat bytea) SERVER mysql_svr OPTIONS(dbname 'testdb', table_name 'empdata');
CREATE FOREIGN TABLE numbers(a int, b varchar(255)) SERVER mysql_svr OPTIONS (dbname 'testdb', table_name 'numbers');
SELECT * FROM department LIMIT 10;
INFO: Successfully connected to MySQL database testdb at server 127.0.0.1 via TCP/IP with cipher <none> (server version: 5.7.17, protocol version: 10)
department_id | department_name
---------------+-----------------
(0 rows)
Expand All @@ -23,6 +25,15 @@ SELECT * FROM empdata LIMIT 10;
INSERT INTO department VALUES(generate_series(1,100), 'dept - ' || generate_series(1,100));
INSERT INTO employee VALUES(generate_series(1,100), 'emp - ' || generate_series(1,100), generate_series(1,100));
INSERT INTO empdata VALUES(1, decode ('01234567', 'hex'));
insert into numbers values(1, 'One');
insert into numbers values(2, 'Two');
insert into numbers values(3, 'Three');
insert into numbers values(4, 'Four');
insert into numbers values(5, 'Five');
insert into numbers values(6, 'Six');
insert into numbers values(7, 'Seven');
insert into numbers values(8, 'Eight');
insert into numbers values(9, 'Nine');
SELECT count(*) FROM department;
count
-------
Expand Down Expand Up @@ -317,9 +328,38 @@ SELECT * FROM employee WHERE emp_name NOT IN ('emp - 10') LIMIT 5;
5 | emp - 5 | 5
(5 rows)

create or replace function test_param_where() returns void as $$
DECLARE
n varchar;
BEGIN
FOR x IN 1..9 LOOP
select b into n from numbers where a=x;
raise notice 'Found number %', n;
end loop;
return;
END
$$ LANGUAGE plpgsql;
SELECT test_param_where();
NOTICE: Found number One
NOTICE: Found number Two
NOTICE: Found number Three
NOTICE: Found number Four
NOTICE: Found number Five
NOTICE: Found number Six
NOTICE: Found number Seven
NOTICE: Found number Eight
NOTICE: Found number Nine
test_param_where
------------------

(1 row)

DELETE FROM employee;
DELETE FROM department;
DELETE FROM empdata;
DELETE FROM numbers;
DROP FUNCTION test_param_where();
DROP FOREIGN TABLE numbers;
DROP FOREIGN TABLE department;
DROP FOREIGN TABLE employee;
DROP FOREIGN TABLE empdata;
Expand Down
180 changes: 180 additions & 0 deletions mysql_fdw.c
Expand Up @@ -144,6 +144,23 @@ static List *mysqlImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverO

static bool mysql_is_column_unique(Oid foreigntableid);

static void prepare_query_params(PlanState *node,
List *fdw_exprs,
int numParams,
FmgrInfo **param_flinfo,
List **param_exprs,
const char ***param_values,
Oid **param_types);

static void process_query_params(ExprContext *econtext,
FmgrInfo *param_flinfo,
List *param_exprs,
const char **param_values,
MYSQL_BIND **mysql_bind_buf,
Oid *param_types);

static void create_cursor(ForeignScanState *node);

void* mysql_dll_handle = NULL;
static int wait_timeout = WAIT_TIMEOUT;
static int interactive_timeout = INTERACTIVE_TIMEOUT;
Expand Down Expand Up @@ -364,6 +381,7 @@ mysqlBeginForeignScan(ForeignScanState *node, int eflags)
UserMapping *user;
ForeignTable *table;
char timeout[255];
int numParams;

/*
* We'll save private state in node->fdw_state.
Expand Down Expand Up @@ -397,6 +415,7 @@ mysqlBeginForeignScan(ForeignScanState *node, int eflags)
festate->query = strVal(list_nth(fsplan->fdw_private, 0));
festate->retrieved_attrs = list_nth(fsplan->fdw_private, 1);
festate->conn = conn;
festate->cursor_exists = false;

festate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
"mysql_fdw temporary data",
Expand Down Expand Up @@ -464,6 +483,25 @@ mysqlBeginForeignScan(ForeignScanState *node, int eflags)
}
}

/* Prepare for output conversion of parameters used in remote query. */
numParams = list_length(fsplan->fdw_exprs);
festate->numParams = numParams;
if (numParams > 0)
prepare_query_params((PlanState *) node,
fsplan->fdw_exprs,
numParams,
&festate->param_flinfo,
&festate->param_exprs,
&festate->param_values,
&festate->param_types);

/*
* If this is the first call after Begin or ReScan, we need to create the
* cursor on the remote side.
*/
if (!festate->cursor_exists)
create_cursor(node);

/* int column_count = mysql_num_fields(festate->meta); */

/* Set the statement as cursor type */
Expand Down Expand Up @@ -991,6 +1029,7 @@ mysqlGetForeignPlan(
Index scan_relid = baserel->relid;
List *fdw_private;
List *local_exprs = NULL;
List *remote_exprs = NULL;
List *params_list = NULL;
List *remote_conds = NIL;

Expand Down Expand Up @@ -1040,11 +1079,17 @@ mysqlGetForeignPlan(
continue;

if (list_member_ptr(fpinfo->remote_conds, rinfo))
{
remote_conds = lappend(remote_conds, rinfo);
remote_exprs = lappend(remote_exprs, rinfo->clause);
}
else if (list_member_ptr(fpinfo->local_conds, rinfo))
local_exprs = lappend(local_exprs, rinfo->clause);
else if (is_foreign_expr(root, baserel, rinfo->clause))
{
remote_conds = lappend(remote_conds, rinfo);
remote_exprs = lappend(remote_exprs, rinfo->clause);
}
else
local_exprs = lappend(local_exprs, rinfo->clause);
}
Expand Down Expand Up @@ -1986,3 +2031,138 @@ mysqlImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
return commands;
}
#endif

/*
* Prepare for processing of parameters used in remote query.
*/
static void
prepare_query_params(PlanState *node,
List *fdw_exprs,
int numParams,
FmgrInfo **param_flinfo,
List **param_exprs,
const char ***param_values,
Oid **param_types)
{
int i;
ListCell *lc;

Assert(numParams > 0);

/* Prepare for output conversion of parameters used in remote query. */
*param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);

*param_types = (Oid *) palloc0(sizeof(Oid) * numParams);

i = 0;
foreach(lc, fdw_exprs)
{
Node *param_expr = (Node *) lfirst(lc);
Oid typefnoid;
bool isvarlena;

*param_types[i] = exprType(param_expr);

getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
fmgr_info(typefnoid, &(*param_flinfo)[i]);
i++;
}

/*
* Prepare remote-parameter expressions for evaluation. (Note: in
* practice, we expect that all these expressions will be just Params, so
* we could possibly do something more efficient than using the full
* expression-eval machinery for this. But probably there would be little
* benefit, and it'd require postgres_fdw to know more than is desirable
* about Param evaluation.)
*/
*param_exprs = (List *) ExecInitExpr((Expr *) fdw_exprs, node);

/* Allocate buffer for text form of query parameters. */
*param_values = (const char **) palloc0(numParams * sizeof(char *));
}

/*
* Construct array of query parameter values in text format.
*/
static void
process_query_params(ExprContext *econtext,
FmgrInfo *param_flinfo,
List *param_exprs,
const char **param_values,
MYSQL_BIND **mysql_bind_buf,
Oid *param_types)
{
// int nestlevel;
int i;
ListCell *lc;

// nestlevel = set_transmission_modes();

i = 0;
foreach(lc, param_exprs)
{
ExprState *expr_state = (ExprState *) lfirst(lc);
Datum expr_value;
bool isNull;

/* Evaluate the parameter expression */
expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL);

mysql_bind_sql_var(param_types[i], i, expr_value, *mysql_bind_buf, &isNull);

/*
* Get string representation of each parameter value by invoking
* type-specific output function, unless the value is null.
*/
if (isNull)
param_values[i] = NULL;
else
param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
i++;
}

// reset_transmission_modes(nestlevel);
}

/*
* Create cursor for node's query with current parameter values.
*/
static void
create_cursor(ForeignScanState *node)
{
MySQLFdwExecState *festate = (MySQLFdwExecState *) node->fdw_state;
ExprContext *econtext = node->ss.ps.ps_ExprContext;
int numParams = festate->numParams;
const char **values = festate->param_values;
MYSQL_BIND *mysql_bind_buffer = NULL;

/*
* Construct array of query parameter values in text format. We do the
* conversions in the short-lived per-tuple context, so as not to cause a
* memory leak over repeated scans.
*/
if (numParams > 0)
{
MemoryContext oldcontext;

oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);

mysql_bind_buffer = (MYSQL_BIND*) palloc0(sizeof(MYSQL_BIND) * numParams);

process_query_params(econtext,
festate->param_flinfo,
festate->param_exprs,
values,
&mysql_bind_buffer,
festate->param_types);

_mysql_stmt_bind_param(festate->stmt, mysql_bind_buffer);

/* Mark the cursor as created, and show no tuples have been retrieved */
festate->cursor_exists = true;

MemoryContextSwitchTo(oldcontext);
}
}

7 changes: 7 additions & 0 deletions mysql_fdw.h
Expand Up @@ -96,6 +96,13 @@ typedef struct MySQLFdwExecState
Relation rel; /* relcache entry for the foreign table */
List *retrieved_attrs; /* list of target attribute numbers */

bool cursor_exists; /* have we created the cursor? */
int numParams; /* number of parameters passed to query */
FmgrInfo *param_flinfo; /* output conversion functions for them */
List *param_exprs; /* executable expressions for param values */
const char **param_values; /* textual values of query parameters */
Oid *param_types; /* type of query parameters */

int p_nums; /* number of parameters to transmit */
FmgrInfo *p_flinfo; /* output conversion functions for them */

Expand Down
3 changes: 1 addition & 2 deletions mysql_init.sh
@@ -1,8 +1,7 @@
#!/bin/sh
export MYSQL_PWD="bar"
mysql -h 127.0.0.1 -u foo -D testdb -e "DROP DATABASE IF EXISTS testdb"
mysql -h 127.0.0.1 -u foo -e "CREATE DATABASE testdb"
mysql -h 127.0.0.1 -u foo -D testdb -e "CREATE TABLE department(department_id int, department_name text, PRIMARY KEY (department_id))"
mysql -h 127.0.0.1 -u foo -D testdb -e "CREATE TABLE employee(emp_id int, emp_name text, emp_dept_id int, PRIMARY KEY (emp_id))"
mysql -h 127.0.0.1 -u foo -D testdb -e "CREATE TABLE empdata (emp_id int, emp_dat blob, PRIMARY KEY (emp_id))"
mysql -h 127.0.0.1 -u foo -D testdb -e "CREATE TABLE numbers (a int PRIMARY KEY, b varchar(255))"

29 changes: 29 additions & 0 deletions sql/mysql_fdw.sql
Expand Up @@ -6,6 +6,7 @@ CREATE USER MAPPING FOR postgres SERVER mysql_svr OPTIONS(username 'foo', passwo
CREATE FOREIGN TABLE department(department_id int, department_name text) SERVER mysql_svr OPTIONS(dbname 'testdb', table_name 'department');
CREATE FOREIGN TABLE employee(emp_id int, emp_name text, emp_dept_id int) SERVER mysql_svr OPTIONS(dbname 'testdb', table_name 'employee');
CREATE FOREIGN TABLE empdata(emp_id int, emp_dat bytea) SERVER mysql_svr OPTIONS(dbname 'testdb', table_name 'empdata');
CREATE FOREIGN TABLE numbers(a int, b varchar(255)) SERVER mysql_svr OPTIONS (dbname 'testdb', table_name 'numbers');

SELECT * FROM department LIMIT 10;
SELECT * FROM employee LIMIT 10;
Expand All @@ -15,6 +16,16 @@ INSERT INTO department VALUES(generate_series(1,100), 'dept - ' || generate_seri
INSERT INTO employee VALUES(generate_series(1,100), 'emp - ' || generate_series(1,100), generate_series(1,100));
INSERT INTO empdata VALUES(1, decode ('01234567', 'hex'));

insert into numbers values(1, 'One');
insert into numbers values(2, 'Two');
insert into numbers values(3, 'Three');
insert into numbers values(4, 'Four');
insert into numbers values(5, 'Five');
insert into numbers values(6, 'Six');
insert into numbers values(7, 'Seven');
insert into numbers values(8, 'Eight');
insert into numbers values(9, 'Nine');

SELECT count(*) FROM department;
SELECT count(*) FROM employee;
SELECT count(*) FROM empdata;
Expand Down Expand Up @@ -51,9 +62,27 @@ SELECT * FROM employee WHERE emp_id NOT IN (SELECT emp_id FROM employee WHERE em
SELECT * FROM employee WHERE emp_name NOT IN ('emp - 1', 'emp - 2') LIMIT 5;
SELECT * FROM employee WHERE emp_name NOT IN ('emp - 10') LIMIT 5;

create or replace function test_param_where() returns void as $$
DECLARE
n varchar;
BEGIN
FOR x IN 1..9 LOOP
select b into n from numbers where a=x;
raise notice 'Found number %', n;
end loop;
return;
END
$$ LANGUAGE plpgsql;

SELECT test_param_where();

DELETE FROM employee;
DELETE FROM department;
DELETE FROM empdata;
DELETE FROM numbers;

DROP FUNCTION test_param_where();
DROP FOREIGN TABLE numbers;

DROP FOREIGN TABLE department;
DROP FOREIGN TABLE employee;
Expand Down

0 comments on commit 13b95af

Please sign in to comment.