Skip to content

Commit

Permalink
feat: add queue discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
GabrielCastro committed Aug 20, 2019
1 parent 41b0aac commit 1afb598
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 30 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ RUN node_modules/.bin/tsc -p .
RUN yarn install --pure-lockfile --production

FROM node:10-alpine
RUN apk --no-cache add tini
RUN apk --no-cache add tini bash
ENTRYPOINT ["/sbin/tini", "--"]

RUN mkdir -p /src
Expand Down
18 changes: 12 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ To learn more about how to setup promethues and grafana see: https://eksworkshop
The dashboard pictured above is [available to download from grafana](https://grafana.com/grafana/dashboards/10128).
It will work aslong as EXPORTER_STAT_PREFIX is not changed.

## Queue Discovery
Queues are discovered at start up by running `KEYS bull:*:id`
this can also be triggered manually from the `/discover_queues` endpoint
`curl -xPOST localhost:9538/discover_queues`

## Metrics

| Metric | type | description |
Expand All @@ -47,12 +52,13 @@ It will work aslong as EXPORTER_STAT_PREFIX is not changed.

### Environment variables for default docker image

| variable | default | description |
|----------------------|--------------------------|--------------------------------------------|
| EXPORTER_REDIS_URL | redis://localhost:6379/0 | Redis uri to connect |
| EXPORTER_PREFIX | bull | prefix for queues |
| EXPORTER_STAT_PREFIX | bull_queue_ | prefix for exported metrics |
| EXPORTER_QUEUES | - | a space separated list of queues to check |
| variable | default | description |
|-----------------------|--------------------------|-------------------------------------------------|
| EXPORTER_REDIS_URL | redis://localhost:6379/0 | Redis uri to connect |
| EXPORTER_PREFIX | bull | prefix for queues |
| EXPORTER_STAT_PREFIX | bull_queue_ | prefix for exported metrics |
| EXPORTER_QUEUES | - | a space separated list of queues to check |
| EXPORTER_AUTODISCOVER | - | set to '0' or 'false' to disable queue discovery|


### Example deployment
Expand Down
11 changes: 10 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,16 @@ import { startServer } from './server';
// tslint:disable:no-console

export async function printOnce(opts: Options): Promise<void> {
const collector = new MetricCollector(opts.metricPrefix, opts._, { redis: opts.url, prefix: opts.prefix });
const collector = new MetricCollector(opts._, {
logger,
metricPrefix: opts.metricPrefix,
redis: opts.url,
prefix: opts.prefix,
autoDiscover: opts.autoDiscover,
});
if (opts.autoDiscover) {
await collector.discoverAll();
}
await collector.updateAll();
await collector.close();

Expand Down
74 changes: 57 additions & 17 deletions src/metricCollector.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import bull from 'bull';
import * as Logger from 'bunyan';
import { EventEmitter } from 'events';
import { Redis } from 'ioredis';
import IoRedis from 'ioredis';
import { register as globalRegister, Registry } from 'prom-client';

import { logger as globalLogger } from './logger';
import { getJobCompleteStats, getStats, makeGuages, QueueGauges } from './queueGauges';

export interface BullOptions extends Pick<bull.QueueOptions, Exclude<keyof bull.QueueOptions, 'redis'>> {
redis?: string | bull.QueueOptions['redis'];
export interface MetricCollectorOptions extends Omit<bull.QueueOptions, 'redis'> {
metricPrefix: string;
redis: string;
autoDiscover: boolean;
logger: Logger;
}

export interface QueueData<T = unknown> {
Expand All @@ -21,27 +24,66 @@ export class MetricCollector {

private readonly logger: Logger;

private readonly queues: QueueData<unknown>[];
private readonly defaultRedisClient: IoRedis.Redis;
private readonly redisUri: string;
private readonly bullOpts: Omit<bull.QueueOptions, 'redis'>;
private readonly queuesByName: Map<string, QueueData<unknown>> = new Map();

private get queues(): QueueData<unknown>[] {
return [...this.queuesByName.values()];
}

private readonly myListeners: Set<(id: string) => Promise<void>> = new Set();

private readonly guages: QueueGauges;

constructor(
statPrefix: string,
queueNames: string[],
opts: BullOptions & { logger?: Logger },
opts: MetricCollectorOptions,
registers: Registry[] = [globalRegister],
) {
const { logger, ...bullOpts } = opts;
const { logger, autoDiscover, redis, metricPrefix, ...bullOpts } = opts;
this.redisUri = redis;
this.defaultRedisClient = new IoRedis(this.redisUri);
this.defaultRedisClient.setMaxListeners(32);
this.bullOpts = bullOpts;
this.logger = logger || globalLogger;
this.queues = queueNames.map(name => ({
name,
queue: new bull(name, bullOpts as bull.QueueOptions),
prefix: opts.prefix || 'bull',
}));
this.addToQueueSet(queueNames);
this.guages = makeGuages(metricPrefix, registers);
}

private createClient(_type: 'client' | 'subscriber' | 'bclient', redisOpts?: IoRedis.RedisOptions): IoRedis.Redis {
if (_type === 'client') {
return this.defaultRedisClient!;
}
return new IoRedis(this.redisUri, redisOpts);
}

private addToQueueSet(names: string[]): void {
for (const name of names) {
if (this.queuesByName.has(name)) {
continue;
}
this.logger.info('added queue', name);
this.queuesByName.set(name, {
name,
queue: new bull(name, {
...this.bullOpts,
createClient: this.createClient.bind(this),
}),
prefix: this.bullOpts.prefix || 'bull',
});
}
}

this.guages = makeGuages(statPrefix, registers);
public async discoverAll(): Promise<void> {
this.logger.info('running queue discovery');
let keys = await this.defaultRedisClient.keys(`${this.bullOpts.prefix}:*:id`);
keys = keys.map(k => k
.replace(new RegExp(`^${this.bullOpts.prefix}:`), '')
.replace(/:id$/, ''),
);
this.addToQueueSet(keys);
}

private async onJobComplete(queue: QueueData, id: string): Promise<void> {
Expand Down Expand Up @@ -71,13 +113,11 @@ export class MetricCollector {
}

public async ping(): Promise<void> {
await Promise.all(this.queues.map(async q => {
const client: Redis = (q.queue as any).client;
await client.ping();
}));
await this.defaultRedisClient.ping();
}

public async close(): Promise<void> {
this.defaultRedisClient.disconnect();
for (const q of this.queues) {
for (const l of this.myListeners) {
(q.queue as any as EventEmitter).removeListener('global:completed', l);
Expand Down
7 changes: 7 additions & 0 deletions src/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export interface Options {
once: boolean;
port: number;
bindAddress: string;
autoDiscover: boolean;
_: string[];
}

Expand Down Expand Up @@ -37,11 +38,17 @@ export function getOptionsFromArgs(...args: string[]): Options {
once: {
alias: 'n',
default: false,
type: 'boolean',
description: 'Print stats and exit without starting a server',
},
port: {
default: 9538,
},
autoDiscover: {
default: false,
alias: 'a',
type: 'boolean',
},
bindAddress: {
alias: 'b',
description: 'Address to listen on',
Expand Down
29 changes: 24 additions & 5 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ function calcDuration(start: [number, number]): number {
return diff[0] * 1e3 + diff[1] * 1e-6;
}

export function makeServer(opts: Options): express.Application {
export async function makeServer(opts: Options): Promise<express.Application> {
const app = express();
app.disable('x-powered-by');

Expand All @@ -27,7 +27,6 @@ export function makeServer(opts: Options): express.Application {
});

app.use((req: express.Request, res: express.Response, next: express.NextFunction) => {

const start = process.hrtime();
const id = uuid();
const reqLog = logger.child({
Expand Down Expand Up @@ -56,17 +55,37 @@ export function makeServer(opts: Options): express.Application {

});

const collector = new MetricCollector(opts.metricPrefix, opts._, { redis: opts.url, prefix: opts.prefix });
const collector = new MetricCollector(opts._, {
logger,
metricPrefix: opts.metricPrefix,
redis: opts.url,
prefix: opts.prefix,
autoDiscover: opts.autoDiscover,
});
collector.collectJobCompletions();

if (opts.autoDiscover) {
await collector.discoverAll();
}

app.post('/discover_queues', (_req: express.Request, res: express.Response, next: express.NextFunction) => {
collector.discoverAll()
.then(() => {
res.send({
ok: true,
});
})
.catch((err: any) => next(err));
});

app.get('/healthz', (_req: express.Request, res: express.Response, next: express.NextFunction) => {
collector.ping()
.then(() => {
res.send({
ok: true,
});
})
.catch(err => next(err));
.catch((err: any) => next(err));
});

app.get('/metrics', (_req: express.Request, res: express.Response, next: express.NextFunction) => {
Expand All @@ -89,7 +108,7 @@ export function makeServer(opts: Options): express.Application {
}

export async function startServer(opts: Options): Promise<{ done: Promise<void> }> {
const app = makeServer(opts);
const app = await makeServer(opts);

let server: http.Server;
await new Promise((resolve, reject) => {
Expand Down

0 comments on commit 1afb598

Please sign in to comment.