/
bulk.R
163 lines (153 loc) · 6.77 KB
/
bulk.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#' Import a set of documents to an Azure Cosmos DB container
#'
#' @param container A Cosmos DB container object, as obtained by `get_cosmos_container` or `create_cosmos_container`.
#' @param data The data to import. Can be a data frame, or a string containing JSON text.
#' @param init_chunksize The number of rows to import per chunk. `bulk_import` can adjust this number dynamically based on observed performance.
#' @param verbose Whether to print updates to the console as the import progresses.
#' @param procname The stored procedure name to use for the server-side import code. Change this if, for some reason, the default name is taken.
#' @param ... Optional arguments passed to lower-level functions.
#' @details
#' This is a convenience function to import a dataset into a container. It works by creating a stored procedure and then calling it in a loop, passing the to-be-imported data in chunks. The dataset must include a column for the container's partition key or an error will result.
#'
#' Note that this function is not meant for production use. In particular, if the import fails midway through, it will not clean up after itself: you should call `bulk_delete` to remove the remnants of a failed import.
#' @return
#' A list containing the number of rows imported, for each value of the partition key.
#' @seealso
#' [bulk_delete], [cosmos_container]
#' @examples
#' \dontrun{
#'
#' endp <- cosmos_endpoint("https://myaccount.documents.azure.com:443/", key="mykey")
#' db <- get_cosmos_database(endp, "mydatabase")
#' cont <- create_cosmos_container(db, "mycontainer", partition_key="sex")
#'
#' # importing the Star Wars data from dplyr
#' # notice that rows with sex=NA are not imported
#' bulk_import(cont, dplyr::starwars)
#'
#' # importing from a JSON file
#' writeLines(jsonlite::toJSON(dplyr::starwars), "starwars.json")
#' bulk_import(cont, "starwars.json")
#'
#' }
#' @rdname bulk_import
#' @export
bulk_import <- function(container, ...)
{
UseMethod("bulk_import")
}
#' @rdname bulk_import
#' @export
bulk_import.cosmos_container <- function(container, data, init_chunksize=1000, verbose=TRUE,
procname="_AzureCosmosR_bulkImport", ...)
{
# create the stored procedure if necessary
res <- tryCatch(create_stored_procedure(container, procname,
readLines(system.file("srcjs/bulkUpload.js", package="AzureCosmosR"))), error=function(e) e)
if(inherits(res, "error"))
if(!(is.character(res$message) && grepl("HTTP 409", res$message))) # proc already existing is ok
stop(res)
if(is.character(data) && jsonlite::validate(data))
data <- jsonlite::fromJSON(data, simplifyDataFrame=FALSE)
key <- get_partition_key(container)
res <- if(is.null(key))
import_by_key(container, NULL, data, procname, init_chunksize, ...)
else
{
if(is.null(data[[key]]))
stop("Data does not contain partition key", call.=FALSE)
lapply(split(data, data[[key]]), function(partdata)
import_by_key(container, partdata[[key]][1], partdata, procname, init_chunksize, verbose=verbose, ...))
}
invisible(res)
}
import_by_key <- function(container, key, data, procname, init_chunksize, headers=list(), verbose=TRUE, ...)
{
rows_imported <- 0
this_import <- 0
this_chunksize <- avg_chunksize <- init_chunksize
nrows <- nrow(data)
n <- 0
if(!is.null(key))
headers$`x-ms-documentdb-partitionkey` <- jsonlite::toJSON(key)
while(rows_imported < nrows)
{
n <- n + 1
this_chunk <- seq(rows_imported + 1, min(nrows, rows_imported + this_chunksize))
this_import <- exec_stored_procedure(container, procname, list(data[this_chunk, ]), headers=headers, ...)
rows_imported <- rows_imported + this_import
avg_chunksize <- (avg_chunksize * (n-1))/n + this_chunksize/n
if(verbose)
message("Rows imported: ", rows_imported, " this chunk: ", length(this_chunk),
" average chunksize: ", avg_chunksize)
# adjust chunksize based on observed import performance per chunk
this_chunksize <- if(this_import < this_chunksize)
(this_chunksize + this_import)/2
else init_chunksize
}
rows_imported
}
#' Delete a set of documents from an Azure Cosmos DB container
#'
#' @param container A Cosmos DB container object, as obtained by `get_cosmos_container` or `create_cosmos_container`.
#' @param query A query specifying which documents to delete.
#' @param partition_key Optionally, limit the deletion only to documents with this key value.
#' @param procname The stored procedure name to use for the server-side import code. Change this if, for some reason, the default name is taken.
#' @param headers,... Optional arguments passed to lower-level functions.
#' @details
#' This is a convenience function to delete multiple documents from a container. It works by creating a stored procedure and then calling it with the supplied query as a parameter. This function is not meant for production use.
#' @return
#' The number of rows deleted.
#' @seealso
#' [bulk_import], [cosmos_container]
#' @examples
#' \dontrun{
#'
#' endp <- cosmos_endpoint("https://myaccount.documents.azure.com:443/", key="mykey")
#' db <- get_cosmos_database(endp, "mydatabase")
#' cont <- create_cosmos_container(db, "mycontainer", partition_key="sex")
#'
#' # importing the Star Wars data from dplyr
#' bulk_import(cont, dplyr::starwars)
#'
#' # deleting a subset of documents
#' bulk_delete(cont, "select * from mycontainer c where c.gender = 'masculine'")
#'
#' # deleting documents for a specific partition key value
#' bulk_delete(cont, "select * from mycontainer", partition_key="male")
#'
#' # deleting all documents
#' bulk_delete(cont, "select * from mycontainer")
#'
#' }
#' @rdname bulk_delete
#' @export
bulk_delete <- function(container, ...)
{
UseMethod("bulk_delete")
}
#' @rdname bulk_delete
#' @export
bulk_delete.cosmos_container <- function(container, query, partition_key,
procname="_AzureCosmosR_bulkDelete", headers=list(), ...)
{
# create the stored procedure if necessary
res <- tryCatch(create_stored_procedure(container, procname,
readLines(system.file("srcjs/bulkDelete.js", package="AzureCosmosR"))), error=function(e) e)
if(inherits(res, "error"))
if(!(is.character(res$message) && grepl("HTTP 409", res$message))) # proc already existing is ok
stop(res)
if(!is.null(partition_key))
headers$`x-ms-documentdb-partitionkey` <- jsonlite::toJSON(partition_key)
if(length(query) > 1)
query <- paste0(query, collapse="\n")
deleted <- 0
repeat
{
res <- exec_stored_procedure.cosmos_container(container, procname, list(query), headers=headers, ...)
deleted <- deleted + res$deleted
if(!res$continuation)
break
}
invisible(deleted)
}