Skip to content

Commit

Permalink
feat(athena-driver): Use AWS-SDK v3 (modular)
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Jul 20, 2021
1 parent 915d7f4 commit f14b7c1
Show file tree
Hide file tree
Showing 3 changed files with 787 additions and 740 deletions.
29 changes: 11 additions & 18 deletions packages/cubejs-athena-driver/driver/AthenaDriver.js
@@ -1,22 +1,19 @@
const AWS = require('aws-sdk');
const { promisify } = require('util');
const AWS = require('@aws-sdk/client-athena');
const { BaseDriver } = require('@cubejs-backend/query-orchestrator');
const { getEnv } = require('@cubejs-backend/shared');
const { getEnv, pausePromise } = require('@cubejs-backend/shared');
const SqlString = require('sqlstring');

function pause(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}

const applyParams = (query, params) => SqlString.format(query, params);

class AthenaDriver extends BaseDriver {
constructor(config = {}) {
super();

this.config = {
accessKeyId: process.env.CUBEJS_AWS_KEY,
secretAccessKey: process.env.CUBEJS_AWS_SECRET,
credentials: {
accessKeyId: config.accessKeyId || process.env.CUBEJS_AWS_KEY,
secretAccessKey: config.secretAccessKey || process.env.CUBEJS_AWS_SECRET,
},
region: process.env.CUBEJS_AWS_REGION,
S3OutputLocation: process.env.CUBEJS_AWS_S3_OUTPUT_LOCATION,
...config,
Expand All @@ -25,10 +22,6 @@ class AthenaDriver extends BaseDriver {
};

this.athena = new AWS.Athena(this.config);
this.athena.startQueryExecutionAsync = promisify(this.athena.startQueryExecution.bind(this.athena));
this.athena.stopQueryExecutionAsync = promisify(this.athena.stopQueryExecution.bind(this.athena));
this.athena.getQueryResultsAsync = promisify(this.athena.getQueryResults.bind(this.athena));
this.athena.getQueryExecutionAsync = promisify(this.athena.getQueryExecution.bind(this.athena));
}

readOnly() {
Expand All @@ -40,7 +33,7 @@ class AthenaDriver extends BaseDriver {
}

async awaitForJobStatus(QueryExecutionId, query, options) {
const queryExecution = await this.athena.getQueryExecutionAsync({
const queryExecution = await this.athena.getQueryExecution({
QueryExecutionId
});

Expand All @@ -64,9 +57,9 @@ class AthenaDriver extends BaseDriver {
}, options);

for (
let results = await this.athena.getQueryResultsAsync({ QueryExecutionId });
let results = await this.athena.getQueryResults({ QueryExecutionId });
results;
results = results.NextToken && (await this.athena.getQueryResultsAsync({
results = results.NextToken && (await this.athena.getQueryResults({
QueryExecutionId, NextToken: results.NextToken
}))
) {
Expand Down Expand Up @@ -95,7 +88,7 @@ class AthenaDriver extends BaseDriver {
} : s))
);

const { QueryExecutionId } = await this.athena.startQueryExecutionAsync({
const { QueryExecutionId } = await this.athena.startQueryExecution({
QueryString: queryString,
ResultConfiguration: {
OutputLocation: this.config.S3OutputLocation
Expand All @@ -110,7 +103,7 @@ class AthenaDriver extends BaseDriver {
return result;
}

await pause(
await pausePromise(
Math.min(this.config.pollMaxInterval, 500 * i)
);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-athena-driver/package.json
Expand Up @@ -17,9 +17,9 @@
"main": "driver/AthenaDriver.js",
"types": "driver/index.d.ts",
"dependencies": {
"@aws-sdk/client-athena": "^3.22.0",
"@cubejs-backend/query-orchestrator": "^0.28.3",
"@cubejs-backend/shared": "^0.28.2",
"aws-sdk": "^2.403.0",
"sqlstring": "^2.3.1"
},
"devDependencies": {
Expand Down

0 comments on commit f14b7c1

Please sign in to comment.