Skip to content

Commit

Permalink
exp: add stream endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
ivarconr committed Oct 29, 2020
1 parent e7e8605 commit 65734f8
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 0 deletions.
14 changes: 14 additions & 0 deletions lib/db/event-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,20 @@ class EventStore extends EventEmitter {
constructor(db) {
super();
this.db = db;

const a = async () => {
const client = await db.client.acquireRawConnection();

// Everything
client.on('notification', msg => {
if (msg.channel === 'event') {
const payload = JSON.parse(msg.payload);
this.emit('stream', payload);
}
});
client.query('LISTEN event');
};
a();
}

async store(event) {
Expand Down
2 changes: 2 additions & 0 deletions lib/routes/client-api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const Controller = require('../controller');
const FeatureController = require('./feature.js');
const MetricsController = require('./metrics.js');
const RegisterController = require('./register.js');
const StreamController = require('./stream');
const apiDef = require('./api-def.json');

class ClientApi extends Controller {
Expand All @@ -17,6 +18,7 @@ class ClientApi extends Controller {
this.use('/features', new FeatureController(stores, getLogger).router);
this.use('/metrics', new MetricsController(stores, getLogger).router);
this.use('/register', new RegisterController(stores, getLogger).router);
this.use('/stream', new StreamController(stores, getLogger).router);
}

index(req, res) {
Expand Down
22 changes: 22 additions & 0 deletions lib/routes/client-api/stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict';

const SSE = require('express-sse');

const sse = new SSE();

const Controller = require('../controller');

class StreamController extends Controller {
constructor({ eventStore }) {
super();
this.eventStore = eventStore;

this.get('/', sse.init);

this.eventStore.on('stream', evt => {
sse.send(evt.data, evt.type, evt.id);
});
}
}

module.exports = StreamController;
43 changes: 43 additions & 0 deletions migrations/20201028195256-listen-events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
'use strict';

const async = require('async');

exports.up = function(db, cb) {
async.series(
[
db.runSql.bind(
db,
`
CREATE OR REPLACE FUNCTION notify_event()
RETURNS trigger
LANGUAGE plpgsql
AS $BODY$
BEGIN
PERFORM pg_notify('event', row_to_json(NEW)::text);
RETURN NULL;
END;
$BODY$`,
),
db.runSql.bind(
db,
`
CREATE TRIGGER notify_event
AFTER INSERT
ON "events"
FOR EACH ROW
EXECUTE PROCEDURE notify_event();`,
),
],
cb,
);
};

exports.down = function(db, cb) {
async.series(
[
db.runSql.bind(db, `DROP TRIGGER notify_event ON events`),
db.runSql.bind(db, `DROP FUNCTION notify_event()`),
],
cb,
);
};
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
"deepmerge": "^4.2.2",
"errorhandler": "^1.5.1",
"express": "^4.17.1",
"express-sse": "^0.5.3",
"gravatar-url": "^3.1.0",
"helmet": "^4.1.0",
"joi": "^17.2.0",
Expand Down
1 change: 1 addition & 0 deletions test/fixtures/fake-event-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module.exports = () => {
const events = [];

return {
on: () => {},
store: event => {
events.push(event);
return Promise.resolve();
Expand Down
5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1899,6 +1899,11 @@ expand-tilde@^2.0.0, expand-tilde@^2.0.2:
dependencies:
homedir-polyfill "^1.0.1"

express-sse@^0.5.3:
version "0.5.3"
resolved "https://registry.yarnpkg.com/express-sse/-/express-sse-0.5.3.tgz#6e6cb1a85ef7b6ec1eb658e37e923907c482bd31"
integrity sha512-DJF0nofFGq0IXJLGq95hfrryP3ZprVAVpyZUnmAk6QhHnm7zCzsHBNFP0i4FKFo2XjOf+JiYUKjT7jQhIeljpg==

express@^4.17.1:
version "4.17.1"
resolved "https://registry.yarnpkg.com/express/-/express-4.17.1.tgz#4491fc38605cf51f8629d39c2b5d026f98a4c134"
Expand Down

0 comments on commit 65734f8

Please sign in to comment.