Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ Afterwards replace `PubSub` with `PostgresPubSub`:

```js
// Before
import { PubSub } from 'graphql-subscriptions';
import { PubSub } from "graphql-subscriptions";

export const pubsub = new PubSub();
```

```js
// After
import { PostgresPubSub } from 'graphql-postgres-subscriptions';
import { PostgresPubSub } from "graphql-postgres-subscriptions";

export const pubsub = new PostgresPubSub();
```
Expand All @@ -43,7 +43,7 @@ You can also pass [node-postgres connection options](https://node-postgres.com/f
You can instantiate your own `client` and pass it to `PostgresPubSub`. Like this:

```js
import { PostgresPubSub } from 'graphql-postgres-subscriptions';
import { PostgresPubSub } from "graphql-postgres-subscriptions";
import { Client } from "pg";

const client = new Client();
Expand All @@ -53,11 +53,29 @@ const pubsub = new PostgresPubSub({ client });

**Important**: Don't pass clients from `pg`'s `Pool` to `PostgresPubSub`. As [node-postgres creator states in this StackOverflow answer](https://stackoverflow.com/questions/8484404/what-is-the-proper-way-to-use-the-node-js-postgresql-module), the client needs to be around and not shared so pg can properly handle `NOTIFY` messages (which this library uses under the hood)

## Error handling

`PostgresPubSub` instances emit a special event called `"error"`. This event's payload is an instance of Javascript's `Error`. You can get the error's text using `error.message`.

```js
const ps = new PostgresPubSub({ client });

ps.subscribe("error", err => {
console.log(err.message); // -> "payload string too long"
}).then(() => ps.publish("a", "a".repeat(9000)));
```

For example you can log all error messages (including stack traces and friends) using something like this:

```js
ps.subscribe("error", console.error);
```

## Development

This project has an integration test suite that uses [`jest`](https://facebook.github.io/jest/) to make sure everything works correctly.

We use Docker to spin up a PostgreSQL instance before running the tests. To run them, type the following commands:

* `docker-compose build`
* `docker-compose run test`
- `docker-compose build`
- `docker-compose run test`
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "graphql-postgres-subscriptions",
"version": "1.0.2",
"version": "1.0.3",
"main": "index.js",
"license": "MIT",
"engines": {
Expand Down
4 changes: 3 additions & 1 deletion postgres-pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ class PostgresPubSub extends PubSub {
return true;
}
subscribe(triggerName, onMessage) {
const callback = ({ payload }) => onMessage(payload);
const callback = message => {
onMessage(message instanceof Error ? message : message.payload);
};
this.ee.on(triggerName, callback);
this.subIdCounter = this.subIdCounter + 1;
this.subscriptions[this.subIdCounter] = [triggerName, callback];
Expand Down
75 changes: 45 additions & 30 deletions postgres-pubsub.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,36 @@ describe("PostgresPubSub", () => {
test("PostgresPubSub can subscribe when instantiated without a client", function(done) {
const ps = new PostgresPubSub();
ps.subscribe("a", payload => {
expect(payload).toEqual("test");
done();
expect(payload).toEqual("test");
done();
}).then(() => {
const succeed = ps.publish("a", "test");
expect(succeed).toBe(true);
});
const succeed = ps.publish("a", "test");
expect(succeed).toBe(true);
});
});

test("PostgresPubSub can subscribe and is called when events happen", function(done) {
const ps = new PostgresPubSub({ client });
ps.subscribe("a", payload => {
expect(payload).toEqual("test");
done();
expect(payload).toEqual("test");
done();
}).then(() => {
const succeed = ps.publish("a", "test");
expect(succeed).toBe(true);
});
const succeed = ps.publish("a", "test");
expect(succeed).toBe(true);
});
});

test("PostgresPubSub can subscribe when instantiated with connection options but without a client", function(done) {
const ps = new PostgresPubSub({
connectionString: process.env.DATABASE_URL
});
ps.subscribe("a", payload => {
expect(payload).toEqual("test");
done();
expect(payload).toEqual("test");
done();
}).then(() => {
const succeed = ps.publish("a", "test");
expect(succeed).toBe(true);
});
const succeed = ps.publish("a", "test");
expect(succeed).toBe(true);
});
});

test("should send notification event after calling publish", done => {
Expand All @@ -53,24 +53,39 @@ describe("PostgresPubSub", () => {
done();
});
ps.subscribe("a", payload => {
expect(payload).toEqual("test");
expect(payload).toEqual("test");
}).then(() => {
const succeed = ps.publish("a", "test");
expect(succeed).toBe(true);
});
const succeed = ps.publish("a", "test");
expect(succeed).toBe(true);
});
});

test("PostgresPubSub can unsubscribe", function(done) {
const ps = new PostgresPubSub({ client });
ps.subscribe("a", payload => {
expect(false).toBe(true); // Should not reach this point
expect(false).toBe(true); // Should not reach this point
}).then(subId => {
ps.unsubscribe(subId);
const succeed = ps.publish("a", "test");
expect(succeed).toBe(true); // True because publish success is not
// indicated by trigger having subscriptions
done(); // works because pubsub is synchronous
});
ps.unsubscribe(subId);
const succeed = ps.publish("a", "test");
expect(succeed).toBe(true); // True because publish success is not
// indicated by trigger having subscriptions
done(); // works because pubsub is synchronous
});
});

test("Should emit error when payload exceeds Postgres 8000 character limit", done => {
const ps = new PostgresPubSub({ client });
ps.subscribe("a", () => {
expect(false).toBe(true); // Should not reach this point
done();
});
ps.subscribe("error", err => {
expect(err.message).toEqual("payload string too long");
done();
}).then(() => {
const succeed = ps.publish("a", "a".repeat(9000));
expect(succeed).toBe(true);
});
});

test("AsyncIterator should expose valid asyncIterator for a specific event", () => {
Expand Down Expand Up @@ -130,10 +145,10 @@ describe("PostgresPubSub", () => {
iterator
.next()
.then(result => {
expect(result).not.toBeUndefined();
expect(result.value).not.toBeUndefined();
expect(result.done).toBe(false);
});
expect(result).not.toBeUndefined();
expect(result.value).not.toBeUndefined();
expect(result.done).toBe(false);
});

ps.publish(eventName, { test: true });

Expand Down