Skip to content

Commit

Permalink
[query] Avoid Class A Operations. (#13885)
Browse files Browse the repository at this point in the history
CHANGELOG: Hail Query-on-Batch previously used Class A Operations for
all interaction with blobs. This change ensures that QoB only uses Class
A Operations when necessary.

Inspired by @jigold 's file system improvement campaign, I pursued the
avoidance of "list" operations. I anticipate this reduces flakiness in
Azure (which is tracked in #13351) and cost in Azure.

I enforced aiotools.fs terminology on hail.fs and Scala:

1. `FileStatus`. Metadata about a blob or file. It does not know if a
directory exists at this path.

2. `FileListEntry`. Metadata from a list operation. It knows if a
directory exists at this path.

Variable names were updated to reflect this distinction:

1. `fileStatus` / `fileStatuses`

2. `fle`/ `fles` / `fileListEntry` / `fileListEntries`, respectively.

`listStatus` renamed to `listDirectory` for clarity.

In both Azure and Google, `fileStatus` does not use a list operation.

`fileListEntry` can be used when we must know if a directory exists. I
just rewrote this from first principles because:
1. In neither Google nor Azure did it check if the path was a directory
and a file.
2. In Google, if the directory entry wasn't in the first page, it would
fail (NB: there are fifteen non-control characters in ASCII before `/`,
if the page size is 15 or fewer, we'd miss the first entry with a `/` at
the end).
3. In Azure, we issued both a get and a list.

There are now unit tests for this method.

---

1. `copyMerge` and `concatenateFiles` previously used `O(N_FILES)` list
operations, they now use `O(N_FILES)` get operations.
2. Writers that used `exists` to check for a _SUCCESS file now use a get
operation.
3. Index readers, import BGEN, and import plink all now check file size
with a get operation.

That said, overall, the bulk of our Class A Operations are probably
writes.
  • Loading branch information
danking committed Jan 9, 2024
1 parent 0cdf992 commit 3ddbf5f
Show file tree
Hide file tree
Showing 30 changed files with 619 additions and 232 deletions.
26 changes: 23 additions & 3 deletions hail/python/hail/fs/hadoop_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@
import dateutil.parser

from hailtop.fs.fs import FS
from hailtop.fs.stat_result import FileType, FileListEntry
from hailtop.fs.stat_result import FileType, FileListEntry, FileStatus


def _file_status_scala_to_python(file_status: Dict[str, Any]) -> FileStatus:
dt = dateutil.parser.isoparse(file_status['modification_time'])
mtime = time.mktime(dt.timetuple())
return FileStatus(
path=file_status['path'], owner=file_status['owner'], size=file_status['size'], modification_time=mtime
)


def _file_list_entry_scala_to_python(file_list_entry: Dict[str, Any]) -> FileListEntry:
Expand Down Expand Up @@ -69,9 +77,21 @@ def is_file(self, path: str) -> bool:
def is_dir(self, path: str) -> bool:
return self._jfs.isDir(path)

def fast_stat(self, path: str) -> FileStatus:
'''Get information about a path other than its file/directory status.
In the cloud, determining if a given path is a file, a directory, or both is expensive. This
method simply returns file metadata if there is a file at this path. If there is no file at
this path, this operation will fail. The presence or absence of a directory at this path
does not affect the behaviors of this method.
'''
file_status_dict = json.loads(self._utils_package_object.fileStatus(self._jfs, path))
return _file_status_scala_to_python(file_status_dict)

def stat(self, path: str) -> FileListEntry:
stat_dict = json.loads(self._utils_package_object.fileListEntry(self._jfs, path))
return _file_list_entry_scala_to_python(stat_dict)
file_list_entry_dict = json.loads(self._utils_package_object.fileListEntry(self._jfs, path))
return _file_list_entry_scala_to_python(file_list_entry_dict)

def ls(self, path: str) -> List[FileListEntry]:
return [
Expand Down
17 changes: 17 additions & 0 deletions hail/python/hailtop/fs/stat_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,23 @@ class FileType(Enum):
SYMLINK = auto()


class FileStatus(NamedTuple):
path: str
owner: Union[None, str, int]
size: int
# common point between unix, google, and hadoop filesystems, represented as a unix timestamp
modification_time: Optional[float]

def to_legacy_dict(self) -> Dict[str, Any]:
return {
'path': self.path,
'owner': self.owner,
'size_bytes': self.size,
'size': filesize(self.size),
'modification_time': self.modification_time,
}


class FileListEntry(NamedTuple):
path: str
owner: Union[None, str, int]
Expand Down
8 changes: 6 additions & 2 deletions hail/python/test/hail/methods/test_impex.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import re
import os
import pytest
import shutil
Expand Down Expand Up @@ -1511,14 +1512,17 @@ def test_old_index_file_throws_error(self):

with hl.TemporaryFilename() as f:
hl.current_backend().fs.copy(bgen_file, f)
with pytest.raises(FatalError, match='have no .idx2 index file'):

expected_missing_idx2_error_message = re.compile(f'have no .idx2 index file.*{f}.*', re.DOTALL)

with pytest.raises(FatalError, match=expected_missing_idx2_error_message):
hl.import_bgen(f, ['GT', 'GP'], sample_file, n_partitions=3)

try:
with hl.current_backend().fs.open(f + '.idx', 'wb') as fobj:
fobj.write(b'')

with pytest.raises(FatalError, match='have no .idx2 index file'):
with pytest.raises(FatalError, match=expected_missing_idx2_error_message):
hl.import_bgen(f, ['GT', 'GP'], sample_file)
finally:
hl.current_backend().fs.remove(f + '.idx')
Expand Down
2 changes: 1 addition & 1 deletion hail/python/test/hail/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def test_hadoop_ls_file_that_does_not_exist(self):
except FileNotFoundError:
pass
except FatalError as err:
assert 'FileNotFoundException: a_file_that_does_not_exist' in err.args[0]
assert 'FileNotFoundException: file:/io/a_file_that_does_not_exist' in err.args[0]
else:
assert False

Expand Down
25 changes: 16 additions & 9 deletions hail/src/main/scala/is/hail/expr/ir/AbstractMatrixTableSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.json4s._
import org.json4s.jackson.JsonMethods
import org.json4s.jackson.JsonMethods.parse

import java.io.OutputStreamWriter
import java.io.{FileNotFoundException, OutputStreamWriter}
import scala.collection.mutable
import scala.language.{existentials, implicitConversions}

Expand All @@ -36,15 +36,22 @@ object RelationalSpec {
new MatrixTypeSerializer

def readMetadata(fs: FS, path: String): JValue = {
if (!fs.isDir(path)) {
if (!fs.exists(path)) {
fatal(s"No file or directory found at $path")
} else {
fatal(s"MatrixTable and Table files are directories; path '$path' is not a directory")
}
}
val metadataFile = path + "/metadata.json.gz"
val jv = using(fs.open(metadataFile))(in => parse(in))
val jv =
try
using(fs.open(metadataFile))(in => parse(in))
catch {
case exc: FileNotFoundException =>
if (fs.isFile(path)) {
fatal(s"MatrixTable and Table files are directories; path '$path' is a file.")
} else {
if (fs.isDir(path)) {
fatal(s"MatrixTable is corrupted: $path/metadata.json.gz is missing.")
} else {
fatal(s"No file or directory found at $path.")
}
}
}

val fileVersion = jv \ "file_version" match {
case JInt(rep) => SemanticVersion(rep.toInt)
Expand Down
12 changes: 7 additions & 5 deletions hail/src/main/scala/is/hail/expr/ir/GenericLines.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package is.hail.expr.ir

import is.hail.backend.spark.SparkBackend
import is.hail.io.compress.BGzipInputStream
import is.hail.io.fs.{BGZipCompressionCodec, FileListEntry, FS, Positioned, PositionedInputStream}
import is.hail.io.fs.{
BGZipCompressionCodec, FileListEntry, FileStatus, FS, Positioned, PositionedInputStream,
}
import is.hail.io.tabix.{TabixLineIterator, TabixReader}
import is.hail.types.virtual.{TBoolean, TInt32, TInt64, TString, TStruct, Type}
import is.hail.utils._
Expand Down Expand Up @@ -258,16 +260,16 @@ object GenericLines {

def read(
fs: FS,
fileListEntries0: IndexedSeq[FileListEntry],
fileStatuses0: IndexedSeq[_ <: FileStatus],
nPartitions: Option[Int],
blockSizeInMB: Option[Int],
minPartitions: Option[Int],
gzAsBGZ: Boolean,
allowSerialRead: Boolean,
filePerPartition: Boolean = false,
): GenericLines = {
val fileListEntries = fileListEntries0.zipWithIndex.filter(_._1.getLen > 0)
val totalSize = fileListEntries.map(_._1.getLen).sum
val fileStatuses = fileStatuses0.zipWithIndex.filter(_._1.getLen > 0)
val totalSize = fileStatuses.map(_._1.getLen).sum

var totalPartitions = nPartitions match {
case Some(nPartitions) => nPartitions
Expand All @@ -282,7 +284,7 @@ object GenericLines {
case None =>
}

val contexts = fileListEntries.flatMap { case (fileListEntry, fileNum) =>
val contexts = fileStatuses.flatMap { case (fileListEntry, fileNum) =>
val size = fileListEntry.getLen
val codec = fs.getCodecFromPath(fileListEntry.getPath, gzAsBGZ)

Expand Down
4 changes: 3 additions & 1 deletion hail/src/main/scala/is/hail/expr/ir/StringTableReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import is.hail.expr.ir.lowering.{LowererUnsupportedOperation, TableStage, TableS
import is.hail.expr.ir.streams.StreamProducer
import is.hail.io.fs.{FileListEntry, FS}
import is.hail.rvd.RVDPartitioner
import is.hail.types.{BaseTypeWithRequiredness, RStruct, TableType, VirtualTypeWithReq}
import is.hail.types.{
BaseTypeWithRequiredness, RStruct, TableType, TypeWithRequiredness, VirtualTypeWithReq,
}
import is.hail.types.physical._
import is.hail.types.physical.stypes.EmitType
import is.hail.types.physical.stypes.concrete.{SJavaString, SStackStruct, SStackStructValue}
Expand Down
2 changes: 1 addition & 1 deletion hail/src/main/scala/is/hail/expr/ir/TableIR.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object TableIR {
def read(fs: FS, path: String, dropRows: Boolean = false, requestedType: Option[TableType] = None)
: TableRead = {
val successFile = path + "/_SUCCESS"
if (!fs.exists(path + "/_SUCCESS"))
if (!fs.isFile(path + "/_SUCCESS"))
fatal(s"write failed: file not found: $successFile")

val tr = TableNativeReader.read(fs, path, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ case object SemanticHash extends Logging {
case Some(etag) =>
etag.getBytes
case None =>
path.getBytes ++ Bytes.fromLong(fs.fileListEntry(path).getModificationTime)
path.getBytes ++ Bytes.fromLong(fs.fileStatus(path).getModificationTime)
}

def levelOrder(root: BaseIR): Iterator[(BaseIR, Int)] = {
Expand Down
53 changes: 30 additions & 23 deletions hail/src/main/scala/is/hail/io/bgen/LoadBgen.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,11 @@ object LoadBgen {
badFiles += file

matches.flatMap { fileListEntry =>
val file = fileListEntry.getPath.toString
val file = fileListEntry.getPath
if (!file.endsWith(".bgen"))
warn(s"input file does not have .bgen extension: $file")

if (fs.isDir(file))
if (fileListEntry.isDirectory)
fs.listDirectory(file)
.filter(fileListEntry =>
".*part-[0-9]+(-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})?".r.matches(
Expand All @@ -193,22 +193,25 @@ object LoadBgen {
def getAllFilePaths(fs: FS, files: Array[String]): Array[String] =
getAllFileListEntries(fs, files).map(_.getPath.toString)

def getBgenFileMetadata(ctx: ExecuteContext, files: Array[String], indexFiles: Array[String])
: Array[BgenFileMetadata] = {
def getBgenFileMetadata(
ctx: ExecuteContext,
files: Array[FileListEntry],
indexFilePaths: Array[String],
): Array[BgenFileMetadata] = {
val fs = ctx.fs
require(files.length == indexFiles.length)
val headers = getFileHeaders(fs, files)
require(files.length == indexFilePaths.length)
val headers = getFileHeaders(fs, files.map(_.getPath))

val cacheByRG: mutable.Map[Option[String], (String, Array[Long]) => Array[AnyRef]] =
mutable.Map.empty

headers.zip(indexFiles).map { case (h, indexFile) =>
val (keyType, annotationType) = IndexReader.readTypes(fs, indexFile)
headers.zip(indexFilePaths).map { case (h, indexFilePath) =>
val (keyType, annotationType) = IndexReader.readTypes(fs, indexFilePath)
val rg = keyType.asInstanceOf[TStruct].field("locus").typ match {
case TLocus(rg) => Some(rg)
case _ => None
}
val metadata = IndexReader.readMetadata(fs, indexFile, keyType, annotationType)
val metadata = IndexReader.readMetadata(fs, indexFilePath, keyType, annotationType)
val indexVersion = SemanticVersion(metadata.fileVersion)
val (leafSpec, internalSpec) = BgenSettings.indexCodecSpecs(indexVersion, rg)

Expand All @@ -226,12 +229,12 @@ object LoadBgen {
val nVariants = metadata.nKeys

val rangeBounds = if (nVariants > 0) {
val Array(start, end) = getKeys(indexFile, Array[Long](0L, nVariants - 1))
val Array(start, end) = getKeys(indexFilePath, Array[Long](0L, nVariants - 1))
Interval(start, end, includesStart = true, includesEnd = true)
} else null

BgenFileMetadata(
indexFile,
indexFilePath,
indexVersion,
h,
rg,
Expand All @@ -245,9 +248,9 @@ object LoadBgen {
}
}

def getIndexFileNames(fs: FS, files: Array[String], indexFileMap: Map[String, String])
def getIndexFileNames(fs: FS, files: Array[FileListEntry], indexFileMap: Map[String, String])
: Array[String] = {
def absolutePath(rel: String): String = fs.fileListEntry(rel).getPath.toString
def absolutePath(rel: String): String = fs.fileStatus(rel).getPath

val fileMapping = Option(indexFileMap)
.getOrElse(Map.empty[String, String])
Expand All @@ -260,19 +263,23 @@ object LoadBgen {
| ${badExtensions.mkString("\n ")})""".stripMargin
)

files.map(absolutePath).map(f => fileMapping.getOrElse(f, f + ".idx2"))
files.map(f => fileMapping.getOrElse(f.getPath, f.getPath + ".idx2"))
}

def getIndexFiles(fs: FS, files: Array[String], indexFileMap: Map[String, String])
def getIndexFiles(fs: FS, files: Array[FileListEntry], indexFileMap: Map[String, String])
: Array[String] = {
val indexFiles = getIndexFileNames(fs, files, indexFileMap)
val missingIdxFiles = files.zip(indexFiles).filterNot { case (f, index) =>
fs.exists(index) && index.endsWith("idx2")
}.map(_._1)
if (missingIdxFiles.nonEmpty)

val bgenFilesWhichAreMisssingIdx2Files = files.zip(indexFiles).filterNot {
case (f, index) => index.endsWith("idx2") && fs.isFile(index + "/index") && fs.isFile(
index + "/metadata.json.gz"
)
}.map(_._1.getPath)

if (bgenFilesWhichAreMisssingIdx2Files.nonEmpty)
fatal(
s"""The following BGEN files have no .idx2 index file. Use 'index_bgen' to create the index file once before calling 'import_bgen':
| ${missingIdxFiles.mkString("\n ")})""".stripMargin
| ${bgenFilesWhichAreMisssingIdx2Files.mkString("\n ")}""".stripMargin
)
indexFiles
}
Expand Down Expand Up @@ -376,9 +383,9 @@ object MatrixBGENReader {
def apply(ctx: ExecuteContext, params: MatrixBGENReaderParameters): MatrixBGENReader = {
val fs = ctx.fs

val allFiles = LoadBgen.getAllFilePaths(fs, params.files.toArray)
val indexFiles = LoadBgen.getIndexFiles(fs, allFiles, params.indexFileMap)
val fileMetadata = LoadBgen.getBgenFileMetadata(ctx, allFiles, indexFiles)
val allFiles = LoadBgen.getAllFileListEntries(fs, params.files.toArray)
val indexFilePaths = LoadBgen.getIndexFiles(fs, allFiles, params.indexFileMap)
val fileMetadata = LoadBgen.getBgenFileMetadata(ctx, allFiles, indexFilePaths)
assert(fileMetadata.nonEmpty)
if (fileMetadata.exists(md => md.indexVersion != fileMetadata.head.indexVersion)) {
fatal(
Expand Down

0 comments on commit 3ddbf5f

Please sign in to comment.