Lightweight RabbitMQ wrapper on amqplib with declarative topology, auto reconnect/resubscribe, health helpers, metrics/logging hooks, and ergonomic retries/dead-lettering. Defaults to amqp://localhost so you can start with zero config and layer on options only when you need them.
npm install @cryptoaaron/amqp-js
import { createClient } from '@cryptoaaron/amqp-js';
const client = createClient({
// url is optional; defaults to amqp://localhost
url: process.env.AMQP_URL
});
await client.publish('user.created', { id: '123', email: 'hi@example.com' });
await client.subscribe('user.created', async (payload, meta) => {
console.log('received', payload, meta.fields.routingKey);
});
await client.ready; // waits for connection + subscriptions
const client = createClient({
url: process.env.AMQP_URL,
topology: {
events: {
'user.created': { exchange: 'app.events', routingKey: 'user.created', queue: 'app.events.user.created' },
'user.deleted': { exchange: 'app.events', routingKey: 'user.deleted' }
}
}
});
await client.publish('user.created', { id: '123' }); // validated against registry
await client.subscribe({
event: 'user.created',
queue: 'app.events.user.created',
prefetch: 20,
maxAttempts: 5,
backoff: (attempt) => attempt * 250, // ms
deadLetterExchange: 'app.events.dlq',
before: (payload) => console.log('before', payload),
after: (payload) => console.log('after', payload),
handler: async (payload) => {
// your work here
}
});
const client = createClient({
url: process.env.AMQP_URL,
logger: ({ level, message, context }) => console.log(level, message, context),
metrics: ({ type, totals }) => console.log(type, totals)
});
-
createClient(options)/new AmqpClient(options)url(optional): AMQP connection string (defaults toamqp://localhost; no env loading done internally)name: connection name (defaultamqp-js)topology.events: map{ [eventName]: { exchange, routingKey, queue?, exchangeType?, queueOptions?, exchangeOptions?, prefetch?, deadLetterExchange?, deadLetterRoutingKey?, deadLetterExchangeType?, validate?, headers? } }defaultExchange: defaults toamqp-js.events(topic)defaultQueueOptions: defaults to{ durable: true }prefetch: default consumer prefetch (10)publishDefaults:{ persistent?: boolean }(defaults to true)reconnect:{ initialDelay?, maxDelay?, factor? }(defaults 500/5000/2)logger(entry)andmetrics(metric)hooksonError(err): error sink for handler/connection errors
-
publish(eventOrOptions, payload?)- If
eventOrOptionsis a string, uses topology entry (exchange,routingKey,validate) and sendspayload. - If object, you can pass
event,exchange,routingKey,message/payload,headers,serializeJson,persistent,contentType,validate.
- If
-
subscribe(eventOrOptions, handler?)- If string, uses topology; otherwise options:
event,queue,exchange,routingKey,prefetch,queueOptions,parseJson(default true),maxAttempts(default 3),backoff(attempt)(defaultMath.min(1000*attempt, 5000)),requeueOnError(default false),deadLetterExchange,deadLetterRoutingKey,deadLetterExchangeType,validate(payload),before/afterhooks (single or array),handler. - Returns
{ id, cancel }.
- If string, uses topology; otherwise options:
-
ready: promise that resolves after connection + topology + subscriptions are ready (resets on reconnect). -
ping(): returns true/false by opening/closing a lightweight channel. -
isConnected(): best-effort connectivity check. -
getMetrics(): counters forpublish/consume/ack/nack/reconnect. -
close(): cancels subscriptions, closes channels/connection.
- URL:
amqp://localhostif you omiturl. - Exchange: durable
topicnamedamqp-js.events. - Queue naming:
<name>.<event>.queue(e.g.,amqp-js.user.created.queue) whenqueueis not specified. - Messages: JSON-encoded objects, persistent by default.
- Prefetch: 10 per subscription.
Type declarations are shipped via src/index.d.ts; imports work with ESM:
import { createClient, AmqpClient } from '@cryptoaaron/amqp-js';
npm test # unit-only (no broker needed)
AMQP_URL=... RUN_AMQP_INTEGRATION=true npm test # runs live round-trip
MIT