Skip to content

Commit a2eb9b2

Browse files
committed
fix: Redis query queue locking redesign
Usage of time as sorting active queries list leads to random flipping of active keys. Such behavior leads to plural execution of single query in queue under load. Replace active keys time sorting with unique `processingId` locks for queries. Fixes #459
1 parent 5634b62 commit a2eb9b2

File tree

7 files changed

+291
-143
lines changed

7 files changed

+291
-143
lines changed

packages/cubejs-query-orchestrator/orchestrator/LocalQueueDriver.js

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ class LocalQueueDriverConnection {
1515
this.toProcess = driver.toProcess;
1616
this.recent = driver.recent;
1717
this.active = driver.active;
18+
this.heartBeat = driver.heartBeat;
19+
this.processingCounter = driver.processingCounter;
20+
this.processingLocks = driver.processingLocks;
1821
}
1922

2023
getResultPromise(resultListKey) {
@@ -30,12 +33,16 @@ class LocalQueueDriverConnection {
3033

3134
async getResultBlocking(queryKey) {
3235
const resultListKey = this.resultListKey(queryKey);
36+
if (!this.queryDef[this.redisHash(queryKey)] && !this.resultPromises[resultListKey]) {
37+
return null;
38+
}
3339
const timeoutPromise = (timeout) => new Promise((resolve) => setTimeout(() => resolve(null), timeout));
3440

3541
const res = await Promise.race([
3642
this.getResultPromise(resultListKey),
3743
timeoutPromise(this.continueWaitTimeout * 1000),
3844
]);
45+
3946
if (res) {
4047
delete this.resultPromises[resultListKey];
4148
}
@@ -98,36 +105,41 @@ class LocalQueueDriverConnection {
98105
const key = this.redisHash(queryKey);
99106
const query = this.queryDef[key];
100107
delete this.active[key];
108+
delete this.heartBeat[key];
101109
delete this.toProcess[key];
102110
delete this.recent[key];
103111
delete this.queryDef[key];
112+
delete this.processingLocks[key];
104113
return [query];
105114
}
106115

107-
setResultAndRemoveQuery(queryKey, executionResult) {
116+
async setResultAndRemoveQuery(queryKey, executionResult, processingId) {
108117
const key = this.redisHash(queryKey);
118+
if (this.processingLocks[key] !== processingId) {
119+
return false;
120+
}
109121
const promise = this.getResultPromise(this.resultListKey(queryKey));
110122
delete this.active[key];
123+
delete this.heartBeat[key];
111124
delete this.toProcess[key];
112125
delete this.recent[key];
113126
delete this.queryDef[key];
127+
delete this.processingLocks[key];
114128
promise.resolve(executionResult);
129+
return true;
115130
}
116131

117-
removeQuery(queryKey) {
118-
const key = this.redisHash(queryKey);
119-
delete this.active[key];
120-
delete this.toProcess[key];
121-
delete this.recent[key];
122-
delete this.queryDef[key];
132+
getNextProcessingId() {
133+
this.processingCounter.counter = this.processingCounter.counter ? this.processingCounter.counter + 1 : 1;
134+
return this.processingCounter.counter;
123135
}
124136

125137
getOrphanedQueries() {
126138
return this.queueArray(this.recent, new Date().getTime() - this.orphanedTimeout * 1000);
127139
}
128140

129141
getStalledQueries() {
130-
return this.queueArray(this.active, new Date().getTime() - this.heartBeatTimeout * 1000);
142+
return this.queueArray(this.heartBeat, new Date().getTime() - this.heartBeatTimeout * 1000);
131143
}
132144

133145
async getQueryStageState(onlyKeys) {
@@ -140,24 +152,43 @@ class LocalQueueDriverConnection {
140152

141153
updateHeartBeat(queryKey) {
142154
const key = this.redisHash(queryKey);
143-
if (this.active[key]) {
144-
this.active[key] = { key, order: new Date().getTime() };
155+
if (this.heartBeat[key]) {
156+
this.heartBeat[key] = { key, order: new Date().getTime() };
145157
}
146158
}
147159

148-
retrieveForProcessing(queryKey) {
160+
retrieveForProcessing(queryKey, processingId) {
149161
const key = this.redisHash(queryKey);
162+
let lockAcquired = false;
163+
if (!this.processingLocks[key]) {
164+
this.processingLocks[key] = processingId;
165+
lockAcquired = true;
166+
}
150167
let added = 0;
151168
if (Object.keys(this.active).length < this.concurrency && !this.active[key]) {
152-
this.active[key] = { key, order: new Date().getTime() };
169+
this.active[key] = { key, order: processingId };
153170
added = 1;
154171
}
155-
return [added, null, this.queueArray(this.active), Object.keys(this.toProcess).length]; // TODO nulls
172+
this.heartBeat[key] = { key, order: new Date().getTime() };
173+
return [
174+
added, null, this.queueArray(this.active), Object.keys(this.toProcess).length, this.queryDef[key], lockAcquired
175+
]; // TODO nulls
156176
}
157177

158-
async optimisticQueryUpdate(queryKey, toUpdate) {
178+
freeProcessingLock(queryKey, processingId) {
159179
const key = this.redisHash(queryKey);
180+
if (this.processingLocks[key] === processingId) {
181+
delete this.processingLocks[key];
182+
}
183+
}
184+
185+
async optimisticQueryUpdate(queryKey, toUpdate, processingId) {
186+
const key = this.redisHash(queryKey);
187+
if (this.processingLocks[key] !== processingId) {
188+
return false;
189+
}
160190
this.queryDef[key] = { ...this.queryDef[key], ...toUpdate };
191+
return true;
161192
}
162193

163194
release() {
@@ -182,6 +213,9 @@ const queryDef = {};
182213
const toProcess = {};
183214
const recent = {};
184215
const active = {};
216+
const heartBeat = {};
217+
const processingCounters = {};
218+
const processingLocks = {};
185219

186220
class LocalQueueDriver extends BaseQueueDriver {
187221
constructor(options) {
@@ -193,12 +227,18 @@ class LocalQueueDriver extends BaseQueueDriver {
193227
toProcess[options.redisQueuePrefix] = toProcess[options.redisQueuePrefix] || {};
194228
recent[options.redisQueuePrefix] = recent[options.redisQueuePrefix] || {};
195229
active[options.redisQueuePrefix] = active[options.redisQueuePrefix] || {};
230+
heartBeat[options.redisQueuePrefix] = heartBeat[options.redisQueuePrefix] || {};
231+
processingCounters[options.redisQueuePrefix] = processingCounters[options.redisQueuePrefix] || {};
232+
processingLocks[options.redisQueuePrefix] = processingLocks[options.redisQueuePrefix] || {};
196233
this.results = results[options.redisQueuePrefix];
197234
this.resultPromises = resultPromises[options.redisQueuePrefix];
198235
this.queryDef = queryDef[options.redisQueuePrefix];
199236
this.toProcess = toProcess[options.redisQueuePrefix];
200237
this.recent = recent[options.redisQueuePrefix];
201238
this.active = active[options.redisQueuePrefix];
239+
this.heartBeat = heartBeat[options.redisQueuePrefix];
240+
this.processingCounter = processingCounters[options.redisQueuePrefix];
241+
this.processingLocks = processingLocks[options.redisQueuePrefix];
202242
}
203243

204244
createConnection() {

packages/cubejs-query-orchestrator/orchestrator/QueryOrchestrator.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@ class QueryOrchestrator {
99
this.redisPrefix = redisPrefix;
1010
this.driverFactory = driverFactory;
1111
this.logger = logger;
12-
const redisPool = new RedisPool();
1312
const { externalDriverFactory } = options;
1413
const cacheAndQueueDriver = options.cacheAndQueueDriver || process.env.CUBEJS_CACHE_AND_QUEUE_DRIVER || (
1514
process.env.NODE_ENV === 'production' || process.env.REDIS_URL ? 'redis' : 'memory'
1615
);
1716
if (cacheAndQueueDriver !== 'redis' && cacheAndQueueDriver !== 'memory') {
1817
throw new Error(`Only 'redis' or 'memory' are supported for cacheAndQueueDriver option`);
1918
}
19+
const redisPool = cacheAndQueueDriver === 'redis' ? new RedisPool() : undefined;
2020

2121
this.redisPool = redisPool;
2222
this.queryCache = new QueryCache(

0 commit comments

Comments
 (0)