Skip to content

Commit

Permalink
Merge pull request r-dbi#77 from krlmlr/feature/iter
Browse files Browse the repository at this point in the history
Implement list_tabledata_iter()
  • Loading branch information
hadley committed Jan 26, 2016
2 parents 3cb19d1 + 8305499 commit 2a8c906
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 29 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Expand Up @@ -30,6 +30,7 @@ export(list_datasets)
export(list_projects)
export(list_tabledata)
export(list_tabledata_callback)
export(list_tabledata_iter)
export(list_tables)
export(parse_dataset)
export(parse_table)
Expand Down
2 changes: 2 additions & 0 deletions NEWS.md
@@ -1,3 +1,5 @@
# Version 0.1.0.9000

* New `list_tabledata_iter()` that allows fetching a table in chunks of varying size. (#77, @krlmlr)

* Add support for API keys via the `BIGRQUERY_API_KEY` environment variable. (#49)
87 changes: 58 additions & 29 deletions R/tabledata.r
@@ -1,8 +1,6 @@
#' Retrieve data from a table.
#'
#' \code{list_tabledata} returns a single dataframe.
#' \code{list_tabledata_callback} calls the supplied callback with each page
#' of data.
#'
#' @inheritParams get_table
#' @param callback function called with single argument, the data from the
Expand Down Expand Up @@ -47,6 +45,9 @@ list_tabledata <- function(project, dataset, table, page_size = 1e4,
do.call("rbind", rows)
}

#' @description
#' \code{list_tabledata_callback} calls the supplied callback with each page
#' of data.
#' @rdname list_tabledata
#' @export
list_tabledata_callback <- function(project, dataset, table, callback,
Expand All @@ -61,50 +62,78 @@ list_tabledata_callback <- function(project, dataset, table, callback,
elapsed <- timer()
is_quiet <- function(x) isTRUE(quiet) || (is.na(quiet) && elapsed() < 2)

if (!is_quiet()) cat("Retrieving data")
table_info <- table_info %||% get_table(project, dataset, table)
schema <- table_info$schema
iter <- list_tabledata_iter(
project = project, dataset = dataset, table = table, callback = callback,
table_info = table_info)

url <- sprintf("projects/%s/datasets/%s/tables/%s/data", project, dataset,
table)
cur_page <- 1
rows_fetched <- 0

req <- bq_get(url, query = list(maxResults = page_size))
data <- extract_data(req$rows, schema)
callback(data)
if (!is.null(data)) {
rows_fetched <- rows_fetched + nrow(data)
}
is_complete <- function(rows_fetched) rows_fetched >= as.integer(req$totalRows)
cur_page <- 0L

while(cur_page < max_pages && !is_complete(rows_fetched)) {
while(cur_page < max_pages && !iter$is_complete()) {
if (!is_quiet()) {
cat("\rRetrieving data: ", sprintf("%4.1f", elapsed()), "s", sep = "")
if (cur_page >= 1L) {
cat("\rRetrieving data: ", sprintf("%4.1f", elapsed()), "s", sep = "")
} else {
cat("Retrieving data")
}
}

req <- bq_get(url, query = list(
pageToken = req$pageToken,
maxResults = page_size)
)
data <- extract_data(req$rows, schema)
data <- iter$next_(page_size)
callback(data)
if (!is.null(data)) {
rows_fetched <- rows_fetched + nrow(data)
}

cur_page <- cur_page + 1
}

if (!is_quiet()) cat("\n")

if (isTRUE(warn) && !is_complete(rows_fetched)) {
if (isTRUE(warn) && !iter$is_complete()) {
warning("Only first ", max_pages, " pages of size ", page_size,
" retrieved. Use max_pages = Inf to retrieve all.", call. = FALSE)
" retrieved. Use max_pages = Inf to retrieve all.", call. = FALSE)
}

invisible(TRUE)
}

#' @description
#' \code{list_tabledata_iter} returns a named list with components \code{next_}
#' (a function that fetches rows) and \code{is_complete} (a function that checks
#' if all rows have been fetched).
#' @rdname list_tabledata
#' @export
list_tabledata_iter <- function(project, dataset, table, table_info = NULL) {

table_info <- table_info %||% get_table(project, dataset, table)
schema <- table_info$schema

url <- sprintf("projects/%s/datasets/%s/tables/%s/data", project, dataset,
table)

last_response <<- NULL
rows_fetched <- 0L

next_ <- function(n) {
query <- list(maxResults = n)
query$pageToken <- last_response$pageToken

response <- bq_get(url, query = query)

data <- extract_data(response$rows, schema)
if (!is.null(data)) {
rows_fetched <<- rows_fetched + nrow(data)
}

# Record only page token and total number of rows to reduce memory consumption
last_response <<- response[c("pageToken", "totalRows")]

data
}

is_complete <- function() {
!is.null(last_response) && rows_fetched >= as.integer(last_response$totalRows)
}

list(next_ = next_, is_complete = is_complete)
}

#Types can be loaded into R, record is not supported yet.
converter <- list(
integer = as.integer,
Expand Down
8 changes: 8 additions & 0 deletions man/list_tabledata.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 2a8c906

Please sign in to comment.