forked from ga4gh/ga4gh-starter-kit-wes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
SubmitRunRequestHandler.java
248 lines (218 loc) · 9.95 KB
/
SubmitRunRequestHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
package org.ga4gh.starterkit.wes.utils.requesthandler;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.ga4gh.starterkit.common.exception.BadRequestException;
import org.ga4gh.starterkit.common.exception.ConflictException;
import org.ga4gh.starterkit.common.hibernate.exception.EntityExistsException;
import org.ga4gh.starterkit.common.requesthandler.RequestHandler;
import org.ga4gh.starterkit.wes.config.WesServiceProps;
import org.ga4gh.starterkit.wes.model.WesServiceInfo;
import org.ga4gh.starterkit.wes.model.WorkflowEngine;
import org.ga4gh.starterkit.wes.model.RunId;
import org.ga4gh.starterkit.wes.model.WesRun;
import org.ga4gh.starterkit.wes.model.WorkflowType;
import org.ga4gh.starterkit.wes.utils.DrsUrlResolver;
import org.ga4gh.starterkit.wes.utils.hibernate.WesHibernateUtil;
import org.ga4gh.starterkit.wes.utils.runmanager.RunManager;
import org.ga4gh.starterkit.wes.utils.runmanager.RunManagerFactory;
import org.springframework.beans.factory.annotation.Autowired;
/**
* Request handling logic for submitting a new run request
*
* @see org.ga4gh.starterkit.wes.controller.Runs#createRun createRun
*/
public class SubmitRunRequestHandler implements RequestHandler<RunId> {
@Autowired
private WesServiceInfo serviceInfo;
@Autowired
private WesServiceProps serviceProps;
@Autowired
private WesHibernateUtil hibernateUtil;
@Autowired
private RunManagerFactory runLauncherFactory;
private WorkflowType workflowType;
private String workflowTypeVersion;
private String workflowUrl;
private String workflowParams;
private String tags;
private List<String> workflowAttachment;
/**
* Instantiates a new SubmitRunRequestHandler object
*/
public SubmitRunRequestHandler() {
}
/**
* Prepares the request handler with input params from the controller function
* @param workflowType workflow language specification
* @param workflowTypeVersion workflow language specification version
* @param workflowUrl URL to workflow source
* @param workflowParams raw JSON string of workflow run input parameters
* @param tags raw JSON string indicating key:value tags
* @param workflowAttachment string array indicating files to upload
* @return the prepared request handler
*/
public SubmitRunRequestHandler prepare(WorkflowType workflowType,
String workflowTypeVersion, String workflowUrl,
String workflowParams, String tags, List<String> workflowAttachment
) {
this.workflowType = workflowType;
this.workflowTypeVersion = workflowTypeVersion;
this.workflowUrl = workflowUrl;
this.workflowParams = workflowParams;
this.tags = tags;
this.workflowAttachment = workflowAttachment;
return this;
}
/**
* submits a new workflow run request and returns the id
*/
public RunId handleRequest() {
try {
validateRunRequest();
WesRun wesRun = prepareRun();
String resolvedWorkflowParams = resolveWorkflowParams();
if (resolvedWorkflowParams != null) {
wesRun.setWorkflowParams(resolvedWorkflowParams);
}
launchRun(wesRun);
return wesRun.toRunId();
} catch (Exception ex) {
throw new ConflictException("Could not register new WorkflowRun");
}
}
/**
* validates that run request parameters are valid
* @throws BadRequestException an unsupported or malformed parameter was provided
*/
private void validateRunRequest() throws BadRequestException {
// Validate workflowType
// - assert requested workflowType is supported according to ServiceInfo
if (!serviceInfo.isWorkflowTypeSupported(workflowType)) {
throw new BadRequestException(
"Unsupported workflow_type: '" + workflowType
+ "'. Supported workflow types: " + serviceInfo.getWorkflowTypeVersions().keySet()
);
}
// Validate workflowTypeVersion
// - assert requested version is supported according to ServiceInfo
if (!serviceInfo.isWorkflowTypeVersionSupported(workflowType, workflowTypeVersion)) {
throw new BadRequestException(
"Unsupported workflow_type_version: '"
+ workflowTypeVersion
+ "'. Supported workflow type versions: "
+ serviceInfo.getWorkflowTypeVersions().get(workflowType)
);
}
// Validate workflowParams
// - assert it is valid JSON
try {
ObjectMapper mapper = new ObjectMapper();
mapper.readTree(workflowParams);
} catch (IOException e) {
throw new BadRequestException("Supplied workflow_params not valid JSON");
}
// TODO add workflow_url validation
// 'tags' not evaluated, not supported
}
/**
* Performs initial preparation when request is valid
* @return a persistent WesRun entity to track the workflow run job
* @throws EntityExistsException a new WesRun instance could not be created
*/
private WesRun prepareRun() throws EntityExistsException {
// create the WesRun, and check if a WesRun by the same id already exists
// in the database. if so, regenerate the WesRun
WesRun wesRun = createWesRun();
boolean exists = hibernateUtil.readEntityObject(WesRun.class, wesRun.getId(), false) != null;
while (exists) {
wesRun = createWesRun();
exists = hibernateUtil.readEntityObject(WesRun.class, wesRun.getId(), false) != null;
}
// persist the WesRun and return the persistent instance
hibernateUtil.createEntityObject(WesRun.class, wesRun);
return wesRun;
}
/**
* Launches the prepared workflow run
* @param wesRun persistent WesRun entity tracking the workflow run job
* @throws ConflictException an exception was encountered
* @throws Exception an exception was encountered
*/
private void launchRun(WesRun wesRun) throws ConflictException, Exception {
// create a low-level RunManager instance from the factory, allow
// the RunManager that has knowledge of the requested workflow type and
// engine to handle the submission
RunManager runLauncher = runLauncherFactory.createRunManager(wesRun);
if (runLauncher == null) {
throw new ConflictException("Could not setup or launch workflow run");
}
runLauncher.setupAndLaunchRun();
}
/**
* Creates a new, non-persistent WesRun object based on input parameters
* @return non-persistent WesRun object
*/
private WesRun createWesRun() {
WesRun wesRun = new WesRun();
wesRun.setId(UUID.randomUUID().toString());
wesRun.setWorkflowType(workflowType);
wesRun.setWorkflowTypeVersion(workflowTypeVersion);
wesRun.setWorkflowUrl(workflowUrl);
wesRun.setWorkflowParams(workflowParams);
// TODO this is hardcoded to NATIVE workflow engine, parameterize to
// allow other workflow engines
wesRun.setWorkflowEngine(WorkflowEngine.NATIVE);
wesRun.setWorkflowEngineVersion(null);
return wesRun;
}
/**
* Resolve any DRS URLs in the input parameters to URLs or file paths, leaving any
* non-DRS URLs untouched
* @return raw JSON string of modified input parameters
*/
private String resolveWorkflowParams() {
try {
// read the input params JSON into a map
ObjectMapper mapper = new ObjectMapper();
Map workflowParamsMap = mapper.readValue(workflowParams, Map.class);
for (Object key : workflowParamsMap.keySet()) {
String inputs = "";
// Check the parameter inputs for a space and split if exists
// Each indivudal entry will then be checked for a DRS URI
// If not a DRS URI the path will be used as is
// Otherwise resolved DRS path would be added to the newly formed input string
if (((String)workflowParamsMap.get(key)).contains(" ")) {
String [] parts = ((String)workflowParamsMap.get(key)).split(" ");
String resolvedPathOrUrl = DrsUrlResolver.resolveAccessPathOrUrl(parts[0], serviceProps.getDrsDockerContainer());
if (resolvedPathOrUrl == null) {
inputs = parts[0];
} else {
inputs = resolvedPathOrUrl;
}
for (int i = 1; i < parts.length; i++) {
resolvedPathOrUrl = DrsUrlResolver.resolveAccessPathOrUrl(parts[i], serviceProps.getDrsDockerContainer());
if (resolvedPathOrUrl == null) {
inputs = inputs + " " + parts[i];
} else {
inputs = inputs + " " + resolvedPathOrUrl;
}
}
workflowParamsMap.put(key, inputs);
} else {
// if the value is found to have been a DRS URL, then resolve it
// and override the DRS URL with the raw path
String resolvedPathOrUrl = DrsUrlResolver.resolveAccessPathOrUrl(workflowParamsMap.get(key), serviceProps.getDrsDockerContainer());
if (resolvedPathOrUrl != null) {
workflowParamsMap.put(key, resolvedPathOrUrl);
}
}
}
return mapper.writeValueAsString(workflowParamsMap);
} catch (IOException ex) {
throw new BadRequestException("Supplied workflow_params not valid JSON");
}
}
}