Skip to content

Commit

Permalink
feat: Executing SQL logging message that shows final SQL
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Mar 27, 2020
1 parent 9e1ee4f commit 26b8758
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 31 deletions.
46 changes: 32 additions & 14 deletions packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js
Original file line number Diff line number Diff line change
Expand Up @@ -389,38 +389,56 @@ class PreAggregationLoader {
async refreshImplStoreInSourceStrategy(client, newVersionEntry) {
const [loadSql, params] =
Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []];
const targetTableName = this.targetTableName(newVersionEntry);
const query = QueryCache.replacePreAggregationTableNames(loadSql, this.preAggregationsTablesToTempTables)
.replace(
this.preAggregation.tableName,
targetTableName
);
this.logger('Executing Load Pre Aggregation SQL', {
queryKey: this.preAggregation.loadSql,
query,
values: params,
targetTableName,
requestId: this.requestId
});
await client.loadPreAggregationIntoTable(
this.targetTableName(newVersionEntry),
QueryCache.replacePreAggregationTableNames(loadSql, this.preAggregationsTablesToTempTables)
.replace(
this.preAggregation.tableName,
this.targetTableName(newVersionEntry)
),
targetTableName,
query,
params
);
await this.createIndexes(client, newVersionEntry);
await this.loadCache.reset(this.preAggregation);
await this.dropOrphanedTables(client, this.targetTableName(newVersionEntry));
await this.dropOrphanedTables(client, targetTableName);
await this.loadCache.reset(this.preAggregation);
}

async refreshImplTempTableExternalStrategy(client, newVersionEntry) {
const [loadSql, params] =
Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []];
await client.createSchemaIfNotExists(this.preAggregation.preAggregationsSchema);
const targetTableName = this.targetTableName(newVersionEntry);
const query = QueryCache.replacePreAggregationTableNames(loadSql, this.preAggregationsTablesToTempTables)
.replace(
this.preAggregation.tableName,
targetTableName
);
this.logger('Executing Load Pre Aggregation SQL', {
queryKey: this.preAggregation.loadSql,
query,
values: params,
targetTableName,
requestId: this.requestId
});
await client.loadPreAggregationIntoTable(
this.targetTableName(newVersionEntry),
QueryCache.replacePreAggregationTableNames(loadSql, this.preAggregationsTablesToTempTables)
.replace(
this.preAggregation.tableName,
this.targetTableName(newVersionEntry)
),
targetTableName,
query,
params
);
const tableData = await this.downloadTempExternalPreAggregation(client, newVersionEntry);
await this.uploadExternalPreAggregation(tableData, newVersionEntry);
await this.loadCache.reset(this.preAggregation);
await this.dropOrphanedTables(client, this.targetTableName(newVersionEntry));
await this.dropOrphanedTables(client, targetTableName);
}

async refreshImplStreamExternalStrategy(client, newVersionEntry) {
Expand Down
18 changes: 15 additions & 3 deletions packages/cubejs-query-orchestrator/orchestrator/QueryCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ class QueryCache {
priority, cacheKey, external, requestId
}) {
const queue = external ? this.getExternalQueue() : this.getQueue();
return queue.executeInQueue('query', cacheKey, { query, values }, priority, {
return queue.executeInQueue('query', cacheKey, {
queryKey: cacheKey, query, values, requestId
}, priority, {
stageQueryKey: cacheKey,
requestId
});
Expand All @@ -142,7 +144,12 @@ class QueryCache {
this.queue = QueryCache.createQueue(
`SQL_QUERY_${this.redisPrefix}`,
this.driverFactory,
(client, q) => client.query(q.query, q.values), {
(client, q) => {
this.logger('Executing SQL', {
...q
});
return client.query(q.query, q.values);
}, {
logger: this.logger,
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
redisPool: this.options.redisPool,
Expand All @@ -158,7 +165,12 @@ class QueryCache {
this.externalQueue = QueryCache.createQueue(
`SQL_QUERY_EXT_${this.redisPrefix}`,
this.externalDriverFactory,
(client, q) => client.query(q.query, q.values),
(client, q) => {
this.logger('Executing SQL', {
...q
});
return client.query(q.query, q.values);
},
{
logger: this.logger,
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
Expand Down
24 changes: 10 additions & 14 deletions packages/cubejs-server-core/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,34 +58,28 @@ const devLogger = (level) => (type, { error, warning, ...message }) => {

const withColor = (str, color = colors.green) => `\u001b[${color}m${str}\u001b[0m`;
const format = ({
queryKey, duration, allSqlLines, ...json
requestId, duration, allSqlLines, query, values, queryKey, showRestParams, ...json
}) => {
const restParams = JSON.stringify(json, null, 2);
// TODO pre-aggregations queryKey format
if (queryKey && queryKey[0] && Array.isArray(queryKey[0]) && typeof queryKey[0][0] === 'string') {
[queryKey] = queryKey;
}
const durationStr = duration ? `(${duration}ms)` : '';
if (queryKey && typeof queryKey[0] === 'string') {
const prefix = `${requestId} ${durationStr}`;
if (query && values) {
const queryMaxLines = 50;
let formatted = SqlString.format(queryKey[0], queryKey[1]).split('\n');
let formatted = SqlString.format(query, values).split('\n');
if (formatted.length > queryMaxLines && !allSqlLines) {
formatted = R.take(queryMaxLines / 2, formatted)
.concat(['.....', '.....', '.....'])
.concat(R.takeLast(queryMaxLines / 2, formatted));
}
return `${durationStr}\n${formatted.join('\n')}\n${restParams}`;
}
if (queryKey) {
return `${durationStr}\n${JSON.stringify(queryKey)}\n${restParams}`; // TODO format
return `${prefix}\n--\n ${formatted.join('\n')}\n--${showRestParams ? `\n${restParams}` : ''}`;
}
return `${durationStr}\n${restParams}`;
return `${prefix}${showRestParams ? `\n${restParams}` : ''}`;
};

const logWarning = () => console.log(
`${withColor(type, colors.yellow)}: ${format({ ...message, allSqlLines: true })} \n${withColor(warning, colors.yellow)}`
`${withColor(type, colors.yellow)}: ${format({ ...message, allSqlLines: true, showRestParams: true })} \n${withColor(warning, colors.yellow)}`
);
const logError = () => console.log(`${withColor(type, colors.red)}: ${format({ ...message, allSqlLines: true })} \n${error}`);
const logError = () => console.log(`${withColor(type, colors.red)}: ${format({ ...message, allSqlLines: true, showRestParams: true })} \n${error}`);
const logDetails = () => console.log(`${withColor(type)}: ${format(message)}`);

if (error) {
Expand All @@ -104,6 +98,8 @@ const devLogger = (level) => (type, { error, warning, ...message }) => {
// eslint-disable-next-line no-fallthrough
case "info": {
if (!error && !warning && [
'Executing SQL',
'Executing Load Pre Aggregation SQL',
'Load Request Success',
'Performing query',
'Performing query completed',
Expand Down

0 comments on commit 26b8758

Please sign in to comment.