Skip to content
Permalink
Browse files
Rewriting data orchestrator logic to support directory scanning throu…
…gh MFT
  • Loading branch information
DImuthuUpe committed Aug 23, 2021
1 parent 068f161 commit ef6a5b0f0d748a459b59c331f33c032632fa0fbb
Show file tree
Hide file tree
Showing 31 changed files with 577 additions and 1,231 deletions.
@@ -42,7 +42,7 @@ custos_repo: "https://github.com/apache/airavata-custos.git"
custos_git_branch: develop

mft_default_agent_id: agent0
mft_default_agent_host: 10.1.0.33
mft_default_agent_host: 10.1.0.42
mft_default_agent_advertised_url: https://beta.iubemcenter.scigap.org:8443/downloads
mft_default_agent_port: 3333

@@ -10,6 +10,8 @@ outboundEventProcessor:
workflowPort: {{ workflow_manager_grpc_port }}
drmsHost: "{{ datalake_drms_host }}"
drmsPort: {{ datalake_drms_grpc_port }}
mftHost: "{{ mft_api_service_host }}"
mftPort: {{ mft_api_service_grpc_port }}
consumer:
brokerURL: "{{ datalake_data_orch_broker_url }}"
consumerGroup: "{{ datalake_data_orch_broker_consumer_group }}"
@@ -1,6 +1,5 @@
package org.apache.airavata.dataorchestrator.clients.core;

import org.apache.airavata.dataorchestrator.messaging.MessagingEvents;
import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -22,28 +21,28 @@ public AbstractListener(EventPublisher eventPublisher) {

public void onRegistered(NotificationEvent event) throws Exception {
LOGGER.info(" Registration event received for path " + event.getResourcePath());
eventPublisher.publish(event, MessagingEvents.REGISTER);
eventPublisher.publish(event, NotificationEvent.Type.REGISTER);

}

public void onCreated(NotificationEvent event) throws Exception {
LOGGER.info(event.getResourceType() + " " +
event.getResourcePath() + ":" + event.getResourceName() + " Created");
eventPublisher.publish(event, MessagingEvents.CREATE);
event.getResourcePath() + ":" + event.getResourcePath() + " Created");
eventPublisher.publish(event, NotificationEvent.Type.CREATE);

}

public void onModified(NotificationEvent event) throws Exception {
LOGGER.info(event.getResourceType() + " " +
event.getResourcePath() + ":" + event.getResourceName() + " Created");
eventPublisher.publish(event, MessagingEvents.MODIFY);
event.getResourcePath() + ":" + event.getResourcePath() + " Created");
eventPublisher.publish(event, NotificationEvent.Type.MODIFY);

}

public void onDeleted(NotificationEvent event) throws Exception {
LOGGER.info(event.getResourceType() + " " +
event.getResourcePath() + ":" + event.getResourceName() + " Created");
eventPublisher.publish(event, MessagingEvents.DELETE);
event.getResourcePath() + ":" + event.getBasePath() + " Created");
eventPublisher.publish(event, NotificationEvent.Type.DELETE);

}

@@ -1,6 +1,5 @@
package org.apache.airavata.dataorchestrator.clients.core;

import org.apache.airavata.dataorchestrator.messaging.MessagingEvents;
import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;

import java.util.concurrent.ExecutionException;
@@ -11,7 +10,7 @@
public interface EventPublisher {


public void publish(NotificationEvent notificationEvent, MessagingEvents event) throws Exception;
public void publish(NotificationEvent notificationEvent, NotificationEvent.Type eventType) throws Exception;


}
@@ -3,6 +3,7 @@
public class Configuration {
private String listeningPath;
private String hostName;
private int depth = 2;

private Producer producer;

@@ -43,6 +44,14 @@ public void setCustos(Custos custos) {
this.custos = custos;
}

public int getDepth() {
return depth;
}

public void setDepth(int depth) {
this.depth = depth;
}

public static class Producer {
private String brokerURL;
private String publisherId;
@@ -2,7 +2,6 @@

import org.apache.airavata.dataorchestrator.clients.core.EventPublisher;
import org.apache.airavata.dataorchestrator.file.client.model.Configuration;
import org.apache.airavata.dataorchestrator.messaging.MessagingEvents;
import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
import org.apache.airavata.dataorchestrator.messaging.publisher.MessageProducer;
import org.apache.kafka.clients.producer.Callback;
@@ -25,8 +24,8 @@ public FileEventPublisher(Configuration configuration) {
}

@Override
public void publish(NotificationEvent notificationEvent, MessagingEvents event) throws ExecutionException, InterruptedException {
notificationEvent.getContext().setEvent(event);
public void publish(NotificationEvent notificationEvent, NotificationEvent.Type eventType) throws ExecutionException, InterruptedException {
notificationEvent.setEventType(eventType);
messageProducer.publish(configuration.getProducer().getPublisherTopic(), notificationEvent, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
@@ -29,15 +29,12 @@ public class FileWatcher implements Runnable {

private Configuration configuration;


public FileWatcher(File rootFolder, Configuration configuration) throws IOException {
this.rootFolder = rootFolder;
this.configuration = configuration;
}


@Override

public void run() {

LOGGER.info("Watcher service starting at " + rootFolder.getAbsolutePath());
@@ -51,12 +48,9 @@ public void run() {
} catch (Exception e) {
LOGGER.error("Error occurred while watching folder " + rootFolder.getAbsolutePath(), e);
Thread.currentThread().interrupt();

}

}


protected void pollEvents(WatchService watchService) throws Exception {

WatchKey key = watchService.take();
@@ -68,10 +62,10 @@ protected void pollEvents(WatchService watchService) throws Exception {
if (!key.reset()) {
keyPathMap.remove(key);
}

if (keyPathMap.isEmpty()) {
return;
}

}


@@ -82,12 +76,14 @@ protected void notifyListeners(WatchService watchService, WatchEvent.Kind<?> kin

path = parentPath.resolve(path);
File file = path.toFile();
FileEvent event = getFileEvent(file);
Optional<FileEvent> event = getFileEvent(file);

if (kind == ENTRY_CREATE) {

for (AbstractListener listener : listeners) {
listener.onCreated(event);
if (event.isPresent()) {
for (AbstractListener listener : listeners) {
listener.onCreated(event.get());
}
}

if (file.isDirectory()) {
@@ -96,23 +92,22 @@ protected void notifyListeners(WatchService watchService, WatchEvent.Kind<?> kin

} else if (kind == ENTRY_MODIFY) {

for (AbstractListener listener : listeners) {

listener.onModified(event);

if (event.isPresent()) {
for (AbstractListener listener : listeners) {
listener.onModified(event.get());
}
}

} else if (kind == ENTRY_DELETE) {

for (AbstractListener listener : listeners) {
listener.onDeleted(event);
if (event.isPresent()) {
for (AbstractListener listener : listeners) {
listener.onDeleted(event.get());
}
}

}

}


public FileWatcher addListener(AbstractListener listener) {

listeners.add(listener);
@@ -152,24 +147,44 @@ public FileWatcher setListeners(List<AbstractListener> listeners) {
*/


protected FileEvent getFileEvent(File file) {
protected Optional<FileEvent> getFileEvent(File file) {
FileEvent event = new FileEvent();
if (file.isDirectory()) {


String absolutePath = file.getAbsolutePath();
if (configuration.getDepth() > 0) {
String relativePath = absolutePath.substring(configuration.getListeningPath().length());
if (relativePath.startsWith("/")) {
relativePath = relativePath.substring(1);
}
String[] relativeParts = relativePath.split("/");
if (relativeParts.length >= configuration.getDepth()) {
String beginPath = configuration.getListeningPath();
beginPath = beginPath.endsWith("/") ? beginPath.substring(0, beginPath.length()-1) : beginPath;
for (int step = 0; step < configuration.getDepth(); step++) {
beginPath = beginPath + "/" + relativeParts[step];
}
absolutePath = beginPath;
} else {
LOGGER.warn("Depth of path {} is not greater or equal to required depth {}", absolutePath, configuration.getDepth());
return Optional.empty();
}
}

if (new File(absolutePath).isDirectory()) {
event.setResourceType(Constants.FOLDER);
} else {
event.setResourceType(Constants.FILE);
}
event.setResourceName(file.getName());
event.setResourcePath(file.getAbsolutePath());
NotificationEvent.Context context = new NotificationEvent.Context();
context.setOccuredTime(System.currentTimeMillis());
context.setAuthToken(Base64.getEncoder().encodeToString((configuration.getCustos().getServiceAccountId()

event.setResourcePath(absolutePath);
event.setOccuredTime(System.currentTimeMillis());
event.setAuthToken(Base64.getEncoder().encodeToString((configuration.getCustos().getServiceAccountId()
+ ":" + configuration.getCustos().getServiceAccountSecret()).getBytes(StandardCharsets.UTF_8)));
context.setBasePath(configuration.getListeningPath());
context.setTenantId(configuration.getCustos().getTenantId());
context.setHostName(configuration.getHostName());
event.setContext(context);
return event;
event.setBasePath(configuration.getListeningPath());
event.setTenantId(configuration.getCustos().getTenantId());
event.setHostName(configuration.getHostName());
return Optional.of(event);
}

private static void registerDir(Path path, WatchService watchService) throws

This file was deleted.

0 comments on commit ef6a5b0

Please sign in to comment.