Skip to content

Commit

Permalink
Continuing testing
Browse files Browse the repository at this point in the history
  • Loading branch information
tsuyoi committed Oct 9, 2018
1 parent b11e365 commit 7d84fe3
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 73 deletions.
Expand Up @@ -158,7 +158,7 @@ public void start() {
logger.trace("Instantiating new [WatchDirectory] from [watchDirectoryName] path");
WatchDirectory wd;
try {
wd = new WatchDirectory(dir, true, this);
wd = new WatchDirectory(dir, false, this);
Thread wdt = new Thread(wd);
wdt.start();
}
Expand Down
Expand Up @@ -16,6 +16,8 @@
import java.util.List;
import java.util.Map;

import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;

public class FSObject implements Runnable {

private final String transfer_watch_file;
Expand Down Expand Up @@ -286,7 +288,8 @@ private void processDir(Path dir) {
File seqStageDir = Paths.get(staging_folder, seqId).toFile();

logger.info("Start processing directory {}", outDir);

sendUpdateInfoMessage(seqId, null, null, 1,
"Discovered for upload");

String status = transferStatus(dir, "transfer_complete_status");
List<String> filterList = new ArrayList<>();
Expand All @@ -304,11 +307,13 @@ private void processDir(Path dir) {
deleteFolder(seqStageDir.toPath());
}
sendUpdateInfoMessage(seqId, null, null, 1,
"Copying/Renaming file(s) from watch directory to staging directory");
copyFolderContents(new File(inDir), seqStageDir);
sendUpdateInfoMessage(seqId, null, null, 1,
"Deleting leftover folder(s) from watch directory");
deleteFolder(Paths.get(inDir));
"Moving files from watch directory to staging directory");
//copyFolderContents(new File(inDir), seqStageDir);
if (!moveFolder(inDir, seqStageDir.toString()))
return;
//sendUpdateInfoMessage(seqId, null, null, 1,
// "Deleting leftover folder(s) from watch directory");
//deleteFolder(Paths.get(inDir));
} catch (IOException e) {
//logger.error("Failed to move sequence to staging directory [{}] -> [{}]\n" + ExceptionUtils.getStackTrace(e), inDir, seqStageDir);
sendUpdateErrorMessage(seqId, null, null, 1, "Failed to move sequence to staging directory: " + ExceptionUtils.getStackTrace(e));
Expand All @@ -328,8 +333,8 @@ private void processDir(Path dir) {
ObjectEngine oe = new ObjectEngine(plugin);
if (oe.uploadBaggedDirectory(bucket_name, seqStageDir.getAbsolutePath(), "", outDir,
null,null, "1")) {
//if (setTransferFile(dir)) {
if (new File(inDir).exists()) {
if (setTransferFile(seqStageDir.toPath().resolve(transfer_status_file))) {
/*if (new File(inDir).exists()) {
try {
//logger.info("Cleaning up uploaded sequence [{}]", inDir);
sendUpdateInfoMessage(seqId, null, null, 1,
Expand All @@ -341,48 +346,48 @@ private void processDir(Path dir) {
sendUpdateErrorMessage(seqId, null, null, 1,
"Failed to remove some files from watch directory, please clean manually");
}
}
logger.debug("Directory Transferred [inDir = {}, outDir = {}]", inDir, outDir);
me = plugin.genGMessage(MsgEvent.Type.INFO, "Directory Transferred");
me.setParam("indir", inDir);
me.setParam("outdir", outDir);
me.setParam("seq_id", seqId);
me.setParam("transfer_watch_file", transfer_watch_file);
me.setParam("transfer_status_file", transfer_status_file);
me.setParam("bucket_name", bucket_name);
me.setParam("endpoint", plugin.getConfig().getStringParam("endpoint"));
me.setParam("pathstage", pathStage);
//if pathstage 3 we need to submit jobs for processing
logger.trace("pathStage = " + pathStage);
if (pathStage.equals("3")) {
logger.trace("Sample Directory: " + inDir);
String sampleList = getSampleList(inDir);

if (sampleList != null) {
logger.trace("Samples : " + sampleList);
me.setParam("sample_list", sampleList);
} else {
me.setParam("sample_list", "");
}*/
logger.debug("Directory Transferred [inDir = {}, outDir = {}]", inDir, outDir);
me = plugin.genGMessage(MsgEvent.Type.INFO, "Directory Transferred");
me.setParam("indir", inDir);
me.setParam("outdir", outDir);
me.setParam("seq_id", seqId);
me.setParam("transfer_watch_file", transfer_watch_file);
me.setParam("transfer_status_file", transfer_status_file);
me.setParam("bucket_name", bucket_name);
me.setParam("endpoint", plugin.getConfig().getStringParam("endpoint"));
me.setParam("pathstage", pathStage);
//if pathstage 3 we need to submit jobs for processing
logger.trace("pathStage = " + pathStage);
if (pathStage.equals("3")) {
logger.trace("Sample Directory: " + inDir);
String sampleList = getSampleList(inDir);

if (sampleList != null) {
logger.trace("Samples : " + sampleList);
me.setParam("sample_list", sampleList);
} else {
me.setParam("sample_list", "");
}
}
me.setParam("sstep", "2");
plugin.sendMsgEvent(me);
//end
} else {
logger.error("Directory Transfer Failed [inDir = {}, outDir = {}]", inDir, outDir);
me = plugin.genGMessage(MsgEvent.Type.ERROR, "Failed Directory Transfer");
me.setParam("indir", inDir);
me.setParam("outdir", outDir);
me.setParam("seq_id", seqId);
me.setParam("transfer_watch_file", transfer_watch_file);
me.setParam("transfer_status_file", transfer_status_file);
me.setParam("bucket_name", bucket_name);
me.setParam("endpoint", plugin.getConfig().getStringParam("endpoint"));
me.setParam("pathstage", pathStage);
me.setParam("sstep", "2");
plugin.sendMsgEvent(me);
}
me.setParam("sstep", "2");
plugin.sendMsgEvent(me);
//end
} else {
logger.error("Directory Transfer Failed [inDir = {}, outDir = {}]", inDir, outDir);
me = plugin.genGMessage(MsgEvent.Type.ERROR, "Failed Directory Transfer");
me.setParam("indir", inDir);
me.setParam("outdir", outDir);
me.setParam("seq_id", seqId);
me.setParam("transfer_watch_file", transfer_watch_file);
me.setParam("transfer_status_file", transfer_status_file);
me.setParam("bucket_name", bucket_name);
me.setParam("endpoint", plugin.getConfig().getStringParam("endpoint"));
me.setParam("pathstage", pathStage);
me.setParam("sstep", "2");
plugin.sendMsgEvent(me);
}
//}
} /*else if (status.equals("yes")) {
logger.trace("[status = \"yes\"]");
if (oe.isSyncDir(bucket_name, outDir, inDir, filterList)) {
Expand Down Expand Up @@ -426,7 +431,7 @@ private void setTransferFileMD5(Path dir, Map<String, String> md5map) {
}

private boolean setTransferFile(Path dir) {
logger.debug("Call to setTransferFile [dir = {}]");
logger.debug("Call to setTransferFile [dir = {}]", dir.toString().replace("\\", "\\\\"));
boolean isSet = false;
try {
if (dir.toString().toLowerCase().endsWith(transfer_status_file.toLowerCase())) {
Expand Down Expand Up @@ -496,6 +501,25 @@ private void copyFolderContents(File src, File dst) throws IOException {
Files.move(Paths.get(src.toURI()), Paths.get(dst.toURI()));
}

private boolean moveFolder(String srcPathString, String dstPathString) {
try {
Path srcPath = Paths.get(srcPathString);
if (!Files.exists(srcPath)) {
logger.error("Folder to move [{}] does not exist", srcPathString.replace("\\", "\\\\"));
return false;
}
Path dstPath = Paths.get(dstPathString);
Files.deleteIfExists(dstPath);
long started = System.currentTimeMillis();
Files.move(srcPath, dstPath, ATOMIC_MOVE);
logger.trace("Moved folder in {}ms", (System.currentTimeMillis() - started));
return true;
} catch (IOException e) {
logger.error("Failed to move folder : {}", ExceptionUtils.getStackTrace(e).replace("\\", "\\\\"));
return false;
}
}

/**
* Deletes an entire folder structure
* @param folder Path of the folder to delete
Expand Down
Expand Up @@ -38,6 +38,8 @@
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;

import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;

public class Encapsulation {
private static CLogger logger = new CLogger(Encapsulation.class, new LinkedBlockingQueue<>(),
"", "", "");
Expand Down Expand Up @@ -271,36 +273,45 @@ public static File bagItUp(File folder, String mode, String hashing, boolean inc
*/
public static void debagify(String src) {
logger.trace("Call to debagify({})", src);
File bag = new File(src);
if (bag.isFile())
Path bag = Paths.get(src);
if (Files.isRegularFile(bag))
return;
File bagIt = new File(src + "/.bagit");
if (bagIt.exists())
Path bagIt = bag.resolve(".bagit");
if (Files.exists(bagIt)) {
try {
deleteFolder(bagIt.toPath());
deleteFolder(bagIt);
} catch (IOException e) {
logger.error("Failed to delete the .bagit directory for bag: {}", src);
}
new File(src + "/bagit.txt").delete();
new File(src + "/bag-info.txt").delete();
new File(src + "/manifest-sha512.txt").delete();
new File(src + "/manifest-sha256.txt").delete();
new File(src + "/manifest-sha1.txt").delete();
new File(src + "/manifest-md5.txt").delete();
new File(src + "/tagmanifest-sha512.txt").delete();
new File(src + "/tagmanifest-sha256.txt").delete();
new File(src + "/tagmanifest-sha1.txt").delete();
new File(src + "/tagmanifest-md5.txt").delete();
File data = new File(src + "/data");
if (data.exists()) {
String tmpDataPath = src + "/" + UUID.randomUUID().toString();
data.renameTo(new File(tmpDataPath));
}
try {
Files.deleteIfExists(bag.resolve("bagit.txt"));
Files.deleteIfExists(bag.resolve("bag-info.txt"));
Files.deleteIfExists(bag.resolve("manifest-sha512.txt"));
Files.deleteIfExists(bag.resolve("manifest-sha256.txt"));
Files.deleteIfExists(bag.resolve("manifest-sha1.txt"));
Files.deleteIfExists(bag.resolve("manifest-md5.txt"));
Files.deleteIfExists(bag.resolve("tagmanifest-sha512.txt"));
Files.deleteIfExists(bag.resolve("tagmanifest-sha256.txt"));
Files.deleteIfExists(bag.resolve("tagmanifest-sha1.txt"));
Files.deleteIfExists(bag.resolve("tagmanifest-md5.txt"));
} catch (IOException e) {
logger.error("Failed to clean up BagIt metadata");
}
Path data = bag.resolve("data");
if (Files.exists(data)) {
try {
copyFolderContents(new File(tmpDataPath), new File(src));
deleteFolder(new File(tmpDataPath).toPath());
new File(src + "/bag-info.txt").delete();
Path tmpDataPath = bag.resolve(UUID.randomUUID().toString());
Files.move(data, tmpDataPath);
if (!moveToFolder(tmpDataPath, bag)) {
logger.error("Failed to move files out of BagIt data directory");
return;
}
Files.deleteIfExists(tmpDataPath);
} catch (IOException e) {
logger.error("Failed to move files from {} to {}", src + "/data", src);
logger.error("Failed to move files from {} to {}",
data.toString().replace("\\", "\\\\"),
bag.toString().replace("\\", "\\\\"));
}
}
}
Expand Down Expand Up @@ -690,6 +701,60 @@ private static void copyFolderContents(File src, File dst) throws IOException {
Files.move(Paths.get(src.toURI()), Paths.get(dst.toURI()));
}

private static boolean movePath(String srcPathString, String dstPathString) {
try {
Path srcPath = Paths.get(srcPathString);
if (!Files.exists(srcPath)) {
logger.error("Folder to move [{}] does not exist", srcPathString.replace("\\", "\\\\"));
return false;
}
Path dstPath = Paths.get(dstPathString);
Files.deleteIfExists(dstPath);
long started = System.currentTimeMillis();
Files.move(srcPath, dstPath, ATOMIC_MOVE);
logger.trace("Moved folder in {}ms", (System.currentTimeMillis() - started));
return true;
} catch (IOException e) {
logger.error("Failed to move [{}] to [{}] : {}",
srcPathString.replace("\\", "\\\\"), dstPathString.replace("\\", "\\\\"),
ExceptionUtils.getStackTrace(e).replace("\\", "\\\\"));
return false;
}
}

private static boolean moveToFolder(Path srcFolder, Path dstFolder) {
try {
//Path srcFolder = Paths.get(srcFolderString);
if (!Files.exists(srcFolder)) {
logger.error("Folder to move [{}] does not exist", srcFolder.toString().replace("\\", "\\\\"));
return false;
}
//Path dstFolder = Paths.get(dstFolderString);
if (!Files.exists(dstFolder)) {
logger.error("Destination folder [{}] does not exist", dstFolder.toString().replace("\\", "\\\\"));
return false;
}
try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(srcFolder)) {
boolean bSuccess = true;
for (Path path : directoryStream) {
if (!movePath(path.toString(), dstFolder.resolve(path.getFileName()).toString()))
bSuccess = false;
}
return bSuccess;
} catch (IOException e) {
logger.error("Failed to move [{}] to folder [{}] : {}",
srcFolder.toString().replace("\\", "\\\\"), dstFolder.toString().replace("\\", "\\\\"),
ExceptionUtils.getStackTrace(e).replace("\\", "\\\\"));
return false;
}
} catch (Exception e) {
logger.error("Failed to move [{}] to folder [{}] : {}",
srcFolder.toString().replace("\\", "\\\\"), dstFolder.toString().replace("\\", "\\\\"),
ExceptionUtils.getStackTrace(e).replace("\\", "\\\\"));
return false;
}
}

/**
* Deletes an entire folder structure
* @param folder Path of the folder to delete
Expand Down
Expand Up @@ -25,7 +25,7 @@ public static void setLogger(CPlugin plugin) {
logger = new CLogger(ObjectEngine.class, plugin.getMsgOutQueue(), plugin.getRegion(), plugin.getAgent(), plugin.getPluginID(), CLogger.Level.Trace);
}
private static final ResourceBundle messages = ResourceBundle.getBundle("MessageBundle");
private static final int max_batch_size = 1000;
private static final int max_batch_size = 100;

private final PayloadVerifier manifestVerifier;
private final ExecutorService executor;
Expand Down

0 comments on commit 7d84fe3

Please sign in to comment.