Skip to content

Commit

Permalink
fix: switched to typescript and improved api
Browse files Browse the repository at this point in the history
  • Loading branch information
ernest-okot committed May 3, 2018
1 parent 88f90af commit 76b3d64
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 140 deletions.
3 changes: 0 additions & 3 deletions .babelrc

This file was deleted.

8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
}
},
"scripts": {
"compile": "babel src/ -d lib/",
"compile": "tsc",
"commit": "git add . && git-cz",
"commitmsg": "commitlint -e $GIT_PARAMS",
"lint": "eslint src",
Expand All @@ -28,14 +28,14 @@
"@commitlint/cli": "^6.1.3",
"@commitlint/config-conventional": "^6.1.3",
"@semantic-release/git": "^4.0.2",
"babel-cli": "^6.26.0",
"babel-preset-es2015": "^6.24.1",
"@types/amqplib": "^0.5.7",
"commitizen": "^2.9.6",
"cz-conventional-changelog": "^2.1.0",
"eslint": "^4.19.1",
"husky": "^0.14.3",
"nsp": "^3.2.1",
"semantic-release": "^15.1.7"
"semantic-release": "^15.1.7",
"typescript": "^2.8.3"
},
"directories": {
"example": "examples",
Expand Down
91 changes: 0 additions & 91 deletions src/topic/consumer.js

This file was deleted.

93 changes: 93 additions & 0 deletions src/topic/consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import {connect, Channel, Connection, Options} from 'amqplib';

class TopicConsumer {
private connection: Connection;
private channel: Channel;
private exchange: string;
private exchangeOptions: Options.AssertExchange;
private queue: string;
private queueOptions: Options.AssertQueue;
private topics: string[];
private topicFunctions = {};

static create(exchange, queue) {
return new TopicConsumer(exchange, queue);
}

constructor(exchange = null, queue = '') {
this.exchange = exchange;
this.queue = queue;
}

setExchange(exchange: string, options: Options.AssertExchange) {
this.exchange = exchange;
this.exchangeOptions = options;

return this;
}

setQueue(queue: string, options: Options.AssertQueue) {
this.queue = queue;
this.queueOptions = options;

return this;
}

subscribe(topic: string, callback: any) {
this.topics = [...this.topics, topic];
this.topicFunctions[topic] = callback;

return this;
}

async start(amqpUrl: string) {
this.connection = await connect(amqpUrl);

this.channel = await this.connection.createChannel();

await this.channel.assertExchange(this.exchange, 'topic', this.exchangeOptions);

const q = await this.channel.assertQueue(this.queue, this.queueOptions);

const uniqueTopics = [...new Set(this.topics)];

await Promise.all(
uniqueTopics.map(async topic =>
await this.channel.bindQueue(q.queue, this.exchange, topic)
)
);

return this.channel.consume(q.queue, async msg => {
const topic = msg.fields.routingKey;

const [key] = Object.keys(this.topicFunctions).filter(t => {
const regexString = t.replace(/\*/g, '[^.]+').replace(/#/g, '.*');
const regex = new RegExp('^' + regexString + '$');

return topic.match(regex);
});

const callback = this.topicFunctions[key];

if (!callback) {
this.channel.ack(msg);
return;
}

try {
await callback(topic, JSON.parse(msg.content.toString()));
this.channel.ack(msg);
} catch (error) {
this.channel.nack(msg)
}

}, {noAck: false,});
}

stop() {
this.connection.close();
}

}

export default TopicConsumer;
7 changes: 0 additions & 7 deletions src/topic/index.js

This file was deleted.

7 changes: 7 additions & 0 deletions src/topic/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import TopicConsumer from './consumer';
import TopicProducer from './producer';

module.exports = {
TopicProducer,
TopicConsumer,
};
35 changes: 0 additions & 35 deletions src/topic/producer.js

This file was deleted.

45 changes: 45 additions & 0 deletions src/topic/producer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import {connect, Connection, Channel, Options} from 'amqplib';


class TopicProducer {
private connection: Connection;
private channel: Channel;
private exchange: string;
private exchangeOptions: Options.AssertExchange;

static create(connection) {
return new TopicProducer(connection);
}

constructor(connection: Connection) {
this.connection = connection;
}

setExchange(name: string, options: Options.AssertExchange) {
this.exchange = name;
this.exchangeOptions = options;
}

async start(amqpUrl) {
if (!this.connection) {
this.connection = await connect(amqpUrl);
}

this.channel = await this.connection.createChannel();
await this.channel.assertExchange('events', 'topic', {durable: true});

return this;
}

publish(topic, data) {
const serialised = JSON.stringify(data);
this.channel.publish(this.exchange, topic, new Buffer(serialised));
}

stop() {
this.connection.close();
}

}

export default TopicProducer;
12 changes: 12 additions & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"compilerOptions": {
"lib": [ "es2015" ],
"downlevelIteration": true,
"outDir": "./lib",
"allowJs": true,
"target": "es5"
},
"include": [
"./src/**/*"
]
}
24 changes: 24 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,26 @@
into-stream "^3.1.0"
lodash "^4.17.4"

"@types/amqplib@^0.5.7":
version "0.5.7"
resolved "https://registry.yarnpkg.com/@types/amqplib/-/amqplib-0.5.7.tgz#06e569f367546399b2bd96f393a9cc6c20eb794f"
dependencies:
"@types/bluebird" "*"
"@types/events" "*"
"@types/node" "*"

"@types/bluebird@*":
version "3.5.20"
resolved "https://registry.yarnpkg.com/@types/bluebird/-/bluebird-3.5.20.tgz#f6363172add6f4eabb8cada53ca9af2781e8d6a1"

"@types/events@*":
version "1.2.0"
resolved "https://registry.yarnpkg.com/@types/events/-/events-1.2.0.tgz#81a6731ce4df43619e5c8c945383b3e62a89ea86"

"@types/node@*":
version "10.0.3"
resolved "https://registry.yarnpkg.com/@types/node/-/node-10.0.3.tgz#1f89840c7aac2406cc43a2ecad98fc02a8e130e4"

JSONStream@^1.0.4:
version "1.3.2"
resolved "https://registry.yarnpkg.com/JSONStream/-/JSONStream-1.3.2.tgz#c102371b6ec3a7cf3b847ca00c20bb0fce4c6dea"
Expand Down Expand Up @@ -4156,6 +4176,10 @@ typedarray@^0.0.6:
version "0.0.6"
resolved "https://registry.yarnpkg.com/typedarray/-/typedarray-0.0.6.tgz#867ac74e3864187b1d3d47d996a78ec5c8830777"

typescript@^2.8.3:
version "2.8.3"
resolved "https://registry.yarnpkg.com/typescript/-/typescript-2.8.3.tgz#5d817f9b6f31bb871835f4edf0089f21abe6c170"

uglify-js@^2.6:
version "2.8.29"
resolved "https://registry.yarnpkg.com/uglify-js/-/uglify-js-2.8.29.tgz#29c5733148057bb4e1f75df35b7a9cb72e6a59dd"
Expand Down

0 comments on commit 76b3d64

Please sign in to comment.