Skip to content

Commit eca3d0c

Browse files
committed
fix: Dead queries added to queue in serverless
1 parent e242806 commit eca3d0c

File tree

4 files changed

+21
-6
lines changed

4 files changed

+21
-6
lines changed

packages/cubejs-query-orchestrator/orchestrator/LocalQueueDriver.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,13 @@ class LocalQueueDriverConnection {
176176
]; // TODO nulls
177177
}
178178

179-
freeProcessingLock(queryKey, processingId) {
179+
freeProcessingLock(queryKey, processingId, activated) {
180180
const key = this.redisHash(queryKey);
181181
if (this.processingLocks[key] === processingId) {
182182
delete this.processingLocks[key];
183+
if (activated) {
184+
delete this.active[key];
185+
}
183186
}
184187
}
185188

packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,8 @@ class QueryQueue {
219219
const processingId = await redisClient.getNextProcessingId();
220220
[insertedCount, removedCount, activeKeys, queueSize, query, processingLockAcquired] =
221221
await redisClient.retrieveForProcessing(queryKey, processingId);
222-
if (query && insertedCount && activeKeys.indexOf(this.redisHash(queryKey)) !== -1 && processingLockAcquired) {
222+
const activated = activeKeys.indexOf(this.redisHash(queryKey)) !== -1;
223+
if (query && insertedCount && activated && processingLockAcquired) {
223224
let executionResult;
224225
const startQueryTime = (new Date()).getTime();
225226
const timeInQueue = (new Date()).getTime() - query.addedToQueueTime;
@@ -319,7 +320,7 @@ class QueryQueue {
319320
insertedCount,
320321
activeKeys
321322
});
322-
await redisClient.freeProcessingLock(queryKey, processingId);
323+
await redisClient.freeProcessingLock(queryKey, processingId, activated);
323324
}
324325
} catch (e) {
325326
this.logger('Queue storage error', {

packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,13 +143,17 @@ class RedisQueueDriverConnection {
143143
return [insertedCount, removedCount, activeKeys, queueSize, JSON.parse(queryDef), processingLockAcquired];
144144
}
145145

146-
async freeProcessingLock(queryKey, processingId) {
146+
async freeProcessingLock(queryKey, processingId, activated) {
147147
const lockKey = this.queryProcessingLockKey(queryKey);
148148
await this.redisClient.watchAsync(lockKey);
149149
const currentProcessId = await this.redisClient.getAsync(lockKey);
150150
if (currentProcessId === processingId) {
151-
await this.redisClient.multi()
152-
.del(lockKey)
151+
let removeCommand = this.redisClient.multi()
152+
.del(lockKey);
153+
if (activated) {
154+
removeCommand = removeCommand.zrem([this.activeRedisKey(), this.redisHash(queryKey)]);
155+
}
156+
await removeCommand
153157
.execAsync();
154158
}
155159
}

packages/cubejs-query-orchestrator/test/QueryQueue.test.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,13 @@ const QueryQueueTest = (name, options) => {
136136
expect(cancelledQuery).toBe('114');
137137
await queue.executeInQueue('delay', `114`, { delay: 50, result: '4' }, 0);
138138
});
139+
140+
test('removed before reconciled', async () => {
141+
const query = ['select * from'];
142+
await queue.processQuery(query);
143+
const result = await queue.executeInQueue('foo', query, query);
144+
expect(result).toBe('select * from bar');
145+
});
139146
});
140147
};
141148

0 commit comments

Comments
 (0)