Skip to content

Commit 101b85f

Browse files
committed
fix: Orphaned queries in Redis queue during intensive load
1 parent 31c5cc1 commit 101b85f

File tree

4 files changed

+81
-14
lines changed

4 files changed

+81
-14
lines changed

packages/cubejs-query-orchestrator/orchestrator/LocalQueueDriver.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ class LocalQueueDriverConnection {
164164
if (!this.processingLocks[key]) {
165165
this.processingLocks[key] = processingId;
166166
lockAcquired = true;
167+
} else {
168+
return null;
167169
}
168170
let added = 0;
169171
if (Object.keys(this.active).length < this.concurrency && !this.active[key]) {

packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,9 +217,14 @@ class QueryQueue {
217217
let processingLockAcquired;
218218
try {
219219
const processingId = await redisClient.getNextProcessingId();
220-
[insertedCount, removedCount, activeKeys, queueSize, query, processingLockAcquired] =
221-
await redisClient.retrieveForProcessing(queryKey, processingId);
222-
const activated = activeKeys.indexOf(this.redisHash(queryKey)) !== -1;
220+
const retrieveResult = await redisClient.retrieveForProcessing(queryKey, processingId);
221+
if (retrieveResult) {
222+
[insertedCount, removedCount, activeKeys, queueSize, query, processingLockAcquired] = retrieveResult;
223+
}
224+
const activated = activeKeys && activeKeys.indexOf(this.redisHash(queryKey)) !== -1;
225+
if (!query) {
226+
query = await redisClient.getQueryDef(this.redisHash(queryKey));
227+
}
223228
if (query && insertedCount && activated && processingLockAcquired) {
224229
let executionResult;
225230
const startQueryTime = (new Date()).getTime();

packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -134,17 +134,33 @@ class RedisQueueDriverConnection {
134134
}
135135

136136
async retrieveForProcessing(queryKey, processingId) {
137-
const [insertedCount, removedCount, activeKeys, queueSize, queryDef, processingLockAcquired] =
138-
await this.redisClient.multi()
139-
.zadd([this.activeRedisKey(), 'NX', processingId, this.redisHash(queryKey)])
140-
.zremrangebyrank([this.activeRedisKey(), this.concurrency, -1])
141-
.zrange([this.activeRedisKey(), 0, this.concurrency - 1])
142-
.zcard(this.toProcessRedisKey())
143-
.hget(([this.queriesDefKey(), this.redisHash(queryKey)]))
144-
.set(this.queryProcessingLockKey(queryKey), processingId, 'NX')
145-
.zadd([this.heartBeatRedisKey(), 'NX', new Date().getTime(), this.redisHash(queryKey)])
146-
.execAsync();
147-
return [insertedCount, removedCount, activeKeys, queueSize, JSON.parse(queryDef), processingLockAcquired];
137+
try {
138+
const lockKey = this.queryProcessingLockKey(queryKey);
139+
await this.redisClient.watchAsync(lockKey);
140+
141+
const currentProcessId = await this.redisClient.getAsync(lockKey);
142+
143+
if (currentProcessId) {
144+
return null;
145+
}
146+
147+
const result =
148+
await this.redisClient.multi()
149+
.zadd([this.activeRedisKey(), 'NX', processingId, this.redisHash(queryKey)])
150+
.zremrangebyrank([this.activeRedisKey(), this.concurrency, -1])
151+
.zrange([this.activeRedisKey(), 0, this.concurrency - 1])
152+
.zcard(this.toProcessRedisKey())
153+
.hget(([this.queriesDefKey(), this.redisHash(queryKey)]))
154+
.set(lockKey, processingId, 'NX')
155+
.zadd([this.heartBeatRedisKey(), 'NX', new Date().getTime(), this.redisHash(queryKey)])
156+
.execAsync();
157+
if (result) {
158+
result[4] = JSON.parse(result[4]);
159+
}
160+
return result;
161+
} finally {
162+
await this.redisClient.unwatchAsync();
163+
}
148164
}
149165

150166
async freeProcessingLock(queryKey, processingId, activated) {

packages/cubejs-query-orchestrator/test/QueryQueue.test.js

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,50 @@ const QueryQueueTest = (name, options) => {
198198
await queue.queueDriver.release(redisClient);
199199
await queue.queueDriver.release(redisClient2);
200200
});
201+
202+
test('activated but lock is not acquired', async () => {
203+
const redisClient = await queue.queueDriver.createConnection();
204+
const redisClient2 = await queue.queueDriver.createConnection();
205+
const priority = 10;
206+
const time = new Date().getTime();
207+
const keyScore = time + (10000 - priority) * 1E14;
208+
209+
await queue.reconcileQueue();
210+
211+
await redisClient.addToQueue(
212+
keyScore, 'activated1', time, 'handler', ['select'], priority, { stageQueryKey: 'race' }
213+
);
214+
215+
await redisClient.addToQueue(
216+
keyScore + 100, 'activated2', time + 100, 'handler2', ['select2'], priority, { stageQueryKey: 'race2' }
217+
);
218+
219+
const processingId1 = await redisClient.getNextProcessingId();
220+
const processingId2 = await redisClient.getNextProcessingId();
221+
const processingId3 = await redisClient.getNextProcessingId();
222+
223+
const retrieve1 = await redisClient.retrieveForProcessing('activated1', processingId1);
224+
console.log(retrieve1);
225+
const retrieve2 = await redisClient2.retrieveForProcessing('activated2', processingId2);
226+
console.log(retrieve2);
227+
console.log(await redisClient.freeProcessingLock('activated1', processingId1, retrieve1 && retrieve1[2].indexOf('activated1') !== -1));
228+
const retrieve3 = await redisClient.retrieveForProcessing('activated2', processingId3);
229+
console.log(retrieve3);
230+
console.log(await redisClient.freeProcessingLock('activated2', processingId3, retrieve3 && retrieve3[2].indexOf('activated2') !== -1));
231+
console.log(retrieve2[2].indexOf('activated2') !== -1);
232+
console.log(await redisClient2.freeProcessingLock('activated2', processingId2, retrieve2 && retrieve2[2].indexOf('activated2') !== -1));
233+
234+
const retrieve4 = await redisClient.retrieveForProcessing('activated2', await redisClient.getNextProcessingId());
235+
console.log(retrieve4);
236+
expect(retrieve4[0]).toBe(1);
237+
expect(!!retrieve4[5]).toBe(true);
238+
239+
console.log(await redisClient.getQueryAndRemove('activated1'));
240+
console.log(await redisClient.getQueryAndRemove('activated2'));
241+
242+
await queue.queueDriver.release(redisClient);
243+
await queue.queueDriver.release(redisClient2);
244+
});
201245
});
202246
};
203247

0 commit comments

Comments
 (0)