/
predefined_processes.R
266 lines (218 loc) · 10.6 KB
/
predefined_processes.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
#' @include user_defined_processes.R
NULL
# processes endpoint ----
#' List available processes on server
#'
#' List all processes available on the back-end. This returns the R translation of the JSON metadata as lists. This process description
#' is stored internally at the environment package variable `process_list`, which is not directly accessible apart from this function.
#'
#' @param con Connection object (optional) otherwise [active_connection()]
#' is used.
#' @return a list of lists with process_id and description
#' @export
list_processes = function(con=NULL) {
process_list = get(x = "process_list", envir = pkgEnvironment)
# if the list was not called already, then fetch the processes from the back-end, otherwise return the stored data
if (is.null(process_list)) {
tryCatch({
con = .assure_connection(con)
tag = "process_overview"
listOfProcesses = con$request(tag = tag, authorized=con$isLoggedIn(), type = "application/json")
process_list = lapply(listOfProcesses$processes, function(process) {
class(process) = "ProcessInfo"
return(process)
})
names(process_list) = sapply(process_list, function(p) p$id)
class(process_list) = "ProcessList"
assign(x = "process_list", value = process_list, envir = pkgEnvironment)
}, error = .capturedErrorToMessage)
}
return(process_list)
}
#' Describe a process
#'
#' Queries an openEO back-end and retrieves more detailed information about offered processes
#' @param con Authentication object (optional) otherwise [active_connection()]
#' is used.
#' @param process id of a process to be described, the ProcessInfo object or a Process object
#'
#' @return a list of detailed information
#' @export
#' @importFrom rlang is_na
#' @importFrom rlang is_null
describe_process = function(process = NA, con=NULL) {
tryCatch({
process_list = list_processes(con=con)
describeProcess = !missing(process) && !rlang::is_na(process) && !rlang::is_null(process)
if (!describeProcess) {
message("No or invalid process_id(s) or process")
return(invisible(NULL))
}
if (is.null(process_list)) {
message("No processes found or loaded from the back-end")
return(invisible(NULL))
}
if ("function" %in% class(process)) {
message("Parameter process was passed as function. Cannot derive the functions name.")
return(invisible(NULL))
}
if ("Process" %in% class(process)) {
process = process$getId()
}
if ("ProcessInfo" %in% class(process)) {
return(process)
}
if (!is.character(process)) {
message("Cannot derive and interprete the process name.")
return(invisible(NULL))
}
if (!process %in% names(process_list)) {
message(paste("Cannot describe process '", process, "'. Process does not exist.", sep = ""))
invisible(NULL)
}
return(process_list[[process]])
}, error = .capturedErrorToMessage)
}
# ProcessCollection ====
#' Process Collection
#'
#' This object contains template functions for process graph building from the processes offered by an openEO service. This object is
#' an unlocked R6 object, in order to add new functions at runtime.
#'
#' @name ProcessCollection
#'
#' @section Methods:
#' \describe{
#' \item{`$new(con = NULL)`}{The object creator created an openEO connection.}
#' }
#' @section Arguments:
#' \describe{
#' \item{con}{optional an active and authenticated Connection (optional) otherwise [active_connection()]
#' is used.}
#' }
#'
#' @seealso [`processes()`]
NULL
ProcessCollection = R6Class(
"ProcessCollection",
lock_objects = FALSE,
public = list(
# public ====
initialize = function(con=NULL) {
tryCatch({
private$createListOfProcesses(con=con)
if (!is.list(private$processes)) stop("Processes are not provided as list")
for (index in 1:length(private$processes)) {
if (is.null(private$processes[[index]])) {
next
}
pid = private$processes[[index]]$getId()
function_formals = private$processes[[index]]$getFormals()
f = function() {}
formals(f) = function_formals
# probably do a deep copy of the object
# for the body we have the problem that index is addressed as variable in the parent environment. This
# causes a problem at call time, where index is resolve and this means that usually the last element
# of the list will be used as process all the time -> solution: serialize index, gsub on quote, make "{" as.name
# and then as.call
body(f) = quote({
# we can access "private" because the function will be added to the public section of this object
exec_process = private$processes[[index]]$clone(deep=TRUE)
# find new node id:
node_id = .randomNodeId(exec_process$getId(),sep="_")
while (node_id %in% private$getNodeIds()) {
node_id = .randomNodeId(exec_process$getId(),sep="_")
}
private$node_ids = c(private$node_ids,node_id)
#map given parameter of this function to the process parameter / arguments and set value
arguments = exec_process$parameters
# parameter objects should be updated directly, since there is a real object reference
this_param_names = names(formals())
# used match.call before, but it seem that it doesn't resolve the pipe - it is something like data = .
this_arguments = lapply(this_param_names, function(param) get(param))
names(this_arguments) = this_param_names
# special case: value is of type Argument
node = ProcessNode$new(node_id = node_id,process=exec_process,graph=self)
lapply(names(this_arguments), function(param_name, arguments){
call_arg = this_arguments[[param_name]]
arguments[[param_name]]$setProcess(node)
arguments[[param_name]]$setValue(call_arg)
}, arguments = arguments)
return(node)
})
# replace index with the actual number! After creation the index will be gone, so we need to replace the
# index variable with the actually used value of the variable. Only this will enable the correct access
# to the process in the process list once the function is called.
tmp = gsub(body(f),pattern="index",replacement = eval(index))
body(f) = as.call(c(as.name(tmp[1]),parse(text=tmp[2:length(tmp)])))
# register the ProcessNode creator functions on the Graph class
self[[pid]] = f
}
}, error = .capturedErrorToMessage)
}
),
private = list(
# private ====
node_ids = character(),
processes = list(),
createListOfProcesses = function(con) {
process_list = list_processes(con=con)
private$processes = lapply(process_list, function(process_description) {
process_description$process_graph = NULL #remove the optional process_graph part as it is confusing here
return(processFromJson(process_description))
})
},
getNodeIds = function() {private$node_ids}
)
)
#' Get a process graph builder / process collection from the connection
#'
#' Queries the connected back-end for all available processes and collection names and registers them via R functions on
#' a [`ProcessCollection`] object to build a process graph in R. The current [`ProcessCollection`] is stored internally at the package environment
#' variable `process_collection`, which can be fetched and set with `active_process_collection`.
#'
#' @param con a connection to an openEO back-end (optional) otherwise [active_connection()]
#' is used.
#' @param processes the [`ProcessCollection`] that is obtained from `processes()` to be set as the active process collection or left empty to fetch the `ProcessCollection` from the package variable.
#'
#' @return a [`ProcessCollection`] object with the offered processes of the back-end
#'
#' @rdname processes
#' @export
processes = function(con = NULL) {
process_collection = active_process_collection()
if (rlang::is_null(process_collection)) {
tryCatch({
process_collection = ProcessCollection$new(con = con)
void = active_process_collection(processes = process_collection)
}, error = .capturedErrorToMessage)
}
return(process_collection)
}
#' @rdname processes
#' @export
active_process_collection = function(processes=NULL) {
if (is.null(processes)) {
return(get(x = "process_collection", envir = pkgEnvironment))
} else if ("ProcessCollection" %in% class(processes)) {
assign(x = "process_collection", value = processes, envir = pkgEnvironment)
invisible(processes)
} else {
stop(paste0("Cannot set processes collection with object of class '",utils::head(class(processes),1),"'"))
}
}
active_process_list = function(process_list=NULL) {
if (is.null(process_list)) {
return(get(x = "process_list", envir = pkgEnvironment))
} else if ("ProcessList" %in% class(process_list)) {
assign(x = "process_list", value = process_list, envir = pkgEnvironment)
invisible(process_list)
} else {
stop(paste0("Cannot set processes list with object of class '",utils::head(class(process_list),1),"'"))
}
}
demo_processes = function() {
process_list = readRDS(system.file("openeo_processes/openeo_processes_1.2.0.rds",package = "openeo"))
active_process_list(process_list = process_list)
invisible(processes())
}