/
batches.R
128 lines (118 loc) · 4.52 KB
/
batches.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
addBatch <- function(ds, ..., strict = TRUE, first_batch = FALSE, body = list(...)) {
batches_url <- shojiURL(ds, "catalogs", "batches")
if (!strict) {
## This is apparently deprecated in favor of passing in "strict" differently
batches_url <- paste0(batches_url, "?strict=0")
}
if (first_batch) {
## If this is the first batch, it's an "import", so delete the dataset
## if it fails--no need to keep a worthless dataset entity around
do_it <- function(expr) {
tryCatch(eval(expr), error = function(e) {
## We failed to add the batch successfully, so we don't really have
## a useful dataset. So delete the entity that was created initially
with_consent(delete(ds))
stop(e)
})
}
} else {
## Don't print "Result URL" if the job fails because the dataset will
## be rolled back and that URL won't exist
do_it <- suppressMessages
}
do_it({
body <- wrapEntity(body = body)
suppressMessages(crPOST(batches_url, body = toJSON(body)))
})
invisible(refresh(ds))
}
addBatchFile <- function(dataset, file, ...) {
if (grepl("^[a-z0-9]+://", file)) {
## S3, or other file on the web
if (startsWith(file, "s3")) {
## We can post s3 URLs directly
return(addBatch(dataset, url = file, ...))
} else {
## We have to create a source first
return(addBatch(dataset, source = createSource(url = file), ...))
}
} else {
## Local file. Send it as file upload
return(addBatch(dataset, source = createSource(file), ...))
}
}
#' @importFrom httr upload_file
#' @importFrom tools file_ext
createSource <- function(file, url, ...) {
sources_url <- sessionURL("sources")
if (!missing(file)) {
if (file.exists(file)) {
# TODO: remove this special casing for Triple-S metadata files when
# the backend does the needful
type <- NULL
if (file_ext(file) %in% c("sss", "xml")) {
type <- "text/xml"
}
u <- crPOST(sources_url,
body = list(uploaded_file = upload_file(file, type = type)), ...
)
} else {
halt("File not found")
}
} else if (!missing(url)) {
u <- crPOST(sources_url,
body = toJSON(wrapEntity(location = url, ...)),
## TODO: all JSON POSTs/PATCHes/PUTs should declare their content-type
## this is just the first place where the backend has rejected without it
config = add_headers(`Content-Type` = "application/json")
)
} else {
halt("Must provide a file or url to createSource")
}
return(u)
}
#' See the appended batches of this dataset
#' @param x a `CrunchDataset`
#' @return a `BatchCatalog`
#' @export
batches <- function(x) BatchCatalog(crGET(shojiURL(x, "catalogs", "batches")))
#' @importFrom methods initialize
setMethod("initialize", "BatchCatalog", function(.Object, ...) {
.Object <- callNextMethod(.Object, ...)
.Object@index <- .Object@index[order(names(.Object@index))]
return(.Object)
})
setMethod("imported", "BatchCatalog", function(x) {
index(x) <- Filter(function(a) isTRUE(a$status == "imported"), index(x))
return(x)
})
setMethod("pending", "BatchCatalog", function(x) {
index(x) <- Filter(function(a) !isTRUE(a$status == "imported"), index(x))
return(x)
})
#' @rdname describe-catalog
#' @export
setMethod("names", "BatchCatalog", function(x) urls(x))
#' Remove batches from a dataset
#'
#' Sometimes append operations do not succeed, whether due to conflicts between
#' the two datasets or other server-side issues. Failed appends can leave behind
#' "error" status batch records, which can cause confusion. This function lets
#' you delete batches that don't match the status or statuses you want to keep.
#' @param dataset CrunchDataset
#' @param keep character the statuses that you want to keep. By default, batches
#' that don't have either "imported" or "appended" status will be deleted.
#' @return `dataset` with the specified batches removed.
#' @export
cleanseBatches <- function(dataset, keep = c("imported", "appended")) {
bat.cat <- batches(dataset)
to.delete <- names(Filter(function(a) !(a$status %in% keep), index(bat.cat)))
for (u in to.delete) {
try(crDELETE(u))
}
if (length(to.delete)) {
## Bust cache
dropOnly(self(bat.cat))
}
return(dataset)
}