diff --git a/src/main/java/edu/tamu/app/observer/AbstractDocumentListener.java b/src/main/java/edu/tamu/app/observer/AbstractDocumentListener.java index b2537208..56832fe6 100644 --- a/src/main/java/edu/tamu/app/observer/AbstractDocumentListener.java +++ b/src/main/java/edu/tamu/app/observer/AbstractDocumentListener.java @@ -1,32 +1,25 @@ package edu.tamu.app.observer; import java.io.File; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.monitor.FileAlterationObserver; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import edu.tamu.app.exception.DocumentNotFoundException; import edu.tamu.app.model.Document; import edu.tamu.app.model.IngestType; import edu.tamu.app.model.Project; -import edu.tamu.app.model.ProjectRepository; import edu.tamu.app.model.repo.DocumentRepo; import edu.tamu.app.model.repo.ProjectRepo; import edu.tamu.app.service.DocumentFactory; -import edu.tamu.app.service.registry.MagpieServiceRegistry; -import edu.tamu.app.service.repository.Repository; public abstract class AbstractDocumentListener extends AbstractFileListener { @@ -34,26 +27,16 @@ public abstract class AbstractDocumentListener extends AbstractFileListener { protected static final Map> pendingResources = new ConcurrentHashMap>(); - protected static final ExecutorService executor = Executors.newFixedThreadPool(10); - - private static final List publishQueue = new CopyOnWriteArrayList(); - - private static final AtomicInteger publishing = new AtomicInteger(0); - - @Value("${app.document.publish.concurrency:5}") - private int publishConcurrency; + protected static final ExecutorService executor = Executors.newFixedThreadPool(1); @Autowired private ProjectRepo projectRepo; - @Autowired - protected DocumentFactory documentFactory; - @Autowired protected DocumentRepo documentRepo; @Autowired - private MagpieServiceRegistry projectServiceRegistry; + protected DocumentFactory documentFactory; public AbstractDocumentListener(String root, String folder) { super(root, folder); @@ -69,16 +52,10 @@ public void onDirectoryCreate(File directory) { Document existingDocument = documentRepo.findByProjectNameAndName(directory.getParentFile().getName(), directory.getName()); if (existingDocument == null) { initializePendingResources(directory.getName()); - createDocument(directory).thenAccept(newDocument -> { - if (newDocument != null) { - logger.info("Document created: " + newDocument.getName()); - publishToRepositories(newDocument); - } else { - logger.warn("Unable to create document!"); - } + createDocument(directory).thenAccept(document -> { + createdDocumentCallback(document); }); } - } @Override @@ -133,7 +110,7 @@ protected void initializePendingResources(String documentName) { pendingResources.put(documentName, new ArrayList()); } - protected Document processPendingResources(Document document) { + protected Document processResources(Document document, File directory) { String documentName = document.getName(); for (String resourcePath : pendingResources.get(documentName)) { document = documentFactory.addResource(document, new File(resourcePath)); @@ -147,7 +124,7 @@ protected CompletableFuture createDocument(File directory) { Document document = null; try { document = documentFactory.createDocument(directory); - document = processPendingResources(document); + document = processResources(document, directory); } catch (Exception e) { e.printStackTrace(); } @@ -155,42 +132,16 @@ protected CompletableFuture createDocument(File directory) { }, executor); } - protected void addResource(File file) throws DocumentNotFoundException { - documentFactory.addResource(file); - } - - private void publishToRepositories(Document document) { - - logger.info("Attempting to publish documnet: " + document.getName()); - - logger.info("Current concurrency: " + publishing.get()); - - if (publishing.get() <= publishConcurrency) { - publishing.incrementAndGet(); - - for (ProjectRepository repository : document.getProject().getRepositories()) { - try { - document = ((Repository) projectServiceRegistry.getService(repository.getName())).push(document); - } catch (IOException e) { - logger.error("Exception thrown attempting to push to " + repository.getName() + "!", e); - e.printStackTrace(); - } - } - - publishing.decrementAndGet(); - - if (publishQueue.size() > 0) { - Document queuedDocument = publishQueue.get(0); - publishQueue.remove(0); - logger.info("Remaining in queue: " + publishQueue.size()); - publishToRepositories(queuedDocument); - } - + protected void createdDocumentCallback(Document document) { + if (document != null) { + logger.info("Document created: " + document.getName()); } else { - logger.info("Queueing document: " + document.getName()); - publishQueue.add(document); + logger.warn("Unable to create document!"); } + } + protected void addResource(File file) throws DocumentNotFoundException { + documentFactory.addResource(file); } } diff --git a/src/main/java/edu/tamu/app/observer/HeadlessDocumentListener.java b/src/main/java/edu/tamu/app/observer/HeadlessDocumentListener.java index 85ee9853..e7322513 100644 --- a/src/main/java/edu/tamu/app/observer/HeadlessDocumentListener.java +++ b/src/main/java/edu/tamu/app/observer/HeadlessDocumentListener.java @@ -1,6 +1,8 @@ package edu.tamu.app.observer; import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -8,35 +10,47 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import edu.tamu.app.exception.DocumentNotFoundException; import edu.tamu.app.model.Document; import edu.tamu.app.model.MetadataFieldGroup; import edu.tamu.app.model.MetadataFieldValue; +import edu.tamu.app.model.ProjectRepository; import edu.tamu.app.model.repo.DocumentRepo; +import edu.tamu.app.service.registry.MagpieServiceRegistry; +import edu.tamu.app.service.repository.Repository; public class HeadlessDocumentListener extends AbstractDocumentListener { private static final Logger logger = Logger.getLogger(HeadlessDocumentListener.class); - @Value("${app.document.create.wait}") - private int documentCreationWait; - @Autowired private DocumentRepo documentRepo; + @Autowired + private MagpieServiceRegistry projectServiceRegistry; + + @Value("${app.document.create.wait}") + private int documentCreationWait; + public HeadlessDocumentListener(String root, String folder) { super(root, folder); } + @Override + public void onFileCreate(File file) { + logger.debug("File listener not invoking resource creation for file " + file.getName() + " because we're waiting for directory quiescence in Headless mode."); + } + @Override protected CompletableFuture createDocument(File directory) { return CompletableFuture.supplyAsync(() -> { Document document = null; try { - initializePendingResources(directory.getName()); waitOnDirectory(directory); document = documentFactory.createDocument(directory); - document = processPendingResources(document); + document = processResources(document, directory); document = applyDefaultValues(document); + logger.info("Document created: " + document.getName()); } catch (Exception e) { e.printStackTrace(); } @@ -45,10 +59,28 @@ protected CompletableFuture createDocument(File directory) { } @Override - protected void addResource(File file) { - String documentName = file.getParentFile().getName(); - List documentPendingResources = pendingResources.get(documentName); - documentPendingResources.add(file.getAbsolutePath()); + protected Document processResources(Document document, File directory) { + // For the Headless use case, we can simply list the files in the document directory after it has become quiescent + for (File resourceFile : directory.listFiles()) { + try { + if (!resourceFile.isHidden() && !resourceFile.isDirectory()) { + addResource(resourceFile); + } + } catch (DocumentNotFoundException e) { + logger.error("Couldn't find a newly created file " + resourceFile.getName()); + e.printStackTrace(); + } + } + return document; + } + + @Override + protected void createdDocumentCallback(Document document) { + if (document != null) { + publishToRepositories(document); + } else { + logger.warn("Unable to create document!"); + } } // this is a blocking sleep operation of this listener @@ -76,4 +108,16 @@ private Document applyDefaultValues(Document document) { return documentRepo.save(document); } + private void publishToRepositories(Document document) { + logger.info("Attempting to publish document: " + document.getName()); + for (ProjectRepository repository : document.getProject().getRepositories()) { + try { + ((Repository) projectServiceRegistry.getService(repository.getName())).push(document); + } catch (IOException e) { + logger.error("Exception thrown attempting to push to " + repository.getName() + "!", e); + e.printStackTrace(); + } + } + } + } diff --git a/src/main/java/edu/tamu/app/service/authority/CSVAuthority.java b/src/main/java/edu/tamu/app/service/authority/CSVAuthority.java index a89e5631..b715b58c 100644 --- a/src/main/java/edu/tamu/app/service/authority/CSVAuthority.java +++ b/src/main/java/edu/tamu/app/service/authority/CSVAuthority.java @@ -105,7 +105,7 @@ private void cacheRecords(String path) { logger.info("Preparing to process CSV records using identifier field " + getIdentifier()); csvParser.getRecords().forEach(record -> { String filename = record.get(getIdentifier()); - logger.info("Processing record " + filename); + logger.debug("Processing record " + filename); if (filename != null) { currentRecords.put(filename, record); } else { diff --git a/src/main/java/edu/tamu/app/service/repository/AbstractFedoraRepository.java b/src/main/java/edu/tamu/app/service/repository/AbstractFedoraRepository.java index f5c1b7e0..123f85b8 100644 --- a/src/main/java/edu/tamu/app/service/repository/AbstractFedoraRepository.java +++ b/src/main/java/edu/tamu/app/service/repository/AbstractFedoraRepository.java @@ -243,6 +243,11 @@ protected HttpURLConnection getResource(String uri, Map requestP protected String startTransaction() throws IOException { HttpURLConnection connection = buildFedoraConnection(String.join("/", getRepoUrl(), getRestPath(), "fcr:tx"), "POST"); + + for (String header : connection.getHeaderFields().keySet()) { + logger.debug("HTTP connection to open Fedora transaction got header \"" + header + "\" with value \"" + connection.getHeaderFields().get(header) + "\"."); + } + String transactionalUrl = connection.getHeaderField("Location"); return transactionalUrl.substring(transactionalUrl.lastIndexOf("/") + 1); diff --git a/src/main/java/edu/tamu/app/service/repository/FedoraPCDMRepository.java b/src/main/java/edu/tamu/app/service/repository/FedoraPCDMRepository.java index 9b07210c..f8d939d9 100644 --- a/src/main/java/edu/tamu/app/service/repository/FedoraPCDMRepository.java +++ b/src/main/java/edu/tamu/app/service/repository/FedoraPCDMRepository.java @@ -26,7 +26,13 @@ public class FedoraPCDMRepository extends AbstractFedoraRepository { private Tika tika; // @formatter:off - private List jpeg2000MimeTypes = Arrays.asList(new String[] { "image/jp2", "image/j2k", "image/jpx", "image/jpm", "image/jpeg2000" }); + private List jpeg2000MimeTypes = Arrays.asList(new String[] { + "image/jp2", + "image/j2k", + "image/jpx", + "image/jpm", + "image/jpeg2000" + }); // @formatter:on private String membersEndpoint = "members";