Skip to content
Wen Lin edited this page Oct 24, 2019 · 5 revisions

MPP FDW

CREATE SERVER server_name [ TYPE 'server_type' ] [ VERSION 'server_version' ]
    FOREIGN DATA WRAPPER fdw_name
    [ OPTIONS ( option 'value' [, ... ] ) ]
CREATE FOREIGN TABLE [ IF NOT EXISTS ] table_name ( [
  { column_name data_type [ OPTIONS ( option 'value' [, ... ] ) ] [ COLLATE collation ] [ column_constraint [ ... ] ]
    | table_constraint }
    [, ... ]
] )
[ INHERITS ( parent_table [, ... ] ) ]
  SERVER server_name
[ OPTIONS ( option 'value' [, ... ] ) ]
[ ]

Examples:

file_fdw

Evironment

name host segid csv_file_path
seg0 host0 1 /var/file/1/file.csv
seg1 host0 2 /var/file/2/file.csv
seg2 host2 3 /var/file/3/file.csv
CREATE EXTENSION file_fdw;
CREATE SERVER fs FOREIGN DATA WRAPPER file_fdw;
CREATE FOREIGN TABLE external_tsv
    (id char(3), i integer, name text)
    SERVER fs
    OPTIONS (filename '/path/to/<segid>/file.csv', encoding="gb2312", option_mapper 'mapper_callback')
    MPP_OPTIONS(option_mapper 'mapper_callback');

char*
default_mapper_callback(char* key, char* value, int segid)
{
    if (strcmp(keys[i], "filename") == 0)
    {
        return strreplace("value", "<segid>", segid);
    }
    return NULL;
}

kafka

CREATE SERVER kafka_server
    FOREIGN DATA WRAPPER kafka_fdw
    OPTIONS (brokers 'localhost:9092');
CREATE USER MAPPING FOR PUBLIC SERVER kafka_server;
CREATE FOREIGN TABLE kafka_test (
    part int OPTIONS (partition 'true'),
    offs bigint OPTIONS (offset 'true'),
    some_int int,
    some_text text,
    some_date date,
    some_time timestamp
) SERVER kafka_server OPTIONS
    (format 'csv', topic 'contrib_regress', batch_size '30', buffer_delay '100');

How to parallelize a FDW?

Define a C struct to represent the information provided by user, these information is needed before dispatch the plan to QEs. Once QD get this structure, it can fill the information into structure Slice and dispatch to QEs.

User may create some UDFs to provide the information, the UDFs maybe need(or don't need)to talk to foreign server to get some information.

For example, if the foreign server is a Kafka server, the UDF can ask the broker to get the information; if the foreign server is file fdw, there is no server to ask, the UDF should implement the logic of providing the information.


typedef struct MPPInfo {
    OpType   opType;     // operation type: read or write?
    int      count;      // how many partitions or segments?
    DistType distType;   // distribution type, if data is distributed by key or randomly in this server
    List     *segments;  // a list of segments for QEs to connect.
} MPPInfo;

Clone this wiki locally