Skip to content
75 changes: 13 additions & 62 deletions src/main/java/edu/tamu/app/observer/AbstractDocumentListener.java
Original file line number Diff line number Diff line change
@@ -1,59 +1,42 @@
package edu.tamu.app.observer;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.monitor.FileAlterationObserver;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import edu.tamu.app.exception.DocumentNotFoundException;
import edu.tamu.app.model.Document;
import edu.tamu.app.model.IngestType;
import edu.tamu.app.model.Project;
import edu.tamu.app.model.ProjectRepository;
import edu.tamu.app.model.repo.DocumentRepo;
import edu.tamu.app.model.repo.ProjectRepo;
import edu.tamu.app.service.DocumentFactory;
import edu.tamu.app.service.registry.MagpieServiceRegistry;
import edu.tamu.app.service.repository.Repository;

public abstract class AbstractDocumentListener extends AbstractFileListener {

private static final Logger logger = Logger.getLogger(AbstractDocumentListener.class);

protected static final Map<String, List<String>> pendingResources = new ConcurrentHashMap<String, List<String>>();

protected static final ExecutorService executor = Executors.newFixedThreadPool(10);

private static final List<Document> publishQueue = new CopyOnWriteArrayList<Document>();

private static final AtomicInteger publishing = new AtomicInteger(0);

@Value("${app.document.publish.concurrency:5}")
private int publishConcurrency;
protected static final ExecutorService executor = Executors.newFixedThreadPool(1);

@Autowired
private ProjectRepo projectRepo;

@Autowired
protected DocumentFactory documentFactory;

@Autowired
protected DocumentRepo documentRepo;

@Autowired
private MagpieServiceRegistry projectServiceRegistry;
protected DocumentFactory documentFactory;

public AbstractDocumentListener(String root, String folder) {
super(root, folder);
Expand All @@ -69,16 +52,10 @@ public void onDirectoryCreate(File directory) {
Document existingDocument = documentRepo.findByProjectNameAndName(directory.getParentFile().getName(), directory.getName());
if (existingDocument == null) {
initializePendingResources(directory.getName());
createDocument(directory).thenAccept(newDocument -> {
if (newDocument != null) {
logger.info("Document created: " + newDocument.getName());
publishToRepositories(newDocument);
} else {
logger.warn("Unable to create document!");
}
createDocument(directory).thenAccept(document -> {
createdDocumentCallback(document);
});
}

}

@Override
Expand Down Expand Up @@ -133,7 +110,7 @@ protected void initializePendingResources(String documentName) {
pendingResources.put(documentName, new ArrayList<String>());
}

protected Document processPendingResources(Document document) {
protected Document processResources(Document document, File directory) {
String documentName = document.getName();
for (String resourcePath : pendingResources.get(documentName)) {
document = documentFactory.addResource(document, new File(resourcePath));
Expand All @@ -147,50 +124,24 @@ protected CompletableFuture<Document> createDocument(File directory) {
Document document = null;
try {
document = documentFactory.createDocument(directory);
document = processPendingResources(document);
document = processResources(document, directory);
} catch (Exception e) {
e.printStackTrace();
}
return document;
}, executor);
}

protected void addResource(File file) throws DocumentNotFoundException {
documentFactory.addResource(file);
}

private void publishToRepositories(Document document) {

logger.info("Attempting to publish documnet: " + document.getName());

logger.info("Current concurrency: " + publishing.get());

if (publishing.get() <= publishConcurrency) {
publishing.incrementAndGet();

for (ProjectRepository repository : document.getProject().getRepositories()) {
try {
document = ((Repository) projectServiceRegistry.getService(repository.getName())).push(document);
} catch (IOException e) {
logger.error("Exception thrown attempting to push to " + repository.getName() + "!", e);
e.printStackTrace();
}
}

publishing.decrementAndGet();

if (publishQueue.size() > 0) {
Document queuedDocument = publishQueue.get(0);
publishQueue.remove(0);
logger.info("Remaining in queue: " + publishQueue.size());
publishToRepositories(queuedDocument);
}

protected void createdDocumentCallback(Document document) {
if (document != null) {
logger.info("Document created: " + document.getName());
} else {
logger.info("Queueing document: " + document.getName());
publishQueue.add(document);
logger.warn("Unable to create document!");
}
}

protected void addResource(File file) throws DocumentNotFoundException {
documentFactory.addResource(file);
}

}
62 changes: 53 additions & 9 deletions src/main/java/edu/tamu/app/observer/HeadlessDocumentListener.java
Original file line number Diff line number Diff line change
@@ -1,42 +1,56 @@
package edu.tamu.app.observer;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import edu.tamu.app.exception.DocumentNotFoundException;
import edu.tamu.app.model.Document;
import edu.tamu.app.model.MetadataFieldGroup;
import edu.tamu.app.model.MetadataFieldValue;
import edu.tamu.app.model.ProjectRepository;
import edu.tamu.app.model.repo.DocumentRepo;
import edu.tamu.app.service.registry.MagpieServiceRegistry;
import edu.tamu.app.service.repository.Repository;

public class HeadlessDocumentListener extends AbstractDocumentListener {

private static final Logger logger = Logger.getLogger(HeadlessDocumentListener.class);

@Value("${app.document.create.wait}")
private int documentCreationWait;

@Autowired
private DocumentRepo documentRepo;

@Autowired
private MagpieServiceRegistry projectServiceRegistry;

@Value("${app.document.create.wait}")
private int documentCreationWait;

public HeadlessDocumentListener(String root, String folder) {
super(root, folder);
}

@Override
public void onFileCreate(File file) {
logger.debug("File listener not invoking resource creation for file " + file.getName() + " because we're waiting for directory quiescence in Headless mode.");
}

@Override
protected CompletableFuture<Document> createDocument(File directory) {
return CompletableFuture.supplyAsync(() -> {
Document document = null;
try {
initializePendingResources(directory.getName());
waitOnDirectory(directory);
document = documentFactory.createDocument(directory);
document = processPendingResources(document);
document = processResources(document, directory);
document = applyDefaultValues(document);
logger.info("Document created: " + document.getName());
} catch (Exception e) {
e.printStackTrace();
}
Expand All @@ -45,10 +59,28 @@ protected CompletableFuture<Document> createDocument(File directory) {
}

@Override
protected void addResource(File file) {
String documentName = file.getParentFile().getName();
List<String> documentPendingResources = pendingResources.get(documentName);
documentPendingResources.add(file.getAbsolutePath());
protected Document processResources(Document document, File directory) {
// For the Headless use case, we can simply list the files in the document directory after it has become quiescent
for (File resourceFile : directory.listFiles()) {
try {
if (!resourceFile.isHidden() && !resourceFile.isDirectory()) {
addResource(resourceFile);
}
} catch (DocumentNotFoundException e) {
logger.error("Couldn't find a newly created file " + resourceFile.getName());
e.printStackTrace();
}
}
return document;
}

@Override
protected void createdDocumentCallback(Document document) {
if (document != null) {
publishToRepositories(document);
} else {
logger.warn("Unable to create document!");
}
}

// this is a blocking sleep operation of this listener
Expand Down Expand Up @@ -76,4 +108,16 @@ private Document applyDefaultValues(Document document) {
return documentRepo.save(document);
}

private void publishToRepositories(Document document) {
logger.info("Attempting to publish document: " + document.getName());
for (ProjectRepository repository : document.getProject().getRepositories()) {
try {
((Repository) projectServiceRegistry.getService(repository.getName())).push(document);
} catch (IOException e) {
logger.error("Exception thrown attempting to push to " + repository.getName() + "!", e);
e.printStackTrace();
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private void cacheRecords(String path) {
logger.info("Preparing to process CSV records using identifier field " + getIdentifier());
csvParser.getRecords().forEach(record -> {
String filename = record.get(getIdentifier());
logger.info("Processing record " + filename);
logger.debug("Processing record " + filename);
if (filename != null) {
currentRecords.put(filename, record);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ protected HttpURLConnection getResource(String uri, Map<String, String> requestP

protected String startTransaction() throws IOException {
HttpURLConnection connection = buildFedoraConnection(String.join("/", getRepoUrl(), getRestPath(), "fcr:tx"), "POST");

for (String header : connection.getHeaderFields().keySet()) {
logger.debug("HTTP connection to open Fedora transaction got header \"" + header + "\" with value \"" + connection.getHeaderFields().get(header) + "\".");
}

String transactionalUrl = connection.getHeaderField("Location");

return transactionalUrl.substring(transactionalUrl.lastIndexOf("/") + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ public class FedoraPCDMRepository extends AbstractFedoraRepository {
private Tika tika;

// @formatter:off
private List<String> jpeg2000MimeTypes = Arrays.asList(new String[] { "image/jp2", "image/j2k", "image/jpx", "image/jpm", "image/jpeg2000" });
private List<String> jpeg2000MimeTypes = Arrays.asList(new String[] {
"image/jp2",
"image/j2k",
"image/jpx",
"image/jpm",
"image/jpeg2000"
});
// @formatter:on

private String membersEndpoint = "members";
Expand Down