Skip to content
Permalink
Browse files
Workflow engine initial implementation
  • Loading branch information
DImuthuUpe committed May 20, 2021
1 parent 4b43148 commit a49c3d4bb383682616d734b8a23c25c60fbf6260
Show file tree
Hide file tree
Showing 25 changed files with 1,295 additions and 0 deletions.
@@ -20,3 +20,6 @@ data-orchestrator/data-orchestrator-api/target
/metadata-service/db-service/client/client.iml
/metadata-service/db-service/server/server.iml
/metadata-service/db-service/stub/stub.iml
/metadata-service/data-builders/data-builders.iml
/metadata-service/db-service/db-service.iml
/data-orchestrator/workflow-engine/workflow-engine.iml
@@ -35,6 +35,7 @@
<modules>
<module>data-orchestrator-api</module>
<module>data-orchestrator-core</module>
<module>workflow-engine</module>
</modules>


@@ -0,0 +1,13 @@
### Service Execution Order

* org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller.Controller
* org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant.Participant
* org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.DataSyncWorkflowManager

### Configure the participant with new tasks

* Extend the task class by BlockingTask or NonBlockingTask class
* Implement methods
* Annotate the class with @BlockingTaskDef or @NonBlockingTaskDef annotations. See ExampleBlockingTask and ExampleNonBlockingTask
* Register task in src/main/resources/task-list.yaml

@@ -0,0 +1,72 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>data-orchestrator</artifactId>
<groupId>org.apache.airavata.data.lake</groupId>
<version>0.01-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>workflow-engine</artifactId>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>helix-core</artifactId>
<version>1.0.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>${log4j.over.slf4j}</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>${yaml.version}</version>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.airavata.datalake.orchestrator.workflow.engine.monitor;

public class AsyncEventMonitor {
}
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller;

import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.CountDownLatch;

@SpringBootApplication()
@ComponentScan("org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller")
public class Controller implements CommandLineRunner {

private final static Logger logger = LoggerFactory.getLogger(Controller.class);

@org.springframework.beans.factory.annotation.Value("${cluster.name}")
private String clusterName;

@org.springframework.beans.factory.annotation.Value("${controller.name}")
private String controllerName;

@org.springframework.beans.factory.annotation.Value("${zookeeper.connection}")
private String zkAddress;

private org.apache.helix.HelixManager zkHelixManager;

private CountDownLatch startLatch = new CountDownLatch(1);
private CountDownLatch stopLatch = new CountDownLatch(1);

@Override
public void run(String... args) throws Exception {
logger.info("Starting Cluster Controller ......");

try {
ZkClient zkClient = new ZkClient(zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT,
ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient);

// Creates the zk cluster if not available
if (!zkHelixAdmin.getClusters().contains(clusterName)) {
zkHelixAdmin.addCluster(clusterName, true);
}

zkHelixAdmin.close();
zkClient.close();

logger.info("Connection to helix cluster : " + clusterName + " with name : " + controllerName);
logger.info("Zookeeper connection string " + zkAddress);

zkHelixManager = HelixControllerMain.startHelixController(zkAddress, clusterName,
controllerName, HelixControllerMain.STANDALONE);
startLatch.countDown();
stopLatch.await();
} catch (Exception ex) {
logger.error("Error in running the Controller: {}", controllerName, ex);
} finally {
disconnect();
}
}

private void disconnect() {
if (zkHelixManager != null) {
logger.info("Controller: {}, has disconnected from cluster: {}", controllerName, clusterName);
zkHelixManager.disconnect();
}
}

public static void main(String args[]) throws Exception {
SpringApplication.run(Controller.class);
}
}

0 comments on commit a49c3d4

Please sign in to comment.