From 29b2afc24b057789540e41a68da91e921b603a82 Mon Sep 17 00:00:00 2001 From: Dan King Date: Mon, 11 Sep 2023 16:30:26 -0400 Subject: [PATCH] [fs] consistent use of the term FileListEntry (#13594) In our asyncio FSes, we use the term FileListEntry for the package of information you receive from a cloud object store list operation. We reserve the term FileStatus for the metadata you get about a blob without a list operation. A major difference is that FLEs know if they refer to files or folders. In contrast, a FileStatus knows that it is a file *but does not know if a directory with the same name exists*. --- hail/python/hail/fs/hadoop_fs.py | 29 +++---- hail/python/hailtop/fs/fs.py | 6 +- hail/python/hailtop/fs/fs_utils.py | 6 +- hail/python/hailtop/fs/router_fs.py | 26 +++---- hail/python/hailtop/fs/stat_result.py | 7 +- .../scala/is/hail/expr/ir/GenericLines.scala | 20 ++--- .../is/hail/expr/ir/StringTableReader.scala | 12 +-- .../main/scala/is/hail/io/bgen/LoadBgen.scala | 20 ++--- .../scala/is/hail/io/fs/AzureStorageFS.scala | 34 ++++---- hail/src/main/scala/is/hail/io/fs/FS.scala | 78 +++++++++---------- .../scala/is/hail/io/fs/GoogleStorageFS.scala | 22 +++--- .../main/scala/is/hail/io/fs/HadoopFS.scala | 16 ++-- .../main/scala/is/hail/io/fs/RouterFS.scala | 6 +- .../main/scala/is/hail/io/vcf/LoadVCF.scala | 22 +++--- .../methods/MatrixExportEntriesByCol.scala | 5 +- .../main/scala/is/hail/utils/Py4jUtils.scala | 16 ++-- .../is/hail/variant/ReferenceGenome.scala | 8 +- hail/src/test/scala/is/hail/fs/FSSuite.scala | 30 +++---- .../is/hail/io/compress/BGzipCodecSuite.scala | 8 +- .../test/scala/is/hail/utils/UtilsSuite.scala | 8 +- 20 files changed, 192 insertions(+), 187 deletions(-) diff --git a/hail/python/hail/fs/hadoop_fs.py b/hail/python/hail/fs/hadoop_fs.py index b8ff0593efd..9927a6b7abb 100644 --- a/hail/python/hail/fs/hadoop_fs.py +++ b/hail/python/hail/fs/hadoop_fs.py @@ -3,23 +3,26 @@ import time from typing import Dict, List, Union, Any -import dateutil +import dateutil.parser from hailtop.fs.fs import FS -from hailtop.fs.stat_result import FileType, StatResult +from hailtop.fs.stat_result import FileType, FileListEntry -def _stat_dict_to_stat_result(stat: Dict[str, Any]) -> StatResult: - dt = dateutil.parser.isoparse(stat['modification_time']) +def _file_list_entry_scala_to_python(file_list_entry: Dict[str, Any]) -> FileListEntry: + dt = dateutil.parser.isoparse(file_list_entry['modification_time']) mtime = time.mktime(dt.timetuple()) - if stat['is_dir']: + if file_list_entry['is_dir']: typ = FileType.DIRECTORY - elif stat['is_link']: + elif file_list_entry['is_link']: typ = FileType.SYMLINK else: typ = FileType.FILE - return StatResult(path=stat['path'], owner=stat['owner'], size=stat['size'], - typ=typ, modification_time=mtime) + return FileListEntry(path=file_list_entry['path'], + owner=file_list_entry['owner'], + size=file_list_entry['size'], + typ=typ, + modification_time=mtime) class HadoopFS(FS): @@ -60,12 +63,12 @@ def is_file(self, path: str) -> bool: def is_dir(self, path: str) -> bool: return self._jfs.isDir(path) - def stat(self, path: str) -> StatResult: - stat_dict = json.loads(self._utils_package_object.stat(self._jfs, path)) - return _stat_dict_to_stat_result(stat_dict) + def stat(self, path: str) -> FileListEntry: + stat_dict = json.loads(self._utils_package_object.fileListEntry(self._jfs, path)) + return _file_list_entry_scala_to_python(stat_dict) - def ls(self, path: str) -> List[StatResult]: - return [_stat_dict_to_stat_result(st) + def ls(self, path: str) -> List[FileListEntry]: + return [_file_list_entry_scala_to_python(st) for st in json.loads(self._utils_package_object.ls(self._jfs, path))] def mkdir(self, path: str) -> None: diff --git a/hail/python/hailtop/fs/fs.py b/hail/python/hailtop/fs/fs.py index 10fc452855d..a11942f582d 100644 --- a/hail/python/hailtop/fs/fs.py +++ b/hail/python/hailtop/fs/fs.py @@ -1,7 +1,7 @@ import abc from typing import IO, List -from .stat_result import StatResult +from .stat_result import FileListEntry class FS(abc.ABC): @@ -26,11 +26,11 @@ def is_dir(self, path: str) -> bool: raise NotImplementedError @abc.abstractmethod - def stat(self, path: str) -> StatResult: + def stat(self, path: str) -> FileListEntry: raise NotImplementedError @abc.abstractmethod - def ls(self, path: str) -> List[StatResult]: + def ls(self, path: str) -> List[FileListEntry]: raise NotImplementedError @abc.abstractmethod diff --git a/hail/python/hailtop/fs/fs_utils.py b/hail/python/hailtop/fs/fs_utils.py index 02588723fc1..1b6c42089ad 100644 --- a/hail/python/hailtop/fs/fs_utils.py +++ b/hail/python/hailtop/fs/fs_utils.py @@ -2,7 +2,7 @@ from typing import List from .router_fs import RouterFS -from .stat_result import StatResult +from .stat_result import FileListEntry _router_fs = None @@ -150,7 +150,7 @@ def is_dir(path: str) -> bool: return _fs().is_dir(path) -def stat(path: str) -> StatResult: +def stat(path: str) -> FileListEntry: """Returns information about the file or directory at a given path. Notes @@ -177,7 +177,7 @@ def stat(path: str) -> StatResult: return _fs().stat(path) -def ls(path: str) -> List[StatResult]: +def ls(path: str) -> List[FileListEntry]: """Returns information about files at `path`. Notes diff --git a/hail/python/hailtop/fs/router_fs.py b/hail/python/hailtop/fs/router_fs.py index 92e31f755d6..217fdb372b3 100644 --- a/hail/python/hailtop/fs/router_fs.py +++ b/hail/python/hailtop/fs/router_fs.py @@ -7,13 +7,13 @@ import glob import fnmatch -from hailtop.aiotools.fs import Copier, Transfer, FileListEntry, ReadableStream, WritableStream +from hailtop.aiotools.fs import Copier, Transfer, FileListEntry as AIOFileListEntry, ReadableStream, WritableStream from hailtop.aiotools.local_fs import LocalAsyncFS from hailtop.aiotools.router_fs import RouterAsyncFS from hailtop.utils import bounded_gather2, async_to_blocking from .fs import FS -from .stat_result import FileType, StatResult +from .stat_result import FileType, FileListEntry class SyncReadableStream(io.RawIOBase, BinaryIO): # type: ignore # https://github.com/python/typeshed/blob/a40d79a4e63c4e750a8d3a8012305da942251eb4/stdlib/http/client.pyi#L81 @@ -152,14 +152,14 @@ def write(self, b): return async_to_blocking(self.aws.write(b)) -def _stat_result(is_dir: bool, size_bytes_and_time_modified: Optional[Tuple[int, float]], path: str) -> StatResult: +def _stat_result(is_dir: bool, size_bytes_and_time_modified: Optional[Tuple[int, float]], path: str) -> FileListEntry: if size_bytes_and_time_modified: size_bytes, time_modified = size_bytes_and_time_modified else: size_bytes = 0 time_modified = None - return StatResult( + return FileListEntry( path=path.rstrip('/'), size=size_bytes, typ=FileType.DIRECTORY if is_dir else FileType.FILE, @@ -238,7 +238,7 @@ async def _async_is_dir(self, path: str) -> bool: def is_dir(self, path: str) -> bool: return async_to_blocking(self._async_is_dir(path)) - def stat(self, path: str) -> StatResult: + def stat(self, path: str) -> FileListEntry: maybe_sb_and_t, is_dir = async_to_blocking(asyncio.gather( self._size_bytes_and_time_modified_or_none(path), self._async_is_dir(path))) if maybe_sb_and_t is None: @@ -255,7 +255,7 @@ async def _size_bytes_and_time_modified_or_none(self, path: str) -> Optional[Tup except FileNotFoundError: return None - async def _fle_to_dict(self, fle: FileListEntry) -> StatResult: + async def _aiofle_to_fle(self, fle: AIOFileListEntry) -> FileListEntry: async def maybe_status() -> Optional[Tuple[int, float]]: try: file_status = await fle.status() @@ -269,7 +269,7 @@ def ls(self, path: str, *, error_when_file_and_directory: bool = True, - _max_simultaneous_files: int = 50) -> List[StatResult]: + _max_simultaneous_files: int = 50) -> List[FileListEntry]: return async_to_blocking(self._async_ls( path, error_when_file_and_directory=error_when_file_and_directory, @@ -279,10 +279,10 @@ async def _async_ls(self, path: str, *, error_when_file_and_directory: bool = True, - _max_simultaneous_files: int = 50) -> List[StatResult]: + _max_simultaneous_files: int = 50) -> List[FileListEntry]: sema = asyncio.Semaphore(_max_simultaneous_files) - async def ls_no_glob(path) -> List[StatResult]: + async def ls_no_glob(path) -> List[FileListEntry]: try: return await self._ls_no_glob(path, error_when_file_and_directory=error_when_file_and_directory, @@ -320,7 +320,7 @@ async def ls_no_glob(path) -> List[StatResult]: else: first_prefix = [] - cached_stats_for_each_cumulative_prefix: Optional[List[StatResult]] = None + cached_stats_for_each_cumulative_prefix: Optional[List[FileListEntry]] = None cumulative_prefixes = [first_prefix] for intervening_components, single_component_glob_pattern in glob_components: @@ -370,12 +370,12 @@ async def _ls_no_glob(self, path: str, *, error_when_file_and_directory: bool = True, - sema: asyncio.Semaphore) -> List[StatResult]: - async def ls_as_dir() -> Optional[List[StatResult]]: + sema: asyncio.Semaphore) -> List[FileListEntry]: + async def ls_as_dir() -> Optional[List[FileListEntry]]: try: return await bounded_gather2( sema, - *[functools.partial(self._fle_to_dict, fle) + *[functools.partial(self._aiofle_to_fle, fle) async for fle in await self.afs.listfiles(path)], cancel_on_error=True ) diff --git a/hail/python/hailtop/fs/stat_result.py b/hail/python/hailtop/fs/stat_result.py index a2bd49087e0..4cbb27c6267 100644 --- a/hail/python/hailtop/fs/stat_result.py +++ b/hail/python/hailtop/fs/stat_result.py @@ -10,7 +10,7 @@ class FileType(Enum): SYMLINK = auto() -class StatResult(NamedTuple): +class FileListEntry(NamedTuple): path: str owner: Union[None, str, int] size: int @@ -25,8 +25,11 @@ def to_legacy_dict(self) -> Dict[str, Any]: return { 'path': self.path, 'owner': self.owner, - 'is_dir': self.is_dir(), 'size_bytes': self.size, 'size': filesize(self.size), 'modification_time': self.modification_time, + 'is_dir': self.is_dir(), } + + +StatResult = FileListEntry # backwards compatibility diff --git a/hail/src/main/scala/is/hail/expr/ir/GenericLines.scala b/hail/src/main/scala/is/hail/expr/ir/GenericLines.scala index e5b469f49cd..6fe9b601778 100644 --- a/hail/src/main/scala/is/hail/expr/ir/GenericLines.scala +++ b/hail/src/main/scala/is/hail/expr/ir/GenericLines.scala @@ -2,7 +2,7 @@ package is.hail.expr.ir import is.hail.backend.spark.SparkBackend import is.hail.io.compress.BGzipInputStream -import is.hail.io.fs.{FS, FileStatus, Positioned, PositionedInputStream, BGZipCompressionCodec} +import is.hail.io.fs.{FS, FileListEntry, Positioned, PositionedInputStream, BGZipCompressionCodec} import is.hail.io.tabix.{TabixReader, TabixLineIterator} import is.hail.types.virtual.{TBoolean, TInt32, TInt64, TString, TStruct, Type} import is.hail.utils._ @@ -252,7 +252,7 @@ object GenericLines { def read( fs: FS, - fileStatuses0: IndexedSeq[FileStatus], + fileListEntries0: IndexedSeq[FileListEntry], nPartitions: Option[Int], blockSizeInMB: Option[Int], minPartitions: Option[Int], @@ -260,8 +260,8 @@ object GenericLines { allowSerialRead: Boolean, filePerPartition: Boolean = false ): GenericLines = { - val fileStatuses = fileStatuses0.zipWithIndex.filter(_._1.getLen > 0) - val totalSize = fileStatuses.map(_._1.getLen).sum + val fileListEntries = fileListEntries0.zipWithIndex.filter(_._1.getLen > 0) + val totalSize = fileListEntries.map(_._1.getLen).sum var totalPartitions = nPartitions match { case Some(nPartitions) => nPartitions @@ -276,9 +276,9 @@ object GenericLines { case None => } - val contexts = fileStatuses.flatMap { case (status, fileNum) => - val size = status.getLen - val codec = fs.getCodecFromPath(status.getPath, gzAsBGZ) + val contexts = fileListEntries.flatMap { case (fileListEntry, fileNum) => + val size = fileListEntry.getLen + val codec = fs.getCodecFromPath(fileListEntry.getPath, gzAsBGZ) val splittable = codec == null || codec == BGZipCompressionCodec if (splittable && !filePerPartition) { @@ -294,14 +294,14 @@ object GenericLines { var end = partScan(i + 1) if (codec != null) end = makeVirtualOffset(end, 0) - Row(i, fileNum, status.getPath, start, end, true) + Row(i, fileNum, fileListEntry.getPath, start, end, true) } } else { if (!allowSerialRead && !filePerPartition) - fatal(s"Cowardly refusing to read file serially: ${ status.getPath }.") + fatal(s"Cowardly refusing to read file serially: ${ fileListEntry.getPath }.") Iterator.single { - Row(0, fileNum, status.getPath, 0L, size, false) + Row(0, fileNum, fileListEntry.getPath, 0L, size, false) } } } diff --git a/hail/src/main/scala/is/hail/expr/ir/StringTableReader.scala b/hail/src/main/scala/is/hail/expr/ir/StringTableReader.scala index 61865ed5449..df40d1fe879 100644 --- a/hail/src/main/scala/is/hail/expr/ir/StringTableReader.scala +++ b/hail/src/main/scala/is/hail/expr/ir/StringTableReader.scala @@ -5,7 +5,7 @@ import is.hail.backend.ExecuteContext import is.hail.expr.ir.functions.StringFunctions import is.hail.expr.ir.lowering.{LowererUnsupportedOperation, TableStage, TableStageDependency, TableStageToRVD} import is.hail.expr.ir.streams.StreamProducer -import is.hail.io.fs.{FS, FileStatus} +import is.hail.io.fs.{FS, FileListEntry} import is.hail.rvd.RVDPartitioner import is.hail.types.physical.stypes.EmitType import is.hail.types.physical.stypes.concrete.{SJavaString, SStackStruct, SStackStructValue} @@ -26,8 +26,8 @@ case class StringTableReaderParameters( object StringTableReader { def apply(fs: FS, params: StringTableReaderParameters): StringTableReader = { - val fileStatuses = getFileStatuses(fs, params.files, params.forceBGZ, params.forceGZ) - new StringTableReader(params, fileStatuses) + val fileListEntries = getFileListEntries(fs, params.files, params.forceBGZ, params.forceGZ) + new StringTableReader(params, fileListEntries) } def fromJValue(fs: FS, jv: JValue): StringTableReader = { implicit val formats: Formats = TableReader.formats @@ -35,7 +35,7 @@ object StringTableReader { StringTableReader(fs, params) } - def getFileStatuses(fs: FS, files: Array[String], forceBGZ: Boolean, forceGZ: Boolean): Array[FileStatus] = { + def getFileListEntries(fs: FS, files: Array[String], forceBGZ: Boolean, forceGZ: Boolean): Array[FileListEntry] = { val status = fs.globAllStatuses(files) if (status.isEmpty) fatal(s"arguments refer to no files: ${files.toIndexedSeq}.") @@ -141,7 +141,7 @@ case class StringTablePartitionReader(lines: GenericLines, uidFieldName: String) class StringTableReader( val params: StringTableReaderParameters, - fileStatuses: IndexedSeq[FileStatus] + fileListEntries: IndexedSeq[FileListEntry] ) extends TableReaderWithExtraUID { override def uidType = TTuple(TInt64, TInt64) @@ -157,7 +157,7 @@ class StringTableReader( override def lower(ctx: ExecuteContext, requestedType: TableType): TableStage = { val fs = ctx.fs - val lines = GenericLines.read(fs, fileStatuses, None, None, params.minPartitions, params.forceBGZ, params.forceGZ, + val lines = GenericLines.read(fs, fileListEntries, None, None, params.minPartitions, params.forceBGZ, params.forceGZ, params.filePerPartition) TableStage(globals = MakeStruct(FastSeq()), partitioner = RVDPartitioner.unkeyed(ctx.stateManager, lines.nPartitions), diff --git a/hail/src/main/scala/is/hail/io/bgen/LoadBgen.scala b/hail/src/main/scala/is/hail/io/bgen/LoadBgen.scala index 5d76f90f8e9..1954fcd5867 100644 --- a/hail/src/main/scala/is/hail/io/bgen/LoadBgen.scala +++ b/hail/src/main/scala/is/hail/io/bgen/LoadBgen.scala @@ -7,7 +7,7 @@ import is.hail.expr.ir.lowering.{TableStage, TableStageDependency} import is.hail.expr.ir.streams.StreamProducer import is.hail.expr.ir.{EmitCode, EmitCodeBuilder, EmitMethodBuilder, EmitSettable, EmitValue, IEmitCode, IR, IRParserEnvironment, Literal, LowerMatrixIR, MakeStruct, MatrixHybridReader, MatrixReader, PartitionNativeIntervalReader, PartitionReader, ReadPartition, Ref, StreamTake, TableExecuteIntermediate, TableNativeReader, TableReader, TableValue, ToStream} import is.hail.io._ -import is.hail.io.fs.{FS, FileStatus, SeekableDataInputStream} +import is.hail.io.fs.{FS, FileListEntry, SeekableDataInputStream} import is.hail.io.index.{IndexReader, StagedIndexReader} import is.hail.io.vcf.LoadVCF import is.hail.rvd.RVDPartitioner @@ -149,24 +149,24 @@ object LoadBgen { | ${ notVersionTwo.mkString("\n ") }""".stripMargin) } - def getAllFileStatuses(fs: FS, files: Array[String]): Array[FileStatus] = { + def getAllFileListEntries(fs: FS, files: Array[String]): Array[FileListEntry] = { val badFiles = new BoxedArrayBuilder[String]() - val statuses = files.flatMap { file => + val fileListEntries = files.flatMap { file => val matches = fs.glob(file) if (matches.isEmpty) badFiles += file - matches.flatMap { status => - val file = status.getPath.toString + matches.flatMap { fileListEntry => + val file = fileListEntry.getPath.toString if (!file.endsWith(".bgen")) warn(s"input file does not have .bgen extension: $file") if (fs.isDir(file)) fs.listStatus(file) - .filter(status => ".*part-[0-9]+(-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})?".r.matches(status.getPath.toString)) + .filter(fileListEntry => ".*part-[0-9]+(-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})?".r.matches(fileListEntry.getPath.toString)) else - Array(status) + Array(fileListEntry) } } @@ -175,11 +175,11 @@ object LoadBgen { s"""The following paths refer to no files: | ${ badFiles.result().mkString("\n ") }""".stripMargin) - statuses + fileListEntries } def getAllFilePaths(fs: FS, files: Array[String]): Array[String] = - getAllFileStatuses(fs, files).map(_.getPath.toString) + getAllFileListEntries(fs, files).map(_.getPath.toString) def getBgenFileMetadata(ctx: ExecuteContext, files: Array[String], indexFiles: Array[String]): Array[BgenFileMetadata] = { @@ -226,7 +226,7 @@ object LoadBgen { } def getIndexFileNames(fs: FS, files: Array[String], indexFileMap: Map[String, String]): Array[String] = { - def absolutePath(rel: String): String = fs.fileStatus(rel).getPath.toString + def absolutePath(rel: String): String = fs.fileListEntry(rel).getPath.toString val fileMapping = Option(indexFileMap) .getOrElse(Map.empty[String, String]) diff --git a/hail/src/main/scala/is/hail/io/fs/AzureStorageFS.scala b/hail/src/main/scala/is/hail/io/fs/AzureStorageFS.scala index 066f41a6063..8345ad56a46 100644 --- a/hail/src/main/scala/is/hail/io/fs/AzureStorageFS.scala +++ b/hail/src/main/scala/is/hail/io/fs/AzureStorageFS.scala @@ -140,21 +140,21 @@ object AzureStorageFS { } } -object AzureStorageFileStatus { - def apply(path: String, isDir: Boolean, blobProperties: BlobProperties): BlobStorageFileStatus = { +object AzureStorageFileListEntry { + def apply(path: String, isDir: Boolean, blobProperties: BlobProperties): BlobStorageFileListEntry = { if (isDir) { - new BlobStorageFileStatus(path, null, 0, true) + new BlobStorageFileListEntry(path, null, 0, true) } else { - new BlobStorageFileStatus(path, blobProperties.getLastModified.toEpochSecond, blobProperties.getBlobSize, false) + new BlobStorageFileListEntry(path, blobProperties.getLastModified.toEpochSecond, blobProperties.getBlobSize, false) } } - def apply(blobPath: String, blobItem: BlobItem): BlobStorageFileStatus = { + def apply(blobPath: String, blobItem: BlobItem): BlobStorageFileListEntry = { if (blobItem.isPrefix) { - new BlobStorageFileStatus(blobPath, null, 0, true) + new BlobStorageFileListEntry(blobPath, null, 0, true) } else { val properties = blobItem.getProperties - new BlobStorageFileStatus(blobPath, properties.getLastModified.toEpochSecond, properties.getContentLength, false) + new BlobStorageFileListEntry(blobPath, properties.getLastModified.toEpochSecond, properties.getContentLength, false) } } } @@ -371,7 +371,7 @@ class AzureStorageFS(val credentialsJSON: Option[String] = None) extends FS { }) } else { try { - if (fileStatus(filename).isFile) { + if (fileListEntry(filename).isFile) { blobClient.delete() } } catch { @@ -380,11 +380,11 @@ class AzureStorageFS(val credentialsJSON: Option[String] = None) extends FS { } } - def listStatus(filename: String): Array[FileStatus] = handlePublicAccessError(filename) { + def listStatus(filename: String): Array[FileListEntry] = handlePublicAccessError(filename) { val url = AzureStorageFS.parseUrl(filename) val blobContainerClient: BlobContainerClient = getContainerClient(url) - val statList: ArrayBuffer[FileStatus] = ArrayBuffer() + val statList: ArrayBuffer[FileListEntry] = ArrayBuffer() val prefix = dropTrailingSlash(url.path) + "/" // collect all children of this directory (blobs and subdirectories) @@ -392,20 +392,20 @@ class AzureStorageFS(val credentialsJSON: Option[String] = None) extends FS { prefixMatches.forEach(blobItem => { val blobPath = dropTrailingSlash(url.withPath(blobItem.getName).toString()) - statList += AzureStorageFileStatus(blobPath, blobItem) + statList += AzureStorageFileListEntry(blobPath, blobItem) }) statList.toArray } - def glob(filename: String): Array[FileStatus] = handlePublicAccessError(filename) { + def glob(filename: String): Array[FileListEntry] = handlePublicAccessError(filename) { val url = AzureStorageFS.parseUrl(filename) globWithPrefix(prefix = url.withPath(""), path = dropTrailingSlash(url.path)) } - override def fileStatus(url: AzureStorageFSURL): FileStatus = retryTransientErrors { + override def fileListEntry(url: AzureStorageFSURL): FileListEntry = retryTransientErrors { if (url.path == "") { - return new BlobStorageFileStatus(url.toString, null, 0, true) + return new BlobStorageFileListEntry(url.toString, null, 0, true) } val blobClient: BlobClient = getBlobClient(url) @@ -431,11 +431,11 @@ class AzureStorageFS(val credentialsJSON: Option[String] = None) extends FS { } else null - AzureStorageFileStatus(filename, isDir, blobProperties) + AzureStorageFileListEntry(filename, isDir, blobProperties) } - def fileStatus(filename: String): FileStatus = handlePublicAccessError(filename) { - fileStatus(AzureStorageFS.parseUrl(filename)) + def fileListEntry(filename: String): FileListEntry = handlePublicAccessError(filename) { + fileListEntry(AzureStorageFS.parseUrl(filename)) } def makeQualified(filename: String): String = { diff --git a/hail/src/main/scala/is/hail/io/fs/FS.scala b/hail/src/main/scala/is/hail/io/fs/FS.scala index dd5b6c63e1b..f9dcaf69c47 100644 --- a/hail/src/main/scala/is/hail/io/fs/FS.scala +++ b/hail/src/main/scala/is/hail/io/fs/FS.scala @@ -65,7 +65,7 @@ trait FSURL[T <: FSURL[T]] { override def toString(): String } -trait FileStatus { +trait FileListEntry { def getPath: String def getModificationTime: java.lang.Long def getLen: Long @@ -75,7 +75,7 @@ trait FileStatus { def getOwner: String } -class BlobStorageFileStatus(path: String, modificationTime: java.lang.Long, size: Long, isDir: Boolean) extends FileStatus { +class BlobStorageFileListEntry(path: String, modificationTime: java.lang.Long, size: Long, isDir: Boolean) extends FileListEntry { def getPath: String = path def getModificationTime: java.lang.Long = modificationTime def getLen: Long = size @@ -342,11 +342,11 @@ trait FS extends Serializable { def delete(filename: String, recursive: Boolean) - def listStatus(filename: String): Array[FileStatus] + def listStatus(filename: String): Array[FileListEntry] - def listStatus(url: URL): Array[FileStatus] = listStatus(url.toString) + def listStatus(url: URL): Array[FileListEntry] = listStatus(url.toString) - def glob(filename: String): Array[FileStatus] + def glob(filename: String): Array[FileListEntry] def globWithPrefix(prefix: URL, path: String) = { val components = @@ -357,15 +357,15 @@ trait FS extends Serializable { val javaFS = FileSystems.getDefault - val ab = new mutable.ArrayBuffer[FileStatus]() - def f(prefix: URL, fs: FileStatus, i: Int): Unit = { + val ab = new mutable.ArrayBuffer[FileListEntry]() + def f(prefix: URL, fs: FileListEntry, i: Int): Unit = { assert(!prefix.getPath.endsWith("/"), prefix) if (i == components.length) { var t = fs if (t == null) { try { - t = fileStatus(prefix) + t = fileListEntry(prefix) } catch { case _: FileNotFoundException => } @@ -397,11 +397,11 @@ trait FS extends Serializable { def globAll(filenames: Iterable[String]): Array[String] = globAllStatuses(filenames).map(_.getPath) - def globAllStatuses(filenames: Iterable[String]): Array[FileStatus] = filenames.flatMap(glob).toArray + def globAllStatuses(filenames: Iterable[String]): Array[FileListEntry] = filenames.flatMap(glob).toArray - def fileStatus(filename: String): FileStatus + def fileListEntry(filename: String): FileListEntry - def fileStatus(url: URL): FileStatus = fileStatus(url.toString) + def fileListEntry(url: URL): FileListEntry = fileListEntry(url.toString) def makeQualified(path: String): String @@ -441,11 +441,11 @@ trait FS extends Serializable { def writePDOS(filename: String)(writer: PositionedDataOutputStream => Unit) = using(create(filename))(os => writer(outputStreamToPositionedDataOutputStream(os))) - def getFileSize(filename: String): Long = fileStatus(filename).getLen + def getFileSize(filename: String): Long = fileListEntry(filename).getLen def isFile(filename: String): Boolean = { try { - fileStatus(filename).isFile + fileListEntry(filename).isFile } catch { case _: FileNotFoundException => false } @@ -453,7 +453,7 @@ trait FS extends Serializable { def isDir(filename: String): Boolean = { try { - fileStatus(filename).isDirectory + fileListEntry(filename).isDirectory } catch { case _: FileNotFoundException => false } @@ -461,7 +461,7 @@ trait FS extends Serializable { def exists(filename: String): Boolean = { try { - fileStatus(filename) + fileListEntry(filename) true } catch { case _: FileNotFoundException => false @@ -529,25 +529,25 @@ trait FS extends Serializable { delete(destinationFile, recursive = true) // overwriting by default - val headerFileStatus = glob(sourceFolder + "/header") + val headerFileListEntry = glob(sourceFolder + "/header") - if (header && headerFileStatus.isEmpty) + if (header && headerFileListEntry.isEmpty) fatal(s"Missing header file") - else if (!header && headerFileStatus.nonEmpty) + else if (!header && headerFileListEntry.nonEmpty) fatal(s"Found unexpected header file") - val partFileStatuses = partFilesOpt match { + val partFileListEntries = partFilesOpt match { case None => glob(sourceFolder + "/part-*") - case Some(files) => files.map(f => fileStatus(sourceFolder + "/" + f)).toArray + case Some(files) => files.map(f => fileListEntry(sourceFolder + "/" + f)).toArray } - val sortedPartFileStatuses = partFileStatuses.sortBy(fs => getPartNumber(new hadoop.fs.Path(fs.getPath).getName)) - if (sortedPartFileStatuses.length != numPartFilesExpected) - fatal(s"Expected $numPartFilesExpected part files but found ${ sortedPartFileStatuses.length }") + val sortedPartFileListEntries = partFileListEntries.sortBy(fs => getPartNumber(new hadoop.fs.Path(fs.getPath).getName)) + if (sortedPartFileListEntries.length != numPartFilesExpected) + fatal(s"Expected $numPartFilesExpected part files but found ${ sortedPartFileListEntries.length }") - val filesToMerge = headerFileStatus ++ sortedPartFileStatuses + val filesToMerge = headerFileListEntry ++ sortedPartFileListEntries info(s"merging ${ filesToMerge.length } files totalling " + - s"${ readableBytes(sortedPartFileStatuses.map(_.getLen).sum) }...") + s"${ readableBytes(sortedPartFileListEntries.map(_.getLen).sum) }...") val (_, dt) = time { copyMergeList(filesToMerge, destinationFile, deleteSource) @@ -562,26 +562,26 @@ trait FS extends Serializable { } } - def copyMergeList(srcFileStatuses: Array[FileStatus], destFilename: String, deleteSource: Boolean = true) { + def copyMergeList(srcFileListEntries: Array[FileListEntry], destFilename: String, deleteSource: Boolean = true) { val codec = Option(getCodecFromPath(destFilename)) val isBGzip = codec.exists(_ == BGZipCompressionCodec) - require(srcFileStatuses.forall { - fileStatus => fileStatus.getPath != destFilename && fileStatus.isFile + require(srcFileListEntries.forall { + fileListEntry => fileListEntry.getPath != destFilename && fileListEntry.isFile }) using(createNoCompression(destFilename)) { os => var i = 0 - while (i < srcFileStatuses.length) { - val fileStatus = srcFileStatuses(i) - val lenAdjust: Long = if (isBGzip && i < srcFileStatuses.length - 1) + while (i < srcFileListEntries.length) { + val fileListEntry = srcFileListEntries(i) + val lenAdjust: Long = if (isBGzip && i < srcFileListEntries.length - 1) -28 else 0 - using(openNoCompression(fileStatus.getPath)) { is => + using(openNoCompression(fileListEntry.getPath)) { is => hadoop.io.IOUtils.copyBytes(is, os, - fileStatus.getLen + lenAdjust, + fileListEntry.getLen + lenAdjust, false) } i += 1 @@ -589,19 +589,19 @@ trait FS extends Serializable { } if (deleteSource) { - srcFileStatuses.foreach { fileStatus => - delete(fileStatus.getPath.toString, recursive = true) + srcFileListEntries.foreach { fileListEntry => + delete(fileListEntry.getPath.toString, recursive = true) } } } def concatenateFiles(sourceNames: Array[String], destFilename: String): Unit = { - val fileStatuses = sourceNames.map(fileStatus(_)) + val fileListEntries = sourceNames.map(fileListEntry(_)) - info(s"merging ${ fileStatuses.length } files totalling " + - s"${ readableBytes(fileStatuses.map(_.getLen).sum) }...") + info(s"merging ${ fileListEntries.length } files totalling " + + s"${ readableBytes(fileListEntries.map(_.getLen).sum) }...") - val (_, timing) = time(copyMergeList(fileStatuses, destFilename, deleteSource = false)) + val (_, timing) = time(copyMergeList(fileListEntries, destFilename, deleteSource = false)) info(s"while writing:\n $destFilename\n merge time: ${ formatTime(timing) }") } diff --git a/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala b/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala index 793a43ce559..c330df4a056 100644 --- a/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala +++ b/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala @@ -64,13 +64,13 @@ object GoogleStorageFS { } } -object GoogleStorageFileStatus { - def apply(blob: Blob): BlobStorageFileStatus = { +object GoogleStorageFileListEntry { + def apply(blob: Blob): BlobStorageFileListEntry = { val isDir = blob.isDirectory val name = dropTrailingSlash(blob.getName) - new BlobStorageFileStatus( + new BlobStorageFileListEntry( s"gs://${ blob.getBucket }/$name", if (isDir) null @@ -448,14 +448,14 @@ class GoogleStorageFS( } } - def glob(filename: String): Array[FileStatus] = retryTransientErrors { + def glob(filename: String): Array[FileListEntry] = retryTransientErrors { val url = parseUrl(filename) globWithPrefix(url.withPath(""), path = dropTrailingSlash(url.path)) } - def listStatus(filename: String): Array[FileStatus] = listStatus(parseUrl(filename)) + def listStatus(filename: String): Array[FileListEntry] = listStatus(parseUrl(filename)) - override def listStatus(url: GoogleStorageFSURL): Array[FileStatus] = retryTransientErrors { + override def listStatus(url: GoogleStorageFSURL): Array[FileListEntry] = retryTransientErrors { val path = if (url.path.endsWith("/")) url.path else url.path + "/" val blobs = retryTransientErrors { @@ -468,17 +468,17 @@ class GoogleStorageFS( blobs.iterateAll().iterator.asScala .filter(b => b.getName != path) // elide directory markers created by Hadoop - .map(b => GoogleStorageFileStatus(b)) + .map(b => GoogleStorageFileListEntry(b)) .toArray } - def fileStatus(filename: String): FileStatus = fileStatus(parseUrl(filename)) + def fileListEntry(filename: String): FileListEntry = fileListEntry(parseUrl(filename)) - override def fileStatus(url: GoogleStorageFSURL): FileStatus = retryTransientErrors { + override def fileListEntry(url: GoogleStorageFSURL): FileListEntry = retryTransientErrors { val path = dropTrailingSlash(url.path) if (url.path == "") - return new BlobStorageFileStatus(s"gs://${url.bucket}", null, 0, true) + return new BlobStorageFileListEntry(s"gs://${url.bucket}", null, 0, true) val blobs = retryTransientErrors { handleRequesterPays( @@ -495,7 +495,7 @@ class GoogleStorageFS( while (name.endsWith("/")) name = name.dropRight(1) if (name == path) - return GoogleStorageFileStatus(b) + return GoogleStorageFileListEntry(b) } throw new FileNotFoundException(url.toString) diff --git a/hail/src/main/scala/is/hail/io/fs/HadoopFS.scala b/hail/src/main/scala/is/hail/io/fs/HadoopFS.scala index bc010f887c3..96e50630998 100644 --- a/hail/src/main/scala/is/hail/io/fs/HadoopFS.scala +++ b/hail/src/main/scala/is/hail/io/fs/HadoopFS.scala @@ -7,7 +7,7 @@ import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream} import java.io._ -class HadoopFileStatus(fs: hadoop.fs.FileStatus) extends FileStatus { +class HadoopFileListEntry(fs: hadoop.fs.FileStatus) extends FileListEntry { val normalizedPath = fs.getPath def getPath: String = fs.getPath.toString @@ -122,7 +122,7 @@ class HadoopFS(private[this] var conf: SerializableHadoopConfiguration) extends new hadoop.fs.Path(filename).getFileSystem(conf.value) } - def listStatus(filename: String): Array[FileStatus] = { + def listStatus(filename: String): Array[FileListEntry] = { val fs = getFileSystem(filename) val hPath = new hadoop.fs.Path(filename) var statuses = fs.globStatus(hPath) @@ -131,7 +131,7 @@ class HadoopFS(private[this] var conf: SerializableHadoopConfiguration) extends } else { statuses.par.map(_.getPath) .flatMap(fs.listStatus(_)) - .map(new HadoopFileStatus(_)) + .map(new HadoopFileListEntry(_)) .toArray } } @@ -163,7 +163,7 @@ class HadoopFS(private[this] var conf: SerializableHadoopConfiguration) extends }.toArray } - override def globAllStatuses(filenames: Iterable[String]): Array[FileStatus] = { + override def globAllStatuses(filenames: Iterable[String]): Array[FileListEntry] = { filenames.flatMap { filename => val statuses = glob(filename) if (statuses.isEmpty) @@ -172,7 +172,7 @@ class HadoopFS(private[this] var conf: SerializableHadoopConfiguration) extends }.toArray } - def glob(filename: String): Array[FileStatus] = { + def glob(filename: String): Array[FileListEntry] = { val fs = getFileSystem(filename) val path = new hadoop.fs.Path(filename) @@ -180,12 +180,12 @@ class HadoopFS(private[this] var conf: SerializableHadoopConfiguration) extends if (files == null) files = Array.empty log.info(s"globbing path $filename returned ${ files.length } files: ${ files.map(_.getPath.getName).mkString(",") }") - files.map(fileStatus => new HadoopFileStatus(fileStatus)) + files.map(fileListEntry => new HadoopFileListEntry(fileListEntry)) } - def fileStatus(filename: String): FileStatus = { + def fileListEntry(filename: String): FileListEntry = { val p = new hadoop.fs.Path(filename) - new HadoopFileStatus(p.getFileSystem(conf.value).getFileStatus(p)) + new HadoopFileListEntry(p.getFileSystem(conf.value).getFileStatus(p)) } def makeQualified(path: String): String = { diff --git a/hail/src/main/scala/is/hail/io/fs/RouterFS.scala b/hail/src/main/scala/is/hail/io/fs/RouterFS.scala index 363384555bc..40f0fc49033 100644 --- a/hail/src/main/scala/is/hail/io/fs/RouterFS.scala +++ b/hail/src/main/scala/is/hail/io/fs/RouterFS.scala @@ -29,11 +29,11 @@ class RouterFS(fss: IndexedSeq[FS]) extends FS { def delete(filename: String, recursive: Boolean) = lookupFS(filename).delete(filename, recursive) - def listStatus(filename: String): Array[FileStatus] = lookupFS(filename).listStatus(filename) + def listStatus(filename: String): Array[FileListEntry] = lookupFS(filename).listStatus(filename) - def glob(filename: String): Array[FileStatus] = lookupFS(filename).glob(filename) + def glob(filename: String): Array[FileListEntry] = lookupFS(filename).glob(filename) - def fileStatus(filename: String): FileStatus = lookupFS(filename).fileStatus(filename) + def fileListEntry(filename: String): FileListEntry = lookupFS(filename).fileListEntry(filename) def makeQualified(path: String): String = lookupFS(path).makeQualified(path) diff --git a/hail/src/main/scala/is/hail/io/vcf/LoadVCF.scala b/hail/src/main/scala/is/hail/io/vcf/LoadVCF.scala index cf5801a3bfb..9b616981cc7 100644 --- a/hail/src/main/scala/is/hail/io/vcf/LoadVCF.scala +++ b/hail/src/main/scala/is/hail/io/vcf/LoadVCF.scala @@ -9,7 +9,7 @@ import is.hail.expr.JSONAnnotationImpex import is.hail.expr.ir.lowering.TableStage import is.hail.expr.ir.streams.StreamProducer import is.hail.expr.ir.{CloseableIterator, EmitCode, EmitCodeBuilder, EmitMethodBuilder, GenericLine, GenericLines, GenericTableValue, IEmitCode, IR, IRParser, Literal, LowerMatrixIR, MatrixHybridReader, MatrixReader, TableExecuteIntermediate, PartitionReader, TableValue} -import is.hail.io.fs.{FS, FileStatus} +import is.hail.io.fs.{FS, FileListEntry} import is.hail.io.tabix._ import is.hail.io.vcf.LoadVCF.{getHeaderLines, parseHeader} import is.hail.io.{VCFAttributes, VCFMetadata} @@ -1257,7 +1257,7 @@ object LoadVCF { def globAllVCFs(arguments: Array[String], fs: FS, forceGZ: Boolean = false, - gzAsBGZ: Boolean = false): Array[FileStatus] = { + gzAsBGZ: Boolean = false): Array[FileListEntry] = { val statuses = fs.globAllStatuses(arguments) if (statuses.isEmpty) @@ -1689,20 +1689,20 @@ object MatrixVCFReader { referenceGenome.foreach(_.validateContigRemap(params.contigRecoding)) - val fileStatuses = LoadVCF.globAllVCFs(fs.globAll(params.files), fs, params.forceGZ, params.gzAsBGZ) + val fileListEntries = LoadVCF.globAllVCFs(fs.globAll(params.files), fs, params.forceGZ, params.gzAsBGZ) val entryFloatType = LoadVCF.getEntryFloatType(params.entryFloatTypeName) - val headerLines1 = getHeaderLines(fs, params.headerFile.getOrElse(fileStatuses.head.getPath), params.filterAndReplace) + val headerLines1 = getHeaderLines(fs, params.headerFile.getOrElse(fileListEntries.head.getPath), params.filterAndReplace) val header1 = parseHeader(headerLines1) - if (fileStatuses.length > 1) { + if (fileListEntries.length > 1) { if (params.headerFile.isEmpty) { val header1Bc = backend.broadcast(header1) val localCallFields = params.callFields val localFloatType = entryFloatType - val files = fileStatuses.map(_.getPath) + val files = fileListEntries.map(_.getPath) val localArrayElementsRequired = params.arrayElementsRequired val localFilterAndReplace = params.filterAndReplace @@ -1757,7 +1757,7 @@ object MatrixVCFReader { LoadVCF.warnDuplicates(sampleIDs) - new MatrixVCFReader(params, fileStatuses, referenceGenome, header1) + new MatrixVCFReader(params, fileListEntries, referenceGenome, header1) } def fromJValue(ctx: ExecuteContext, jv: JValue): MatrixVCFReader = { @@ -1791,11 +1791,11 @@ case class MatrixVCFReaderParameters( class MatrixVCFReader( val params: MatrixVCFReaderParameters, - fileStatuses: IndexedSeq[FileStatus], + fileListEntries: IndexedSeq[FileListEntry], referenceGenome: Option[ReferenceGenome], header: VCFHeaderInfo ) extends MatrixHybridReader { - require(params.partitionsJSON.isEmpty || fileStatuses.length == 1, "reading with partitions can currently only read a single path") + require(params.partitionsJSON.isEmpty || fileListEntries.length == 1, "reading with partitions can currently only read a single path") val sampleIDs = params.sampleIDs.map(_.toArray).getOrElse(header.sampleIds) @@ -1887,9 +1887,9 @@ class MatrixVCFReader( val part = partitioner(ctx.stateManager) val lines = part match { case Some(partitioner) => - GenericLines.readTabix(fs, fileStatuses(0).getPath, localContigRecoding, partitioner.rangeBounds) + GenericLines.readTabix(fs, fileListEntries(0).getPath, localContigRecoding, partitioner.rangeBounds) case None => - GenericLines.read(fs, fileStatuses, params.nPartitions, params.blockSizeInMB, params.minPartitions, params.gzAsBGZ, params.forceGZ) + GenericLines.read(fs, fileListEntries, params.nPartitions, params.blockSizeInMB, params.minPartitions, params.gzAsBGZ, params.forceGZ) } val globals = Row(sampleIDs.zipWithIndex.map { case (s, i) => Row(s, i.toLong) }.toFastIndexedSeq) diff --git a/hail/src/main/scala/is/hail/methods/MatrixExportEntriesByCol.scala b/hail/src/main/scala/is/hail/methods/MatrixExportEntriesByCol.scala index ba0ac578960..4ad95fe3cb5 100644 --- a/hail/src/main/scala/is/hail/methods/MatrixExportEntriesByCol.scala +++ b/hail/src/main/scala/is/hail/methods/MatrixExportEntriesByCol.scala @@ -10,7 +10,6 @@ import is.hail.expr.ir.MatrixValue import is.hail.expr.ir.functions.MatrixToValueFunction import is.hail.types.{MatrixType, RTable, TypeWithRequiredness} import is.hail.types.virtual.{TVoid, Type} -import is.hail.io.fs.FileStatus import is.hail.utils._ import org.apache.spark.TaskContext import org.apache.spark.sql.Row @@ -151,8 +150,8 @@ case class MatrixExportEntriesByCol(parallelism: Int, path: String, bgzip: Boole val newFiles = mv.sparkContext.parallelize(0 until ns, numSlices = ns) .map { sampleIdx => val partFilePath = path + "/" + partFile(digitsNeeded(nCols), sampleIdx, TaskContext.get) - val fileStatuses = partFolders.map(pf => fsBc.value.fileStatus(pf + s"/$sampleIdx" + extension)) - fsBc.value.copyMergeList(fileStatuses, partFilePath, deleteSource = false) + val fileListEntries = partFolders.map(pf => fsBc.value.fileListEntry(pf + s"/$sampleIdx" + extension)) + fsBc.value.copyMergeList(fileListEntries, partFilePath, deleteSource = false) partFilePath }.collect() diff --git a/hail/src/main/scala/is/hail/utils/Py4jUtils.scala b/hail/src/main/scala/is/hail/utils/Py4jUtils.scala index eefeacd3168..afb24f8dad9 100644 --- a/hail/src/main/scala/is/hail/utils/Py4jUtils.scala +++ b/hail/src/main/scala/is/hail/utils/Py4jUtils.scala @@ -2,7 +2,7 @@ package is.hail.utils import is.hail.HailContext import is.hail.expr.JSONAnnotationImpex -import is.hail.io.fs.{FS, FileStatus, SeekableDataInputStream} +import is.hail.io.fs.{FS, FileListEntry, SeekableDataInputStream} import is.hail.types.virtual.Type import org.json4s.JsonAST._ import org.json4s.jackson.JsonMethods @@ -52,15 +52,15 @@ trait Py4jUtils { def ls(fs: FS, path: String): String = { val statuses = fs.listStatus(path) - JsonMethods.compact(JArray(statuses.map(fs => statusToJson(fs)).toList)) + JsonMethods.compact(JArray(statuses.map(fs => fileListEntryToJson(fs)).toList)) } - def stat(fs: FS, path: String): String = { - val stat = fs.fileStatus(path) - JsonMethods.compact(statusToJson(stat)) + def fileListEntry(fs: FS, path: String): String = { + val stat = fs.fileListEntry(path) + JsonMethods.compact(fileListEntryToJson(stat)) } - private def statusToJson(fs: FileStatus): JObject = { + private def fileListEntryToJson(fs: FileListEntry): JObject = { JObject( "path" -> JString(fs.getPath.toString), "size" -> JInt(fs.getLen), @@ -103,7 +103,7 @@ trait Py4jUtils { } def readFile(fs: FS, path: String, buffSize: Int): HadoopSeekablePyReader = - new HadoopSeekablePyReader(fs.fileStatus(path), fs.openNoCompression(path), buffSize) + new HadoopSeekablePyReader(fs.fileListEntry(path), fs.openNoCompression(path), buffSize) def readFileCodec(fs: FS, path: String, buffSize: Int): HadoopPyReader = new HadoopPyReader(fs.open(path), buffSize) @@ -163,7 +163,7 @@ class HadoopPyReader(in: InputStream, buffSize: Int) { } } -class HadoopSeekablePyReader(status: FileStatus, in: SeekableDataInputStream, buffSize: Int) extends HadoopPyReader(in, buffSize) { +class HadoopSeekablePyReader(status: FileListEntry, in: SeekableDataInputStream, buffSize: Int) extends HadoopPyReader(in, buffSize) { def seek(pos: Long, whence: Int): Long = { // whence corresponds to python arguments to seek // it is validated in python diff --git a/hail/src/main/scala/is/hail/variant/ReferenceGenome.scala b/hail/src/main/scala/is/hail/variant/ReferenceGenome.scala index 0284be23ee8..db3868664aa 100644 --- a/hail/src/main/scala/is/hail/variant/ReferenceGenome.scala +++ b/hail/src/main/scala/is/hail/variant/ReferenceGenome.scala @@ -408,7 +408,7 @@ case class ReferenceGenome(name: String, contigs: Array[String], lengths: Map[St if (!fs.exists(chainFile)) fatal(s"Chain file '$chainFile' does not exist.") - val chainFilePath = fs.fileStatus(chainFile).getPath + val chainFilePath = fs.fileListEntry(chainFile).getPath val lo = LiftOver(fs, chainFilePath) val destRG = ctx.getReference(destRGName) lo.checkChainFile(this, destRG) @@ -446,7 +446,7 @@ case class ReferenceGenome(name: String, contigs: Array[String], lengths: Map[St // since removeLiftover updates both maps, so we don't check to see if liftoverMap has // keys that are not in chainFiles for ((destRGName, chainFile) <- chainFiles) { - val chainFilePath = fs.fileStatus(chainFile).getPath + val chainFilePath = fs.fileListEntry(chainFile).getPath liftoverMap.get(destRGName) match { case Some(lo) if lo.chainFile == chainFilePath => // do nothing case _ => liftoverMap += destRGName -> LiftOver(fs, chainFilePath) @@ -455,8 +455,8 @@ case class ReferenceGenome(name: String, contigs: Array[String], lengths: Map[St // add sequence if (fastaFilePath != null) { - val fastaPath = fs.fileStatus(fastaFilePath).getPath - val indexPath = fs.fileStatus(fastaIndexPath).getPath + val fastaPath = fs.fileListEntry(fastaFilePath).getPath + val indexPath = fs.fileListEntry(fastaIndexPath).getPath if (fastaReaderCfg == null || fastaReaderCfg.fastaFile != fastaPath || fastaReaderCfg.indexFile != indexPath) { fastaReaderCfg = FASTAReaderConfig(tmpdir, fs, this, fastaPath, indexPath) } diff --git a/hail/src/test/scala/is/hail/fs/FSSuite.scala b/hail/src/test/scala/is/hail/fs/FSSuite.scala index 628c4981762..d540c4aab10 100644 --- a/hail/src/test/scala/is/hail/fs/FSSuite.scala +++ b/hail/src/test/scala/is/hail/fs/FSSuite.scala @@ -5,7 +5,7 @@ import is.hail.fs.azure.AzureStorageFSSuite import is.hail.HailSuite import is.hail.backend.ExecuteContext import is.hail.io.fs.FSUtil.dropTrailingSlash -import is.hail.io.fs.{FS, FileStatus, GoogleStorageFS, Seekable} +import is.hail.io.fs.{FS, FileListEntry, GoogleStorageFS, Seekable} import is.hail.utils._ import org.apache.commons.codec.binary.Hex import org.apache.commons.io.IOUtils @@ -34,7 +34,7 @@ trait FSSuite extends TestNGSuite { def t(extension: String = null): String = ExecuteContext.createTmpPathNoCleanup(tmpdir, "fs-suite-tmp", extension) - def pathsRelRoot(root: String, statuses: Array[FileStatus]): Set[String] = { + def pathsRelRoot(root: String, statuses: Array[FileListEntry]): Set[String] = { statuses.map { status => var p = status.getPath assert(p.startsWith(root), s"$p $root") @@ -42,7 +42,7 @@ trait FSSuite extends TestNGSuite { }.toSet } - def pathsRelResourcesRoot(statuses: Array[FileStatus]): Set[String] = pathsRelRoot(fsResourcesRoot, statuses) + def pathsRelResourcesRoot(statuses: Array[FileListEntry]): Set[String] = pathsRelRoot(fsResourcesRoot, statuses) @Test def testExists(): Unit = { assert(fs.exists(r("/a"))) @@ -57,37 +57,37 @@ trait FSSuite extends TestNGSuite { assert(!fs.exists(r("/does_not_exist_dir/"))) } - @Test def testFileStatusOnFile(): Unit = { + @Test def testFileListEntryOnFile(): Unit = { // file val f = r("/a") - val s = fs.fileStatus(f) + val s = fs.fileListEntry(f) assert(s.getPath == f) assert(s.isFile) assert(!s.isDirectory) assert(s.getLen == 12) } - @Test def testFileStatusOnDir(): Unit = { + @Test def testFileListEntryOnDir(): Unit = { // file val f = r("/dir") - val s = fs.fileStatus(f) + val s = fs.fileListEntry(f) assert(s.getPath == f) assert(!s.isFile) assert(s.isDirectory) } - @Test def testFileStatusOnDirWithSlash(): Unit = { + @Test def testFileListEntryOnDirWithSlash(): Unit = { // file val f = r("/dir/") - val s = fs.fileStatus(f) + val s = fs.fileListEntry(f) assert(s.getPath == f.dropRight(1)) assert(!s.isFile) assert(s.isDirectory) } - @Test def testFileStatusOnMissingFile(): Unit = { + @Test def testFileListEntryOnMissingFile(): Unit = { try { - fs.fileStatus(r("/does_not_exist")) + fs.fileListEntry(r("/does_not_exist")) } catch { case _: FileNotFoundException => return @@ -95,16 +95,16 @@ trait FSSuite extends TestNGSuite { assert(false) } - @Test def testFileStatusRoot(): Unit = { - val s = fs.fileStatus(root) + @Test def testFileListEntryRoot(): Unit = { + val s = fs.fileListEntry(root) assert(s.getPath == root) } - @Test def testFileStatusRootWithSlash(): Unit = { + @Test def testFileListEntryRootWithSlash(): Unit = { if (root.endsWith("/")) return - val s = fs.fileStatus(s"$root/") + val s = fs.fileListEntry(s"$root/") assert(s.getPath == root) } diff --git a/hail/src/test/scala/is/hail/io/compress/BGzipCodecSuite.scala b/hail/src/test/scala/is/hail/io/compress/BGzipCodecSuite.scala index c8ce681947c..6c91c7c19ce 100644 --- a/hail/src/test/scala/is/hail/io/compress/BGzipCodecSuite.scala +++ b/hail/src/test/scala/is/hail/io/compress/BGzipCodecSuite.scala @@ -75,7 +75,7 @@ class BGzipCodecSuite extends HailSuite { @Test def testGenericLinesSimpleUncompressed() { val lines = Source.fromFile(uncompPath).getLines().toFastIndexedSeq - val uncompStatus = fs.fileStatus(uncompPath) + val uncompStatus = fs.fileListEntry(uncompPath) var i = 0 while (i < 16) { val lines2 = GenericLines.collect( @@ -89,7 +89,7 @@ class BGzipCodecSuite extends HailSuite { @Test def testGenericLinesSimpleBGZ() { val lines = Source.fromFile(uncompPath).getLines().toFastIndexedSeq - val compStatus = fs.fileStatus(compPath) + val compStatus = fs.fileListEntry(compPath) var i = 0 while (i < 16) { val lines2 = GenericLines.collect( @@ -104,7 +104,7 @@ class BGzipCodecSuite extends HailSuite { val lines = Source.fromFile(uncompPath).getLines().toFastIndexedSeq // won't split, just run once - val gzStatus = fs.fileStatus(gzPath) + val gzStatus = fs.fileListEntry(gzPath) val lines2 = GenericLines.collect( fs, GenericLines.read(fs, Array(gzStatus), Some(7), None, None, false, true)) @@ -113,7 +113,7 @@ class BGzipCodecSuite extends HailSuite { @Test def testGenericLinesRefuseGZ() { interceptFatal("Cowardly refusing") { - val gzStatus = fs.fileStatus(gzPath) + val gzStatus = fs.fileListEntry(gzPath) GenericLines.read(fs, Array(gzStatus), Some(7), None, None, false, false) } } diff --git a/hail/src/test/scala/is/hail/utils/UtilsSuite.scala b/hail/src/test/scala/is/hail/utils/UtilsSuite.scala index 7650de60962..33b59cb5d65 100644 --- a/hail/src/test/scala/is/hail/utils/UtilsSuite.scala +++ b/hail/src/test/scala/is/hail/utils/UtilsSuite.scala @@ -83,13 +83,13 @@ class UtilsSuite extends HailSuite { assert(!rdd1.exists(_ < 0)) } - @Test def testSortFileStatus() { + @Test def testSortFileListEntry() { val fs = new HadoopFS(new SerializableHadoopConfiguration(sc.hadoopConfiguration)) val partFileNames = fs.glob("src/test/resources/part-*") - .map { fileStatus => - (fileStatus, new hadoop.fs.Path(fileStatus.getPath)) - }.sortBy { case (fileStatus, path) => + .map { fileListEntry => + (fileListEntry, new hadoop.fs.Path(fileListEntry.getPath)) + }.sortBy { case (fileListEntry, path) => getPartNumber(path.getName) }.map(_._2.getName)