Skip to content

Commit

Permalink
Merge pull request #226 from TAMULib/october-dame-sprint-d01211-liste…
Browse files Browse the repository at this point in the history
…ners

schedule file monitor health check and recovery
  • Loading branch information
wwelling committed Oct 10, 2019
2 parents f6ff9cc + d53aff7 commit 98b522d
Show file tree
Hide file tree
Showing 11 changed files with 408 additions and 101 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>edu.tamu</groupId>
<artifactId>metadatatool</artifactId>
<version>1.1.0-SNAPSHOT</version>
<version>1.6.0-SNAPSHOT</version>

<name>Scanned Document Metadata Tool</name>

Expand Down
39 changes: 16 additions & 23 deletions src/main/java/edu/tamu/app/Initialization.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@
import org.springframework.core.io.ResourceLoader;
import org.springframework.stereotype.Component;

import edu.tamu.app.observer.FileMonitorManager;
import edu.tamu.app.observer.FileObserverRegistry;
import edu.tamu.app.observer.MapFileListener;
import edu.tamu.app.observer.ProjectListener;
import edu.tamu.app.service.SyncService;
import edu.tamu.app.utilities.FileSystemUtility;

@Component
Expand All @@ -24,8 +20,12 @@ public class Initialization implements CommandLineRunner {
public static String ASSETS_PATH;

public static String PROJECTS_PATH = "projects";

public static String MAPS_PATH = "maps";

public static int LISTENER_PARALLELISM;
public static int LISTENER_PARALLELISM = 10;

public static long LISTENER_INTERVAL = 1000;

@Value("${app.host}")
private String host;
Expand All @@ -34,43 +34,32 @@ public class Initialization implements CommandLineRunner {
private String assetsPath;

@Value("${app.listener.parallelism:10}")
private int parallelism;
private int listenerParallelism;

@Value("${app.polling.interval:1000}")
private long listenerInterval;

@Value("${app.assets.folders}")
private String[] assetsFolders;

@Autowired
private ResourceLoader resourceLoader;

@Autowired
private FileMonitorManager fileMonitorManager;

@Autowired
private FileObserverRegistry fileObserverRegistry;

@Autowired
private SyncService syncService;

@Override
public void run(String... args) throws Exception {

setHost(host);

setAssetsPath(assetsPath);

setListenerParallelism(parallelism);
setListenerParallelism(listenerParallelism);
setListenerInterval(listenerInterval);

for (String folder : assetsFolders) {
FileSystemUtility.createDirectory(ASSETS_PATH + File.separator + folder);
}

fileObserverRegistry.register(new ProjectListener(ASSETS_PATH, PROJECTS_PATH));
fileObserverRegistry.register(new MapFileListener(ASSETS_PATH, "maps"));

syncService.syncStartup();

// NOTE: this must be last on startup, otherwise it will invoke all file observers
fileMonitorManager.start();
fileObserverRegistry.start();
}

private void setHost(String host) {
Expand All @@ -92,4 +81,8 @@ private void setListenerParallelism(int parallelism) {
LISTENER_PARALLELISM = parallelism;
}

private void setListenerInterval(long interval) {
LISTENER_INTERVAL = interval;
}

}
74 changes: 46 additions & 28 deletions src/main/java/edu/tamu/app/observer/FileMonitorManager.java
Original file line number Diff line number Diff line change
@@ -1,61 +1,57 @@
package edu.tamu.app.observer;

import java.util.Optional;

import javax.annotation.PostConstruct;
import java.util.concurrent.ThreadFactory;

import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class FileMonitorManager {

private static final Logger logger = Logger.getLogger(FileMonitorManager.class);

private static FileAlterationMonitor monitor;
private final FileAlterationMonitor monitor;

@Value("${app.polling.interval}")
private Long interval;
private final SimpleThreadFactory threadFactory;

public FileMonitorManager() {
public FileMonitorManager(long interval) {
monitor = new FileAlterationMonitor(interval);
threadFactory = new SimpleThreadFactory();
monitor.setThreadFactory(threadFactory);
}

public synchronized void start() throws Exception {
logger.info("Starting monitor");
monitor.start();
}

@PostConstruct
public void createMonitor() {
monitor = new FileAlterationMonitor(interval);
public synchronized void stop() throws Exception {
logger.info("Stopping monitor");
monitor.stop();
}

public synchronized boolean isAlive() {
return threadFactory.isMonitorThreadAlive();
}

public void addObserver(FileAlterationObserver observer) {
public synchronized void addObserver(FileAlterationObserver observer) {
logger.info("Adding observer: " + observer.getDirectory());
monitor.addObserver(observer);
}

public void removeObserver(FileAlterationObserver observer) {
public synchronized void removeObserver(FileAlterationObserver observer) {
logger.info("Removing observer: " + observer.getDirectory());
monitor.removeObserver(observer);
}

public void start() throws Exception {
logger.info("Starting monitor");
monitor.start();
public synchronized Iterable<FileAlterationObserver> getObservers() {
return monitor.getObservers();
}

public void stop() throws Exception {
logger.info("Stopping monitor");
monitor.stop();
}

public FileAlterationMonitor getMonitor() {
return monitor;
}

public Optional<FileAlterationObserver> getObserver(String path) {
public synchronized Optional<FileAlterationObserver> getObserver(String path) {
Optional<FileAlterationObserver> observer = Optional.empty();
for (FileAlterationObserver fao : monitor.getObservers()) {
for (FileAlterationObserver fao : getObservers()) {
if (fao.getDirectory().getAbsolutePath().equals(path)) {
observer = Optional.of(fao);
break;
Expand All @@ -64,4 +60,26 @@ public Optional<FileAlterationObserver> getObserver(String path) {
return observer;
}

class SimpleThreadFactory implements ThreadFactory {
private Thread monitorThread;

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
if (r instanceof FileAlterationMonitor) {
monitorThread = thread;
}
return thread;
}

private boolean isMonitorThreadAlive() {
boolean isAlive = false;
if (monitorThread != null) {
isAlive = monitorThread.isAlive();
}
return isAlive;
}

}

}
88 changes: 79 additions & 9 deletions src/main/java/edu/tamu/app/observer/FileObserverRegistry.java
Original file line number Diff line number Diff line change
@@ -1,29 +1,82 @@
package edu.tamu.app.observer;

import static edu.tamu.app.Initialization.ASSETS_PATH;
import static edu.tamu.app.Initialization.LISTENER_INTERVAL;
import static edu.tamu.app.Initialization.MAPS_PATH;
import static edu.tamu.app.Initialization.PROJECTS_PATH;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.io.monitor.FileAlterationListener;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import edu.tamu.app.service.SyncService;

@Service
public class FileObserverRegistry {

private static final Logger logger = Logger.getLogger(FileObserverRegistry.class);

@Autowired
private FileMonitorManager fileMonitorManager;
private AutowireCapableBeanFactory beanFactory;

@Autowired
private AutowireCapableBeanFactory beanFactory;
private SyncService syncService;

private AtomicBoolean started = new AtomicBoolean(false);

public void register(FileListener listener) {
private FileMonitorManager fileMonitorManager;

List<FileAlterationObserver> getObservers() {
List<FileAlterationObserver> observers = new ArrayList<FileAlterationObserver>();
fileMonitorManager.getObservers().forEach(observers::add);
return observers;
}

public synchronized void start() throws Exception {
if (!started.getAndSet(true)) {
logger.info("Starting registry");
fileMonitorManager = new FileMonitorManager(LISTENER_INTERVAL);
register(new ProjectListener(ASSETS_PATH, PROJECTS_PATH));
register(new MapFileListener(ASSETS_PATH, MAPS_PATH));
syncService.sync();
// NOTE: this must be last, otherwise it will invoke all file observers
fileMonitorManager.start();
} else {
logger.info("Registry is already started");
}
}

public synchronized void stop() throws Exception {
if (started.getAndSet(false)) {
logger.info("Stopping file monitor");
logger.info("Removing all observers");
for (FileAlterationObserver observer : fileMonitorManager.getObservers()) {
fileMonitorManager.removeObserver(observer);
}
fileMonitorManager.stop();
}
}

public synchronized void restart() throws Exception {
logger.info("Restarting file monitor");
stop();
start();
}

public synchronized void register(FileListener listener) {
String path = listener.getPath();
try {
dismiss(listener);
dismiss(path);
} catch (Exception e) {
logger.error("Unable to dismiss listener: " + path);
}
Expand All @@ -40,11 +93,7 @@ public void register(FileListener listener) {
}
}

public void dismiss(FileListener listener) throws Exception {
dismiss(listener.getPath());
}

public void dismiss(String path) throws Exception {
public synchronized void dismiss(String path) throws Exception {
Optional<FileAlterationObserver> observer = fileMonitorManager.getObserver(path);
if (observer.isPresent()) {
logger.info("Dismissing listener: " + path);
Expand All @@ -58,4 +107,25 @@ public void dismiss(String path) throws Exception {
}
}

@Scheduled(fixedDelayString = "${app.monitor.health.interval:900000}", initialDelayString = "${app.monitor.health.initDelay:30000}")
void healthCheck() throws Exception {
logger.info("File monitor health check");
if (!fileMonitorManager.isAlive()) {
logger.warn("File monitor thread has stopped!");
restart();
}
for (FileAlterationObserver observer : fileMonitorManager.getObservers()) {
if (observer.getDirectory().exists()) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Observing %s", observer.getDirectory().getName()));
for (FileAlterationListener listener : observer.getListeners()) {
logger.debug(String.format("\twith %s", listener.getClass().getSimpleName()));
}
}
} else {
logger.warn(String.format("Observer directory %s does not exist!", observer.getDirectory().getParent()));
}
}
}

}
17 changes: 7 additions & 10 deletions src/main/java/edu/tamu/app/service/ProjectFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@
import edu.tamu.app.observer.FileObserverRegistry;
import edu.tamu.app.observer.HeadlessDocumentListener;
import edu.tamu.app.observer.StandardDocumentListener;
import edu.tamu.app.service.authority.Authority;
import edu.tamu.app.service.registry.MagpieService;
import edu.tamu.app.service.registry.MagpieServiceRegistry;
import edu.tamu.app.service.repository.Repository;
import edu.tamu.app.service.suggestor.Suggestor;
import edu.tamu.app.utilities.FileSystemUtility;

@Service
Expand Down Expand Up @@ -139,7 +137,7 @@ public Project getOrCreateProject(String projectName) {
return project;
}

public Project createProject(String projectName) {
public synchronized Project createProject(String projectName) {

JsonNode projectNode = getProjectNode(projectName);

Expand Down Expand Up @@ -234,28 +232,27 @@ protected List<ProjectSuggestor> getProjectSuggestors(JsonNode projectNode) {
}

public void registerServiceListeners(Project project) {

// NOTE: ProjectRepository is a Destination, either a Repository or Preservation
project.getRepositories().forEach(repository -> {
Repository registeredRepository = (Repository) projectServiceRegistry.getService(repository.getName());
MagpieService registeredRepository = projectServiceRegistry.getService(repository.getName());
if (registeredRepository == null) {
projectServiceRegistry.register(project, repository);
}
});

project.getAuthorities().forEach(authority -> {
Authority registeredAuthority = (Authority) projectServiceRegistry.getService(authority.getName());
MagpieService registeredAuthority = projectServiceRegistry.getService(authority.getName());
if (registeredAuthority == null) {
projectServiceRegistry.register(project, authority);
}
});

project.getSuggestors().forEach(suggestor -> {
Suggestor registeredSuggestor = (Suggestor) projectServiceRegistry.getService(suggestor.getName());
MagpieService registeredSuggestor = projectServiceRegistry.getService(suggestor.getName());
if (registeredSuggestor == null) {
projectServiceRegistry.register(project, suggestor);
}
});

}

public Map<ServiceType, List<String>> getProjectRepositoryTypes() {
Expand Down Expand Up @@ -326,7 +323,7 @@ public JsonNode getProjectNode(String projectName) {
return profileNode;
}

public List<MetadataFieldGroup> getProjectFields(String projectName) {
public synchronized List<MetadataFieldGroup> getProjectFields(String projectName) {

Instant start = Instant.now();

Expand Down
Loading

0 comments on commit 98b522d

Please sign in to comment.