Skip to content
Browse files

First working version of the MySQL FDW.

  • Loading branch information...
1 parent 688c0fd commit e30604f79cba45b97453866318bf4fe9e1dbea78 @dpage dpage committed
Showing with 180 additions and 181 deletions.
  1. +69 −14 README
  2. +111 −167 mysql_fdw.c
View
83 README
@@ -13,22 +13,25 @@ warned!
Building
--------
-FIXME: Library requirements etc.
+Install MySQL, or just the C client library, and Once that's done, the
+extension can be built with:
-Once that's done, the extension can be built with:
+PATH=/usr/local/pgsql/bin/:/usr/local/mysql/bin:$PATH make USE_PGXS=1
+sudo PATH=/usr/local/pgsql/bin/:/usr/local/mysql/bin:$PATH make USE_PGXS=1 install
-PATH=/usr/local/pgsql91/bin/:$PATH make USE_PGXS=1 make
-sudo PATH=/usr/local/pgsql91/bin/:$PATH make USE_PGXS=1 install
+(assuming you have PostgreSQL 9.1 in /usr/local/pgsql and MySQL in
+/usr/local/mysql).
-(assuming you have PostgreSQL 9.1 in /usr/local/pgsql91).
-
-I've tested on Mac OS X 10.6 only, but other *nix's should also work.
+I've tested on Mac OS X 10.7 only, but other *nix's should also work.
I haven't tested on Windows, but the code should be good on MinGW.
Limitations
-----------
-FIXME
+- No attempt is made to pushdown quals to MySQL.
+
+- No attempt is made to estimate the number of rows etc. when planning
+ the remote query.
Usage
-----
@@ -39,27 +42,79 @@ address: The address or hostname of the MySQL server.
Default: 127.0.0.1
port: The port number on which the MySQL server is listening.
- Default: 6379
+ Default: 3306
The following parameter can be set on a MySQL foreign table:
-database: The numeric ID of the MySQL database to query.
- Default: 0
+database: The name of the MySQL database to query.
+ Default: NULL
+
+query: An SQL query to define the data set on the MySQL server.
+
+table: The name of a table (quoted and qualified as required)
+ on the MySQL table.
+
+Note that the query and table paramters are mutually exclusive. Using
+query can provide either a simple way to push down quals (which of
+course is fixed at definition time), or to base remote tables on
+more complex SQL queries.
The following parameter can be set on a user mapping for a MySQL
foreign server:
username: The username to use when connecting to MySQL
+ Default <none>
password: The password to authenticate to the MySQL server with.
Default: <none>
-FIXME: Add additional options.
-
Example
-------
-FIXME
+-- Create the require functions for the FDW.
+CREATE FUNCTION mysql_fdw_handler()
+ RETURNS fdw_handler
+ AS '$libdir/mysql_fdw'
+ LANGUAGE C STRICT;
+
+CREATE FUNCTION mysql_fdw_validator(text[], oid)
+ RETURNS void
+ AS '$libdir/mysql_fdw'
+ LANGUAGE C STRICT;
+
+-- Create the data wrapper or "transport".
+CREATE FOREIGN DATA WRAPPER mysql_fdw
+ HANDLER mysql_fdw_handler
+ VALIDATOR mysql_fdw_validator;
+
+-- Create the foreign server, a pointer to the MySQL server.
+CREATE SERVER mysql_svr
+ FOREIGN DATA WRAPPER mysql_fdw
+ OPTIONS (address '127.0.0.1', port '3306');
+
+-- Create one or more foreign tables on the MySQL server. The first of
+-- these maps to a remote table, whilst the second uses an SQL query.
+CREATE FOREIGN TABLE employees (
+ id integer,
+ name text,
+ address text)
+ SERVER mysql_svr
+ OPTIONS (table 'hr.employees');
+
+CREATE FOREIGN TABLE overtime_2010 (
+ id integer,
+ employee_id integer,
+ ot_date date,
+ ot_hours integer))
+ SERVER mysql_svr
+ OPTIONS (query 'SELECT * FROM overtime WHERE year = 2010;');
+
+-- Create a user mapping to tell the FDW the username/password to
+-- use to connect to MySQL, for PUBLIC. This could be done on a per-
+-- role basis.
+CREATE USER MAPPING FOR PUBLIC
+ SERVER mysql
+ OPTIONS (username 'dpage', password '');
--
Dave Page
View
278 mysql_fdw.c
@@ -14,9 +14,6 @@
*-------------------------------------------------------------------------
*/
-/* Debug mode */
-/* #define DEBUG */
-
#include "postgres.h"
#include <stdio.h>
@@ -50,8 +47,6 @@
PG_MODULE_MAGIC;
-#define PROCID_TEXTEQ 67
-
/*
* Describes the valid options for objects that use this wrapper.
*/
@@ -74,7 +69,7 @@ static struct MySQLFdwOption valid_options[] =
{ "username", UserMappingRelationId },
{ "password", UserMappingRelationId },
{ "database", ForeignTableRelationId },
- { "schema", ForeignTableRelationId },
+ { "query", ForeignTableRelationId },
{ "table", ForeignTableRelationId },
/* Sentinel */
@@ -88,14 +83,14 @@ static struct MySQLFdwOption valid_options[] =
typedef struct MySQLFdwExecutionState
{
MYSQL *conn;
+ MYSQL_RES *result;
AttInMetadata *attinmeta;
- long long row;
char *address;
int port;
char *username;
char *password;
char *database;
- char *schema;
+ char *query;
char *table;
} MySQLFdwExecutionState;
@@ -122,8 +117,7 @@ static void mysqlEndForeignScan(ForeignScanState *node);
* Helper functions
*/
static bool mysqlIsValidOption(const char *option, Oid context);
-static void mysqlGetOptions(Oid foreigntableid, char **address, int *port, char **username, char **password, char **database, char **schema, char **table);
-static void mysqlGetQual(Node *node, TupleDesc tupdesc, char **key, char **value, bool *pushdown);
+static void mysqlGetOptions(Oid foreigntableid, char **address, int *port, char **username, char **password, char **database, char **query, char **table);
/*
* Foreign-data wrapper handler function: return a struct with pointers
@@ -134,10 +128,6 @@ mysql_fdw_handler(PG_FUNCTION_ARGS)
{
FdwRoutine *fdwroutine = makeNode(FdwRoutine);
-#ifdef DEBUG
- elog(NOTICE, "mysql_fdw_handler");
-#endif
-
fdwroutine->PlanForeignScan = mysqlPlanForeignScan;
fdwroutine->ExplainForeignScan = mysqlExplainForeignScan;
fdwroutine->BeginForeignScan = mysqlBeginForeignScan;
@@ -164,14 +154,10 @@ mysql_fdw_validator(PG_FUNCTION_ARGS)
char *svr_username = NULL;
char *svr_password = NULL;
char *svr_database = NULL;
- char *svr_schema = NULL;
+ char *svr_query = NULL;
char *svr_table = NULL;
ListCell *cell;
-#ifdef DEBUG
- elog(NOTICE, "mysql_fdw_validator");
-#endif
-
/*
* Check that only options supported by mysql_fdw,
* and allowed for the current object type, are given.
@@ -251,18 +237,30 @@ mysql_fdw_validator(PG_FUNCTION_ARGS)
svr_database = defGetString(def);
}
- else if (strcmp(def->defname, "schema") == 0)
+ else if (strcmp(def->defname, "query") == 0)
{
- if (svr_schema)
+ if (svr_table)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting options: query cannot be used with table")
+ ));
+
+ if (svr_query)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options: schema (%s)", defGetString(def))
+ errmsg("conflicting or redundant options: query (%s)", defGetString(def))
));
- svr_schema = defGetString(def);
+ svr_query = defGetString(def);
}
else if (strcmp(def->defname, "table") == 0)
{
+ if (svr_query)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting options: table cannot be used with query")
+ ));
+
if (svr_table)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -286,10 +284,6 @@ mysqlIsValidOption(const char *option, Oid context)
{
struct MySQLFdwOption *opt;
-#ifdef DEBUG
- elog(NOTICE, "mysqlIsValidOption");
-#endif
-
for (opt = valid_options; opt->optname; opt++)
{
if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
@@ -302,7 +296,7 @@ mysqlIsValidOption(const char *option, Oid context)
* Fetch the options for a mysql_fdw foreign table.
*/
static void
-mysqlGetOptions(Oid foreigntableid, char **address, int *port, char **username, char **password, char **database, char **schema, char **table)
+mysqlGetOptions(Oid foreigntableid, char **address, int *port, char **username, char **password, char **database, char **query, char **table)
{
ForeignTable *f_table;
ForeignServer *f_server;
@@ -310,10 +304,6 @@ mysqlGetOptions(Oid foreigntableid, char **address, int *port, char **username,
List *options;
ListCell *lc;
-#ifdef DEBUG
- elog(NOTICE, "mysqlGetOptions");
-#endif
-
/*
* Extract options from FDW objects.
*/
@@ -346,8 +336,8 @@ mysqlGetOptions(Oid foreigntableid, char **address, int *port, char **username,
if (strcmp(def->defname, "database") == 0)
*database = defGetString(def);
- if (strcmp(def->defname, "schema") == 0)
- *schema = defGetString(def);
+ if (strcmp(def->defname, "query") == 0)
+ *query = defGetString(def);
if (strcmp(def->defname, "table") == 0)
*table = defGetString(def);
@@ -359,6 +349,13 @@ mysqlGetOptions(Oid foreigntableid, char **address, int *port, char **username,
if (!*port)
*port = 3306;
+
+ /* Check we have the options we need to proceed */
+ if (!*table && !*query)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("either a table or a query must be specified")
+ ));
}
/*
@@ -374,19 +371,11 @@ mysqlPlanForeignScan(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel)
char *svr_username = NULL;
char *svr_password = NULL;
char *svr_database = NULL;
- char *svr_schema = NULL;
+ char *svr_query = NULL;
char *svr_table = NULL;
-#ifdef DEBUG
- elog(NOTICE, "mysqlPlanForeignScan");
-#endif
-
/* Fetch options */
- mysqlGetOptions(foreigntableid, &svr_address, &svr_port, &svr_username, &svr_password, &svr_database, &svr_schema, &svr_table);
-
- /* Connect to the database */
-
- /* Execute a query to get the database size */
+ mysqlGetOptions(foreigntableid, &svr_address, &svr_port, &svr_username, &svr_password, &svr_database, &svr_query, &svr_table);
/* Construct FdwPlan with cost estimates. */
fdwplan = makeNode(FdwPlan);
@@ -397,6 +386,11 @@ mysqlPlanForeignScan(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel)
else
fdwplan->startup_cost = 25;
+ /*
+ * TODO: Currently we assume 10 rows. We need to connect to the remote database and
+ * execute an explain or count to get an idea of the number of rows (and maybe other
+ * costs), without it costing a fortune to do so - Heisenberg's principle people!
+ */
baserel->rows = 10;
fdwplan->total_cost = 10 + fdwplan->startup_cost;
fdwplan->fdw_private = NIL; /* not used */
@@ -411,19 +405,41 @@ mysqlPlanForeignScan(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel)
static void
mysqlExplainForeignScan(ForeignScanState *node, ExplainState *es)
{
+ char *query;
+ MYSQL_RES *result;
+ MYSQL_ROW *row;
+ long rows = 0;
+
MySQLFdwExecutionState *festate = (MySQLFdwExecutionState *) node->fdw_state;
-#ifdef DEBUG
- elog(NOTICE, "mysqlExplainForeignScan");
-#endif
+ /*
+ * MySQL seems to have some pretty unhelpful EXPLAIN output, which only
+ * gives a row estimate for each relation in the statement. We'll use the
+ * sum of the rows as our cost estimate.
+ */
+ query = (char *) palloc(strlen(festate->query) + 9);
+ snprintf(query, strlen(festate->query) + 9, "EXPLAIN %s", festate->query);
+
+ if (mysql_query(festate->conn, query) != 0)
+ {
+ char *err = pstrdup(mysql_error(festate->conn));
+ mysql_close(festate->conn);
+ ereport(ERROR,
+ (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
+ errmsg("failed to execute the MySQL query: %s", err)
+ ));
+ }
+
+ result = mysql_store_result(festate->conn);
- /* Execute a query to get the database size */
+ while ((row = mysql_fetch_row(result)))
+ rows += atol(row[8]);
+ mysql_free_result(result);
+
/* Suppress file size if we're not showing cost details */
if (es->costs)
- {
- ExplainPropertyLong("Foreign MySQL Database Size", 10, es);
- }
+ ExplainPropertyLong("Foreign MySQL Data Rows", rows, es);
}
/*
@@ -438,21 +454,14 @@ mysqlBeginForeignScan(ForeignScanState *node, int eflags)
char *svr_username = NULL;
char *svr_password = NULL;
char *svr_database = NULL;
- char *svr_schema = NULL;
+ char *svr_query = NULL;
char *svr_table = NULL;
MYSQL *conn;
- char *qual_key = NULL;
- char *qual_value = NULL;
- bool pushdown = false;
MySQLFdwExecutionState *festate;
- char query[1024];
-
-#ifdef DEBUG
- elog(NOTICE, "BeginForeignScan");
-#endif
+ char *query;
/* Fetch options */
- mysqlGetOptions(RelationGetRelid(node->ss.ss_currentRelation), &svr_address, &svr_port, &svr_username, &svr_password, &svr_database, &svr_schema, &svr_table);
+ mysqlGetOptions(RelationGetRelid(node->ss.ss_currentRelation), &svr_address, &svr_port, &svr_username, &svr_password, &svr_database, &svr_query, &svr_table);
/* Connect to the server */
conn = mysql_init(NULL);
@@ -468,44 +477,43 @@ mysqlBeginForeignScan(ForeignScanState *node, int eflags)
errmsg("failed to connect to MySQL: %s", mysql_error(conn))
));
- /* See if we've got a qual we can push down */
- if (node->ss.ps.plan->qual)
+ /* Build the query */
+ if (svr_query)
+ query = svr_query;
+ else
{
- ListCell *lc;
-
- foreach (lc, node->ss.ps.qual)
- {
- /* Only the first qual can be pushed down to MySQL */
- ExprState *state = lfirst(lc);
+ size_t len = strlen(svr_table) + 15;
- mysqlGetQual((Node *) state->expr, node->ss.ss_currentRelation->rd_att, &qual_key, &qual_value, &pushdown);
- if (pushdown)
- break;
- }
+ query = (char *)palloc(len);
+ snprintf(query, len, "SELECT * FROM %s", svr_table);
}
- /* Stash away the state info we have already */
- festate = (MySQLFdwExecutionState *) palloc(sizeof(MySQLFdwExecutionState));
- node->fdw_state = (void *) festate;
- festate->conn = conn;
- festate->row = 0;
- festate->address = svr_address;
- festate->port = svr_port;
+ /* Stash away the state info we have already */
+ festate = (MySQLFdwExecutionState *) palloc(sizeof(MySQLFdwExecutionState));
+ node->fdw_state = (void *) festate;
+ festate->conn = conn;
+ festate->address = svr_address;
+ festate->port = svr_port;
+ festate->query = query;
- /* OK, we connected. If this is an EXPLAIN, bail out now */
- if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
- return;
+ /* OK, we connected. If this is an EXPLAIN, bail out now */
+ if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
+ return;
/* Execute the query */
- snprintf(query, sizeof(query), "SELECT * FROM `%s`.`%s`", svr_schema, svr_table);
if (mysql_query(conn, query) != 0)
{
+ char *err = pstrdup(mysql_error(conn));
+ mysql_close(conn);
ereport(ERROR,
(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
- errmsg("failed to execute the MySQL query: %s", mysql_error(conn))
+ errmsg("failed to execute the MySQL query: %s", err)
));
}
+ /* Guess the query succeeded then */
+ festate->result = mysql_store_result(conn);
+
/* Store the additional state info */
festate->attinmeta = TupleDescGetAttInMetadata(node->ss.ss_currentRelation->rd_att);
}
@@ -518,41 +526,30 @@ mysqlBeginForeignScan(ForeignScanState *node, int eflags)
static TupleTableSlot *
mysqlIterateForeignScan(ForeignScanState *node)
{
- bool found = false;
- char *key;
- char *data = 0;
char **values;
HeapTuple tuple;
+ MYSQL_ROW row;
+ int x;
MySQLFdwExecutionState *festate = (MySQLFdwExecutionState *) node->fdw_state;
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
-#ifdef DEBUG
- elog(NOTICE, "mysqlIterateForeignScan");
-#endif
-
/* Cleanup */
ExecClearTuple(slot);
-
/* Get the next tuple */
- if (festate->row < 10)
- found = true;
- festate->row++;
+ if ((row = mysql_fetch_row(festate->result)))
+ {
+ /* Build the tuple */
+ values = (char **) palloc(sizeof(char *) * mysql_num_fields(festate->result));
- /* Build the tuple */
- values = (char **) palloc(sizeof(char *) * 2);
+ for (x = 0; x < mysql_num_fields(festate->result); x++)
+ values[x] = row[x];
- if (found)
- {
- values[0] = "<key>";
- values[1] = "<vaue>";
tuple = BuildTupleFromCStrings(festate->attinmeta, values);
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
}
- /* Cleanup */
-
return slot;
}
@@ -565,12 +562,17 @@ mysqlEndForeignScan(ForeignScanState *node)
{
MySQLFdwExecutionState *festate = (MySQLFdwExecutionState *) node->fdw_state;
-#ifdef DEBUG
- elog(NOTICE, "mysqlEndForeignScan");
-#endif
+ if (festate->result)
+ {
+ mysql_free_result(festate->result);
+ festate->result = NULL;
+ }
- mysql_close(festate->conn);
- festate->conn = NULL;
+ if (festate->result)
+ {
+ mysql_close(festate->conn);
+ festate->conn = NULL;
+ }
}
/*
@@ -582,63 +584,5 @@ mysqlReScanForeignScan(ForeignScanState *node)
{
MySQLFdwExecutionState *festate = (MySQLFdwExecutionState *) node->fdw_state;
-#ifdef DEBUG
- elog(NOTICE, "mysqlReScanForeignScan");
-#endif
-
- festate->row = 0;
}
-static void
-mysqlGetQual(Node *node, TupleDesc tupdesc, char **key, char **value, bool *pushdown)
-{
- *key = NULL;
- *value = NULL;
- *pushdown = false;
-
- if (!node)
- return;
-
- if (IsA(node, OpExpr))
- {
- OpExpr *op = (OpExpr *) node;
- Node *left, *right;
- Index varattno;
-
- if (list_length(op->args) != 2)
- return;
-
- left = list_nth(op->args, 0);
-
- if (!IsA(left, Var))
- return;
-
- varattno = ((Var *) left)->varattno;
-
- right = list_nth(op->args, 1);
-
- if (IsA(right, Const))
- {
- StringInfoData buf;
-
- initStringInfo(&buf);
-
- /* And get the column and value... */
- *key = NameStr(tupdesc->attrs[varattno - 1]->attname);
- *value = TextDatumGetCString(((Const *) right)->constvalue);
-
- /*
- * We can push down this qual if:
- * - The operatory is TEXTEQ
- * - The qual is on the key column
- */
- if (op->opfuncid == PROCID_TEXTEQ && strcmp(*key, "key") == 0)
- *pushdown = true;
-
- elog(NOTICE, "Got qual %s = %s", *key, *value);
- return;
- }
- }
-
- return;
-}

0 comments on commit e30604f

Please sign in to comment.
Something went wrong with that request. Please try again.