-
Notifications
You must be signed in to change notification settings - Fork 23
User Scripts
All configuration languages eventually become Turing-complete, so let's start with a Turing-complete language for configuration.
Replicator contains an embedded JavaScript / ES5+ runtime that has a TypeScript loader frontend. This allows users to inject arbitrary logic into the data-processing pipeline. Sufficiently complicated scripts may be pre-processed by other build systems that produce CommonJS-compatible output.
The user script imports API bindings from a module named
replicator@v1
.
TypeScript definitions for the module are baked into the Replicator binary and can be printed by
running the following:
replicator userscript --api > 'replicator@v1.d.ts'
Most Replicator subcommands support a --userscript
flag which specifies the entry-point file:
replicator start --userscript ./path/to/main.ts
The following model describes the flow of data. Records are fanned out from a source,
dispatch()
ed to a per-destination map()
function, and then applied to one or more destination
tables.
source -> dispatch() -> map() -> some_table
-> map() -> another_table
-> map() -> third_table
....
All userscripts should import the runtime bindings as
import * as api from "replicator@v1";
Running replicator userscript --api
will print the current d.ts
file, for better tooling
integration.
The embedded loader supports module-relative import paths or importing from arbitrary URLs:
import * as lib from "./lib";
import * as something from "https://some.cdn/package@1";
Non-script resources can also be imported:
import externalData from "./data.txt";
In CDC-based modes, the input schema name is used as the source. That is, if a changefeed is
sending data to https://replicator.svc/my_db/public
, then one would call
api.configureSource("my_db.public")
or api.configureSource("MY_DB")
for single-level namespaces.
Similarly, if the pglogical
or mylogical
frontends are used, the source name is taken from the
incoming schema.
The meta
object passed to the dispatch
function contains the following fields:
-
cdc
|mylogical
|pglogical
: One of these properties is set totrue
, depending on the frontend. -
logical
: The logical component of the mutation's HLC timestamp. -
nanos
: The wall component of the mutation's HLC timestamp. -
schema
: The destination schema, e.g.my_db.public
orMY_DB
. -
table
: The name of the destination table. This will be needed to produce thedispatch()
output as shown below.
// configureSource is useful for renaming tables on the fly or adjusting schemas on the fly.
api.configureSource("my_db.public", {
dispatch: (doc: Document, meta: Document): Record<Table, Document[]> => {
console.log(JSON.stringify(doc), JSON.stringify(meta));
let tbl: api.Table; // Just a string.
switch (meta.table) {
case "oldTbl1":
tbl = "newTbl1";
break;
case "oldTbl2":
tbl = "newTbl2";
break;
default:
tbl = meta.table;
break;
}
// Other schema fixups could be performed on the document.
// Multiple documents can be dispatched to multiple
// tables as well to create a fan-out effect.
let ret: Record<Table, Document[]> = {};
ret[tbl] = [ doc ];
return ret
}
})
// Data behaviors are configured using fully-qualified table names.
// Unqualified table names are acceptable, but are ambiguous if
// a single instance of Replicator is processing multiple schemas.
api.configureTable("my_db.public.my_table", {
map: (doc: Document): Document => {
console.log("map", JSON.stringify(doc));
return doc;
},
// See other configuration options in the d.ts file.
})
If Replicator is using a document store as its source, a mapping of source collections to destination table(s) must be established.
This shows a configuration where a source collection some_collection
is passed directly through to
some_table
in the destination.
import * as api from "replicator@v1";
api.configureSource("some_collection", {
target: "some_table"
});
As is often the case, source documents will have nested structures that require non-trivial mapping to multiple destination tables:
import * as api from "replicator@v1";
api.configureSource("complex", {
dispatch: (doc, meta) => {
console.log(JSON.stringify(doc), JSON.stringify(meta));
return {
// "destination table" => array of column/value pairs
"parent": [{"foo": doc.foo, "bar": doc.bar}],
"child": [
{"column": "value"},
{"column": "value"},
],
};
},
deletesTo: "parent" # Allows ON DELETE CASCADE
});
The dispatch
function can be arbitrarily complex. It is often the case that the return value is
built entirely dynamically.
Each destination table may have a map()
function associated with it that has an opportunity to
inspect or modify all records bound for that table. The mapper function can also act as a filter by
returning null
instead of an object.
import * as api from "replicator@v1";
api.configureTable("has_mapper", {
// Final document fixups, or return null to drop it.
map: doc => {
doc.msg = doc.msg.trim();
doc.num = 42;
console.log("Hello debug", JSON.stringify(doc));
return doc;
}
});
Similarly, a deleteKey()
function may be defined for a table which can alter the primary-key
values used to processing incoming deletes or to elide deletions altogether. The function receives
an array of values representing the primary-key columns and it may return a revised key or null.
This supports use cases where the target schema does not exactly match the source schema.
import * as api from "replicator@v1";
// Elide all deletes for the table, e.g.: for archival use cases.
api.configureTable("delete_elide", {
deleteKey: () => null,
});
// Swap the order of PK elements.
api.configureTable("delete_swap", {
deleteKey: key => [key[1], key[0]]
});
All data application behaviors can be configured programmatically.
import * as api from "replicator@v1";
api.configureTable("all_features", {
// Compare-and-set operations.
cas: ["cas0", "cas1"],
// Drop old data.
deadlines: {
"dl0": "1h",
"dl1": "1m"
},
// Provide alternate SQL expressions to (possibly filtered) data.
exprs: {
"expr0": "fnv32($0::BYTES)",
"expr1": lib.libraryFn(),
},
// Place unmapped data into JSONB column.
extras: "overflow_column",
// Allow column in target database to be ignored.
ignore: {
"ign0": true,
"ign1": true,
"ign2": false
}
});
It is possible for the userscript to replace Replicator's interactions with the target database
table by defining an apply
function for a given table. The apply
function will receive a
collection of operations to perform on the target table, which it should translate to (arbitrary)
SQL commands. The use of an apply
handler supplants any other data behavior which may be
configured for the table.
The example below is explicitly typed to that readers may refer to the
replicator@v1
type definitions.
api.configureTable("sql_demo", {
// The apply function must return a Promise. The runtime supports
// async functions, greatly improve readability.
apply: async (ops: Iterable<ApplyOp>): Promise<void> => {
// Access to the target database is provided via an already-established
// transaction. The api.getTX() function will return the same value
// in the apply callback and in any asynchronous (Promise) callbacks
// that the apply function may trigger.
let tx: api.TargetTX = api.getTX();
// The userscript can access Replicator's introspection of the target
// table.
let cols: api.TargetColumn[] = tx.columns();
// We can perform arbitrary queries against the database. The query()
// function returns a Promise, on which we await in the following
// for loop. The value returned by the Promise implements the
// JavaScript Iterable protocol; this allows it to be used with
// the for..of construction. The returned arrays have elements which
// correspond to the columns of the result set.
let rows: Promise<Iterable<any[]>> = tx.query(
`SELECT *
FROM (VALUES (1, 2), (3, 4))`);
for (let row of await rows) {
console.log("rows query", JSON.stringify(row));
}
// This is a not-entirely-contrived example where one might
// interact with a database that can't update, but can delete
// and then insert.
for (let op of ops) {
// The transaction object has a table() method which will
// return a quoted, SQL-safe identifier for the configured
// table. The script is free to execute SQL that affects
// any table. Substitution parameters are supported as a
// varargs. The specific syntax for substitution points
// is specific to the target database.
await tx.exec(
`DELETE
FROM ${tx.table()}
WHERE pk = $1`, 2 * +op.pk[0])
// The operation is a union type, distinguished by the action
// property, which contains either "upsert" or "delete".
if (op.action == "upsert") {
await tx.exec(
`INSERT INTO ${tx.table()} (pk, val)
VALUES ($1, $2)`,
2 * +op.data.pk, -2 * +op.data.val);
}
}
},
});
Most CLI flags can be set or overridden by the user script.
import * as api from "replicator@v1";
api.setOptions({
"immediate": "true"
});