Skip to content

Commit

Permalink
fix: fixed topic actors to pass tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ernest-okot committed May 12, 2018
1 parent b1a9568 commit 3c8653f
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 96 deletions.
137 changes: 72 additions & 65 deletions src/topic/consumer.ts
Original file line number Diff line number Diff line change
@@ -1,92 +1,99 @@
import {connect, Channel, Connection, Options} from 'amqplib';
import {Channel, connect, Connection, Options} from 'amqplib';

class TopicConsumer {
private connection: Connection;
private channel: Channel;
private exchange: string;
private exchangeOptions: Options.AssertExchange;
private queue: string;
private queueOptions: Options.AssertQueue;
private topics: string[];
private topicFunctions = {};

static create(exchange, queue) {
return new TopicConsumer(exchange, queue);
}
public static create(exchange?: string, queue?: string) {
return new TopicConsumer(exchange, queue);
}

constructor(exchange = null, queue = '') {
this.exchange = exchange;
this.queue = queue;
}
public exchange: string;
public exchangeOptions: Options.AssertExchange;
public queue: string;
public queueOptions: Options.AssertQueue;
public topics: string[];
public topicCallbacks: {[topicGlob: string]: (topic: string, payload: any) => any};
private connection: Connection;
private channel: Channel;

setExchange(exchange: string, options: Options.AssertExchange) {
this.exchange = exchange;
this.exchangeOptions = options;
constructor(exchange?: string, queue?: string) {
this.exchange = exchange;
this.queue = queue;
}

return this;
}
public setExchange(exchange: string, options?: Options.AssertExchange) {
this.exchange = exchange;
this.exchangeOptions = options;

setQueue(queue: string, options: Options.AssertQueue) {
this.queue = queue;
this.queueOptions = options;
return this;
}

return this;
}
public setQueue(queue: string, options?: Options.AssertQueue) {
this.queue = queue;
this.queueOptions = options;

subscribe(topic: string, callback: any) {
this.topics = [...this.topics, topic];
this.topicFunctions[topic] = callback;
return this;
}

return this;
}
public subscribe(topic: string, callback: any) {
this.topics = [...this.topics || [], topic];
this.topicCallbacks = {
...this.topicCallbacks || {},
[topic]: callback,
};

async start(amqpUrl: string) {
this.connection = await connect(amqpUrl);
return this;
}

this.channel = await this.connection.createChannel();
public async start(amqpUrl: string) {
this.connection = await connect(amqpUrl);

await this.channel.assertExchange(this.exchange, 'topic', this.exchangeOptions);
this.channel = await this.connection.createChannel();

const q = await this.channel.assertQueue(this.queue, this.queueOptions);
await this.channel.assertExchange(this.exchange, 'topic', this.exchangeOptions);

const uniqueTopics = [...new Set(this.topics)];
const q = await this.channel.assertQueue(this.queue, this.queueOptions);

await Promise.all(
uniqueTopics.map(async topic =>
await this.channel.bindQueue(q.queue, this.exchange, topic)
)
);
const uniqueTopics = [...new Set(this.topics)];

return this.channel.consume(q.queue, async msg => {
const topic = msg.fields.routingKey;
await Promise.all(
uniqueTopics.map(async (topic) =>
await this.channel.bindQueue(q.queue, this.exchange, topic),
),
);

const [key] = Object.keys(this.topicFunctions).filter(t => {
const regexString = t.replace(/\*/g, '[^.]+').replace(/#/g, '.*');
const regex = new RegExp('^' + regexString + '$');
return this.channel.consume(q.queue, async (msg) => {
const topic = msg.fields.routingKey;

return topic.match(regex);
});
const [key] = this.subscribers(topic);

const callback = this.topicFunctions[key];
const callback = this.topicCallbacks[key];

if (!callback) {
this.channel.ack(msg);
return;
}
if (!callback) {
this.channel.ack(msg);
return;
}

try {
await callback(topic, JSON.parse(msg.content.toString()));
this.channel.ack(msg);
} catch (error) {
this.channel.nack(msg)
}
try {
await callback(topic, JSON.parse(msg.content.toString()));
this.channel.ack(msg);
} catch (error) {
this.channel.nack(msg);
}

}, {noAck: false,});
}
}, {noAck: false});
}

stop() {
this.connection.close();
}
public stop() {
this.connection.close();
}

public subscribers(topic) {
return Object.keys(this.topicCallbacks)
.filter((topicGlob) => {
const regexString = topicGlob.replace(/\*/g, '[^.]+').replace(/#/g, '.*');
return topic.match('^' + regexString + '$');
});
}

}

Expand Down
70 changes: 39 additions & 31 deletions src/topic/producer.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,53 @@
import {connect, Connection, Channel, Options} from 'amqplib';

import {Channel, connect, Connection, Options} from 'amqplib';

class TopicProducer {
private connection: Connection;
private channel: Channel;
private exchange: string;
private exchangeOptions: Options.AssertExchange;
public static EXCHANGE_TYPE = 'topic';

static create(connection) {
return new TopicProducer(connection);
}
public static create(exchange?: string) {
return new TopicProducer(exchange);
}

constructor(connection: Connection) {
this.connection = connection;
}
public exchange: string;
public exchangeOptions: Options.AssertExchange;
private connection: Connection;
private channel: Channel;

constructor(exchange?: string) {
this.exchange = exchange;
}

setExchange(name: string, options: Options.AssertExchange) {
this.exchange = name;
this.exchangeOptions = options;
public setExchange(name: string, options?: Options.AssertExchange) {
this.exchange = name;
this.exchangeOptions = options;
}

public async start(url: string) {
if (!this.connection) {
this.connection = await connect(url);
}

async start(amqpUrl) {
if (!this.connection) {
this.connection = await connect(amqpUrl);
}
this.channel = await this.connection.createChannel();

this.channel = await this.connection.createChannel();
await this.channel.assertExchange('events', 'topic', {durable: true});
await this.channel.assertExchange(
this.exchange,
TopicProducer.EXCHANGE_TYPE,
{
durable: true,
},
);

return this;
}
return this;
}

publish(topic, data) {
const serialised = JSON.stringify(data);
this.channel.publish(this.exchange, topic, new Buffer(serialised));
}
public publish(topic, data) {
const content = new Buffer(JSON.stringify(data));
this.channel.publish(this.exchange, topic, content);
}

stop() {
this.connection.close();
}
public async stop() {
return await this.connection.close();
}

}

export default TopicProducer;
export default TopicProducer;

0 comments on commit 3c8653f

Please sign in to comment.