Skip to content

Commit

Permalink
[compiler] Execute TableWriter via lowering pipeline (#12879)
Browse files Browse the repository at this point in the history
Execute TableWriter and MatrixWriter via lowering pipeline instead of
spark execution.
Removed support for checkpoint files for now - plan is to implement
something more general purpose somewhat akin to call-caching.


Plot of top 20 affected benchmarks, none of which use writing,
interestingly...

![image](https://user-images.githubusercontent.com/8223952/231855633-84ddbe64-1dba-4e62-bfa0-b9e2b041d588.png)
You can view these results yourself in
[benchmarks.zip](https://github.com/hail-is/hail/files/11225644/benchmarks.zip)
  • Loading branch information
ehigham committed Apr 21, 2023
1 parent cc8d364 commit 53d8052
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 543 deletions.
12 changes: 4 additions & 8 deletions hail/python/hail/ir/matrix_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@ class MatrixNativeWriter(MatrixWriter):
stage_locally=bool,
codec_spec=nullable(str),
partitions=nullable(str),
partitions_type=nullable(hail_type),
checkpoint_file=nullable(str))
def __init__(self, path, overwrite, stage_locally, codec_spec, partitions, partitions_type, checkpoint_file):
partitions_type=nullable(hail_type))
def __init__(self, path, overwrite, stage_locally, codec_spec, partitions, partitions_type):
self.path = path
self.overwrite = overwrite
self.stage_locally = stage_locally
self.codec_spec = codec_spec
self.partitions = partitions
self.partitions_type = partitions_type
self.checkpoint_file = checkpoint_file

def render(self):
writer = {'name': 'MatrixNativeWriter',
Expand All @@ -40,8 +38,7 @@ def render(self):
'stageLocally': self.stage_locally,
'codecSpecJSONStr': self.codec_spec,
'partitions': self.partitions,
'partitionsTypeStr': self.partitions_type._parsable_string() if self.partitions_type is not None else None,
'checkpointFile': self.checkpoint_file
'partitionsTypeStr': self.partitions_type._parsable_string() if self.partitions_type is not None else None
}
return escape_str(json.dumps(writer))

Expand All @@ -52,8 +49,7 @@ def __eq__(self, other):
other.stage_locally == self.stage_locally and \
other.codec_spec == self.codec_spec and \
other.partitions == self.partitions and \
other.partitions_type == self.partitions_type and \
other.checkpoint_file == self.checkpoint_file
other.partitions_type == self.partitions_type


class MatrixVCFWriter(MatrixWriter):
Expand Down
7 changes: 3 additions & 4 deletions hail/python/hail/matrixtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -2701,10 +2701,9 @@ def checkpoint(self, output: str, overwrite: bool = False, stage_locally: bool =
overwrite=bool,
stage_locally=bool,
_codec_spec=nullable(str),
_partitions=nullable(expr_any),
_checkpoint_file=nullable(str))
_partitions=nullable(expr_any))
def write(self, output: str, overwrite: bool = False, stage_locally: bool = False,
_codec_spec: Optional[str] = None, _partitions=None, _checkpoint_file=None):
_codec_spec: Optional[str] = None, _partitions=None):
"""Write to disk.
Examples
Expand Down Expand Up @@ -2736,7 +2735,7 @@ def write(self, output: str, overwrite: bool = False, stage_locally: bool = Fals
else:
_partitions_type = None

writer = ir.MatrixNativeWriter(output, overwrite, stage_locally, _codec_spec, _partitions, _partitions_type, _checkpoint_file)
writer = ir.MatrixNativeWriter(output, overwrite, stage_locally, _codec_spec, _partitions, _partitions_type)
Env.backend().execute(ir.MatrixWrite(self._mir, writer))

class _Show:
Expand Down
9 changes: 0 additions & 9 deletions hail/python/test/hail/matrixtable/test_matrix_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1295,15 +1295,6 @@ def test_write_stage_locally(self):
mt2 = hl.read_matrix_table(f)
self.assertTrue(mt._same(mt2))

def test_write_checkpoint_file(self):
mt = self.get_mt()
f = new_temp_file(extension='mt')
cp = new_temp_file()
mt.write(f, _checkpoint_file=cp)

mt2 = hl.read_matrix_table(f)
self.assertTrue(mt._same(mt2))

def test_write_no_parts(self):
mt = hl.utils.range_matrix_table(10, 10, 2).filter_rows(False)
path = new_temp_file(extension='mt')
Expand Down
5 changes: 2 additions & 3 deletions hail/python/test/hail/test_ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,10 @@ def aggregate(x):
ir.TableWrite(table, ir.TableNativeWriter(new_temp_file(), False, True, "fake_codec_spec$$")),
ir.TableWrite(table, ir.TableTextWriter(new_temp_file(), None, True, "concatenated", ",")),
ir.MatrixAggregate(matrix_read, ir.MakeStruct([('foo', ir.ApplyAggOp('Collect', [], [ir.I32(0)]))])),
ir.MatrixWrite(matrix_read, ir.MatrixNativeWriter(new_temp_file(), False, False, "", None, None, None)),
ir.MatrixWrite(matrix_read, ir.MatrixNativeWriter(new_temp_file(), False, False, "", None, None)),
ir.MatrixWrite(matrix_read, ir.MatrixNativeWriter(new_temp_file(), False, False, "",
'[{"start":{"row_idx":0},"end":{"row_idx": 10},"includeStart":true,"includeEnd":false}]',
hl.dtype('array<interval<struct{row_idx:int32}>>'),
'some_file')),
hl.dtype('array<interval<struct{row_idx:int32}>>'))),
ir.MatrixWrite(matrix_read, ir.MatrixVCFWriter(new_temp_file(), None, ir.ExportType.CONCATENATED, None, False)),
ir.MatrixWrite(matrix_read, ir.MatrixGENWriter(new_temp_file(), 4)),
ir.MatrixWrite(matrix_read, ir.MatrixPLINKWriter(new_temp_file())),
Expand Down
69 changes: 3 additions & 66 deletions hail/src/main/scala/is/hail/expr/ir/MatrixValue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,16 @@ package is.hail.expr.ir
import is.hail.HailContext
import is.hail.annotations._
import is.hail.backend.{ExecuteContext, HailStateManager}
import is.hail.backend.spark.SparkBackend
import is.hail.expr.JSONAnnotationImpex
import is.hail.io.{BufferSpec, FileWriteMetadata}
import is.hail.linalg.RowMatrix
import is.hail.rvd.{AbstractRVDSpec, RVD}
import is.hail.types.physical.{PArray, PCanonicalStruct, PStruct, PType}
import is.hail.types.virtual._
import is.hail.types.{MatrixType, TableType}
import is.hail.io.{BufferSpec, FileWriteMetadata, MatrixWriteCheckpoint}
import is.hail.io.fs.FS
import is.hail.linalg.RowMatrix
import is.hail.rvd.{AbstractRVDSpec, RVD, _}
import is.hail.sparkextras.ContextRDD
import is.hail.utils._
import is.hail.variant._
import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.Row
import org.json4s.jackson.JsonMethods
import org.json4s.jackson.JsonMethods.parse

case class MatrixValue(
typ: MatrixType,
Expand Down Expand Up @@ -83,9 +76,6 @@ case class MatrixValue(
s"\n @1", dups.sortBy(-_._2).map { case (id, count) => s"""($count) "$id"""" }.truncatable("\n "))
}

def colsTableValue(ctx: ExecuteContext): TableValue =
TableValue(ctx, typ.colsTableType, globals, colsRVD(ctx))

private def writeCols(ctx: ExecuteContext, path: String, bufferSpec: BufferSpec): Long = {
val fs = ctx.fs
val fileData = AbstractRVDSpec.writeSingle(ctx, path + "/rows", colValues.t.elementType.asInstanceOf[PStruct], bufferSpec, colValues.javaValue)
Expand Down Expand Up @@ -216,59 +206,6 @@ case class MatrixValue(
s"\n * Largest partition: $largestStr")
}

def write(ctx: ExecuteContext,
path: String,
overwrite: Boolean,
stageLocally: Boolean,
codecSpecJSON: String,
partitions: String,
partitionsTypeStr: String,
checkpointFile: String): Unit = {
assert(typ.isCanonical)
val fs = ctx.fs

val bufferSpec = BufferSpec.parseOrDefault(codecSpecJSON)

if (overwrite) {
if (checkpointFile != null)
fatal(s"cannot currently use a checkpoint file with overwrite=True")
fs.delete(path, recursive = true)
} else if (fs.exists(path))
if (checkpointFile == null || fs.exists(path + "/_SUCCESS"))
fatal(s"file already exists: $path")

fs.mkDir(path)

val targetPartitioner =
if (partitions != null) {
if (checkpointFile != null)
fatal(s"cannot currently use a checkpoint file with `partitions` argument")
val partitionsType = IRParser.parseType(partitionsTypeStr)
val jv = JsonMethods.parse(partitions)
val rangeBounds = JSONAnnotationImpex.importAnnotation(jv, partitionsType)
.asInstanceOf[IndexedSeq[Interval]]
new RVDPartitioner(ctx.stateManager, typ.rowKey.toArray, typ.rowKeyStruct, rangeBounds)
} else
null

val checkpoint = Option(checkpointFile).map(path => MatrixWriteCheckpoint.read(fs, path, path, rvd.getNumPartitions))

val fileData = rvd.writeRowsSplit(ctx, path, bufferSpec, stageLocally, targetPartitioner, checkpoint)

finalizeWrite(ctx, path, bufferSpec, fileData, consoleInfo = true)
}

def colsRVD(ctx: ExecuteContext): RVD = {
// only used in exportPlink
assert(typ.colKey.isEmpty)
val colPType = PType.canonical(typ.colType).setRequired(true).asInstanceOf[PStruct]

RVD.coerce(ctx,
typ.colsTableType.canonicalRVDType,
ContextRDD.parallelize(colValues.safeJavaValue)
.cmapPartitions { (ctx, it) => it.copyToRegion(ctx.region, colPType) })
}

def toRowMatrix(entryField: String): RowMatrix = {
val partCounts: Array[Long] = rvd.countPerPartition()
val partStarts = partCounts.scanLeft(0L)(_ + _)
Expand Down
71 changes: 12 additions & 59 deletions hail/src/main/scala/is/hail/expr/ir/MatrixWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,24 @@ case class WrappedMatrixWriter(writer: MatrixWriter,
entriesFieldName: String,
colKey: IndexedSeq[String]) extends TableWriter {
def path: String = writer.path
def apply(ctx: ExecuteContext, tv: TableValue): Unit = writer(ctx, tv.toMatrixValue(colKey, colsFieldName, entriesFieldName))

override def lower(ctx: ExecuteContext, ts: TableStage, r: RTable): IR =
writer.lower(colsFieldName, entriesFieldName, colKey, ctx, ts, r)

override def canLowerEfficiently: Boolean = writer.canLowerEfficiently
}

abstract class MatrixWriter {
def path: String
def apply(ctx: ExecuteContext, mv: MatrixValue): Unit
def lower(colsFieldName: String, entriesFieldName: String, colKey: IndexedSeq[String],
ctx: ExecuteContext, ts: TableStage, r: RTable): IR =
throw new LowererUnsupportedOperation(s"${ this.getClass } does not have defined lowering!")

def canLowerEfficiently: Boolean
def apply(ctx: ExecuteContext, mv: MatrixValue): Unit = {
val tv = mv.toTableValue
val ts = TableExecuteIntermediate(tv).asTableStage(ctx)
CompileAndEvaluate(ctx, lower(LowerMatrixIR.colsFieldName, MatrixType.entriesIdentifier,
mv.typ.colKey, ctx, ts, BaseTypeWithRequiredness(tv.typ).asInstanceOf[RTable]
))
}

def lower(colsFieldName: String, entriesFieldName: String, colKey: IndexedSeq[String],
ctx: ExecuteContext, ts: TableStage, r: RTable): IR
}

case class MatrixNativeWriter(
Expand All @@ -73,12 +76,8 @@ case class MatrixNativeWriter(
stageLocally: Boolean = false,
codecSpecJSONStr: String = null,
partitions: String = null,
partitionsTypeStr: String = null,
checkpointFile: String = null
partitionsTypeStr: String = null
) extends MatrixWriter {
def apply(ctx: ExecuteContext, mv: MatrixValue): Unit = mv.write(ctx, path, overwrite, stageLocally, codecSpecJSONStr, partitions, partitionsTypeStr, checkpointFile)

def canLowerEfficiently: Boolean = !stageLocally && checkpointFile == null

override def lower(colsFieldName: String, entriesFieldName: String, colKey: IndexedSeq[String],
ctx: ExecuteContext, tablestage: TableStage, r: RTable): IR = {
Expand All @@ -95,10 +94,6 @@ case class MatrixNativeWriter(
tablestage.repartitionNoShuffle(ctx, new RVDPartitioner(ctx.stateManager, tm.rowKey.toArray, tm.rowKeyStruct, rangeBounds))
} else tablestage

if (checkpointFile != null) {
warn(s"lowered execution does not support checkpoint files")
}

val rowSpec = TypedCodecSpec(EType.fromTypeAndAnalysis(tm.rowType, rm.rowType), tm.rowType, bufferSpec)
val entrySpec = TypedCodecSpec(EType.fromTypeAndAnalysis(tm.entriesRVType, rm.entriesRVType), tm.entriesRVType, bufferSpec)
val colSpec = TypedCodecSpec(EType.fromTypeAndAnalysis(tm.colType, rm.colType), tm.colType, bufferSpec)
Expand Down Expand Up @@ -423,16 +418,6 @@ case class MatrixVCFWriter(
metadata: Option[VCFMetadata] = None,
tabix: Boolean = false
) extends MatrixWriter {
def apply(ctx: ExecuteContext, mv: MatrixValue): Unit = {
val tv = mv.toTableValue
val ts = TableExecuteIntermediate(tv).asTableStage(ctx)
CompileAndEvaluate(ctx,
lower(LowerMatrixIR.colsFieldName, MatrixType.entriesIdentifier, mv.typ.colKey,
ctx, ts, BaseTypeWithRequiredness(tv.typ).asInstanceOf[RTable]))
}

def canLowerEfficiently: Boolean = true

override def lower(colsFieldName: String, entriesFieldName: String, colKey: IndexedSeq[String],
ctx: ExecuteContext, ts: TableStage, r: RTable): IR = {
require(exportType != ExportType.PARALLEL_COMPOSABLE)
Expand Down Expand Up @@ -879,15 +864,6 @@ case class MatrixGENWriter(
path: String,
precision: Int = 4
) extends MatrixWriter {
def apply(ctx: ExecuteContext, mv: MatrixValue): Unit = {
val tv = mv.toTableValue
val ts = TableExecuteIntermediate(tv).asTableStage(ctx)
CompileAndEvaluate(ctx,
lower(LowerMatrixIR.colsFieldName, MatrixType.entriesIdentifier, mv.typ.colKey,
ctx, ts, BaseTypeWithRequiredness(tv.typ).asInstanceOf[RTable]))
}

def canLowerEfficiently: Boolean = true

override def lower(colsFieldName: String, entriesFieldName: String, colKey: IndexedSeq[String],
ctx: ExecuteContext, ts: TableStage, r: RTable): IR = {
Expand Down Expand Up @@ -1000,16 +976,6 @@ case class MatrixBGENWriter(
exportType: String,
compressionCodec: String
) extends MatrixWriter {
def apply(ctx: ExecuteContext, mv: MatrixValue): Unit = {
val tv = mv.toTableValue
val ts = TableExecuteIntermediate(tv).asTableStage(ctx)
CompileAndEvaluate(ctx,
lower(LowerMatrixIR.colsFieldName, MatrixType.entriesIdentifier, mv.typ.colKey,
ctx, ts, BaseTypeWithRequiredness(tv.typ).asInstanceOf[RTable])
)
}

def canLowerEfficiently: Boolean = true

override def lower(colsFieldName: String, entriesFieldName: String, colKey: IndexedSeq[String],
ctx: ExecuteContext, ts: TableStage, r: RTable): IR = {
Expand Down Expand Up @@ -1279,16 +1245,6 @@ case class BGENExportFinalizer(typ: MatrixType, path: String, exportType: String
case class MatrixPLINKWriter(
path: String
) extends MatrixWriter {
def apply(ctx: ExecuteContext, mv: MatrixValue): Unit = {
val tv = mv.toTableValue
val ts = TableExecuteIntermediate(tv).asTableStage(ctx)
CompileAndEvaluate(ctx,
lower(LowerMatrixIR.colsFieldName, MatrixType.entriesIdentifier, mv.typ.colKey,
ctx, ts, BaseTypeWithRequiredness(tv.typ).asInstanceOf[RTable])
)
}

def canLowerEfficiently: Boolean = true

override def lower(colsFieldName: String, entriesFieldName: String, colKey: IndexedSeq[String],
ctx: ExecuteContext, ts: TableStage, r: RTable): IR = {
Expand Down Expand Up @@ -1446,9 +1402,6 @@ case class MatrixBlockMatrixWriter(
entryField: String,
blockSize: Int
) extends MatrixWriter {
def apply(ctx: ExecuteContext, mv: MatrixValue): Unit = MatrixWriteBlockMatrix(ctx, mv, entryField, path, overwrite, blockSize)

def canLowerEfficiently: Boolean = true

override def lower(colsFieldName: String, entriesFieldName: String, colKey: IndexedSeq[String],
ctx: ExecuteContext, ts: TableStage, r: RTable): IR = {
Expand Down
2 changes: 1 addition & 1 deletion hail/src/main/scala/is/hail/expr/ir/TableValue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ sealed trait TableExecuteIntermediate {
def partitioner: RVDPartitioner
}

class TableValueIntermediate(tv: TableValue) extends TableExecuteIntermediate {
case class TableValueIntermediate(tv: TableValue) extends TableExecuteIntermediate {
def asTableStage(ctx: ExecuteContext): TableStage = {
RVDToTableStage(tv.rvd, tv.globals.toEncodedLiteral(ctx.theHailClassLoader))
}
Expand Down

0 comments on commit 53d8052

Please sign in to comment.