Skip to content

Commit 1fdd8e9

Browse files
committed
feat: Enhanced trace logging
1 parent 2a586d2 commit 1fdd8e9

File tree

3 files changed

+49
-12
lines changed

3 files changed

+49
-12
lines changed

packages/cubejs-query-orchestrator/orchestrator/LocalQueueDriver.js

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,13 @@ class LocalQueueDriverConnection {
6161

6262
addToQueue(keyScore, queryKey, time, queryHandler, query, priority, options) {
6363
const queryQueueObj = {
64-
queryHandler, query, queryKey, stageQueryKey: options.stageQueryKey, priority, requestId: options.requestId
64+
queryHandler,
65+
query,
66+
queryKey,
67+
stageQueryKey: options.stageQueryKey,
68+
priority,
69+
requestId: options.requestId,
70+
addedToQueueTime: new Date().getTime()
6571
};
6672
const key = this.redisHash(queryKey);
6773
if (!this.queryDef[key]) {
@@ -124,8 +130,8 @@ class LocalQueueDriverConnection {
124130
return this.queueArray(this.active, new Date().getTime() - this.heartBeatTimeout * 1000);
125131
}
126132

127-
async getQueryStageState() {
128-
return [this.queueArray(this.active), this.queueArray(this.toProcess), R.clone(this.queryDef)];
133+
async getQueryStageState(onlyKeys) {
134+
return [this.queueArray(this.active), this.queueArray(this.toProcess), onlyKeys ? {} : R.clone(this.queryDef)];
129135
}
130136

131137
async getQueryDef(queryKey) {

packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,25 @@ class QueryQueue {
5858
queueSize,
5959
queryKey,
6060
queuePrefix: this.redisQueuePrefix,
61-
requestId: options.requestId
61+
requestId: options.requestId,
6262
});
6363
}
6464

6565
await this.reconcileQueue(redisClient);
66+
67+
const queryDef = await redisClient.getQueryDef(queryKey);
68+
const [active, toProcess] = await redisClient.getQueryStageState(true);
69+
70+
this.logger('Waiting for query', {
71+
queueSize,
72+
queryKey: queryDef.queryKey,
73+
queuePrefix: this.redisQueuePrefix,
74+
requestId: options.requestId,
75+
active: active.indexOf(redisClient.redisHash(queryKey)) !== -1,
76+
queueIndex: toProcess.indexOf(redisClient.redisHash(queryKey)),
77+
waitingForRequestId: queryDef.requestId
78+
});
79+
6680
result = await redisClient.getResultBlocking(queryKey);
6781
if (!result) {
6882
throw new ContinueWaitError();
@@ -178,11 +192,13 @@ class QueryQueue {
178192
if (query) {
179193
let executionResult;
180194
const startQueryTime = (new Date()).getTime();
195+
const timeInQueue = (new Date()).getTime() - query.addedToQueueTime;
181196
this.logger('Performing query', {
182197
queueSize,
183198
queryKey: query.queryKey,
184199
queuePrefix: this.redisQueuePrefix,
185-
requestId: query.requestId
200+
requestId: query.requestId,
201+
timeInQueue
186202
});
187203
await redisClient.optimisticQueryUpdate(queryKey, { startQueryTime });
188204

@@ -216,7 +232,8 @@ class QueryQueue {
216232
duration: ((new Date()).getTime() - startQueryTime),
217233
queryKey: query.queryKey,
218234
queuePrefix: this.redisQueuePrefix,
219-
requestId: query.requestId
235+
requestId: query.requestId,
236+
timeInQueue
220237
});
221238
} catch (e) {
222239
executionResult = {
@@ -255,6 +272,12 @@ class QueryQueue {
255272

256273
await this.reconcileQueue(redisClient);
257274
}
275+
} catch (e) {
276+
this.logger('Queue storage error', {
277+
queryKey,
278+
error: (e.stack || e).toString(),
279+
queuePrefix: this.redisQueuePrefix
280+
});
258281
} finally {
259282
redisClient.release();
260283
}

packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,13 @@ class RedisQueueDriverConnection {
3434
this.queriesDefKey(),
3535
this.redisHash(queryKey),
3636
JSON.stringify({
37-
queryHandler, query, queryKey, stageQueryKey: options.stageQueryKey, priority, requestId: options.requestId
37+
queryHandler,
38+
query,
39+
queryKey,
40+
stageQueryKey: options.stageQueryKey,
41+
priority,
42+
requestId: options.requestId,
43+
addedToQueueTime: new Date().getTime()
3844
})
3945
])
4046
.zcard(this.toProcessRedisKey())
@@ -91,12 +97,14 @@ class RedisQueueDriverConnection {
9197
);
9298
}
9399

94-
async getQueryStageState() {
95-
const [active, toProcess, allQueryDefs] = await this.redisClient.multi()
100+
async getQueryStageState(onlyKeys) {
101+
let request = this.redisClient.multi()
96102
.zrange([this.activeRedisKey(), 0, -1])
97-
.zrange([this.toProcessRedisKey(), 0, -1])
98-
.hgetall(this.queriesDefKey())
99-
.execAsync();
103+
.zrange([this.toProcessRedisKey(), 0, -1]);
104+
if (!onlyKeys) {
105+
request = request.hgetall(this.queriesDefKey());
106+
}
107+
const [active, toProcess, allQueryDefs] = await request.execAsync();
100108
return [active, toProcess, R.map(q => JSON.parse(q), allQueryDefs || {})];
101109
}
102110

0 commit comments

Comments
 (0)