Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
tsuyoi committed Oct 10, 2018
1 parent 7d84fe3 commit cca7cd7
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 28 deletions.
Expand Up @@ -7,13 +7,8 @@
import com.researchworx.cresco.plugins.gobjectIngestion.folderprocessor.ObjectFS;
import com.researchworx.cresco.plugins.gobjectIngestion.folderprocessor.WatchDirectory;
import com.researchworx.cresco.plugins.gobjectIngestion.objectstorage.Encapsulation;
import com.researchworx.cresco.plugins.gobjectIngestion.objectstorage.LargeBagVerifier;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.ProtocolException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -83,6 +78,7 @@ public void start() {
genomicControllerAgent = getConfig().getStringParam("genomic_controller_agent",getAgent());
genomicControllerPlugin = getConfig().getStringParam("genomic_controller_plugin");
Encapsulation.setLogger(this);
LargeBagVerifier.setLogger(this);

logger.debug("[pathStage] == {}", pathStage);
logger.info("Building Stage [{}]", pathStage);
Expand Down
Expand Up @@ -150,6 +150,8 @@ public void run() {
}

private String transferStatus(Path dir, String statusString) {
if (!Files.exists(dir))
return null;
logger.debug("Call to transferStatus [dir = {}, statusString = {}]", dir.toString(), statusString);
String status = "no";
try {
Expand Down Expand Up @@ -285,11 +287,10 @@ private void processDir(Path dir) {
String seqId = outDir;
logger.debug("[outDir = {}]", outDir);

File seqStageDir = Paths.get(staging_folder, seqId).toFile();
//File seqStageDir = Paths.get(staging_folder, seqId).toFile();
Path seqStageDir = Paths.get(staging_folder, seqId);

logger.info("Start processing directory {}", outDir);
sendUpdateInfoMessage(seqId, null, null, 1,
"Discovered for upload");

String status = transferStatus(dir, "transfer_complete_status");
List<String> filterList = new ArrayList<>();
Expand All @@ -298,13 +299,15 @@ private void processDir(Path dir) {


if (status.equals("no")) {
sendUpdateInfoMessage(seqId, null, null, 1,
"Discovered for upload");
try {
logger.info("Copying sequence to staging folder [{}] -> [{}]",
inDir, seqStageDir);
if (seqStageDir.exists()) {
if (Files.exists(seqStageDir)) {
sendUpdateInfoMessage(seqId, null, null, 1,
"Deleting existing file(s) from staging directory");
deleteFolder(seqStageDir.toPath());
deleteFolder(seqStageDir);
}
sendUpdateInfoMessage(seqId, null, null, 1,
"Moving files from watch directory to staging directory");
Expand All @@ -331,9 +334,9 @@ private void processDir(Path dir) {

logger.debug("[status = \"no\"]");
ObjectEngine oe = new ObjectEngine(plugin);
if (oe.uploadBaggedDirectory(bucket_name, seqStageDir.getAbsolutePath(), "", outDir,
if (oe.uploadBaggedDirectory(bucket_name, seqStageDir.toString(), "", outDir,
null,null, "1")) {
if (setTransferFile(seqStageDir.toPath().resolve(transfer_status_file))) {
if (setTransferFile(seqStageDir.resolve(transfer_status_file))) {
/*if (new File(inDir).exists()) {
try {
//logger.info("Cleaning up uploaded sequence [{}]", inDir);
Expand Down
@@ -1,6 +1,5 @@
package com.researchworx.cresco.plugins.gobjectIngestion.objectstorage;

import com.researchworx.cresco.library.messaging.MsgEvent;
import com.researchworx.cresco.library.plugin.core.CPlugin;
import com.researchworx.cresco.library.utilities.CLogger;
import gov.loc.repository.bagit.domain.Bag;
Expand All @@ -10,32 +9,35 @@
import gov.loc.repository.bagit.hash.StandardBagitAlgorithmNameToSupportedAlgorithmMapping;
import gov.loc.repository.bagit.verify.CheckManifestHashesTask;
import gov.loc.repository.bagit.verify.MandatoryVerifier;
import gov.loc.repository.bagit.verify.PayloadVerifier;
import gov.loc.repository.bagit.verify.QuickVerifier;

import java.io.IOException;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class LargeBagVerifier implements AutoCloseable {
private static CLogger logger = new CLogger(Encapsulation.class, new LinkedBlockingQueue<>(), "", "", "");
private static CLogger logger = new CLogger(LargeBagVerifier.class, new LinkedBlockingQueue<>(), "", "", "");

public static void setLogger(CPlugin plugin) {
logger = new CLogger(ObjectEngine.class, plugin.getMsgOutQueue(), plugin.getRegion(), plugin.getAgent(), plugin.getPluginID(), CLogger.Level.Trace);
logger = new CLogger(LargeBagVerifier.class, plugin.getMsgOutQueue(), plugin.getRegion(), plugin.getAgent(), plugin.getPluginID(), CLogger.Level.Trace);
}

private static final ResourceBundle messages = ResourceBundle.getBundle("MessageBundle");
private static final int max_batch_size = 100;

private final PayloadVerifier manifestVerifier;
private final ManifestVerifier manifestVerifier;
private final ExecutorService executor;

/**
* Create a BagVerifier with a cached thread pool and a
* {@link StandardBagitAlgorithmNameToSupportedAlgorithmMapping}
*/
public LargeBagVerifier(){
this(Executors.newCachedThreadPool(), new StandardBagitAlgorithmNameToSupportedAlgorithmMapping());
this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()), new StandardBagitAlgorithmNameToSupportedAlgorithmMapping());
}

/**
Expand All @@ -44,7 +46,7 @@ public LargeBagVerifier(){
* @param nameMapping the mapping between BagIt algorithm name and the java supported algorithm
*/
public LargeBagVerifier(final BagitAlgorithmNameToSupportedAlgorithmMapping nameMapping){
this(Executors.newCachedThreadPool(), nameMapping);
this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()), nameMapping);
}

/**
Expand All @@ -64,7 +66,7 @@ public LargeBagVerifier(final ExecutorService executor){
* @param executor the thread pool to use when doing work
*/
public LargeBagVerifier(final ExecutorService executor, final BagitAlgorithmNameToSupportedAlgorithmMapping nameMapping){
manifestVerifier = new PayloadVerifier(nameMapping, executor);
manifestVerifier = new ManifestVerifier(nameMapping, executor);
this.executor = executor;
}

Expand Down Expand Up @@ -94,7 +96,7 @@ public static boolean canQuickVerify(final Bag bag){
* @throws InvalidPayloadOxumException if either the total bytes or the number of files
* calculated for the payload directory of the bag is different than the supplied values
* @throws PayloadOxumDoesNotExistException if the bag does not contain a payload-oxum.
* To check, run {@link BagVerifier#canQuickVerify}
* To check, run {@link LargeBagVerifier#canQuickVerify}
*/
public static void quicklyVerify(final Bag bag) throws IOException, InvalidPayloadOxumException{
QuickVerifier.quicklyVerify(bag);
Expand Down Expand Up @@ -209,14 +211,14 @@ public void isComplete(final Bag bag, final boolean ignoreHiddenFiles) throws

MandatoryVerifier.checkIfAtLeastOnePayloadManifestsExist(bag.getRootDir(), bag.getVersion());

manifestVerifier.verifyPayload(bag, ignoreHiddenFiles);
manifestVerifier.verifyManifests(bag, ignoreHiddenFiles);
}

public ExecutorService getExecutor() {
return executor;
}

public PayloadVerifier getManifestVerifier() {
public ManifestVerifier getManifestVerifier() {
return manifestVerifier;
}
}
@@ -0,0 +1,180 @@
package com.researchworx.cresco.plugins.gobjectIngestion.objectstorage;

import com.researchworx.cresco.library.plugin.core.CPlugin;
import com.researchworx.cresco.library.utilities.CLogger;
import gov.loc.repository.bagit.domain.Bag;
import gov.loc.repository.bagit.domain.Manifest;
import gov.loc.repository.bagit.domain.Version;
import gov.loc.repository.bagit.exceptions.FileNotInPayloadDirectoryException;
import gov.loc.repository.bagit.exceptions.InvalidBagitFileFormatException;
import gov.loc.repository.bagit.exceptions.MaliciousPathException;
import gov.loc.repository.bagit.exceptions.UnsupportedAlgorithmException;
import gov.loc.repository.bagit.hash.BagitAlgorithmNameToSupportedAlgorithmMapping;
import gov.loc.repository.bagit.hash.StandardBagitAlgorithmNameToSupportedAlgorithmMapping;
import gov.loc.repository.bagit.reader.ManifestReader;
import gov.loc.repository.bagit.util.PathUtils;
import gov.loc.repository.bagit.verify.CheckIfFileExistsTask;
import gov.loc.repository.bagit.verify.PayloadFileExistsInAllManifestsVistor;
import gov.loc.repository.bagit.verify.PayloadFileExistsInAtLeastOneManifestVistor;
import org.slf4j.helpers.MessageFormatter;

import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.ResourceBundle;
import java.util.Set;
import java.util.concurrent.*;

public class ManifestVerifier implements AutoCloseable {
private static CLogger logger = new CLogger(ManifestVerifier.class, new LinkedBlockingQueue<>(), "", "", "");

public static void setLogger(CPlugin plugin) {
logger = new CLogger(ManifestVerifier.class, plugin.getMsgOutQueue(), plugin.getRegion(), plugin.getAgent(), plugin.getPluginID(), CLogger.Level.Trace);
}
private static final ResourceBundle messages = ResourceBundle.getBundle("MessageBundle");

private transient final BagitAlgorithmNameToSupportedAlgorithmMapping nameMapping;
private transient final ExecutorService executor;

/**
* Create a PayloadVerifier using a cached thread pool and the
* {@link StandardBagitAlgorithmNameToSupportedAlgorithmMapping} mapping
*/
public ManifestVerifier(){
this(new StandardBagitAlgorithmNameToSupportedAlgorithmMapping(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
}

/**
* Create a PayloadVerifier using a cached thread pool and a custom mapping
*
* @param nameMapping the mapping between BagIt algorithm name and the java supported algorithm
*/
public ManifestVerifier(final BagitAlgorithmNameToSupportedAlgorithmMapping nameMapping) {
this(nameMapping, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
}

/**
* Create a PayloadVerifier using a custom thread pool and the
* {@link StandardBagitAlgorithmNameToSupportedAlgorithmMapping} mapping
*
* @param executor the thread pool to use when doing work
*/
public ManifestVerifier(final ExecutorService executor) {
this(new StandardBagitAlgorithmNameToSupportedAlgorithmMapping(), executor);
}

/**
* Create a PayloadVerifier using a custom thread pool and a custom mapping
*
* @param nameMapping the mapping between BagIt algorithm name and the java supported algorithm
* @param executor the thread pool to use when doing work
*/
public ManifestVerifier(final BagitAlgorithmNameToSupportedAlgorithmMapping nameMapping, final ExecutorService executor) {
this.nameMapping = nameMapping;
this.executor = executor;
}

@Override
public void close() throws SecurityException{
//shutdown the thread pool so the resource isn't leaked
executor.shutdown();
}

/**
* Verify that all the files in the payload directory are listed in the payload manifest and
* all files listed in all manifests exist.
*
* @param bag the bag to check to check
* @param ignoreHiddenFiles to ignore hidden files unless they are specifically listed in a manifest
*
* @throws IOException if there is a problem reading a file
* @throws MaliciousPathException the path in the manifest was specifically crafted to cause harm
* @throws UnsupportedAlgorithmException if the algorithm used for the manifest is unsupported
* @throws InvalidBagitFileFormatException if any of the manifests don't conform to the bagit specification
* @throws FileNotInPayloadDirectoryException if a file is listed in a manifest but doesn't exist in the payload directory
* @throws InterruptedException if a thread is interrupted while doing work
*/
public void verifyManifests(final Bag bag, final boolean ignoreHiddenFiles)
throws IOException, MaliciousPathException, UnsupportedAlgorithmException,
InvalidBagitFileFormatException, FileNotInPayloadDirectoryException, InterruptedException {

final Set<Path> allFilesListedInManifests = getAllFilesListedInManifests(bag);
checkAllFilesListedInManifestExist(allFilesListedInManifests);

if (bag.getVersion().isOlder(new Version(1, 0))) {
checkAllFilesInPayloadDirAreListedInAtLeastOneAManifest(allFilesListedInManifests, PathUtils.getDataDir(bag), ignoreHiddenFiles);
} else {
CheckAllFilesInPayloadDirAreListedInAllManifests(bag.getPayLoadManifests(), PathUtils.getDataDir(bag), ignoreHiddenFiles);
}
}

/*
* get all the files listed in all the manifests
*/
private Set<Path> getAllFilesListedInManifests(final Bag bag)
throws IOException, MaliciousPathException, UnsupportedAlgorithmException, InvalidBagitFileFormatException {
logger.debug(messages.getString("all_files_in_manifests"));
final Set<Path> filesListedInManifests = new HashSet<>();

try(DirectoryStream<Path> directoryStream =
Files.newDirectoryStream(PathUtils.getBagitDir(bag.getVersion(), bag.getRootDir()))){
for (final Path path : directoryStream) {
final String filename = PathUtils.getFilename(path);
if (filename.startsWith("tagmanifest-") || filename.startsWith("manifest-")) {
logger.debug(messages.getString("get_listing_in_manifest"), path);
final Manifest manifest = ManifestReader.readManifest(nameMapping, path, bag.getRootDir(),
bag.getFileEncoding());
filesListedInManifests.addAll(manifest.getFileToChecksumMap().keySet());
}
}
}

return filesListedInManifests;
}

/*
* Make sure all the listed files actually exist
*/
@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
private void checkAllFilesListedInManifestExist(final Set<Path> files) throws FileNotInPayloadDirectoryException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(files.size());
final Set<Path> missingFiles = new ConcurrentSkipListSet<>();

logger.info(messages.getString("check_all_files_in_manifests_exist"));
for (final Path file : files) {
executor.execute(new CheckIfFileExistsTask(file, missingFiles, latch));
}

latch.await();

if (!missingFiles.isEmpty()) {
final String formattedMessage = messages.getString("missing_payload_files_error");
throw new FileNotInPayloadDirectoryException(MessageFormatter.format(formattedMessage, missingFiles).getMessage());
}
}

/*
* Make sure all files in the directory are in at least 1 manifest
*/
private static void checkAllFilesInPayloadDirAreListedInAtLeastOneAManifest(final Set<Path> filesListedInManifests,
final Path payloadDir, final boolean ignoreHiddenFiles) throws IOException {
logger.debug(messages.getString("checking_file_in_at_least_one_manifest"), payloadDir);
if (Files.exists(payloadDir)) {
Files.walkFileTree(payloadDir,
new PayloadFileExistsInAtLeastOneManifestVistor(filesListedInManifests, ignoreHiddenFiles));
}
}

/*
* as per the bagit-spec 1.0+ all files have to be listed in all manifests
*/
private static void CheckAllFilesInPayloadDirAreListedInAllManifests(final Set<Manifest> payLoadManifests,
final Path payloadDir, final boolean ignoreHiddenFiles) throws IOException {
logger.debug(messages.getString("checking_file_in_all_manifests"), payloadDir);
if (Files.exists(payloadDir)) {
Files.walkFileTree(payloadDir, new PayloadFileExistsInAllManifestsVistor(payLoadManifests, ignoreHiddenFiles));
}
}
}
Expand Up @@ -179,11 +179,11 @@ public boolean uploadBaggedDirectory(String bucket, String inPath, String s3Pref
return false;
}
long freeSpace = inFile.getUsableSpace();
logger.trace("freeSpace: {}", freeSpace);
logger.trace("freeSpace: {}", humanReadableByteCount(freeSpace, true));
long uncompressedSize = FileUtils.sizeOfDirectory(inFile);
logger.trace("uncompressedSize: {}", uncompressedSize);
logger.trace("uncompressedSize: {}", humanReadableByteCount(uncompressedSize, true));
long requiredSpace = uncompressedSize + (1024 * 1024 * 1024);
logger.trace("requiredSpace: {}", requiredSpace);
logger.trace("requiredSpace: {}", humanReadableByteCount(requiredSpace, true));
if (requiredSpace > freeSpace) {
sendUpdateErrorMessage(seqId, sampleId, reqId, step,
String.format("Not enough free space to bag up [%s], needs [%s] has [%s]",
Expand Down

0 comments on commit cca7cd7

Please sign in to comment.