Skip to content

Commit

Permalink
feat: Show refreshKey values in Playground
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Nov 25, 2019
1 parent d952aff commit b49e184
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 37 deletions.
2 changes: 2 additions & 0 deletions packages/cubejs-api-gateway/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ class ApiGateway {
res({
query: normalizedQuery,
data: transformData(aliasToMemberNameMap, flattenAnnotation, response.data),
lastRefreshTime: response.lastRefreshTime && response.lastRefreshTime.toISOString(),
refreshKeyValues: process.env.NODE_ENV === 'production' ? undefined : response.refreshKeyValues,
annotation
});
} catch (e) {
Expand Down
59 changes: 58 additions & 1 deletion packages/cubejs-playground/src/ChartContainer.jsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* global navigator */
import React from 'react';
import {
Card, Button, Menu, Dropdown, Icon, notification, Modal
Card, Button, Menu, Dropdown, Icon, notification, Modal, Table
} from 'antd';
import { getParameters } from 'codesandbox-import-utils/lib/api/define';
import { fetch } from 'whatwg-fetch';
Expand Down Expand Up @@ -233,6 +233,18 @@ class ChartContainer extends React.Component {
>
SQL
</Button>
<Button
onClick={() => {
playgroundAction('Show Cache');
this.setState({ showCode: showCode === 'cache' ? null : 'cache' });
}}
icon="sync"
size="small"
type={showCode === 'cache' ? 'primary' : 'default'}
disabled={!!frameworkItem.docsLink}
>
Cache
</Button>
<Button
icon="code-sandbox"
size="small"
Expand Down Expand Up @@ -282,6 +294,51 @@ class ChartContainer extends React.Component {
render={({ sqlQuery }) => <PrismCode code={sqlQuery && sqlFormatter.format(sqlQuery.sql())} />}
/>
);
} else if (showCode === 'cache') {
return (
<QueryRenderer
loadSql
query={{ ...query, renewQuery: true }}
cubejsApi={cubejsApi}
render={
({ sqlQuery, resultSet: rs }) => (
<div>
<h3>
Last Refresh Time:
{rs && rs.loadResponse.lastRefreshTime}
</h3>
<Table
loading={!sqlQuery}
pagination={false}
columns={[
{
title: 'Refresh Key SQL',
key: 'refreshKey',
render: (text, record) => <PrismCode code={sqlFormatter.format(record[0])} />,
},
{
title: 'Value',
key: 'value',
render: (text, record) => (
<PrismCode
code={
rs && rs.loadResponse.refreshKeyValues
&& JSON.stringify(rs.loadResponse.refreshKeyValues[
sqlQuery.sqlQuery.sql.cacheKeyQueries.queries.indexOf(record)
], null, 2)
}
/>
),
}
]}
dataSource={
sqlQuery && sqlQuery.sqlQuery.sql && sqlQuery.sqlQuery.sql.cacheKeyQueries.queries
}
/>
</div>
)}
/>
);
}
return render({ resultSet, error, sandboxId });
};
Expand Down
84 changes: 49 additions & 35 deletions packages/cubejs-query-orchestrator/orchestrator/QueryCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@ class QueryCache {
new LocalCacheDriver();
}

cachedQueryResult (queryBody, preAggregationsTablesToTempTables) {
const replacePreAggregationTableNames = (queryAndParams) =>
QueryCache.replacePreAggregationTableNames(queryAndParams, preAggregationsTablesToTempTables);
async cachedQueryResult(queryBody, preAggregationsTablesToTempTables) {
const replacePreAggregationTableNames = (queryAndParams) => QueryCache.replacePreAggregationTableNames(
queryAndParams, preAggregationsTablesToTempTables
);

const query = replacePreAggregationTableNames(queryBody.query);
let queuePriority = 10;
if (Number.isInteger(queryBody.queuePriority)) {
// eslint-disable-next-line prefer-destructuring
queuePriority = queryBody.queuePriority;
}
const forceNoCache = queryBody.forceNoCache || false;
const values = queryBody.values;
const { values } = queryBody;
const cacheKeyQueries =
(
queryBody.cacheKeyQueries && queryBody.cacheKeyQueries.queries ||
Expand All @@ -39,7 +41,7 @@ class QueryCache {
const expireSecs = queryBody.expireSecs || 24 * 3600;

if (!cacheKeyQueries) {
return this.queryWithRetryAndRelease(query, values, queryBody.external);
return { data: await this.queryWithRetryAndRelease(query, values, queryBody.external) };
}
const cacheKey = QueryCache.queryCacheKey(queryBody);

Expand Down Expand Up @@ -67,22 +69,25 @@ class QueryCache {
});
}

return mainPromise;
return {
data: await mainPromise,
lastRefreshTime: await this.lastRefreshTime(cacheKey)
};
}

static queryCacheKey(queryBody) {
return [queryBody.query, queryBody.values, (queryBody.preAggregations || []).map(p => p.loadSql)];
}

static replaceAll(replaceThis, withThis, inThis) {
withThis = withThis.replace(/\$/g,"$$$$");
withThis = withThis.replace(/\$/g, "$$$$");
return inThis.replace(
new RegExp(replaceThis.replace(/([/,!\\^${}[\]().*+?|<>\-&])/g,"\\$&"),"g"),
new RegExp(replaceThis.replace(/([/,!\\^${}[\]().*+?|<>\-&])/g, "\\$&"), "g"),
withThis
);
}

static replacePreAggregationTableNames (queryAndParams, preAggregationsTablesToTempTables) {
static replacePreAggregationTableNames(queryAndParams, preAggregationsTablesToTempTables) {
const [keyQuery, params] = Array.isArray(queryAndParams) ? queryAndParams : [queryAndParams, []];
const replacedKeqQuery = preAggregationsTablesToTempTables.reduce(
(query, [tableName, tempTable]) => QueryCache.replaceAll(tableName, tempTable, query),
Expand Down Expand Up @@ -169,9 +174,9 @@ class QueryCache {
query, values, cacheKeyQueries, expireSecs, cacheKey, renewalThreshold, options
).catch(e => {
if (!(e instanceof ContinueWaitError)) {
this.logger('Error while renew cycle', { query, query_values: values, error: e.stack || e })
this.logger('Error while renew cycle', { query, query_values: values, error: e.stack || e });
}
})
});
}

renewQuery(query, values, cacheKeyQueries, expireSecs, cacheKey, renewalThreshold, options) {
Expand All @@ -192,30 +197,34 @@ class QueryCache {
this.logger('Error fetching cache key queries', { error: e.stack || e });
return [];
})
.then(cacheKeyQueryResults => {
return this.cacheQueryResult(
query, values,
cacheKey,
expireSecs,
{
renewalThreshold: renewalThreshold || 6 * 60 * 60,
renewalKey: cacheKeyQueryResults && [
cacheKeyQueries, cacheKeyQueryResults, this.queryRedisKey([query, values])
],
waitForRenew: true,
external: options.external
}
);
});
.then(async cacheKeyQueryResults => (
{
data: await this.cacheQueryResult(
query, values,
cacheKey,
expireSecs,
{
renewalThreshold: renewalThreshold || 6 * 60 * 60,
renewalKey: cacheKeyQueryResults && [
cacheKeyQueries, cacheKeyQueryResults, this.queryRedisKey([query, values])
],
waitForRenew: true,
external: options.external
}
),
refreshKeyValues: cacheKeyQueryResults,
lastRefreshTime: await this.lastRefreshTime(cacheKey)
}
));
}

cacheQueryResult(query, values, cacheKey, expiration, options) {
options = options || {};
const renewalThreshold = options.renewalThreshold;
const { renewalThreshold } = options;
const renewalKey = options.renewalKey && this.queryRedisKey(options.renewalKey);
const redisKey = this.queryRedisKey(cacheKey);
const fetchNew = () => {
return this.queryWithRetryAndRelease(query, values, options.priority, cacheKey, options.external).then(res => {
const fetchNew = () => (
this.queryWithRetryAndRelease(query, values, options.priority, cacheKey, options.external).then(res => {
const result = {
time: (new Date()).getTime(),
result: res,
Expand All @@ -224,17 +233,17 @@ class QueryCache {
return this.cacheDriver.set(redisKey, result, expiration)
.then(() => {
this.logger('Renewed', { cacheKey });
return res
return res;
});
}).catch(e => {
if (!(e instanceof ContinueWaitError)) {
this.logger('Dropping Cache', { cacheKey, error: e.stack || e });
this.cacheDriver.remove(redisKey)
.catch(e => this.logger('Error removing key', { cacheKey, error: e.stack || e }));
.catch(err => this.logger('Error removing key', { cacheKey, error: err.stack || err }));
}
throw e;
});
};
})
);

if (options.forceNoCache) {
this.logger('Force no cache for', { cacheKey });
Expand Down Expand Up @@ -268,7 +277,7 @@ class QueryCache {
this.logger('Renewing existing key', { cacheKey, renewalThreshold });
fetchNew().catch(e => {
if (!(e instanceof ContinueWaitError)) {
this.logger('Error renewing', {cacheKey, error: e.stack || e})
this.logger('Error renewing', { cacheKey, error: e.stack || e });
}
});
}
Expand All @@ -282,8 +291,13 @@ class QueryCache {
});
}

async lastRefreshTime(cacheKey) {
const cachedValue = await this.cacheDriver.get(this.queryRedisKey(cacheKey));
return cachedValue && new Date(cachedValue.time);
}

queryRedisKey(cacheKey) {
return `SQL_QUERY_RESULT_${this.redisPrefix}_${crypto.createHash('md5').update(JSON.stringify(cacheKey)).digest("hex")}`
return `SQL_QUERY_RESULT_${this.redisPrefix}_${crypto.createHash('md5').update(JSON.stringify(cacheKey)).digest("hex")}`;
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-server-core/core/OrchestratorApi.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class OrchestratorApi {
params: query.values
});

return { data };
return data;
} catch (err) {
if ((err instanceof pt.TimeoutError || err instanceof ContinueWaitError)) {
this.logger('Continue wait', {
Expand Down

0 comments on commit b49e184

Please sign in to comment.