Skip to content

Commit

Permalink
feat(subscriptions): introducing live query support (#399)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjie committed Feb 20, 2019
1 parent 1978ffa commit 86ef40f
Show file tree
Hide file tree
Showing 70 changed files with 12,847 additions and 120 deletions.
1 change: 1 addition & 0 deletions .eslintignore
Expand Up @@ -2,3 +2,4 @@ node_modules
node7minus
node8plus
examples
dist
16 changes: 14 additions & 2 deletions .travis.yml
Expand Up @@ -6,9 +6,13 @@ node_js:

addons:
postgresql: "9.4"
apt:
packages:
- postgresql-server-dev-9.4

env:
TEST_DATABASE_URL: postgres://localhost:5432/travis
PGVERSION: 9.4

cache:
directories:
Expand All @@ -20,6 +24,13 @@ install:
- yarn
- lerna bootstrap

before_script:
- psql -c "ALTER USER travis WITH PASSWORD 'travis';"
- sudo bash -c "echo -e 'wal_level = logical\nmax_replication_slots = 10\nmax_wal_senders = 10' >> /etc/postgresql/$PGVERSION/main/postgresql.conf"
- sudo service postgresql restart
- git clone https://github.com/eulerto/wal2json.git
- sudo bash -c "cd wal2json && USE_PGXS=1 make && USE_PGXS=1 make install"

script:
- scripts/ci

Expand All @@ -30,11 +41,12 @@ matrix:
packages:
- postgresql-10
- postgresql-client-10
- postgresql-server-dev-10
postgresql: 10
env:
- PGPORT=5433
- TEST_DATABASE_URL=postgres://travis:travis@localhost:5433/travis
- LDS_TEST_DATABASE_URL=postgres://travis:travis@localhost:5433/lds_test
- PGVERSION=10
sudo: false
dist: trusty
before_script:
- psql -c "ALTER USER travis WITH PASSWORD 'travis';"
2 changes: 1 addition & 1 deletion lerna.json
Expand Up @@ -5,7 +5,7 @@
"packages": [
"packages/*"
],
"version": "4.3.1",
"version": "4.3.2-beta.0",
"command": {
"publish": {
"allowBranch": [
Expand Down
4 changes: 2 additions & 2 deletions packages/graphile-build-pg/package.json
@@ -1,6 +1,6 @@
{
"name": "graphile-build-pg",
"version": "4.3.1",
"version": "4.3.2-beta.0",
"description": "Build a GraphQL schema by reflection over a PostgreSQL schema. Easy to customize since it's built with plugins on graphile-build",
"main": "node8plus/index.js",
"types": "node8plus/index.d.ts",
Expand Down Expand Up @@ -39,7 +39,7 @@
"dependencies": {
"chalk": "^2.1.0",
"debug": ">=2 <3",
"graphile-build": "4.3.1",
"graphile-build": "4.3.2-beta.0",
"graphql-iso-date": "^3.6.0",
"jsonwebtoken": "^8.1.1",
"lodash": ">=4 <5",
Expand Down
32 changes: 32 additions & 0 deletions packages/graphile-build-pg/src/PgLiveProvider.js
@@ -0,0 +1,32 @@
// @flow
import { LiveProvider } from "graphile-build";
import type { PgClass } from "./plugins/PgIntrospectionPlugin";

export default class PgLiveProvider extends LiveProvider {
// eslint-disable-next-line flowtype/no-weak-types
constructor(...args: any[]) {
super(...args);
this.namespace = "pg";
}

collectionIdentifierIsValid(collectionIdentifier: PgClass) {
return collectionIdentifier && collectionIdentifier.kind === "class";
}

recordIdentifierIsValid(
collectionIdentifier: PgClass,
// eslint-disable-next-line flowtype/no-weak-types
recordIdentifier: Array<any>
) {
if (!Array.isArray(recordIdentifier)) return false;
if (!collectionIdentifier.primaryKeyConstraint) return false;
if (
recordIdentifier.length !==
collectionIdentifier.primaryKeyConstraint.keyAttributes.length
) {
return false;
}
// TODO: more validation would not go amiss
return true;
}
}
157 changes: 143 additions & 14 deletions packages/graphile-build-pg/src/QueryBuilder.js
Expand Up @@ -3,6 +3,10 @@ import * as sql from "pg-sql2";
import type { SQL } from "pg-sql2";
import isSafeInteger from "lodash/isSafeInteger";
import chunk from "lodash/chunk";
import type { PgClass } from "./plugins/PgIntrospectionPlugin";

// eslint-disable-next-line flowtype/no-weak-types
type GraphQLContext = any;

const isDev = process.env.POSTGRAPHILE_ENV === "development";

Expand Down Expand Up @@ -42,11 +46,14 @@ export type QueryBuilderOptions = {
};

class QueryBuilder {
parentQueryBuilder: QueryBuilder | void;
context: GraphQLContext;
supportsJSONB: boolean;
locks: {
[string]: true | string,
};
finalized: boolean;
selectedIdentifiers: boolean;
data: {
cursorPrefix: Array<string>,
select: Array<[SQLGen, RawAlias]>,
Expand All @@ -65,9 +72,13 @@ class QueryBuilder {
first: ?number,
last: ?number,
beforeLock: {
[string]: Array<() => void>,
[string]: Array<() => void> | null,
},
cursorComparator: ?CursorComparator,
liveConditions: Array<
// eslint-disable-next-line flowtype/no-weak-types
[(data: {}) => (record: any) => boolean, { [key: string]: SQL } | void]
>,
};
compiledData: {
cursorPrefix: Array<string>,
Expand All @@ -89,12 +100,14 @@ class QueryBuilder {
cursorComparator: ?CursorComparator,
};

constructor(options: QueryBuilderOptions = {}) {
constructor(options: QueryBuilderOptions = {}, context: GraphQLContext = {}) {
this.context = context || {};
this.supportsJSONB =
options.supportsJSONB == null ? true : !!options.supportsJSONB;

this.locks = {};
this.finalized = false;
this.selectedIdentifiers = false;
this.data = {
// TODO: refactor `cursorPrefix`, it shouldn't be here (or should at least have getters/setters)
cursorPrefix: ["natural"],
Expand All @@ -115,6 +128,7 @@ class QueryBuilder {
last: null,
beforeLock: {},
cursorComparator: null,
liveConditions: [],
};
this.compiledData = {
cursorPrefix: ["natural"],
Expand Down Expand Up @@ -194,9 +208,83 @@ class QueryBuilder {

beforeLock(field: string, fn: () => void) {
this.checkLock(field);
this.data.beforeLock[field] = this.data.beforeLock[field] || [];
if (!this.data.beforeLock[field]) {
this.data.beforeLock[field] = [];
}
// $FlowFixMe
this.data.beforeLock[field].push(fn);
}

makeLiveCollection(
table: PgClass,
// eslint-disable-next-line flowtype/no-weak-types
cb?: (checker: (data: any) => (record: any) => boolean) => void
) {
if (!this.context.liveCollection) return;
if (!this.context.liveConditions) return;
/* the actual condition doesn't matter hugely, 'select' should work */
const liveConditions = this.data.liveConditions;
const checkerGenerator = data => {
// Compute this once.
const checkers = liveConditions.map(([checkerGenerator]) =>
checkerGenerator(data)
);
return record => checkers.every(checker => checker(record));
};
if (this.parentQueryBuilder) {
if (cb) {
throw new Error(
"Either use parentQueryBuilder or pass callback, not both."
);
}
this.parentQueryBuilder.beforeLock("select", () => {
const id = this.context.liveConditions.push(checkerGenerator) - 1;
// BEWARE: it's easy to override others' conditions, and that will cause issues. Be sensible.
const allRequirements = this.data.liveConditions.reduce(
(memo, [_checkerGenerator, requirements]) =>
requirements ? Object.assign(memo, requirements) : memo,
{}
);
// $FlowFixMe
this.parentQueryBuilder.select(
sql.fragment`json_build_object(
'__id', ${sql.value(id)}::int
${sql.join(
Object.keys(allRequirements).map(
key =>
sql.fragment`, ${sql.literal(key)}::text, ${
allRequirements[key]
}`
),
", "
)}
)`,
"__live"
);
});
} else if (cb) {
cb(checkerGenerator);
} else {
throw new Error(
"makeLiveCollection was called without parentQueryBuilder and without callback"
);
}
}

addLiveCondition(
// eslint-disable-next-line flowtype/no-weak-types
checkerGenerator: (data: {}) => (record: any) => boolean,
requirements?: { [key: string]: SQL }
) {
if (requirements && !this.parentQueryBuilder) {
throw new Error(
"There's no parentQueryBuilder, so there cannot be requirements"
);
}
this.data.liveConditions.push([checkerGenerator, requirements]);
}

setCursorComparator(fn: CursorComparator) {
this.checkLock("cursorComparator");
this.data.cursorComparator = fn;
Expand Down Expand Up @@ -230,6 +318,23 @@ class QueryBuilder {
}
this.data.select.push([exprGen, alias]);
}
selectIdentifiers(table: PgClass) {
if (this.selectedIdentifiers) return;
const primaryKey = table.primaryKeyConstraint;
if (!primaryKey) return;
const primaryKeys = primaryKey.keyAttributes;
this.select(
sql.fragment`json_build_array(${sql.join(
primaryKeys.map(
key =>
sql.fragment`${this.getTableAlias()}.${sql.identifier(key.name)}`
),
", "
)})`,
"__identifiers"
);
this.selectedIdentifiers = true;
}
selectCursor(exprGen: SQLGen) {
this.checkLock("selectCursor");
this.data.selectCursor = exprGen;
Expand Down Expand Up @@ -554,11 +659,15 @@ class QueryBuilder {
queryBuilder: this,
});
const beforeLocks = this.data.beforeLock[type];
this.data.beforeLock[type] = [];
for (const fn of beforeLocks || []) {
fn();
if (beforeLocks && beforeLocks.length) {
this.data.beforeLock[type] = null;
for (const fn of beforeLocks) {
fn();
}
}
if (type !== "select") {
this.locks[type] = isDev ? new Error("Initally locked here").stack : true;
}
this.locks[type] = isDev ? new Error("Initally locked here").stack : true;
if (type === "cursorComparator") {
// It's meant to be a function
this.compiledData[type] = this.data[type];
Expand All @@ -574,18 +683,38 @@ class QueryBuilder {
context
);
} else if (type === "select") {
// Assume that duplicate fields must be identical, don't output the same key multiple times
/*
* NOTICE: locking select can cause additional selects to be added, so the
* length of this.data[type] may increase during the operation. This is
* why we handle this.locks[type] separately.
*/

// Assume that duplicate fields must be identical, don't output the same
// key multiple times
const seenFields = {};
const context = getContext();
this.compiledData[type] = this.data[type].reduce((memo, [a, b]) => {
const data = [];
const selects = this.data[type];

// DELIBERATE slow loop, see NOTICE above
for (let i = 0; i < selects.length; i++) {
const [valueOrGenerator, columnName] = selects[i];
// $FlowFixMe
if (!seenFields[b]) {
if (!seenFields[columnName]) {
// $FlowFixMe
seenFields[b] = true;
memo.push([callIfNecessary(a, context), b]);
seenFields[columnName] = true;
data.push([callIfNecessary(valueOrGenerator, context), columnName]);
const newBeforeLocks = this.data.beforeLock[type];
if (newBeforeLocks && newBeforeLocks.length) {
this.data.beforeLock[type] = null;
for (const fn of newBeforeLocks) {
fn();
}
}
}
return memo;
}, []);
}
this.locks[type] = isDev ? new Error("Initally locked here").stack : true;
this.compiledData[type] = data;
} else if (type === "orderBy") {
const context = getContext();
this.compiledData[type] = this.data[type].map(([a, b, c]) => [
Expand Down

0 comments on commit 86ef40f

Please sign in to comment.