Skip to content

Commit

Permalink
Merge pull request #191 from TAMULib/june-dame-sprint-staging
Browse files Browse the repository at this point in the history
June dame sprint staging
  • Loading branch information
jcreel committed Jun 8, 2018
2 parents a818323 + 64bd9ec commit 7b8001f
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ public abstract class AbstractDocumentListener extends AbstractFileListener {

protected static final Map<String, List<String>> pendingResources = new ConcurrentHashMap<String, List<String>>();

protected static final ExecutorService executor = Executors.newFixedThreadPool(10);
protected static final ExecutorService executor = Executors.newFixedThreadPool(1);

@Autowired
private ProjectRepo projectRepo;

@Autowired
protected DocumentFactory documentFactory;
protected DocumentRepo documentRepo;

@Autowired
protected DocumentRepo documentRepo;
protected DocumentFactory documentFactory;

public AbstractDocumentListener(String root, String folder) {
super(root, folder);
Expand Down Expand Up @@ -110,7 +110,7 @@ protected void initializePendingResources(String documentName) {
pendingResources.put(documentName, new ArrayList<String>());
}

protected Document processPendingResources(Document document) {
protected Document processResources(Document document, File directory) {
String documentName = document.getName();
for (String resourcePath : pendingResources.get(documentName)) {
document = documentFactory.addResource(document, new File(resourcePath));
Expand All @@ -124,7 +124,7 @@ protected CompletableFuture<Document> createDocument(File directory) {
Document document = null;
try {
document = documentFactory.createDocument(directory);
document = processPendingResources(document);
document = processResources(document, directory);
} catch (Exception e) {
e.printStackTrace();
}
Expand Down
80 changes: 32 additions & 48 deletions src/main/java/edu/tamu/app/observer/HeadlessDocumentListener.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package edu.tamu.app.observer;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import edu.tamu.app.exception.DocumentNotFoundException;
import edu.tamu.app.model.Document;
import edu.tamu.app.model.MetadataFieldGroup;
import edu.tamu.app.model.MetadataFieldValue;
Expand All @@ -32,51 +32,57 @@ public class HeadlessDocumentListener extends AbstractDocumentListener {
@Value("${app.document.create.wait}")
private int documentCreationWait;

@Value("${app.document.publish.concurrency:5}")
private int publishConcurrency;

private static final List<Document> publishQueue = new CopyOnWriteArrayList<Document>();

private static final AtomicInteger publishing = new AtomicInteger(0);

public HeadlessDocumentListener(String root, String folder) {
super(root, folder);
}

@Override
public void onFileCreate(File file) {
logger.debug("File listener not invoking resource creation for file " + file.getName() + " because we're waiting for directory quiescence in Headless mode.");
}

@Override
protected CompletableFuture<Document> createDocument(File directory) {
return CompletableFuture.supplyAsync(() -> {
Document document = null;
try {
initializePendingResources(directory.getName());
waitOnDirectory(directory);
document = documentFactory.createDocument(directory);
document = processPendingResources(document);
document = processResources(document, directory);
document = applyDefaultValues(document);
logger.info("Document created: " + document.getName());
} catch (Exception e) {
e.printStackTrace();
}
return document;
}, executor);
}

@Override
protected Document processResources(Document document, File directory) {
// For the Headless use case, we can simply list the files in the document directory after it has become quiescent
for (File resourceFile : directory.listFiles()) {
try {
if (!resourceFile.isHidden() && !resourceFile.isDirectory()) {
addResource(resourceFile);
}
} catch (DocumentNotFoundException e) {
logger.error("Couldn't find a newly created file " + resourceFile.getName());
e.printStackTrace();
}
}
return document;
}

@Override
protected void createdDocumentCallback(Document document) {
if (document != null) {
logger.info("Document created: " + document.getName());
publishToRepositories(document);
} else {
logger.warn("Unable to create document!");
}
}

@Override
protected void addResource(File file) {
String documentName = file.getParentFile().getName();
List<String> documentPendingResources = pendingResources.get(documentName);
documentPendingResources.add(file.getAbsolutePath());
}

// this is a blocking sleep operation of this listener
private void waitOnDirectory(File directory) {
logger.info("Waiting for directory " + directory + " to be quiescent, as it is a Headless project.");
Expand All @@ -103,37 +109,15 @@ private Document applyDefaultValues(Document document) {
}

private void publishToRepositories(Document document) {

logger.info("Attempting to publish documnet: " + document.getName());

logger.info("Current concurrency: " + publishing.get());

if (publishing.get() <= publishConcurrency) {
publishing.incrementAndGet();

for (ProjectRepository repository : document.getProject().getRepositories()) {
try {
document = ((Repository) projectServiceRegistry.getService(repository.getName())).push(document);
} catch (IOException e) {
logger.error("Exception thrown attempting to push to " + repository.getName() + "!", e);
e.printStackTrace();
}
}

publishing.decrementAndGet();

if (publishQueue.size() > 0) {
Document queuedDocument = publishQueue.get(0);
publishQueue.remove(0);
logger.info("Remaining in queue: " + publishQueue.size());
publishToRepositories(queuedDocument);
logger.info("Attempting to publish document: " + document.getName());
for (ProjectRepository repository : document.getProject().getRepositories()) {
try {
((Repository) projectServiceRegistry.getService(repository.getName())).push(document);
} catch (IOException e) {
logger.error("Exception thrown attempting to push to " + repository.getName() + "!", e);
e.printStackTrace();
}

} else {
logger.info("Queueing document: " + document.getName());
publishQueue.add(document);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private void cacheRecords(String path) {
logger.info("Preparing to process CSV records using identifier field " + getIdentifier());
csvParser.getRecords().forEach(record -> {
String filename = record.get(getIdentifier());
logger.info("Processing record " + filename);
logger.debug("Processing record " + filename);
if (filename != null) {
currentRecords.put(filename, record);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ public class FedoraPCDMRepository extends AbstractFedoraRepository {
private Tika tika;

// @formatter:off
private List<String> jpeg2000MimeTypes = Arrays.asList(new String[] { "image/jp2", "image/j2k", "image/jpx", "image/jpm", "image/jpeg2000" });
private List<String> jpeg2000MimeTypes = Arrays.asList(new String[] {
"image/jp2",
"image/j2k",
"image/jpx",
"image/jpm",
"image/jpeg2000"
});
// @formatter:on

private String membersEndpoint = "members";
Expand Down

0 comments on commit 7b8001f

Please sign in to comment.