Skip to content

Commit 472a0c3

Browse files
committed
feat: Scheduled Refresh REST API
1 parent 71d07e6 commit 472a0c3

File tree

4 files changed

+30
-6
lines changed

4 files changed

+30
-6
lines changed

packages/cubejs-api-gateway/index.js

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ class ApiGateway {
212212
this.apiSecret = apiSecret;
213213
this.compilerApi = compilerApi;
214214
this.adapterApi = adapterApi;
215+
this.refreshScheduler = options.refreshScheduler;
215216
this.logger = logger;
216217
this.checkAuthMiddleware = options.checkAuthMiddleware || this.checkAuth.bind(this);
217218
this.checkAuthFn = options.checkAuth || this.defaultCheckAuth.bind(this);
@@ -254,6 +255,14 @@ class ApiGateway {
254255
res: this.resToResultFn(res)
255256
});
256257
}));
258+
259+
app.get(`${this.basePath}/v1/run-scheduled-refresh`, this.checkAuthMiddleware, (async (req, res) => {
260+
await this.runScheduledRefresh({
261+
queryingOptions: req.query.queryingOptions,
262+
context: await this.contextByReq(req, req.authInfo, this.requestIdByReq(req)),
263+
res: this.resToResultFn(res)
264+
});
265+
}));
257266
}
258267

259268
initSubscriptionServer(sendMessage) {
@@ -264,6 +273,19 @@ class ApiGateway {
264273
return requestStarted && (new Date().getTime() - requestStarted.getTime());
265274
}
266275

276+
async runScheduledRefresh({ context, res, queryingOptions }) {
277+
const requestStarted = new Date();
278+
try {
279+
const refreshScheduler = this.refreshScheduler();
280+
await refreshScheduler.runScheduledRefresh(context, this.parseQueryParam(queryingOptions || {}));
281+
res({}); // TODO status
282+
} catch (e) {
283+
this.handleError({
284+
e, context, res, requestStarted
285+
});
286+
}
287+
}
288+
267289
async meta({ context, res }) {
268290
const requestStarted = new Date();
269291
try {

packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class PreAggregationLoadCache {
109109
return this.versionEntries;
110110
}
111111

112-
async keyQueryResult(keyQuery, waitForRenew) {
112+
async keyQueryResult(keyQuery, waitForRenew, priority) {
113113
if (!this.queryResults[this.queryCache.queryRedisKey(keyQuery)]) {
114114
this.queryResults[this.queryCache.queryRedisKey(keyQuery)] = await this.queryCache.cacheQueryResult(
115115
Array.isArray(keyQuery) ? keyQuery[0] : keyQuery,
@@ -119,7 +119,8 @@ class PreAggregationLoadCache {
119119
{
120120
renewalThreshold: this.queryCache.options.refreshKeyRenewalThreshold || 2 * 60,
121121
renewalKey: keyQuery,
122-
waitForRenew
122+
waitForRenew,
123+
priority
123124
}
124125
);
125126
}
@@ -308,7 +309,7 @@ class PreAggregationLoader {
308309
getInvalidationKeyValues() {
309310
return Promise.all(
310311
(this.preAggregation.invalidateKeyQueries || [])
311-
.map(keyQuery => this.loadCache.keyQueryResult(keyQuery, this.waitForRenew))
312+
.map(keyQuery => this.loadCache.keyQueryResult(keyQuery, this.waitForRenew, this.priority(10)))
312313
);
313314
}
314315

packages/cubejs-server-core/core/RefreshScheduler.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class RefreshScheduler {
7373
}
7474
}
7575

76-
async ensurePreAggregationsRefreshed(context, queryingOptions) {
76+
async runScheduledRefresh(context, queryingOptions) {
7777
queryingOptions = { timezone: 'UTC', ...queryingOptions };
7878
context = { requestId: `scheduler-${uuid()}`, ...context };
7979
this.serverCore.logger('Refresh Scheduler Run', {

packages/cubejs-server-core/core/index.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,8 @@ class CubejsServerCore {
281281
checkAuthMiddleware: this.options.checkAuthMiddleware,
282282
checkAuth: this.options.checkAuth,
283283
queryTransformer: this.options.queryTransformer,
284-
extendContext: this.options.extendContext
284+
extendContext: this.options.extendContext,
285+
refreshScheduler: () => new RefreshScheduler(this)
285286
}
286287
);
287288
}
@@ -368,7 +369,7 @@ class CubejsServerCore {
368369

369370
async runScheduledRefresh(context, queryingOptions) {
370371
const scheduler = new RefreshScheduler(this);
371-
await scheduler.ensurePreAggregationsRefreshed(context, queryingOptions);
372+
await scheduler.runScheduledRefresh(context, queryingOptions);
372373
}
373374

374375
async getDriver() {

0 commit comments

Comments
 (0)