Skip to content
Permalink
Browse files
Integrating data parsing workflow
  • Loading branch information
DImuthuUpe committed Aug 7, 2021
1 parent 59fb3df commit cdbea1266872de8c83d0e592750b1a2fc77e2675
Show file tree
Hide file tree
Showing 26 changed files with 980 additions and 62 deletions.
@@ -18,6 +18,7 @@
firewalld: port="{{ item }}/tcp"
zone=public permanent=true state=enabled immediate=yes
with_items:
- "{{ mft_api_service_grpc_port }}"
- "{{ mft_default_agent_port }}"
- "{{ mft_consul_port }}"
- "{{ mft_resource_service_grpc_port }}"
@@ -1,7 +1,7 @@
[
{
"type": "SCP",
"secretId": "ssh_storage_preference_emc_source",
"secretId": "ed1af924-2a91-412c-bedf-d9321252456d",
"user": "ubuntu",
"privateKey": "{{ vault_mft_agent_default_ssh_private_key }}",
"publicKey": "{{ vault_mft_agent_default_ssh_public_key }}",
@@ -41,7 +41,7 @@ case $1 in
echo "Starting $SERVICE_NAME ..."
if [ ! -f $PID_PATH_NAME ]; then
nohup java ${JAVA_OPTS} -classpath "${AIRAVATA_CLASSPATH}" \
org.apache.airavata.datalake.orchestrator.APIServerInitializer ${AIRAVATA_COMMAND} $* > $LOG_FILE 2>&1 &
org.apache.airavata.datalake.orchestrator.DataOrchestratorAPIRunner ${AIRAVATA_COMMAND} $* > $LOG_FILE 2>&1 &
echo $! > $PID_PATH_NAME
echo "$SERVICE_NAME started ..."
else
@@ -90,7 +90,7 @@ case $1 in
rm $PID_PATH_NAME
echo "$SERVICE_NAME starting ..."
nohup java ${JAVA_OPTS} -classpath "${AIRAVATA_CLASSPATH}" \
org.apache.airavata.datalake.orchestrator.APIServerInitializer ${AIRAVATA_COMMAND} $* > $LOG_FILE 2>&1 &
org.apache.airavata.datalake.orchestrator.DataOrchestratorAPIRunner ${AIRAVATA_COMMAND} $* > $LOG_FILE 2>&1 &
echo $! > $PID_PATH_NAME
echo "$SERVICE_NAME started ..."
else
@@ -67,5 +67,5 @@ do
done

java ${JAVA_OPTS} -classpath "${AIRAVATA_CLASSPATH}" \
org.apache.airavata.datalake.orchestrator.APIServerInitializer ${AIRAVATA_COMMAND} $*
org.apache.airavata.datalake.orchestrator.DataOrchestratorAPIRunner ${AIRAVATA_COMMAND} $*

@@ -43,8 +43,8 @@
@EnableJpaAuditing
@EnableJpaRepositories("org.apache.airavata.datalake")
@EntityScan("org.apache.airavata.datalake")
public class APIServerInitializer implements CommandLineRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(APIServerInitializer.class);
public class DataOrchestratorAPIRunner implements CommandLineRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(DataOrchestratorAPIRunner.class);

@Autowired
private OrchestratorEventHandler orchestratorEventHandler;
@@ -53,7 +53,7 @@ public class APIServerInitializer implements CommandLineRunner {
private String configPath;

public static void main(String[] args) {
SpringApplication.run(APIServerInitializer.class, args);
SpringApplication.run(DataOrchestratorAPIRunner.class, args);
}

@Override
@@ -28,6 +28,7 @@
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;
import java.util.Optional;

@GRpcService
public class DataParserApiHandler extends DataParserServiceGrpc.DataParserServiceImplBase {
@@ -50,32 +51,51 @@ public void registerParser(ParserRegisterRequest request, StreamObserver<ParserR

@Override
public void listParsers(ParserListRequest request, StreamObserver<ParserListResponse> responseObserver) {
DozerBeanMapper mapper = new DozerBeanMapper();

ParserListResponse.Builder response = ParserListResponse.newBuilder();

List<DataParserEntity> allParsers = parserRepo.findAll();
allParsers.forEach(dataParserEntity -> {
DataParser.Builder parserBuilder = DataParser.newBuilder();
mapper.map(dataParserEntity, parserBuilder);
dataParserEntity.getInputInterfacesList().forEach(dataParserInputInterfaceEntity -> {
DataParserInputInterface.Builder inputBuilder = DataParserInputInterface.newBuilder();
mapper.map(dataParserInputInterfaceEntity, inputBuilder);
parserBuilder.addInputInterfaces(inputBuilder);
});

dataParserEntity.getOutputInterfacesList().forEach(dataParserOutputInterfaceEntity -> {
DataParserOutputInterface.Builder outputBuilder = DataParserOutputInterface.newBuilder();
mapper.map(dataParserOutputInterfaceEntity, outputBuilder);
parserBuilder.addOutputInterfaces(outputBuilder);
});
response.addParsers(parserBuilder);
response.addParsers(mapParser(dataParserEntity));
});

responseObserver.onNext(response.build());
responseObserver.onCompleted();
}

@Override
public void fetchParser(ParserFetchRequest request, StreamObserver<ParserFetchResponse> responseObserver) {
Optional<DataParserEntity> entityOp = this.parserRepo.findById(request.getParserId());
if (entityOp.isPresent()) {
responseObserver.onNext(ParserFetchResponse.newBuilder().setParser(mapParser(entityOp.get())).build());
responseObserver.onCompleted();

} else {
responseObserver.onError(new Exception("Couldn't find a parser with id " + request.getParserId()));
}
}

private DataParser.Builder mapParser(DataParserEntity dataParserEntity) {
DozerBeanMapper mapper = new DozerBeanMapper();

DataParser.Builder parserBuilder = DataParser.newBuilder();
mapper.map(dataParserEntity, parserBuilder);
dataParserEntity.getInputInterfacesList().forEach(dataParserInputInterfaceEntity -> {
DataParserInputInterface.Builder inputBuilder = DataParserInputInterface.newBuilder();
mapper.map(dataParserInputInterfaceEntity, inputBuilder);
parserBuilder.addInputInterfaces(inputBuilder);
});

dataParserEntity.getOutputInterfacesList().forEach(dataParserOutputInterfaceEntity -> {
DataParserOutputInterface.Builder outputBuilder = DataParserOutputInterface.newBuilder();
mapper.map(dataParserOutputInterfaceEntity, outputBuilder);
parserBuilder.addOutputInterfaces(outputBuilder);
});

return parserBuilder;
}

@Override
public void registerParsingJob(ParsingJobRegisterRequest request, StreamObserver<ParsingJobRegisterResponse> responseObserver) {
DozerBeanMapper mapper = new DozerBeanMapper();
@@ -77,6 +77,14 @@ message ParserRegisterResponse {
string parserId = 1;
}

message ParserFetchRequest {
string parserId = 1;
}

message ParserFetchResponse {
DataParser parser = 1;
}

message ParserListRequest {

}
@@ -103,6 +111,10 @@ message ParsingJobListResponse {


service DataParserService {

rpc fetchParser (ParserFetchRequest) returns (ParserFetchResponse) {
}

rpc registerParser (ParserRegisterRequest) returns (ParserRegisterResponse) {
}

@@ -33,6 +33,11 @@ public DataSyncWorkflowManager dataSyncWorkflowManager() {
return new DataSyncWorkflowManager();
}

@Bean(initMethod = "init")
public DataParsingWorkflowManager dataParsingWorkflowManager() {
return new DataParsingWorkflowManager();
}

@Bean
public CallbackWorkflowStore callbackWorkflowStore() {
return new CallbackWorkflowStore();
@@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.airavata.datalake.workflow.engine.wm.datasync;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.apache.airavata.datalake.data.orchestrator.api.stub.parsing.*;
import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowInvocationRequest;
import org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowMessage;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.GenericDataParsingTask;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.MetadataPersistTask;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.SyncLocalDataDownloadTask;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.types.StringMap;
import org.apache.airavata.datalake.orchestrator.workflow.engine.wm.CallbackWorkflowStore;
import org.apache.airavata.datalake.orchestrator.workflow.engine.wm.WorkflowOperator;
import org.apache.airavata.mft.api.client.MFTApiClient;
import org.apache.airavata.mft.api.service.FetchResourceMetadataRequest;
import org.apache.airavata.mft.api.service.FileMetadataResponse;
import org.apache.airavata.mft.api.service.MFTApiServiceGrpc;
import org.apache.airavata.mft.common.AuthToken;
import org.apache.airavata.mft.common.DelegateAuth;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import javax.script.*;
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;

public class DataParsingWorkflowManager {
private final static Logger logger = LoggerFactory.getLogger(DataParsingWorkflowManager.class);

@org.springframework.beans.factory.annotation.Value("${cluster.name}")
private String clusterName;

@org.springframework.beans.factory.annotation.Value("${parsing.wm.name}")
private String workflowManagerName;

@org.springframework.beans.factory.annotation.Value("${zookeeper.connection}")
private String zkAddress;

@org.springframework.beans.factory.annotation.Value("${mft.host}")
private String mftHost;

@org.springframework.beans.factory.annotation.Value("${mft.port}")
private int mftPort;

@org.springframework.beans.factory.annotation.Value("${drms.host}")
private String drmsHost;

@org.springframework.beans.factory.annotation.Value("${drms.port}")
private int drmsPort;

private String mftClientId = "mft-agent";

private String mftClientSecret = "kHqH27BloDCbLvwUA8ZYRlHcJxXZyby9PB90bTdU";


@Autowired
private CallbackWorkflowStore callbackWorkflowStore;

private WorkflowOperator workflowOperator;

public void init() throws Exception {
workflowOperator = new WorkflowOperator();
workflowOperator.init(clusterName, workflowManagerName, zkAddress, callbackWorkflowStore);
logger.info("Successfully initialized Data Parsing Workflow Manager");
}

public void submitDataParsingWorkflow(WorkflowInvocationRequest request) throws Exception {

WorkflowMessage workflowMessage = request.getMessage();
logger.info("Processing parsing workflow for resource {}", workflowMessage.getSourceResourceId());

MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(mftHost, mftPort);

DelegateAuth delegateAuth = DelegateAuth.newBuilder()
.setUserId(workflowMessage.getUsername())
.setClientId(mftClientId)
.setClientSecret(mftClientSecret)
.putProperties("TENANT_ID", workflowMessage.getTenantId()).build();

FileMetadataResponse metadata = mftClient.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
.setResourceType("SCP")
.setResourceId(workflowMessage.getSourceResourceId())
.setResourceToken(workflowMessage.getSourceCredentialToken())
.setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build()).build());

ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6566).usePlaintext().build();
DataParserServiceGrpc.DataParserServiceBlockingStub parserClient = DataParserServiceGrpc.newBlockingStub(channel);

ParsingJobListResponse parsingJobs = parserClient.listParsingJobs(ParsingJobListRequest.newBuilder().build());

Map<String, StringMap> parserInputMappings = new HashMap<>();
List<DataParsingJob> selectedPJs = parsingJobs.getParsersList().stream().filter(pj -> {
List<DataParsingJobInput> pjis = pj.getDataParsingJobInputsList();

boolean match = true;
StringMap stringMap = new StringMap();
for (DataParsingJobInput pji : pjis) {

ScriptEngine engine = new ScriptEngineManager().getEngineByName("JavaScript");
Bindings bindings = engine.getBindings(ScriptContext.ENGINE_SCOPE);
bindings.put("polyglot.js.allowHostAccess", true);
bindings.put("polyglot.js.allowHostClassLookup", (Predicate<String>) s -> true);
bindings.put("metadata", metadata);
try {
Boolean eval = (Boolean) engine.eval(pji.getSelectionQuery());
stringMap.put(pji.getDataParserInputInterfaceId(), "$DOWNLOAD_PATH");
match = match && eval;
} catch (ScriptException e) {
logger.error("Failed to evaluate parsing job {}", pj.getDataParsingJobId());
match = false;
}
}

if (match) {
parserInputMappings.put(pj.getParserId(), stringMap);
}
return match;
}).collect(Collectors.toList());

Map<String, AbstractTask> taskMap = new HashMap<>();

SyncLocalDataDownloadTask downloadTask = new SyncLocalDataDownloadTask();
downloadTask.setTaskId("SLDT-" + UUID.randomUUID().toString());
downloadTask.setMftClientId(mftClientId);
downloadTask.setMftClientSecret(mftClientSecret);
downloadTask.setUserId(workflowMessage.getUsername());
downloadTask.setTenantId(workflowMessage.getTenantId());
downloadTask.setMftHost(mftHost);
downloadTask.setMftPort(mftPort);
downloadTask.setSourceResourceId(workflowMessage.getSourceResourceId());
downloadTask.setSourceCredToken(workflowMessage.getSourceCredentialToken());

taskMap.put(downloadTask.getTaskId(), downloadTask);

for(String parserId: parserInputMappings.keySet()) {

GenericDataParsingTask dataParsingTask = new GenericDataParsingTask();
dataParsingTask.setTaskId("DPT-" + UUID.randomUUID().toString());
dataParsingTask.setParserId(parserId);
dataParsingTask.setInputMapping(parserInputMappings.get(parserId));
taskMap.put(dataParsingTask.getTaskId(), dataParsingTask);

OutPort outPort = new OutPort();
outPort.setNextTaskId(dataParsingTask.getTaskId());
downloadTask.addOutPort(outPort);

DataParsingJob dataParsingJob = selectedPJs.stream().filter(pj -> pj.getParserId().equals(parserId)).findFirst().get();
ParserFetchResponse parser = parserClient.fetchParser(ParserFetchRequest.newBuilder().setParserId(parserId).build());

for (DataParserOutputInterface dataParserOutputInterface: parser.getParser().getOutputInterfacesList()) {

Optional<DataParsingJobOutput> dataParsingJobOutput = dataParsingJob.getDataParsingJobOutputsList().stream().filter(o ->
o.getDataParserOutputInterfaceId().equals(dataParserOutputInterface.getParserOutputInterfaceId()))
.findFirst();

if (dataParsingJobOutput.isPresent() && dataParsingJobOutput.get().getOutputType().equals("JSON")) {
MetadataPersistTask mpt = new MetadataPersistTask();
mpt.setTaskId("MPT-" + UUID.randomUUID().toString());
mpt.setDrmsHost(drmsHost);
mpt.setDrmsPort(drmsPort);
mpt.setTenant(workflowMessage.getTenantId());
mpt.setUser(workflowMessage.getUsername());
mpt.setServiceAccountKey(mftClientId);
mpt.setServiceAccountSecret(mftClientSecret);
mpt.setResourceId(workflowMessage.getSourceResourceId());
mpt.setJsonFile("$" + dataParsingTask.getTaskId() + "-" + dataParserOutputInterface.getOutputName());
OutPort dpOut = new OutPort();
dpOut.setNextTaskId(mpt.getTaskId());
dataParsingTask.addOutPort(dpOut);
taskMap.put(mpt.getTaskId(), mpt);
}
}

}

String[] startTaskIds = {downloadTask.getTaskId()};
String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);

logger.info("Submitted workflow {} to parse resource {}", workflowId, workflowMessage.getSourceResourceId());
}
}
@@ -40,11 +40,16 @@ public class WorkflowEngineAPIHandler extends WorkflowServiceGrpc.WorkflowServic
@Autowired
private DataSyncWorkflowManager dataSyncWorkflowManager;

@Autowired
private DataParsingWorkflowManager dataParsingWorkflowManager;

@Override
public void invokeWorkflow(WorkflowInvocationRequest request,
StreamObserver<WorkflowInvocationResponse> responseObserver) {
try {
dataSyncWorkflowManager.submitDataSyncWorkflow(request);
logger.info("Invoking workflow executor for resource {}", request.getMessage().getSourceResourceId());
//dataSyncWorkflowManager.submitDataSyncWorkflow(request);
dataParsingWorkflowManager.submitDataParsingWorkflow(request);
responseObserver.onNext(WorkflowInvocationResponse.newBuilder().setStatus(true).build());
responseObserver.onCompleted();
} catch (Exception ex) {

0 comments on commit cdbea12

Please sign in to comment.