Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cloud input/output for IntervalListTools #1852

Merged
merged 28 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 68 additions & 0 deletions src/main/java/picard/nio/DeleteRecursive.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package picard.nio;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;

/**
*
* Copied from GATK; to be removed once the original GATK code is ported to htsjdk
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should push this to htsjdk as you say here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in The Futuure

*
* Class to hold a set of {@link Path} to be delete on the JVM exit through a shutdown hook.
*
* <p>This class is a modification of {@link htsjdk.samtools.util.nio.DeleteOnExitPathHook}
*
* This class should be considered an implementation detail of {@link IOUtils#deleteOnExit(Path)} and not used directly.
*/
class DeleteRecursivelyOnExitPathHook {
private static final Logger LOG = LogManager.getLogger(DeleteRecursivelyOnExitPathHook.class);
private static LinkedHashSet<Path> paths = new LinkedHashSet<>();
static {
Runtime.getRuntime().addShutdownHook(new Thread(DeleteRecursivelyOnExitPathHook::runHooks));
}

private DeleteRecursivelyOnExitPathHook() {}

/**
* Adds a {@link Path} for deletion on JVM exit.
*
* @param path path to be deleted. This path may be a non-empty directory and the entire directory structure will
* be deleted.
*
* @throws IllegalStateException if the shutdown hook is in progress.
*/
public static synchronized void add(Path path) {
if(paths == null) {
// DeleteOnExitHook is running. Too late to add a file
throw new IllegalStateException("Shutdown in progress");
}

paths.add(path);
}

static void runHooks() {
LinkedHashSet<Path> thePaths;

synchronized (DeleteRecursivelyOnExitPathHook.class) {
thePaths = paths;
paths = null;
}

ArrayList<Path> toBeDeleted = new ArrayList<>(thePaths);

// reverse the list to maintain previous jdk deletion order.
// Last in first deleted.
Collections.reverse(toBeDeleted);
for (Path path : toBeDeleted) {
try {
GATKIOUtils.deleteRecursively(path); // tsato: can't I just call IOUtil.recursiveDelete?
} catch (final Exception e) {
// do nothing if cannot be deleted, because it is a shutdown hook
LOG.debug(() -> "Could not recursively delete " + path.toString() + " during JVM shutdown because we encountered the following exception:", e);
}
}
}
}
160 changes: 160 additions & 0 deletions src/main/java/picard/nio/GATKBucketUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package picard.nio;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmnbroad @droazen @lbergelson these classes were "ported" from GATK in order to make temporaryFiles for tests


import com.google.cloud.storage.contrib.nio.CloudStorageFileSystem;
import htsjdk.samtools.util.FileExtensions;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.UUID;


/**
* Copied from BucketUtils.java in GATK
* To be replaced once the original GATK BucketUtils.java is ported to htsjdk
*/
public class GATKBucketUtils {
// In GATK these are accessed as e.g. GoogleCloudStorageFileSystem.SCHEME
public static final String GOOGLE_CLOUD_STORAGE_FILESYSTEM_SCHEME = "gs";
public static final String HTTP_FILESYSTEM_PROVIDER_SCHEME = "http";
public static final String HTTPS_FILESYSTEM_PROVIDER_SCHEME = "https";


public static final String GCS_PREFIX = "gs://";
public static final String HTTP_PREFIX = "http://";
public static final String HTTPS_PREFIX = "https://";
public static final String HDFS_SCHEME = "hdfs";
public static final String HDFS_PREFIX = HDFS_SCHEME + "://";

// slashes omitted since hdfs paths seem to only have 1 slash which would be weirder to include than no slashes
public static final String FILE_PREFIX = "file:";

private GATKBucketUtils(){} //private so that no one will instantiate this class

/**
* Get a temporary file path based on the prefix and extension provided.
* This file (and possible indexes associated with it) will be scheduled for deletion on shutdown
*
* @param prefix a prefix for the file name
* for remote paths this should be a valid URI to root the temporary file in (e.g. gs://hellbender/staging/)
* there is no guarantee that this will be used as the root of the tmp file name, a local prefix may be placed in the tmp folder for example
* @param extension and extension for the temporary file path, the resulting path will end in this
* @return a path to use as a temporary file, on remote file systems which don't support an atomic tmp file reservation a path is chosen with a long randomized name
*
*/
public static String getTempFilePath(String prefix, String extension){
if (isGcsUrl(prefix) || (isHadoopUrl(prefix))){
final String path = randomRemotePath(prefix, "", extension);
GATKIOUtils.deleteOnExit(GATKIOUtils.getPath(path));
// Mark auxiliary files to be deleted
GATKIOUtils.deleteOnExit(GATKIOUtils.getPath(path + FileExtensions.TRIBBLE_INDEX));
GATKIOUtils.deleteOnExit(GATKIOUtils.getPath(path + FileExtensions.TABIX_INDEX));
GATKIOUtils.deleteOnExit(GATKIOUtils.getPath(path + ".bai"));
GATKIOUtils.deleteOnExit(GATKIOUtils.getPath(path + ".md5"));
GATKIOUtils.deleteOnExit(GATKIOUtils.getPath(path.replaceAll(extension + "$", ".bai"))); //if path ends with extension, replace it with .bai
return path;
} else {
return GATKIOUtils.createTempFile(prefix, extension).getAbsolutePath();
}
}



/**
* Picks a random name, by putting some random letters between "prefix" and "suffix".
*
* @param stagingLocation The folder where you want the file to be. Must start with "gs://" or "hdfs://"
* @param prefix The beginning of the file name
* @param suffix The end of the file name, e.g. ".tmp"
*/
private static String randomRemotePath(String stagingLocation, String prefix, String suffix) {
if (isGcsUrl(stagingLocation)) {
// Go through URI because Path.toString isn't guaranteed to include the "gs://" prefix.
return getPathOnGcs(stagingLocation).resolve(prefix + UUID.randomUUID() + suffix).toUri().toString();
// Disable support for Hadoop in Picard
// } else if (isHadoopUrl(stagingLocation)) {
// return new Path(stagingLocation, prefix + UUID.randomUUID() + suffix).toString();
} else {
throw new IllegalArgumentException("Staging location is not remote: " + stagingLocation);
}
}

/**
* String -> Path. This *should* not be necessary (use Paths.get(URI.create(...)) instead) , but it currently is
* on Spark because using the fat, shaded jar breaks the registration of the GCS FilesystemProvider.
* To transform other types of string URLs into Paths, use IOUtils.getPath instead.
*/
public static java.nio.file.Path getPathOnGcs(String gcsUrl) {
// use a split limit of -1 to preserve empty split tokens, especially trailing slashes on directory names
final String[] split = gcsUrl.split("/", -1);
final String BUCKET = split[2];
final String pathWithoutBucket = String.join("/", Arrays.copyOfRange(split, 3, split.length));
return CloudStorageFileSystem.forBucket(BUCKET).getPath(pathWithoutBucket);
}

/**
* @param path path to inspect
* @return true if this path represents a gcs location
*/
public static boolean isGcsUrl(final String path) {
GATKUtils.nonNull(path);
return path.startsWith(GCS_PREFIX);
}

/**
*
* The GATK code modified to use PicardHTSPath rather than GATKPath
* Return true if this {@code PicardHTSPath} represents a gcs URI.
* @param pathSpec specifier to inspect
* @return true if this {@code PicardHTSPath} represents a gcs URI.
*/
public static boolean isGcsUrl(final PicardHtsPath pathSpec) {
GATKUtils.nonNull(pathSpec);
return pathSpec.getScheme().equals(GOOGLE_CLOUD_STORAGE_FILESYSTEM_SCHEME);
}

/**
* @param pathSpec specifier to inspect
* @return true if this {@code GATKPath} represents a remote storage system which may benefit from prefetching (gcs or http(s))
*/
public static boolean isEligibleForPrefetching(final PicardHtsPath pathSpec) {
GATKUtils.nonNull(pathSpec);
return isEligibleForPrefetching(pathSpec.getScheme());
}

/**
* @param path path to inspect
* @return true if this {@code Path} represents a remote storage system which may benefit from prefetching (gcs or http(s))
*/
public static boolean isEligibleForPrefetching(final java.nio.file.Path path) {
GATKUtils.nonNull(path);
return isEligibleForPrefetching(path.toUri().getScheme());
}

private static boolean isEligibleForPrefetching(final String scheme){
return scheme != null
&& (scheme.equals(GOOGLE_CLOUD_STORAGE_FILESYSTEM_SCHEME)
|| scheme.equals(HTTP_FILESYSTEM_PROVIDER_SCHEME)
|| scheme.equals(HTTPS_FILESYSTEM_PROVIDER_SCHEME));
}

/**
* @return true if the given path is an http or https Url.
*/
public static boolean isHttpUrl(String path){
return path.startsWith(HTTP_PREFIX) || path.startsWith(HTTPS_PREFIX);
}

/**
* Returns true if the given path is a HDFS (Hadoop filesystem) URL.
*/
public static boolean isHadoopUrl(String path) {
return path.startsWith(HDFS_PREFIX);
}

/**
* Returns true if the given path is a GCS, HDFS (Hadoop filesystem), or Http(s) URL.
*/
public static boolean isRemoteStorageUrl(String path) {
return isGcsUrl(path) || isHadoopUrl(path) || isHttpUrl(path);
}
}
128 changes: 128 additions & 0 deletions src/main/java/picard/nio/GATKIOUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package picard.nio;

import com.google.cloud.storage.contrib.nio.CloudStorageFileSystem;
import htsjdk.samtools.util.FileExtensions;
import htsjdk.samtools.util.IOUtil;
import picard.PicardException;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.*;
import java.util.HashMap;

public class GATKIOUtils {
/**
* Schedule a file or directory to be deleted on JVM shutdown.
*
* This calls {@link GATKIOUtils#deleteRecursively(Path)} on {@code fileToDelete }as a shutdown hook.
* @param fileToDelete file or directory to be deleted recursively at JVM shutdown.
*/
public static void deleteOnExit(final Path fileToDelete){
DeleteRecursivelyOnExitPathHook.add(fileToDelete);
}

/**
* Converts the given URI to a {@link Path} object. If the filesystem cannot be found in the usual way, then attempt
* to load the filesystem provider using the thread context classloader. This is needed when the filesystem
* provider is loaded using a URL classloader (e.g. in spark-submit).
*
* Also makes an attempt to interpret the argument as a file name if it's not a URI.
*
* @param uriString the URI to convert.
* @return the resulting {@code Path}
* @throws UserException if an I/O error occurs when creating the file system
*/
public static Path getPath(String uriString) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't really thought about it when we talked before, but bringing these files in adds hard references to the google libraries which will cause runtime errors if these methods are invoked in the non-cloud jar. Are we sure we want to do that?

We either have to remove the references to CloudStorageFileSystem, make the gcloud dependency a hard one instead of optional, or handle the cases where it's missing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We make it mandatory is the decision.

GATKUtils.nonNull(uriString);
URI uri;
try {
uri = URI.create(uriString);
} catch (IllegalArgumentException x) {
// not a valid URI. Caller probably just gave us a file name.
return Paths.get(uriString);
}
try {
// special case GCS, in case the filesystem provider wasn't installed properly but is available.
if (CloudStorageFileSystem.URI_SCHEME.equals(uri.getScheme())) {
return GATKBucketUtils.getPathOnGcs(uriString);
}
// Paths.get(String) assumes the default file system
// Paths.get(URI) uses the scheme
return uri.getScheme() == null ? Paths.get(uriString) : Paths.get(uri);
} catch (FileSystemNotFoundException e) {
try {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if ( cl == null ) {
throw e;
}
return FileSystems.newFileSystem(uri, new HashMap<>(), cl).provider().getPath(uri);
}
catch (ProviderNotFoundException x) {
// TODO: this creates bogus Path on the current file system for schemes such as gendb, nonexistent, gcs
// TODO: we depend on this code path to allow IntervalUtils to all getPath on a string that may be either
// a literal interval or a feature file containing intervals
// not a valid URI. Caller probably just gave us a file name or "chr1:1-2".
return Paths.get(uriString);
}
catch ( IOException io ) {
// UserException in GATK but Picard does not differentiate between e.g. PicardException vs UserException
// Might be useful to add the UserException class in the long term
throw new PicardException(uriString + " is not a supported path", io);
}
}
}

/**
* Creates a temp file that will be deleted on exit
*
* This will also mark the corresponding Tribble/Tabix/BAM indices matching the temp file for deletion.
* @param name Prefix of the file; {@link File#createTempFile(String, String, File)} requires that this be >= 3 characters
* @param extension Extension to concat to the end of the file.
* @return A file in the temporary directory starting with name, ending with extension, which will be deleted after the program exits.
*/
public static File createTempFile(String name, String extension) {
return createTempFileInDirectory(name, extension, null);
}

/**
* Creates a temp file in a target directory that will be deleted on exit
*
* This will also mark the corresponding Tribble/Tabix/BAM indices matching the temp file for deletion.
* @param name Prefix of the file; {@link File#createTempFile(String, String, File)} requires that this be >= 3 characters
* @param extension Extension to concat to the end of the file name.
* @param targetDir Directory in which to create the temp file. If null, the default temp directory is used.
* @return A file in the temporary directory starting with name, ending with extension, which will be deleted after the program exits.
*/
public static File createTempFileInDirectory(final String name, String extension, final File targetDir) {
try {

if (!extension.startsWith(".")) {
extension = "." + extension;
}

final File file = File.createTempFile(name, extension, targetDir);
file.deleteOnExit();

// Mark corresponding indices for deletion on exit as well just in case an index is created for the temp file:
new File(file.getAbsolutePath() + FileExtensions.TRIBBLE_INDEX).deleteOnExit();
new File(file.getAbsolutePath() + FileExtensions.TABIX_INDEX).deleteOnExit();
new File(file.getAbsolutePath() + ".bai").deleteOnExit();
new File(file.getAbsolutePath() + ".md5").deleteOnExit();
new File(file.getAbsolutePath().replaceAll(extension + "$", ".bai")).deleteOnExit();

return file;
} catch (IOException ex) {
throw new PicardException("Cannot create temp file: " + ex.getMessage(), ex);
}
}

/**
* Delete rootPath recursively
* @param rootPath is the file/directory to be deleted
*/
public static void deleteRecursively(final Path rootPath) {
IOUtil.recursiveDelete(rootPath);
}

}