Skip to content

Commit

Permalink
[fs] consistent use of the term FileListEntry (#13594)
Browse files Browse the repository at this point in the history
In our asyncio FSes, we use the term FileListEntry for the package of
information you receive from a cloud object store list operation. We
reserve the term FileStatus for the metadata you get about a blob
without a list operation. A major difference is that FLEs know if they
refer to files or folders. In contrast, a FileStatus knows that it is a
file *but does not know if a directory with the same name exists*.
  • Loading branch information
danking committed Sep 11, 2023
1 parent f96d24e commit 29b2afc
Show file tree
Hide file tree
Showing 20 changed files with 192 additions and 187 deletions.
29 changes: 16 additions & 13 deletions hail/python/hail/fs/hadoop_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,26 @@
import time
from typing import Dict, List, Union, Any

import dateutil
import dateutil.parser

from hailtop.fs.fs import FS
from hailtop.fs.stat_result import FileType, StatResult
from hailtop.fs.stat_result import FileType, FileListEntry


def _stat_dict_to_stat_result(stat: Dict[str, Any]) -> StatResult:
dt = dateutil.parser.isoparse(stat['modification_time'])
def _file_list_entry_scala_to_python(file_list_entry: Dict[str, Any]) -> FileListEntry:
dt = dateutil.parser.isoparse(file_list_entry['modification_time'])
mtime = time.mktime(dt.timetuple())
if stat['is_dir']:
if file_list_entry['is_dir']:
typ = FileType.DIRECTORY
elif stat['is_link']:
elif file_list_entry['is_link']:
typ = FileType.SYMLINK
else:
typ = FileType.FILE
return StatResult(path=stat['path'], owner=stat['owner'], size=stat['size'],
typ=typ, modification_time=mtime)
return FileListEntry(path=file_list_entry['path'],
owner=file_list_entry['owner'],
size=file_list_entry['size'],
typ=typ,
modification_time=mtime)


class HadoopFS(FS):
Expand Down Expand Up @@ -60,12 +63,12 @@ def is_file(self, path: str) -> bool:
def is_dir(self, path: str) -> bool:
return self._jfs.isDir(path)

def stat(self, path: str) -> StatResult:
stat_dict = json.loads(self._utils_package_object.stat(self._jfs, path))
return _stat_dict_to_stat_result(stat_dict)
def stat(self, path: str) -> FileListEntry:
stat_dict = json.loads(self._utils_package_object.fileListEntry(self._jfs, path))
return _file_list_entry_scala_to_python(stat_dict)

def ls(self, path: str) -> List[StatResult]:
return [_stat_dict_to_stat_result(st)
def ls(self, path: str) -> List[FileListEntry]:
return [_file_list_entry_scala_to_python(st)
for st in json.loads(self._utils_package_object.ls(self._jfs, path))]

def mkdir(self, path: str) -> None:
Expand Down
6 changes: 3 additions & 3 deletions hail/python/hailtop/fs/fs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import abc
from typing import IO, List

from .stat_result import StatResult
from .stat_result import FileListEntry


class FS(abc.ABC):
Expand All @@ -26,11 +26,11 @@ def is_dir(self, path: str) -> bool:
raise NotImplementedError

@abc.abstractmethod
def stat(self, path: str) -> StatResult:
def stat(self, path: str) -> FileListEntry:
raise NotImplementedError

@abc.abstractmethod
def ls(self, path: str) -> List[StatResult]:
def ls(self, path: str) -> List[FileListEntry]:
raise NotImplementedError

@abc.abstractmethod
Expand Down
6 changes: 3 additions & 3 deletions hail/python/hailtop/fs/fs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import List

from .router_fs import RouterFS
from .stat_result import StatResult
from .stat_result import FileListEntry

_router_fs = None

Expand Down Expand Up @@ -150,7 +150,7 @@ def is_dir(path: str) -> bool:
return _fs().is_dir(path)


def stat(path: str) -> StatResult:
def stat(path: str) -> FileListEntry:
"""Returns information about the file or directory at a given path.
Notes
Expand All @@ -177,7 +177,7 @@ def stat(path: str) -> StatResult:
return _fs().stat(path)


def ls(path: str) -> List[StatResult]:
def ls(path: str) -> List[FileListEntry]:
"""Returns information about files at `path`.
Notes
Expand Down
26 changes: 13 additions & 13 deletions hail/python/hailtop/fs/router_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
import glob
import fnmatch

from hailtop.aiotools.fs import Copier, Transfer, FileListEntry, ReadableStream, WritableStream
from hailtop.aiotools.fs import Copier, Transfer, FileListEntry as AIOFileListEntry, ReadableStream, WritableStream
from hailtop.aiotools.local_fs import LocalAsyncFS
from hailtop.aiotools.router_fs import RouterAsyncFS
from hailtop.utils import bounded_gather2, async_to_blocking

from .fs import FS
from .stat_result import FileType, StatResult
from .stat_result import FileType, FileListEntry


class SyncReadableStream(io.RawIOBase, BinaryIO): # type: ignore # https://github.com/python/typeshed/blob/a40d79a4e63c4e750a8d3a8012305da942251eb4/stdlib/http/client.pyi#L81
Expand Down Expand Up @@ -152,14 +152,14 @@ def write(self, b):
return async_to_blocking(self.aws.write(b))


def _stat_result(is_dir: bool, size_bytes_and_time_modified: Optional[Tuple[int, float]], path: str) -> StatResult:
def _stat_result(is_dir: bool, size_bytes_and_time_modified: Optional[Tuple[int, float]], path: str) -> FileListEntry:
if size_bytes_and_time_modified:
size_bytes, time_modified = size_bytes_and_time_modified
else:
size_bytes = 0
time_modified = None

return StatResult(
return FileListEntry(
path=path.rstrip('/'),
size=size_bytes,
typ=FileType.DIRECTORY if is_dir else FileType.FILE,
Expand Down Expand Up @@ -238,7 +238,7 @@ async def _async_is_dir(self, path: str) -> bool:
def is_dir(self, path: str) -> bool:
return async_to_blocking(self._async_is_dir(path))

def stat(self, path: str) -> StatResult:
def stat(self, path: str) -> FileListEntry:
maybe_sb_and_t, is_dir = async_to_blocking(asyncio.gather(
self._size_bytes_and_time_modified_or_none(path), self._async_is_dir(path)))
if maybe_sb_and_t is None:
Expand All @@ -255,7 +255,7 @@ async def _size_bytes_and_time_modified_or_none(self, path: str) -> Optional[Tup
except FileNotFoundError:
return None

async def _fle_to_dict(self, fle: FileListEntry) -> StatResult:
async def _aiofle_to_fle(self, fle: AIOFileListEntry) -> FileListEntry:
async def maybe_status() -> Optional[Tuple[int, float]]:
try:
file_status = await fle.status()
Expand All @@ -269,7 +269,7 @@ def ls(self,
path: str,
*,
error_when_file_and_directory: bool = True,
_max_simultaneous_files: int = 50) -> List[StatResult]:
_max_simultaneous_files: int = 50) -> List[FileListEntry]:
return async_to_blocking(self._async_ls(
path,
error_when_file_and_directory=error_when_file_and_directory,
Expand All @@ -279,10 +279,10 @@ async def _async_ls(self,
path: str,
*,
error_when_file_and_directory: bool = True,
_max_simultaneous_files: int = 50) -> List[StatResult]:
_max_simultaneous_files: int = 50) -> List[FileListEntry]:
sema = asyncio.Semaphore(_max_simultaneous_files)

async def ls_no_glob(path) -> List[StatResult]:
async def ls_no_glob(path) -> List[FileListEntry]:
try:
return await self._ls_no_glob(path,
error_when_file_and_directory=error_when_file_and_directory,
Expand Down Expand Up @@ -320,7 +320,7 @@ async def ls_no_glob(path) -> List[StatResult]:
else:
first_prefix = []

cached_stats_for_each_cumulative_prefix: Optional[List[StatResult]] = None
cached_stats_for_each_cumulative_prefix: Optional[List[FileListEntry]] = None
cumulative_prefixes = [first_prefix]

for intervening_components, single_component_glob_pattern in glob_components:
Expand Down Expand Up @@ -370,12 +370,12 @@ async def _ls_no_glob(self,
path: str,
*,
error_when_file_and_directory: bool = True,
sema: asyncio.Semaphore) -> List[StatResult]:
async def ls_as_dir() -> Optional[List[StatResult]]:
sema: asyncio.Semaphore) -> List[FileListEntry]:
async def ls_as_dir() -> Optional[List[FileListEntry]]:
try:
return await bounded_gather2(
sema,
*[functools.partial(self._fle_to_dict, fle)
*[functools.partial(self._aiofle_to_fle, fle)
async for fle in await self.afs.listfiles(path)],
cancel_on_error=True
)
Expand Down
7 changes: 5 additions & 2 deletions hail/python/hailtop/fs/stat_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class FileType(Enum):
SYMLINK = auto()


class StatResult(NamedTuple):
class FileListEntry(NamedTuple):
path: str
owner: Union[None, str, int]
size: int
Expand All @@ -25,8 +25,11 @@ def to_legacy_dict(self) -> Dict[str, Any]:
return {
'path': self.path,
'owner': self.owner,
'is_dir': self.is_dir(),
'size_bytes': self.size,
'size': filesize(self.size),
'modification_time': self.modification_time,
'is_dir': self.is_dir(),
}


StatResult = FileListEntry # backwards compatibility
20 changes: 10 additions & 10 deletions hail/src/main/scala/is/hail/expr/ir/GenericLines.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package is.hail.expr.ir

import is.hail.backend.spark.SparkBackend
import is.hail.io.compress.BGzipInputStream
import is.hail.io.fs.{FS, FileStatus, Positioned, PositionedInputStream, BGZipCompressionCodec}
import is.hail.io.fs.{FS, FileListEntry, Positioned, PositionedInputStream, BGZipCompressionCodec}
import is.hail.io.tabix.{TabixReader, TabixLineIterator}
import is.hail.types.virtual.{TBoolean, TInt32, TInt64, TString, TStruct, Type}
import is.hail.utils._
Expand Down Expand Up @@ -252,16 +252,16 @@ object GenericLines {

def read(
fs: FS,
fileStatuses0: IndexedSeq[FileStatus],
fileListEntries0: IndexedSeq[FileListEntry],
nPartitions: Option[Int],
blockSizeInMB: Option[Int],
minPartitions: Option[Int],
gzAsBGZ: Boolean,
allowSerialRead: Boolean,
filePerPartition: Boolean = false
): GenericLines = {
val fileStatuses = fileStatuses0.zipWithIndex.filter(_._1.getLen > 0)
val totalSize = fileStatuses.map(_._1.getLen).sum
val fileListEntries = fileListEntries0.zipWithIndex.filter(_._1.getLen > 0)
val totalSize = fileListEntries.map(_._1.getLen).sum

var totalPartitions = nPartitions match {
case Some(nPartitions) => nPartitions
Expand All @@ -276,9 +276,9 @@ object GenericLines {
case None =>
}

val contexts = fileStatuses.flatMap { case (status, fileNum) =>
val size = status.getLen
val codec = fs.getCodecFromPath(status.getPath, gzAsBGZ)
val contexts = fileListEntries.flatMap { case (fileListEntry, fileNum) =>
val size = fileListEntry.getLen
val codec = fs.getCodecFromPath(fileListEntry.getPath, gzAsBGZ)

val splittable = codec == null || codec == BGZipCompressionCodec
if (splittable && !filePerPartition) {
Expand All @@ -294,14 +294,14 @@ object GenericLines {
var end = partScan(i + 1)
if (codec != null)
end = makeVirtualOffset(end, 0)
Row(i, fileNum, status.getPath, start, end, true)
Row(i, fileNum, fileListEntry.getPath, start, end, true)
}
} else {
if (!allowSerialRead && !filePerPartition)
fatal(s"Cowardly refusing to read file serially: ${ status.getPath }.")
fatal(s"Cowardly refusing to read file serially: ${ fileListEntry.getPath }.")

Iterator.single {
Row(0, fileNum, status.getPath, 0L, size, false)
Row(0, fileNum, fileListEntry.getPath, 0L, size, false)
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions hail/src/main/scala/is/hail/expr/ir/StringTableReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import is.hail.backend.ExecuteContext
import is.hail.expr.ir.functions.StringFunctions
import is.hail.expr.ir.lowering.{LowererUnsupportedOperation, TableStage, TableStageDependency, TableStageToRVD}
import is.hail.expr.ir.streams.StreamProducer
import is.hail.io.fs.{FS, FileStatus}
import is.hail.io.fs.{FS, FileListEntry}
import is.hail.rvd.RVDPartitioner
import is.hail.types.physical.stypes.EmitType
import is.hail.types.physical.stypes.concrete.{SJavaString, SStackStruct, SStackStructValue}
Expand All @@ -26,16 +26,16 @@ case class StringTableReaderParameters(

object StringTableReader {
def apply(fs: FS, params: StringTableReaderParameters): StringTableReader = {
val fileStatuses = getFileStatuses(fs, params.files, params.forceBGZ, params.forceGZ)
new StringTableReader(params, fileStatuses)
val fileListEntries = getFileListEntries(fs, params.files, params.forceBGZ, params.forceGZ)
new StringTableReader(params, fileListEntries)
}
def fromJValue(fs: FS, jv: JValue): StringTableReader = {
implicit val formats: Formats = TableReader.formats
val params = jv.extract[StringTableReaderParameters]
StringTableReader(fs, params)
}

def getFileStatuses(fs: FS, files: Array[String], forceBGZ: Boolean, forceGZ: Boolean): Array[FileStatus] = {
def getFileListEntries(fs: FS, files: Array[String], forceBGZ: Boolean, forceGZ: Boolean): Array[FileListEntry] = {
val status = fs.globAllStatuses(files)
if (status.isEmpty)
fatal(s"arguments refer to no files: ${files.toIndexedSeq}.")
Expand Down Expand Up @@ -141,7 +141,7 @@ case class StringTablePartitionReader(lines: GenericLines, uidFieldName: String)

class StringTableReader(
val params: StringTableReaderParameters,
fileStatuses: IndexedSeq[FileStatus]
fileListEntries: IndexedSeq[FileListEntry]
) extends TableReaderWithExtraUID {

override def uidType = TTuple(TInt64, TInt64)
Expand All @@ -157,7 +157,7 @@ class StringTableReader(

override def lower(ctx: ExecuteContext, requestedType: TableType): TableStage = {
val fs = ctx.fs
val lines = GenericLines.read(fs, fileStatuses, None, None, params.minPartitions, params.forceBGZ, params.forceGZ,
val lines = GenericLines.read(fs, fileListEntries, None, None, params.minPartitions, params.forceBGZ, params.forceGZ,
params.filePerPartition)
TableStage(globals = MakeStruct(FastSeq()),
partitioner = RVDPartitioner.unkeyed(ctx.stateManager, lines.nPartitions),
Expand Down
20 changes: 10 additions & 10 deletions hail/src/main/scala/is/hail/io/bgen/LoadBgen.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import is.hail.expr.ir.lowering.{TableStage, TableStageDependency}
import is.hail.expr.ir.streams.StreamProducer
import is.hail.expr.ir.{EmitCode, EmitCodeBuilder, EmitMethodBuilder, EmitSettable, EmitValue, IEmitCode, IR, IRParserEnvironment, Literal, LowerMatrixIR, MakeStruct, MatrixHybridReader, MatrixReader, PartitionNativeIntervalReader, PartitionReader, ReadPartition, Ref, StreamTake, TableExecuteIntermediate, TableNativeReader, TableReader, TableValue, ToStream}
import is.hail.io._
import is.hail.io.fs.{FS, FileStatus, SeekableDataInputStream}
import is.hail.io.fs.{FS, FileListEntry, SeekableDataInputStream}
import is.hail.io.index.{IndexReader, StagedIndexReader}
import is.hail.io.vcf.LoadVCF
import is.hail.rvd.RVDPartitioner
Expand Down Expand Up @@ -149,24 +149,24 @@ object LoadBgen {
| ${ notVersionTwo.mkString("\n ") }""".stripMargin)
}

def getAllFileStatuses(fs: FS, files: Array[String]): Array[FileStatus] = {
def getAllFileListEntries(fs: FS, files: Array[String]): Array[FileListEntry] = {
val badFiles = new BoxedArrayBuilder[String]()

val statuses = files.flatMap { file =>
val fileListEntries = files.flatMap { file =>
val matches = fs.glob(file)
if (matches.isEmpty)
badFiles += file

matches.flatMap { status =>
val file = status.getPath.toString
matches.flatMap { fileListEntry =>
val file = fileListEntry.getPath.toString
if (!file.endsWith(".bgen"))
warn(s"input file does not have .bgen extension: $file")

if (fs.isDir(file))
fs.listStatus(file)
.filter(status => ".*part-[0-9]+(-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})?".r.matches(status.getPath.toString))
.filter(fileListEntry => ".*part-[0-9]+(-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})?".r.matches(fileListEntry.getPath.toString))
else
Array(status)
Array(fileListEntry)
}
}

Expand All @@ -175,11 +175,11 @@ object LoadBgen {
s"""The following paths refer to no files:
| ${ badFiles.result().mkString("\n ") }""".stripMargin)

statuses
fileListEntries
}

def getAllFilePaths(fs: FS, files: Array[String]): Array[String] =
getAllFileStatuses(fs, files).map(_.getPath.toString)
getAllFileListEntries(fs, files).map(_.getPath.toString)


def getBgenFileMetadata(ctx: ExecuteContext, files: Array[String], indexFiles: Array[String]): Array[BgenFileMetadata] = {
Expand Down Expand Up @@ -226,7 +226,7 @@ object LoadBgen {
}

def getIndexFileNames(fs: FS, files: Array[String], indexFileMap: Map[String, String]): Array[String] = {
def absolutePath(rel: String): String = fs.fileStatus(rel).getPath.toString
def absolutePath(rel: String): String = fs.fileListEntry(rel).getPath.toString

val fileMapping = Option(indexFileMap)
.getOrElse(Map.empty[String, String])
Expand Down

0 comments on commit 29b2afc

Please sign in to comment.