Skip to content
Permalink
Browse files
Pre defining download path for parsing to rectify helix context issue
  • Loading branch information
DImuthuUpe committed Sep 3, 2021
1 parent 8d2a5d4 commit d8d08677749b9d3b425c4d74e189ba792d88ac65
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 7 deletions.
@@ -122,6 +122,8 @@ public void submitDataParsingWorkflow(WorkflowInvocationRequest request) throws

ParsingJobListResponse parsingJobs = parserClient.listParsingJobs(ParsingJobListRequest.newBuilder().build());

String tempDownloadPath = "/tmp/" + UUID.randomUUID().toString();

Map<String, StringMap> parserInputMappings = new HashMap<>();
List<DataParsingJob> selectedPJs = parsingJobs.getParsersList().stream().filter(pj -> {
List<DataParsingJobInput> pjis = pj.getDataParsingJobInputsList();
@@ -137,7 +139,7 @@ public void submitDataParsingWorkflow(WorkflowInvocationRequest request) throws
bindings.put("metadata", metadata);
try {
Boolean eval = (Boolean) engine.eval(pji.getSelectionQuery());
stringMap.put(pji.getDataParserInputInterfaceId(), "$DOWNLOAD_PATH");
stringMap.put(pji.getDataParserInputInterfaceId(), tempDownloadPath);
match = match && eval;
} catch (ScriptException e) {
logger.error("Failed to evaluate parsing job {}", pj.getDataParsingJobId());
@@ -169,6 +171,7 @@ public void submitDataParsingWorkflow(WorkflowInvocationRequest request) throws
downloadTask.setMftPort(mftPort);
downloadTask.setSourceResourceId(sourceResourceId);
downloadTask.setSourceCredToken(workflowMessage.getSourceCredentialToken());
downloadTask.setDownloadPath(tempDownloadPath);

taskMap.put(downloadTask.getTaskId(), downloadTask);

@@ -205,7 +208,8 @@ public void submitDataParsingWorkflow(WorkflowInvocationRequest request) throws
mpt.setServiceAccountKey(mftClientId);
mpt.setServiceAccountSecret(mftClientSecret);
mpt.setResourceId(sourceResourceId);
mpt.setJsonFile("$" + dataParsingTask.getTaskId() + "-" + dataParserOutputInterface.getOutputName());
mpt.setJsonFile("$" + dataParsingTask.getTaskId() +
"-" + dataParserOutputInterface.getOutputName());
OutPort dpOut = new OutPort();
dpOut.setNextTaskId(mpt.getTaskId());
dataParsingTask.addOutPort(dpOut);
@@ -72,6 +72,7 @@ public TaskResult runBlockingCode() throws Exception {

String derivedFilePath = getJsonFile();
if (derivedFilePath.startsWith("$")) {
logger.info("Fetching json file path from cotext for key {}", derivedFilePath);
derivedFilePath = getUserContent(derivedFilePath.substring(1), Scope.WORKFLOW);
}

@@ -63,6 +63,9 @@ public class SyncLocalDataDownloadTask extends BlockingTask {
@TaskParam(name = "SourceCredToken")
private final ThreadLocal<String> sourceCredToken = new ThreadLocal<>();

@TaskParam(name = "DownloadPath")
private final ThreadLocal<String> downloadPath = new ThreadLocal<>();

public static void main(String args[]) {


@@ -117,11 +120,10 @@ public TaskResult runBlockingCode() {
}

String downloadUrl = httpDownloadApiResponse.getUrl();
logger.info("Using download URL {}", downloadUrl);
logger.info("Using download URL {} to download file {}", downloadUrl, metadata.getFriendlyName());

String downloadPath = "/tmp/" + metadata.getFriendlyName();
try (BufferedInputStream in = new BufferedInputStream(new URL(downloadUrl).openStream());
FileOutputStream fileOutputStream = new FileOutputStream(downloadPath)) {
FileOutputStream fileOutputStream = new FileOutputStream(getDownloadPath())) {
byte dataBuffer[] = new byte[1024];
int bytesRead;
while ((bytesRead = in.read(dataBuffer, 0, 1024)) != -1) {
@@ -132,9 +134,8 @@ public TaskResult runBlockingCode() {
return new TaskResult(TaskResult.Status.FAILED, "Failed to download file");
}

logger.info("Downloaded to path {}", downloadPath);
logger.info("Downloaded filr {} to path {}", metadata.getFriendlyName(), getDownloadPath());

putUserContent("DOWNLOAD_PATH", downloadPath, Scope.WORKFLOW);
return new TaskResult(TaskResult.Status.COMPLETED, "Success");
}

@@ -201,4 +202,12 @@ public String getTenantId() {
public void setTenantId(String tenantId) {
this.tenantId.set(tenantId);
}

public String getDownloadPath() {
return downloadPath.get();
}

public void setDownloadPath(String downloadPath) {
this.downloadPath.set(downloadPath);
}
}

0 comments on commit d8d0867

Please sign in to comment.