Orchestrate data transfer from foreign tables to local tables with a simple configuration table. The relations must exist and should be defined with the same columns in no particular order.
fdw-assistant provides:
- a convenient way to build new stages based on configuration, composed by preconfigured jobs
- a report view that aggregate job status per stage
- a variety of options in the central config table
fdw-assistant has been designed to be used by a multiple processus
orchestrator, like xargs or dispatch commands. In that way, data
and subsets could be processed in parallel.
Download the main file and execute it on a PostgreSQL database. It creates a
dedicated schema (named assistant by default) with relations and routines.
Use the -v flag with the INSTALL variable to change the schema name.
psql -f fdw-assistant.sql -v INSTALL=assistantTo remove the assistant, just drop the schema.
DROP SCHEMA assistant CASCADE;config
-
source(typeregclass): Where the data comes from, relative to currentsearch_path. -
target(typeregclass): Where the data goes to, relative to currentsearch_path. -
pkey(typetext): Column name included in primary key constraint. Composite columns are not supported. -
priority(typenumeric): Used during stage creation to sort the job list in ascendent order. -
parts(typebigint) : Defines the number of subsets we want. Used during stage creation to build a modulus condition for each subset. -
trunc(typeboolean): If set totrue, the target table will be truncated at the very first start of a job. -
condition(typetext): Applies aWHEREcondition to theSELECTstatement used during data transfer. -
batchsize(typeinterger): If set, the job will loop over source target to transfer data as a bunch of rows with intermediateCOMMITat batch completion.
Examples:
INSERT INTO config
(source, target, pkey, priority, parts, trunc, condition, batchsize)
VALUES
-- t1 will be copied in a single operation
('source.t1', 'public.t1', 'id', 2, 1, true, null, null),
-- t2 will be dispatched to two jobs, each will insert data with a batch size of 200
('source.t2', 'public.t2', 'id', 1, 2, false, null, 200),
-- t3 will be copied with a condition that filters negative values
('source.t3', 'public.t3', 'id', 3, 1, true, 'value >= 0', null);plan(targets text[]) function
-
plan()prepares a new stage by creatingstage,jobandtaskrecords and returns a set ofCALL copy()statements in order of configured priority. -
targetsparameter is an array of target relation names used as filter. If empty, all relations in theconfigtable are used as targets. -
Using multiple parts (more than one) will result in multiple tasks (and tasks) being generated with a modulo condition to split into unique subsets.
copy(job_id bigint) procedure
-
copy()handlesINSERTstatement build and execution, updates its own job record during bulk insertion batches and at the end withsuccessorfailedstatus. -
A table with
truncoption will be truncated before bulk inserts. -
The same
copy(job_id)statement can be executed several times without truncating a table withtruncoption. This behavior is intended to resume the job from the last known sequence in case of unintentional interruption.
state enum type
-
running: a job is processing the data transfer -
failed: a error has been raised, the job stopped in the middle of a batch and rollbacked his current task. -
pending: a job has been prepared by theplan()procedure but thecopy()has not been called yet. -
completed: a job has processed the data transfert successfully.
report view
-
stage_id(typebigint): Identifier used to filter a specific stage. -
target(typeregclass): Where the data goes to, relative to currentsearch_path. -
job_start(typetimestamp): Start time of the jobs,nullif not started yet. -
state(typestate): State of the jobs,pendingby default. -
rows(typenumeric): Number of rows processed by the jobs so far. -
total(typenumeric): Number of rows to process by the jobs. -
elapsed(typeinterval): Cumulative elapsed time spent by the jobs. -
rate(typenumeric): Calculated rate (rows per second) attached to a jobs, based on elapsed time and rows processed. -
progress(typenumeric): Calculated progression (from 0 to 1) based on processed rows and total rows to process. -
eti(typeinterval): Estimated time interval before completion. -
eta(typetimestamp): Estimated time of completion.
stage table
-
stage_id(typebigint): A unique identifier spawned by callingplan()function. -
ts(typetimestamp): Creation time of the stage.
job table
-
stage_id(typebigint): Identifier of the stage that job belongs to. -
job_id(typebigint): A unique identifier to manipulate the job withcopy()procedure. -
part(typeinteger): Subset identifier, start with 0. -
lastseq(typebigint): Last maximum value returned by theINSERTstatement, based on thepkey, to resume the data transfer with new rows. -
rows(typebigint): Cumulative number of rows processed. -
elapsed(typeinterval): Cumulative elapsed time spent by the job. -
ts(typetimestamp): Start time of the job,nullif not started yet. -
state(typestate): State of the job,pendingby default.
Feature tests are included in the sql directory. Ensure a local PostgreSQL
server is running and execute the following command:
make testEach test is isolated in a dedicated database, named based on the test
filename without the .sql extension and prefixed by a single underscore
(_). They should be launched in parallel with the -j option. For example, to
run tests in parallel with 4 jobs, run the following command:
make test -j 4To drop transient databases, run the following command:
make cleanGenerate oracle_fdw key options for each foreign table
When reading foreign tables through oracle_fdw extension, we should add a special option on primary key column of the foreign table. It enforces index usage on remote database when exporting data in a batch.
Source : https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns
SELECT format('ALTER FOREIGN TABLE fdw.%I ALTER COLUMN %I OPTIONS (ADD key ''true'')', c.relname, a.attname)
FROM pg_catalog.pg_class c
INNER JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
INNER JOIN pg_index i ON c.oid = i.indrelid
INNER JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) AND i.indisprimary
WHERE n.nspname = 'public';Feed the config table from foreign tables definition
INSERT INTO config (target, source, pkey)
SELECT format('public.%I', c.relname), format('%I.%I', n.nspname, c.relname), a.attname
FROM pg_catalog.pg_foreign_table ft
INNER JOIN pg_catalog.pg_class c ON c.oid = ft.ftrelid
INNER JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
INNER JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid,
LATERAL pg_catalog.pg_options_to_table(a.attfdwoptions) op
WHERE a.attnum > 0 AND NOT a.attisdropped
AND op.option_name = 'key' AND op.option_value = 'true';Feed the config table from public tables definition with single column key
INSERT INTO config (target, source, pkey)
SELECT format('%I.%I', n.nspname, c.relname), format('%I.%I', fn.nspname, cf.relname), MIN(a.attname)
FROM pg_catalog.pg_class c
INNER JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
INNER JOIN pg_index i ON c.oid = i.indrelid
INNER JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) AND i.indisprimary
INNER JOIN pg_catalog.pg_class cf ON cf.relname = c.relname
INNER JOIN pg_catalog.pg_foreign_table ft ON ft.ftrelid = cf.oid
INNER JOIN pg_catalog.pg_namespace fn ON fn.oid = cf.relnamespace
WHERE n.nspname = 'public' AND c.relkind = 'r' AND cf.relkind = 'f'
GROUP BY 1, 2 HAVING COUNT(1) = 1;Feed the config table from public tables definition
INSERT INTO assistant.config (target, source)
SELECT DISTINCT format('%I.%I', n.nspname, c.relname), format('%I.%I', fn.nspname, cf.relname)
FROM pg_catalog.pg_class c
INNER JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
INNER JOIN pg_catalog.pg_class cf ON cf.relname = c.relname
INNER JOIN pg_catalog.pg_foreign_table ft ON ft.ftrelid = cf.oid
INNER JOIN pg_catalog.pg_namespace fn ON fn.oid = cf.relnamespace
WHERE n.nspname = 'public' AND c.relkind = 'r' AND cf.relkind = 'f';Add a conversion comment on all boolean columns from public tables definition
SET search_path = source;
SELECT format('COMMENT ON COLUMN %I.%I IS ''bool(%%s)''', c.relname, a.attname)
FROM pg_catalog.pg_class c
INNER JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
INNER JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
INNER JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
WHERE n.nspname = 'public'
AND c.relkind = 'r' AND t.typname = 'bool'
AND a.attnum > 0 AND NOT a.attisdropped \gexec