Skip to content

Commit

Permalink
feat: Ability to set timeouts for polling in BigQuery/Athena (#1675)
Browse files Browse the repository at this point in the history
* feat(bigquery-driver): Support pollTimeout, pollMaxInterval, fix #1672

* feat(athena-driver): Catch timeouts on pulling query's results

* feat: Introduce CUBEJS_DB_POLL_TIMEOUT/CUBEJS_DB_POLL_MAX_INTERVAL
  • Loading branch information
ovr committed Dec 24, 2020
1 parent 17d5fdb commit dc944b1
Show file tree
Hide file tree
Showing 13 changed files with 282 additions and 75 deletions.
2 changes: 2 additions & 0 deletions docs/content/Configuration/Environment-Variables-Reference.md
Expand Up @@ -50,6 +50,8 @@ databases [in this guide][link-connecting-to-db].
| `CUBEJS_AWS_REGION` | AWS Athena, JDBC | The AWS region of the Cube.js deployment | [A valid AWS region][link-aws-regions] |
| `CUBEJS_AWS_S3_OUTPUT_LOCATION` | AWS Athena, JDBC | The S3 path to store query results made by the Cube.js deployment | A valid S3 path |
| `CUBEJS_AWS_SECRET` | AWS Athena, JDBC | The AWS Secret Access Key to use for database connections | A valid AWS Secret Access Key |
| `CUBEJS_DB_PULL_TIMEOUT` | BigQuery, AWS Athena | Timeout for query polling | A number in seconds or a string that specify time (`1s`, `5m`) |

This comment has been minimized.

Copy link
@dvins

dvins Feb 20, 2021

There is a typo in these two environment variable names, they say pUll instead of pOll.

| `CUBEJS_DB_PULL_MAX_INTERVAL` | BigQuery, AWS Athena | Max interval on retries for query polling | A number in seconds or a string that specify time (`5s`, `1m`) |
| `CUBEJS_DB_BQ_CREDENTIALS` | BigQuery | A Base64 encoded JSON key file for connecting to Google BigQuery | A valid Google BigQuery JSON key file encoded as a Base64 string |
| `CUBEJS_DB_BQ_KEY_FILE` | BigQuery | The path to a JSON key file for connecting to Google BigQuery | A valid Google BigQuery JSON key file |
| `CUBEJS_DB_BQ_PROJECT_ID` | BigQuery | The Google BigQuery project ID to connect to | A valid Google BigQuery Project ID |
Expand Down
2 changes: 1 addition & 1 deletion examples/web-analytics/dashboard-app/package.json
Expand Up @@ -35,7 +35,7 @@
"yup": "^0.28.1"
},
"scripts": {
"start": "react-scripts start",
"start": "SKIP_PREFLIGHT_CHECK=true react-scripts start",
"build": "SKIP_PREFLIGHT_CHECK=true react-scripts build",
"test": "react-scripts test",
"eject": "react-scripts eject",
Expand Down
114 changes: 71 additions & 43 deletions packages/cubejs-athena-driver/driver/AthenaDriver.js
@@ -1,20 +1,29 @@
const AWS = require('aws-sdk');
const { promisify } = require('util');
const { BaseDriver } = require('@cubejs-backend/query-orchestrator');
const { getEnv } = require('@cubejs-backend/shared');
const SqlString = require('sqlstring');

function pause(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}

const applyParams = (query, params) => SqlString.format(query, params);

class AthenaDriver extends BaseDriver {
constructor(config) {
constructor(config = {}) {
super();

this.config = {
accessKeyId: process.env.CUBEJS_AWS_KEY,
secretAccessKey: process.env.CUBEJS_AWS_SECRET,
region: process.env.CUBEJS_AWS_REGION,
S3OutputLocation: process.env.CUBEJS_AWS_S3_OUTPUT_LOCATION,
...config
...config,
pollTimeout: (config.pollTimeout || getEnv('dbPollTimeout')) * 1000,
pollMaxInterval: (config.pollMaxInterval || getEnv('dbPollMaxInterval')) * 1000,
};

this.athena = new AWS.Athena(this.config);
this.athena.startQueryExecutionAsync = promisify(this.athena.startQueryExecution.bind(this.athena));
this.athena.stopQueryExecutionAsync = promisify(this.athena.stopQueryExecution.bind(this.athena));
Expand All @@ -30,10 +39,52 @@ class AthenaDriver extends BaseDriver {
return this.query('SELECT 1', []);
}

sleep(ms) {
return new Promise((resolve) => {
setTimeout(() => resolve(), ms);
async awaitForJobStatus(QueryExecutionId, query, options) {
const queryExecution = await this.athena.getQueryExecutionAsync({
QueryExecutionId
});

const status = queryExecution.QueryExecution.Status.State;
if (status === 'FAILED') {
throw new Error(queryExecution.QueryExecution.Status.StateChangeReason);
}

if (status === 'CANCELLED') {
throw new Error('Query has been cancelled');
}

if (
status === 'SUCCEEDED'
) {
const allRows = [];
let columnInfo;

this.reportQueryUsage({
dataScannedInBytes: queryExecution.QueryExecution.Statistics.DataScannedInBytes
}, options);

for (
let results = await this.athena.getQueryResultsAsync({ QueryExecutionId });
results;
results = results.NextToken && (await this.athena.getQueryResultsAsync({
QueryExecutionId, NextToken: results.NextToken
}))
) {
const [header, ...tableRows] = results.ResultSet.Rows;
allRows.push(...(allRows.length ? results.ResultSet.Rows : tableRows));
if (!columnInfo) {
columnInfo = /SHOW COLUMNS/.test(query) // Fix for getColumns method
? [{ Name: 'column' }]
: results.ResultSet.ResultSetMetadata.ColumnInfo;
}
}

return allRows.map(r => columnInfo
.map((c, i) => ({ [c.Name]: r.Data[i].VarCharValue }))
.reduce((a, b) => ({ ...a, ...b }), {}));
}

return null;
}

async query(query, values, options) {
Expand All @@ -43,53 +94,30 @@ class AthenaDriver extends BaseDriver {
toSqlString: () => SqlString.escape(s).replace(/\\\\([_%])/g, '\\$1').replace(/\\'/g, '\'\'')
} : s))
);

const { QueryExecutionId } = await this.athena.startQueryExecutionAsync({
QueryString: queryString,
ResultConfiguration: {
OutputLocation: this.config.S3OutputLocation
}
});
while (true) {
const queryExecution = await this.athena.getQueryExecutionAsync({
QueryExecutionId
});
const status = queryExecution.QueryExecution.Status.State;
if (status === 'FAILED') {
throw new Error(queryExecution.QueryExecution.Status.StateChangeReason);
}
if (status === 'CANCELLED') {
throw new Error('Query has been cancelled');
}
if (
status === 'SUCCEEDED'
) {
const allRows = [];
let columnInfo;
this.reportQueryUsage({
dataScannedInBytes: queryExecution.QueryExecution.Statistics.DataScannedInBytes
}, options);
for (
let results = await this.athena.getQueryResultsAsync({ QueryExecutionId });
results;
results = results.NextToken && (await this.athena.getQueryResultsAsync({
QueryExecutionId, NextToken: results.NextToken
}))
) {
const [header, ...tableRows] = results.ResultSet.Rows;
allRows.push(...(allRows.length ? results.ResultSet.Rows : tableRows));
if (!columnInfo) {
columnInfo = /SHOW COLUMNS/.test(query) // Fix for getColumns method
? [{ Name: 'column' }]
: results.ResultSet.ResultSetMetadata.ColumnInfo;
}
}

return allRows.map(r => columnInfo
.map((c, i) => ({ [c.Name]: r.Data[i].VarCharValue }))
.reduce((a, b) => ({ ...a, ...b }), {}));
const startedTime = Date.now();

for (let i = 0; Date.now() - startedTime <= this.config.pollTimeout; i++) {
const result = await this.awaitForJobStatus(QueryExecutionId, query, options);
if (result) {
return result;
}
await this.sleep(500);

await pause(
Math.min(this.config.pollMaxInterval, 500 * i)
);
}

throw new Error(
`Athena job timeout reached ${this.config.pollTimeout}ms`
);
}

async tablesSchema() {
Expand Down
4 changes: 3 additions & 1 deletion packages/cubejs-athena-driver/driver/index.d.ts
Expand Up @@ -2,7 +2,9 @@ import { ClientConfiguration } from "aws-sdk/clients/athena";

declare module "@cubejs-backend/athena-driver" {
interface AthenaDriverOptions extends ClientConfiguration {
readOnly?: boolean
readOnly?: boolean,
pollTimeout?: number,
pollMaxInterval?: number,
}

export default class AthenaDriver {
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-athena-driver/package.json
Expand Up @@ -18,6 +18,7 @@
"types": "driver/index.d.ts",
"dependencies": {
"@cubejs-backend/query-orchestrator": "^0.25.1",
"@cubejs-backend/shared": "^0.25.0",
"aws-sdk": "^2.403.0",
"sqlstring": "^2.3.1"
},
Expand Down
76 changes: 76 additions & 0 deletions packages/cubejs-athena-driver/yarn.lock
Expand Up @@ -23,6 +23,25 @@
chalk "^2.0.0"
js-tokens "^4.0.0"

"@cubejs-backend/query-orchestrator@^0.25.0":
version "0.25.0"
resolved "https://registry.yarnpkg.com/@cubejs-backend/query-orchestrator/-/query-orchestrator-0.25.0.tgz#76a964dbef1d3c812327724a561ff9a7e6a1cf70"
integrity sha512-2WzU7JA8Qwd4HsZeoz4WiQ+H40DH/1yAQUB28sLa2pVQizzpCwbgBmupvdIjbVY2cX+lrIMIrjEXq6tloRkOUA==
dependencies:
"@cubejs-backend/shared" "^0.25.0"
generic-pool "^3.7.1"
ramda "^0.27.0"
redis "^3.0.2"

"@cubejs-backend/shared@^0.25.0":
version "0.25.0"
resolved "https://registry.yarnpkg.com/@cubejs-backend/shared/-/shared-0.25.0.tgz#2b969988c40c276314ba82216f643dbb9df6d092"
integrity sha512-Th9KxIuCIILqnR/dnmCH32BVXLe/TmYX8NU8osS2Wf06FzF4G361U+ZK/U5O5e7sPvwA/Tg9MhDe/QDBvCDM8A==
dependencies:
env-var "^6.3.0"
node-fetch "^2.6.1"
node-machine-id "^1.1.12"

"@types/json5@^0.0.29":
version "0.0.29"
resolved "https://registry.yarnpkg.com/@types/json5/-/json5-0.0.29.tgz#ee28707ae94e11d2b827bcbe5270bcea7f3e71ee"
Expand Down Expand Up @@ -273,6 +292,11 @@ define-properties@^1.1.3:
dependencies:
object-keys "^1.0.12"

denque@^1.4.1:
version "1.4.1"
resolved "https://registry.yarnpkg.com/denque/-/denque-1.4.1.tgz#6744ff7641c148c3f8a69c307e51235c1f4a37cf"
integrity sha512-OfzPuSZKGcgr96rf1oODnfjqBFmr1DVoc/TrItj3Ohe0Ah1C5WX5Baquw/9U9KovnQ88EqmJbD66rKYUQYN1tQ==

doctrine@1.5.0:
version "1.5.0"
resolved "https://registry.yarnpkg.com/doctrine/-/doctrine-1.5.0.tgz#379dce730f6166f76cefa4e6707a159b02c5a6fa"
Expand All @@ -298,6 +322,11 @@ emoji-regex@^8.0.0:
resolved "https://registry.yarnpkg.com/emoji-regex/-/emoji-regex-8.0.0.tgz#e818fd69ce5ccfcb404594f842963bf53164cc37"
integrity sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A==

env-var@^6.3.0:
version "6.3.0"
resolved "https://registry.yarnpkg.com/env-var/-/env-var-6.3.0.tgz#b4ace5bcd1d293629a2c509ae7b46f8add2f8892"
integrity sha512-gaNzDZuVaJQJlP2SigAZLu/FieZN5MzdN7lgHNehESwlRanHwGQ/WUtJ7q//dhrj3aGBZM45yEaKOuvSJaf4mA==

error-ex@^1.2.0:
version "1.3.2"
resolved "https://registry.yarnpkg.com/error-ex/-/error-ex-1.3.2.tgz#b4ac40648107fdcdcfae242f428bea8a14d4f1bf"
Expand Down Expand Up @@ -593,6 +622,11 @@ functional-red-black-tree@^1.0.1:
resolved "https://registry.yarnpkg.com/functional-red-black-tree/-/functional-red-black-tree-1.0.1.tgz#1b0ab3bd553b2a0d6399d29c0e3ea0b252078327"
integrity sha1-GwqzvVU7Kg1jmdKcDj6gslIHgyc=

generic-pool@^3.7.1:
version "3.7.1"
resolved "https://registry.yarnpkg.com/generic-pool/-/generic-pool-3.7.1.tgz#36fe5bb83e7e0e032e5d32cd05dc00f5ff119aa8"
integrity sha512-ug6DAZoNgWm6q5KhPFA+hzXfBLFQu5sTXxPpv44DmE0A2g+CiHoq9LTVdkXpZMkYVMoGw83F6W+WT0h0MFMK/w==

get-intrinsic@^1.0.0:
version "1.0.1"
resolved "https://registry.yarnpkg.com/get-intrinsic/-/get-intrinsic-1.0.1.tgz#94a9768fcbdd0595a1c9273aacf4c89d075631be"
Expand Down Expand Up @@ -920,6 +954,16 @@ nice-try@^1.0.4:
resolved "https://registry.yarnpkg.com/nice-try/-/nice-try-1.0.5.tgz#a3378a7696ce7d223e88fc9b764bd7ef1089e366"
integrity sha512-1nh45deeb5olNY7eX82BkPO7SSxR5SSYJiPTrTdFUVYwAl8CKMA5N9PjTYkHiRjisVcxcQ1HXdLhx2qxxJzLNQ==

node-fetch@^2.6.1:
version "2.6.1"
resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.6.1.tgz#045bd323631f76ed2e2b55573394416b639a0052"
integrity sha512-V4aYg89jEoVRxRb2fJdAg8FHvI7cEyYdVAh94HH0UIK8oJxUfkjlDQN9RbMx+bEjP7+ggMiFRprSti032Oipxw==

node-machine-id@^1.1.12:
version "1.1.12"
resolved "https://registry.yarnpkg.com/node-machine-id/-/node-machine-id-1.1.12.tgz#37904eee1e59b320bb9c5d6c0a59f3b469cb6267"
integrity sha512-QNABxbrPa3qEIfrE6GOJ7BYIuignnJw7iQ2YPbc3Nla1HzRJjXzZOiikfF8m7eAMfichLt3M4VgLOetqgDmgGQ==

normalize-package-data@^2.3.2:
version "2.5.0"
resolved "https://registry.yarnpkg.com/normalize-package-data/-/normalize-package-data-2.5.0.tgz#e66db1838b200c1dfc233225d12cb36520e234a8"
Expand Down Expand Up @@ -1097,6 +1141,11 @@ querystring@0.2.0:
resolved "https://registry.yarnpkg.com/querystring/-/querystring-0.2.0.tgz#b209849203bb25df820da756e747005878521620"
integrity sha1-sgmEkgO7Jd+CDadW50cAWHhSFiA=

ramda@^0.27.0:
version "0.27.1"
resolved "https://registry.yarnpkg.com/ramda/-/ramda-0.27.1.tgz#66fc2df3ef873874ffc2da6aa8984658abacf5c9"
integrity sha512-PgIdVpn5y5Yns8vqb8FzBUEYn98V3xcPgawAkkgj0YJ0qDsnHCiNmZYfOGMgOvoB0eWFLpYbhxUR3mxfDIMvpw==

read-pkg-up@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/read-pkg-up/-/read-pkg-up-2.0.0.tgz#6b72a8048984e0c41e79510fd5e9fa99b3b549be"
Expand All @@ -1114,6 +1163,33 @@ read-pkg@^2.0.0:
normalize-package-data "^2.3.2"
path-type "^2.0.0"

redis-commands@^1.5.0:
version "1.6.0"
resolved "https://registry.yarnpkg.com/redis-commands/-/redis-commands-1.6.0.tgz#36d4ca42ae9ed29815cdb30ad9f97982eba1ce23"
integrity sha512-2jnZ0IkjZxvguITjFTrGiLyzQZcTvaw8DAaCXxZq/dsHXz7KfMQ3OUJy7Tz9vnRtZRVz6VRCPDvruvU8Ts44wQ==

redis-errors@^1.0.0, redis-errors@^1.2.0:
version "1.2.0"
resolved "https://registry.yarnpkg.com/redis-errors/-/redis-errors-1.2.0.tgz#eb62d2adb15e4eaf4610c04afe1529384250abad"
integrity sha1-62LSrbFeTq9GEMBK/hUpOEJQq60=

redis-parser@^3.0.0:
version "3.0.0"
resolved "https://registry.yarnpkg.com/redis-parser/-/redis-parser-3.0.0.tgz#b66d828cdcafe6b4b8a428a7def4c6bcac31c8b4"
integrity sha1-tm2CjNyv5rS4pCin3vTGvKwxyLQ=
dependencies:
redis-errors "^1.0.0"

redis@^3.0.2:
version "3.0.2"
resolved "https://registry.yarnpkg.com/redis/-/redis-3.0.2.tgz#bd47067b8a4a3e6a2e556e57f71cc82c7360150a"
integrity sha512-PNhLCrjU6vKVuMOyFu7oSP296mwBkcE6lrAjruBYG5LgdSqtRBoVQIylrMyVZD/lkF24RSNNatzvYag6HRBHjQ==
dependencies:
denque "^1.4.1"
redis-commands "^1.5.0"
redis-errors "^1.2.0"
redis-parser "^3.0.0"

regexpp@^2.0.1:
version "2.0.1"
resolved "https://registry.yarnpkg.com/regexpp/-/regexpp-2.0.1.tgz#8d19d31cf632482b589049f8281f93dbcba4d07f"
Expand Down
32 changes: 31 additions & 1 deletion packages/cubejs-backend-shared/src/env.ts
@@ -1,5 +1,27 @@
import { get } from 'env-var';

export function convertTimeStrToMs(input: string, envName: string) {
if (/^\d+$/.test(input)) {
return parseInt(input, 10);
}

if (input.length > 1) {
// eslint-disable-next-line default-case
switch (input.substr(-1).toLowerCase()) {
case 'h':
return parseInt(input.slice(0, -1), 10) * 60 * 60;
case 'm':
return parseInt(input.slice(0, -1), 10) * 60;
case 's':
return parseInt(input.slice(0, -1), 10);
}
}

throw new Error(
`Unsupported time format in ${envName}`
);
}

const variables = {
devMode: () => get('CUBEJS_DEV_MODE')
.default('false')
Expand All @@ -23,7 +45,15 @@ const variables = {
// It's only excepted for CI, nothing else.
internalExceptions: () => get('INTERNAL_EXCEPTIONS_YOU_WILL_BE_FIRED')
.default('false')
.asEnum(['exit', 'log', 'false'])
.asEnum(['exit', 'log', 'false']),
dbPollTimeout: () => {
const value = process.env.CUBEJS_DB_POLL_TIMEOUT || '15m';
return convertTimeStrToMs(value, 'CUBEJS_DB_POLL_TIMEOUT');
},
dbPollMaxInterval: () => {
const value = process.env.CUBEJS_DB_POLL_MAX_INTERVAL || '5s';
return convertTimeStrToMs(value, 'CUBEJS_DB_POLL_MAX_INTERVAL');
}
};

type Vars = typeof variables;
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-backend-shared/src/index.ts
@@ -1,4 +1,4 @@
export * from './env';
export { getEnv, isDockerImage } from './env';
export * from './package';
export * from './track';
export * from './errors';

0 comments on commit dc944b1

Please sign in to comment.