Skip to content

Commit

Permalink
Added support for workgroup and made output location optional with Am…
Browse files Browse the repository at this point in the history
…azon Athena - closes #320
  • Loading branch information
ankane committed Sep 21, 2021
1 parent 65c46f0 commit c44ca45
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
## 2.4.6 (unreleased)

- Added support for workgroup with Amazon Athena
- Added casting for timestamp with time zone columns with Amazon Athena
- Added support for setting credentials in config file with Amazon Athena
- Made output location optional with Amazon Athena
- Fixed casting error for `NULL` values with Amazon Athena

## 2.4.5 (2021-09-15)
Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -620,9 +620,12 @@ data_sources:
my_source:
adapter: athena
database: database
# optional settings
output_location: s3://some-bucket/
access_key_id: ... # optional [unreleased]
secret_access_key: ... # optional [unreleased]
workgroup: primary
access_key_id: ...
secret_access_key: ...
```

Here’s an example IAM policy:
Expand Down
30 changes: 18 additions & 12 deletions lib/blazer/adapters/athena_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,25 @@ def run_statement(statement, comment)
rows = []
error = nil

query_options = {
query_string: statement,
# use token so we fetch cached results after query is run
client_request_token: Digest::MD5.hexdigest([statement, data_source.id].join("/")),
query_execution_context: {
database: database,
}
}

if settings["output_location"]
query_options[:result_configuration] = {output_location: settings["output_location"]}
end

if settings["workgroup"]
query_options[:work_group] = settings["workgroup"]
end

begin
resp =
client.start_query_execution(
query_string: statement,
# use token so we fetch cached results after query is run
client_request_token: Digest::MD5.hexdigest([statement,data_source.id].join("/")),
query_execution_context: {
database: database,
},
result_configuration: {
output_location: settings["output_location"]
}
)
resp = client.start_query_execution(**query_options)
query_execution_id = resp.query_execution_id

timeout = data_source.timeout || 300
Expand Down

0 comments on commit c44ca45

Please sign in to comment.