Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry api call if initially failed (#79) #80

Merged
merged 27 commits into from Mar 24, 2020
Merged
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4bb7ae3
bumped version to dev
DyfanJones Mar 19, 2020
39a9653
added fail to dbExecute
DyfanJones Mar 19, 2020
d8a698c
added helper function retry_api_call if call initially failed (#79)
DyfanJones Mar 19, 2020
73f03a7
added parameters in noctua_options to configure the retry_api_call (#79)
DyfanJones Mar 19, 2020
4ac14d5
added retry_api_call when sending data to s3 (#79)
DyfanJones Mar 19, 2020
6aa8f67
added retry_api_call when retrieving data from S3 (#79)
DyfanJones Mar 19, 2020
927a375
updated documentations for new parameters
DyfanJones Mar 19, 2020
af26425
reduced retry to 5 and set quiet to FALSE
DyfanJones Mar 19, 2020
cd915f9
wrapped expr with eval.parent(substitute(expr)) due to https://stacko…
DyfanJones Mar 19, 2020
f7f70e8
created unit test for retry_api_call
DyfanJones Mar 19, 2020
3fcc9bc
added retry_api_call
DyfanJones Mar 19, 2020
f13940a
added retry_api_call
DyfanJones Mar 19, 2020
b472ca5
update pkgdown site
DyfanJones Mar 19, 2020
9f42c87
updated description
DyfanJones Mar 19, 2020
4b4103d
revert retry when a quick response is required
DyfanJones Mar 19, 2020
039821d
Set default retry be noisy
DyfanJones Mar 20, 2020
5ca7996
added retry to dbExistsTable
DyfanJones Mar 22, 2020
15779d4
added retry_api_call to AthenaResult
DyfanJones Mar 22, 2020
db46b9f
added retry to all athena calls, enabled next token to be passed to s…
DyfanJones Mar 22, 2020
ce60647
updated comments
DyfanJones Mar 22, 2020
c8d26d3
simplified last chunk
DyfanJones Mar 22, 2020
1edda83
updated unit test with NextToken
DyfanJones Mar 22, 2020
6ddcfc9
added dbFetch new functionality
DyfanJones Mar 22, 2020
a20d420
added retry_api_call to athena call
DyfanJones Mar 22, 2020
dddd3ae
simplified gathering names
DyfanJones Mar 22, 2020
f1664f1
fixed issue of token not updating for next athena api call
DyfanJones Mar 22, 2020
f4a3fdc
updated token check
DyfanJones Mar 22, 2020
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.

Always

Just for now

@@ -1,7 +1,7 @@
Package: noctua
Type: Package
Title: Connect to 'AWS Athena' using R 'AWS SDK' 'paws' ('DBI' Interface)
Version: 1.6.0
Version: 1.6.0.9000
Authors@R: person("Dyfan", "Jones", email="dyfan.r.jones@gmail.com",
role= c("aut", "cre"))
Description: Designed to be compatible with the 'R' package 'DBI' (Database Interface)
18 NEWS.md
@@ -1,3 +1,21 @@
# noctua 1.6.0.9000
## New Feature
* functions that collect or push to AWS S3 now have a retry capability. Meaning if API call fails then the call is retried (#79)
* `noctua_options` contains 2 new parameters to control how `noctua` handles retries.
* `dbFetch` is able to return data from AWS Athena in chunk. This has been achieved by passing `NextToken` to `AthenaResult` s4 class. This method won't be as fast `n = -1` as each chunk will have to be process into data frame format.

```
library(DBI)
con <- dbConnect(noctua::athena())
res <- dbExecute(con, "select * from some_big_table limit 10000")
dbFetch(res, 5000)
```

## Bug
* `dbWriteTable` would throw `throttling error` every now and again, `retry_api_call` as been built to handle the parsing of data between R and AWS S3.

# noctua 1.6.0
## New Feature
* Inspired by `pyathena`, `noctua_options` now has a new paramter `cache_size`. This implements local caching in R environments instead of using AWS `list_query_executions`. This is down to `dbClearResult` clearing S3's Athena output when caching isn't disabled
@@ -243,6 +243,11 @@ setMethod(
res <- AthenaResult(conn =conn, statement= statement, s3_staging_dir = s3_staging_dir)
poll_result <- poll(res)

# if query failed stop
if(poll_result$QueryExecution$Status$State == "FAILED") {
stop(poll_result$QueryExecution$Status$StateChangeReason, call. = FALSE)
}

# cache query metadata if caching is enabled
if (athena_option_env$cache_size > 0) cache_query(poll_result)

@@ -365,7 +370,7 @@ setMethod(
function(conn, schema = NULL, ...){
if (!dbIsValid(conn)) {stop("Connection already closed.", call. = FALSE)}
if(is.null(schema)){
tryCatch(schema <- sapply(conn@ptr$glue$get_databases()$DatabaseList,function(x) x$Name))}
retry_api_call(schema <- sapply(conn@ptr$glue$get_databases()$DatabaseList,function(x) x$Name))}
tryCatch(output <- lapply(schema, function (x) conn@ptr$glue$get_tables(DatabaseName = x)$TableList))
unlist(lapply(output, function(x) sapply(x, function(y) y$Name)))
}
@@ -410,7 +415,7 @@ setMethod("dbGetTables", "AthenaConnection",
function(conn, schema = NULL, ...){
if (!dbIsValid(conn)) {stop("Connection already closed.", call. = FALSE)}
if(is.null(schema)){
tryCatch(schema <- sapply(conn@ptr$glue$get_databases()$DatabaseList,function(x) x$Name))}
retry_api_call(schema <- sapply(conn@ptr$glue$get_databases()$DatabaseList,function(x) x$Name))}
tryCatch(output <- lapply(schema, function (x) conn@ptr$glue$get_tables(DatabaseName = x)$TableList))
rbindlist(lapply(output, function(x) rbindlist(lapply(x, function(y) data.frame(Schema = y$DatabaseName,
TableName=y$Name,
@@ -516,10 +521,24 @@ setMethod(
} else {dbms.name <- conn@info$dbms.name
Table <- tolower(name)}

tryerror <- try(conn@ptr$glue$get_table(DatabaseName = dbms.name, Name = Table), silent = TRUE)
if(inherits(tryerror, "try-error") && !grepl(".*table.*not.*found.*", tryerror[1], ignore.case = T)){
stop(gsub("^Error : ", "", tryerror[1]), call. = F)}
!grepl(".*table.*not.*found.*", tryerror[1], ignore.case = T)
for (i in seq_len(athena_option_env$retry)) {
resp <- tryCatch(conn@ptr$glue$get_table(DatabaseName = dbms.name, Name = Table),
error = function(e) e)

# exponential step back if error and not expected error
if(inherits(resp, "error") && !grepl(".*table.*not.*found.*", resp, ignore.case = T)){
backoff_len <- runif(n=1, min=0, max=(2^i - 1))

if(!athena_option_env$retry_quiet) message(resp, "Request failed. Retrying in ", round(backoff_len, 1), " seconds...")

Sys.sleep(backoff_len)
} else {break}
}


if (inherits(resp, "error") && !grepl(".*table.*not.*found.*", resp, ignore.case = T)) stop(resp)

!grepl(".*table.*not.*found.*", resp[1], ignore.case = T)
})

#' Remove table from Athena
@@ -7,25 +7,29 @@ class(athena_option_env$file_parser) <- "athena_data.table"
cache_dt = data.table("QueryId" = character(), "Query" = character(), "State"= character(),
"StatementType"= character(),"WorkGroup" = character())
athena_option_env$cache_dt <- cache_dt
athena_option_env$retry <- 5
athena_option_env$retry_quiet <- FALSE

# ==========================================================================
# Setting file parser method

#' A method to configue noctua backend options.
#'
#' \code{noctua_options()} provides a method to change the backend. This includes changing the file parser
#' and whether \code{noctua} should cache query ids locally.
#' \code{noctua_options()} provides a method to change the backend. This includes changing the file parser,
#' whether \code{noctua} should cache query ids locally and number of retries on a failed api call.
#' @param file_parser Method to read and write tables to Athena, currently defaults to data.table
#' @param cache_size Number of queries to be cached. Currently only support caching up to 100 distinct queries.
#' @param clear_cache Clears all previous cached query metadata
#' @param retry Maximum number of requests to attempt.
#' @param retry_quiet If \code{FALSE}, will print a message from retry displaying how long until the next request.
#' @return \code{noctua_options()} returns \code{NULL}, invisibly.
#' @examples
#' library(noctua)
#'
#' # change file parser from default data.table to vroom
#' noctua_options("vroom")
#' @export
noctua_options <- function(file_parser = c("data.table", "vroom"), cache_size = 0, clear_cache = FALSE) {
noctua_options <- function(file_parser = c("data.table", "vroom"), cache_size = 0, clear_cache = FALSE, retry = 5, retry_quiet = FALSE) {
file_parser = match.arg(file_parser)
stopifnot(is.logical(clear_cache))

@@ -41,6 +45,8 @@ noctua_options <- function(file_parser = c("data.table", "vroom"), cache_size =
class(athena_option_env$file_parser) <- paste("athena", file_parser, sep = "_")

athena_option_env$cache_size <- cache_size
athena_option_env$retry <- retry
athena_option_env$retry_quiet <- retry_quiet

if(clear_cache) athena_option_env$cache_dt <- athena_option_env$cache_dt[0]

@@ -10,12 +10,12 @@ AthenaResult <- function(conn,
response <- list(QueryExecutionId = NULL)
if (athena_option_env$cache_size > 0) response <- list(QueryExecutionId = check_cache(statement, conn@info$work_group))
if (is.null(response$QueryExecutionId)) {
tryCatch(response <- conn@ptr$Athena$start_query_execution(QueryString = statement,
retry_api_call(response <- conn@ptr$Athena$start_query_execution(QueryString = statement,
QueryExecutionContext = list(Database = conn@info$dbms.name),
ResultConfiguration = ResultConfiguration(conn),
WorkGroup = conn@info$work_group))}
on.exit(if(!is.null(conn@info$expiration)) time_check(conn@info$expiration))
new("AthenaResult", connection = conn, info = response)
new("AthenaResult", connection = conn, info = c(response, list(NextToken = NULL)))
}

#' @rdname AthenaConnection
@@ -67,11 +67,11 @@ setMethod(
} else {

# checks status of query
tryCatch(query_execution <- res@connection@ptr$Athena$get_query_execution(QueryExecutionId = res@info$QueryExecutionId))
retry_api_call(query_execution <- res@connection@ptr$Athena$get_query_execution(QueryExecutionId = res@info$QueryExecutionId))

# stops resource if query is still running
if (!(query_execution$QueryExecution$Status$State %in% c("SUCCEEDED", "FAILED", "CANCELLED"))){
tryCatch(res@connection@ptr$Athena$stop_query_execution(QueryExecutionId = res@info$QueryExecutionId))}
retry_api_call(res@connection@ptr$Athena$stop_query_execution(QueryExecutionId = res@info$QueryExecutionId))}

# clear s3 athena output
# split s3_uri
@@ -144,16 +144,55 @@ setMethod(
stop(result$QueryExecution$Status$StateChangeReason, call. = FALSE)
}

# return metadata of athena data types
retry_api_call(result_class <- res@connection@ptr$Athena$get_query_results(QueryExecutionId = res@info$QueryExecutionId,
MaxResults = as.integer(1))$ResultSet$ResultSetMetadata$ColumnInfo)
if(n >= 0 && n !=Inf){
n = as.integer(n + 1)
if (n > 1000){n = 1000L; message("Info: n has been restricted to 1000 due to AWS Athena limitation")}
tryCatch(result <- res@connection@ptr$Athena$get_query_results(QueryExecutionId = res@info$QueryExecutionId, MaxResults = n))
chunk = n
if (n > 1000L) chunk = 1000L

iterate <- 1:ceiling(n/chunk)

# create empty list shell
dt_list <- list()
length(dt_list) <- max(iterate)

# assign token from AthenaResult class
token <- res@info$NextToken
for (i in iterate){
if(i == max(iterate)) chunk <- as.integer(n - (i-1) * chunk)

# get chunk with retry api call if call fails
retry_api_call(result <- res@connection@ptr$Athena$get_query_results(QueryExecutionId = res@info$QueryExecutionId, NextToken = token, MaxResults = chunk))

# process returned list
output <- lapply(result$ResultSet$Rows, function(x) (sapply(x$Data, function(x) if(length(x) == 0 ) NA else x)))
suppressWarnings(staging_dt <- rbindlist(output, use.names = FALSE))

# remove colnames from first row
if (i == 1 && is.null(token)){
staging_dt <- staging_dt[-1,]
}

token <- result$NextToken
# ensure rownames are not set
rownames(staging_dt) <- NULL

# added staging data.table to list
dt_list[[i]] <- staging_dt
}

output <- lapply(result$ResultSet$Rows, function(x) (sapply(x$Data, function(x) if(length(x) == 0 ) NA else x)))
dt <- rbindlist(output, fill = TRUE)
colnames(dt) <- as.character(unname(dt[1,]))
rownames(dt) <- NULL
return(dt[-1,])
# combined all lists together
dt <- rbindlist(dt_list, use.names = FALSE)

# Update last token in s4 class
eval.parent(substitute(res@info$NextToken <- result$NextToken))

# replace names with actual names
Names <- sapply(result_class, function(x) x$Name)
colnames(dt) <- Names
return(dt)
}

# Added data scan information when returning data from athena
@@ -165,14 +204,10 @@ setMethod(

# connect to s3 and create a bucket object
# download athena output
tryCatch(obj <- res@connection@ptr$S3$get_object(Bucket = result_info$bucket, Key = result_info$key))
retry_api_call(obj <- res@connection@ptr$S3$get_object(Bucket = result_info$bucket, Key = result_info$key))

write_bin(obj$Body, File)

# return metadata of athena data types
tryCatch(result_class <- res@connection@ptr$Athena$get_query_results(QueryExecutionId = res@info$QueryExecutionId,
MaxResults = as.integer(1))$ResultSet$ResultSetMetadata$ColumnInfo)

if(grepl("\\.csv$",result_info$key)){
output <- athena_read(athena_option_env$file_parser, File, result_class)
} else{
@@ -224,7 +259,7 @@ setMethod(
"dbHasCompleted", "AthenaResult",
function(res, ...) {
if (!dbIsValid(res)) {stop("Result already cleared", call. = FALSE)}
tryCatch(query_execution <- res@connection@ptr$Athena$get_query_execution(QueryExecutionId = res@info$QueryExecutionId))
retry_api_call(query_execution <- res@connection@ptr$Athena$get_query_execution(QueryExecutionId = res@info$QueryExecutionId))

if(query_execution$QueryExecution$Status$State %in% c("SUCCEEDED", "FAILED", "CANCELLED")) TRUE
else if (query_execution$QueryExecution$Status$State == "RUNNING") FALSE
@@ -291,7 +326,7 @@ setMethod(
stop(result$QueryExecution$Status$StateChangeReason, call. = FALSE)
}

tryCatch(result <- res@connection@ptr$Athena$get_query_results(QueryExecutionId = res@info$QueryExecutionId,
retry_api_call(result <- res@connection@ptr$Athena$get_query_results(QueryExecutionId = res@info$QueryExecutionId,
MaxResults = as.integer(1)))

Name <- sapply(result$ResultSet$ResultSetMetadata$ColumnInfo, function(x) x$Name)
@@ -245,7 +245,7 @@ upload_data <- function(con, x, name, partition = NULL, s3.location= NULL, file

for (i in 1:length(x)){
obj <- readBin(x[i], "raw", n = file.size(x[i]))
tryCatch(con@ptr$S3$put_object(Body = obj, Bucket = s3_key[[1]], Key = s3_key[[2]][i]))}
retry_api_call(con@ptr$S3$put_object(Body = obj, Bucket = s3_key[[1]], Key = s3_key[[2]][i]))}

invisible(NULL)
}
@@ -111,7 +111,7 @@ db_save_query.AthenaConnection <- function(con, sql, name ,
sql, ";"))
res <- dbExecute(con, tt_sql)
# check if execution failed
query_execution <- res@connection@ptr$Athena$get_query_execution(QueryExecutionId = res@info$QueryExecutionId)
retry_api_call(query_execution <- res@connection@ptr$Athena$get_query_execution(QueryExecutionId = res@info$QueryExecutionId))
if(query_execution$QueryExecution$Status$State == "FAILED") {
stop(query_execution$QueryExecution$Status$StateChangeReason, call. = FALSE)
}
@@ -216,3 +216,24 @@ check_cache = function(query, work_group){
query_id = athena_option_env$cache_dt[get("Query") == query & get("State") == "SUCCEEDED" & get("StatementType") == "DML" & get("WorkGroup") == work_group, get("QueryId")]
if(length(query_id) == 0) return(NULL) else return(query_id[1])
}

# If api call fails retry call
retry_api_call <- function(expr){

for (i in seq_len(athena_option_env$retry)) {
resp <- tryCatch(eval.parent(substitute(expr)),
error = function(e) e)

if(inherits(resp, "error")){
backoff_len <- runif(n=1, min=0, max=(2^i - 1))

if(!athena_option_env$retry_quiet) message(resp, "Request failed. Retrying in ", round(backoff_len, 1), " seconds...")

Sys.sleep(backoff_len)
} else {break}
}

if (inherits(resp, "error")) stop(resp)

resp
}

Some generated files are not rendered by default. Learn more.

Some generated files are not rendered by default. Learn more.

Some generated files are not rendered by default. Learn more.

Some generated files are not rendered by default. Learn more.

Some generated files are not rendered by default. Learn more.

Some generated files are not rendered by default. Learn more.

Some generated files are not rendered by default. Learn more.

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.