/
Bossbat.js
141 lines (117 loc) · 4.16 KB
/
Bossbat.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import Redlock from 'redlock';
import Redis from 'ioredis';
import timestring from 'timestring';
import { compose } from 'throwback';
import { parseExpression } from 'cron-parser';
// We timeout jobs after 2 seconds:
const JOB_TTL = 2000;
const JOB_PREFIX = 'bossbat';
export default class Bossbat {
constructor({ connection, prefix = JOB_PREFIX, ttl = JOB_TTL, tz, disableRedisConfig } = {}) {
const DB_NUMBER = (connection && connection.db) || 0;
this.prefix = prefix;
this.ttl = ttl;
this.tz = tz;
this.client = new Redis(connection);
this.subscriber = new Redis(connection);
this.redlock = new Redlock([this.client], { retryCount: 0 });
this.jobs = {};
this.qas = [];
if (!disableRedisConfig) {
this.subscriber.config('SET', 'notify-keyspace-events', 'Ex');
}
// Subscribe to expiring keys on the jobs DB:
this.subscriber.subscribe(`__keyevent@${DB_NUMBER}__:expired`);
this.subscriber.on('message', (channel, message) => {
// Check to make sure that the message is a job run request:
if (!message.startsWith(`${this.prefix}:work:`)) return;
const jobName = message.startsWith(`${this.prefix}:work:demand:`)
? message.replace(`${this.prefix}:work:demand:`, '')
: message.replace(`${this.prefix}:work:`, '');
if (this.jobs[jobName]) {
// Attempt to perform the job. Only one worker will end up getting assigned
// the job thanks to distributed locking via redlock.
this.doWork(jobName);
// Schedule the next run. We do this in every instance because it's
// just a simple set command, and is okay to run on top of eachother.
if (this.jobs[jobName].every || this.jobs[jobName].cron) {
this.scheduleRun(jobName, this.jobs[jobName]);
}
}
});
}
quit() {
this.jobs = {};
return Promise.all([
this.subscriber.quit(),
this.client.quit(),
]);
}
hire(name, definition) {
this.jobs[name] = definition;
if (definition.every || definition.cron) {
this.scheduleRun(name, definition);
}
}
fire(name) {
return this.client.del(this.getJobKey(name));
}
qa(fn) {
this.qas.push(fn);
}
demand(name) {
this.scheduleRun(name);
}
// Semi-privates:
getJobKey(name) {
return `${this.prefix}:work:${name}`;
}
getDemandKey(name) {
return `${this.prefix}:work:demand:${name}`;
}
getLockKey(name) {
return `${this.prefix}:lock:${name}`;
}
doWork(name) {
this.redlock.lock(this.getLockKey(name), this.ttl).then((lock) => {
const fn = compose(this.qas);
// Call the QA functions, then finally the job function. We use a copy of
// the job definition to prevent pollution between scheduled runs.
const response = fn(name, { ...this.jobs[name] }, (_, definition) => (
definition.work()
));
const end = () => { lock.unlock(); };
response.then(end, end);
}, () => (
// If we fail to get a lock, that means another instance already processed the job.
// We just ignore these cases:
null
));
}
scheduleRun(name, definition) {
// If there's no definition passed, it's a demand, let's schedule as tight as we can:
if (!definition) {
return this.client.set(this.getDemandKey(name), name, 'PX', 1, 'NX');
}
let timeout;
if (definition.every) {
const typeOfEvery = typeof definition.every;
if (typeOfEvery === 'string') {
// Passed a human interval:
timeout = timestring(definition.every, 'ms');
} else if (typeOfEvery === 'number') {
// Passed a ms interval:
timeout = definition.every;
} else {
throw new Error(`Unknown interval of type "${typeOfEvery}" passed to hire.`);
}
} else if (definition.cron) {
const options = { iterator: false, tz: this.tz };
const iterator = parseExpression(definition.cron, options);
const nextCronTimeout = () => iterator.next().getTime() - Date.now();
const cronTimeout = nextCronTimeout();
timeout = cronTimeout > 0 ? cronTimeout : nextCronTimeout();
}
return this.client.set(this.getJobKey(name), name, 'PX', timeout, 'NX');
}
}