From 83b569480b6ff18322a45039deef9da57408f067 Mon Sep 17 00:00:00 2001 From: Alex Kotlar Date: Thu, 20 Jun 2019 10:21:59 -0400 Subject: [PATCH 1/5] first pass --- hail/src/main/scala/is/hail/methods/IBD.scala | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/hail/src/main/scala/is/hail/methods/IBD.scala b/hail/src/main/scala/is/hail/methods/IBD.scala index 68e2630e89e..678fd1fa50f 100644 --- a/hail/src/main/scala/is/hail/methods/IBD.scala +++ b/hail/src/main/scala/is/hail/methods/IBD.scala @@ -3,7 +3,7 @@ package is.hail.methods import is.hail.HailContext import is.hail.annotations._ import is.hail.expr.ir._ -import is.hail.expr.types.physical.PString +import is.hail.expr.types.physical.{PFloat64, PInt64, PString, PStruct} import is.hail.expr.types.virtual.{TFloat64, TInt64, TString, TStruct} import is.hail.rvd.RVDContext import is.hail.sparkextras.ContextRDD @@ -20,10 +20,8 @@ object IBDInfo { IBDInfo(Z0, Z1, Z2, Z1 / 2 + Z2) } - val signature = - TStruct(("Z0", TFloat64()), ("Z1", TFloat64()), ("Z2", TFloat64()), ("PI_HAT", TFloat64())) - - private val pType = signature.physicalType + val pType = + PStruct(("Z0", PFloat64()), ("Z1", PFloat64()), ("Z2", PFloat64()), ("PI_HAT", PFloat64())) def fromRegionValue(rv: RegionValue): IBDInfo = fromRegionValue(rv.region, rv.offset) @@ -54,10 +52,8 @@ case class IBDInfo(Z0: Double, Z1: Double, Z2: Double, PI_HAT: Double) { } object ExtendedIBDInfo { - val signature = - TStruct(("ibd", IBDInfo.signature), ("ibs0", TInt64()), ("ibs1", TInt64()), ("ibs2", TInt64())) - - private val pType = signature.physicalType + val pType = + PStruct(("ibd", IBDInfo.pType), ("ibs0", PInt64()), ("ibs1", PInt64()), ("ibs2", PInt64())) def fromRegionValue(rv: RegionValue): ExtendedIBDInfo = fromRegionValue(rv.region, rv.offset) @@ -291,7 +287,7 @@ object IBD { eibd = calculateIBDInfo(ibses(idx * 3), ibses(idx * 3 + 1), ibses(idx * 3 + 2), ibse, bounded) if min.forall(eibd.ibd.PI_HAT >= _) && max.forall(eibd.ibd.PI_HAT <= _) } yield { - rvb.start(ibdSignature.physicalType) + rvb.start(ibdPType) rvb.startStruct() rvb.addString(sampleIds(i)) rvb.addString(sampleIds(j)) @@ -323,17 +319,15 @@ object IBD { val computeMaf = mafFieldName.map(generateComputeMaf(input, _)) val sampleIds = input.stringSampleIds - TableLiteral(TableValue(ibdSignature, FastIndexedSeq("i", "j"), + TableLiteral(TableValue(ibdPType.virtualType, FastIndexedSeq("i", "j"), computeIBDMatrix(input, computeMaf, min, max, sampleIds, bounded))) } - private val ibdSignature = TStruct(("i", TString()), ("j", TString())) ++ ExtendedIBDInfo.signature - - private val ibdPType = ibdSignature.physicalType + private val ibdPType = PStruct(("i", PString()), ("j", PString())) ++ ExtendedIBDInfo.pType def toKeyTable(sc: HailContext, ibdMatrix: RDD[((Annotation, Annotation), ExtendedIBDInfo)]): Table = { val ktRdd = ibdMatrix.map { case ((i, j), eibd) => eibd.makeRow(i, j) } - Table(sc, ktRdd, ibdSignature, FastIndexedSeq("i", "j")) + Table(sc, ktRdd, ibdPType.virtualType, FastIndexedSeq("i", "j")) } def toRDD(tv: TableValue): RDD[((Annotation, Annotation), ExtendedIBDInfo)] = { From a438704b185777dc2c4d68e98acd3151ccc67706 Mon Sep 17 00:00:00 2001 From: Alex Kotlar Date: Thu, 20 Jun 2019 13:21:20 -0400 Subject: [PATCH 2/5] add TableValue constructor that takes PType, and make IBD use that --- hail/src/main/scala/is/hail/expr/ir/TableValue.scala | 9 +++++++++ hail/src/main/scala/is/hail/methods/IBD.scala | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/hail/src/main/scala/is/hail/expr/ir/TableValue.scala b/hail/src/main/scala/is/hail/expr/ir/TableValue.scala index a6e152e7920..3cc3e6ab38c 100644 --- a/hail/src/main/scala/is/hail/expr/ir/TableValue.scala +++ b/hail/src/main/scala/is/hail/expr/ir/TableValue.scala @@ -3,6 +3,7 @@ package is.hail.expr.ir import is.hail.HailContext import is.hail.annotations._ import is.hail.expr.TableAnnotationImpex +import is.hail.expr.types.physical.PStruct import is.hail.expr.types.{MatrixType, TableType} import is.hail.expr.types.virtual.{Field, TArray, TStruct} import is.hail.io.{CodecSpec, exportTypes} @@ -19,6 +20,14 @@ import org.apache.spark.storage.StorageLevel import org.json4s.jackson.JsonMethods object TableValue { + def apply(rowType: PStruct, key: IndexedSeq[String], rdd: ContextRDD[RVDContext, RegionValue]): TableValue = { + Interpret( + TableKeyBy(TableLiteral(TableValue(TableType(rowType.virtualType, FastIndexedSeq(), TStruct.empty()), + BroadcastRow.empty(), + RVD.unkeyed(rowType, rdd))), + key)) + } + def apply(rowType: TStruct, key: IndexedSeq[String], rdd: ContextRDD[RVDContext, RegionValue]): TableValue = { Interpret( TableKeyBy(TableLiteral(TableValue(TableType(rowType, FastIndexedSeq(), TStruct.empty()), diff --git a/hail/src/main/scala/is/hail/methods/IBD.scala b/hail/src/main/scala/is/hail/methods/IBD.scala index 678fd1fa50f..8e114c8cf3f 100644 --- a/hail/src/main/scala/is/hail/methods/IBD.scala +++ b/hail/src/main/scala/is/hail/methods/IBD.scala @@ -319,7 +319,7 @@ object IBD { val computeMaf = mafFieldName.map(generateComputeMaf(input, _)) val sampleIds = input.stringSampleIds - TableLiteral(TableValue(ibdPType.virtualType, FastIndexedSeq("i", "j"), + TableLiteral(TableValue(ibdPType, FastIndexedSeq("i", "j"), computeIBDMatrix(input, computeMaf, min, max, sampleIds, bounded))) } From 79687d10f7f17eb422d20bf46ef3c2a2fab234d6 Mon Sep 17 00:00:00 2001 From: Alex Kotlar Date: Thu, 20 Jun 2019 13:38:37 -0400 Subject: [PATCH 3/5] table constructors --- hail/src/main/scala/is/hail/methods/IBD.scala | 4 +-- hail/src/main/scala/is/hail/table/Table.scala | 26 +++++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/hail/src/main/scala/is/hail/methods/IBD.scala b/hail/src/main/scala/is/hail/methods/IBD.scala index 8e114c8cf3f..b1a4baed75c 100644 --- a/hail/src/main/scala/is/hail/methods/IBD.scala +++ b/hail/src/main/scala/is/hail/methods/IBD.scala @@ -325,9 +325,9 @@ object IBD { private val ibdPType = PStruct(("i", PString()), ("j", PString())) ++ ExtendedIBDInfo.pType - def toKeyTable(sc: HailContext, ibdMatrix: RDD[((Annotation, Annotation), ExtendedIBDInfo)]): Table = { + def toKeyTable(hc: HailContext, ibdMatrix: RDD[((Annotation, Annotation), ExtendedIBDInfo)]): Table = { val ktRdd = ibdMatrix.map { case ((i, j), eibd) => eibd.makeRow(i, j) } - Table(sc, ktRdd, ibdPType.virtualType, FastIndexedSeq("i", "j")) + Table(hc, ktRdd, ibdPType, FastIndexedSeq("i", "j")) } def toRDD(tv: TableValue): RDD[((Annotation, Annotation), ExtendedIBDInfo)] = { diff --git a/hail/src/main/scala/is/hail/table/Table.scala b/hail/src/main/scala/is/hail/table/Table.scala index 20acfbf492f..fcd30cabee6 100644 --- a/hail/src/main/scala/is/hail/table/Table.scala +++ b/hail/src/main/scala/is/hail/table/Table.scala @@ -4,6 +4,7 @@ import is.hail.HailContext import is.hail.annotations._ import is.hail.expr.ir._ import is.hail.expr.types._ +import is.hail.expr.types.physical.PStruct import is.hail.expr.types.virtual._ import is.hail.expr.{ir, _} import is.hail.io.plink.{FamFileConfig, LoadPlink} @@ -148,6 +149,31 @@ object Table { ).keyBy(key, isSorted) } + def apply( + hc: HailContext, + rdd: RDD[Row], + signature: PStruct, + key: IndexedSeq[String] + ): Table = apply(hc, ContextRDD.weaken[RVDContext](rdd), signature, key, TStruct.empty(), Annotation.empty, false) + + def apply( + hc: HailContext, + crdd: ContextRDD[RVDContext, Row], + signature: PStruct, + key: IndexedSeq[String], + globalSignature: TStruct, + globals: Annotation, + isSorted: Boolean + ): Table = { + val crdd2 = crdd.cmapPartitions((ctx, it) => it.toRegionValueIterator(ctx.region, signature)) + new Table(hc, TableLiteral( + TableValue( + TableType(signature.virtualType, FastIndexedSeq(), globalSignature), + BroadcastRow(globals.asInstanceOf[Row], globalSignature, hc.backend), + RVD.unkeyed(signature, crdd2))) + ).keyBy(key, isSorted) + } + def sameWithinTolerance(t: Type, l: Array[Row], r: Array[Row], tolerance: Double, absolute: Boolean): Boolean = { val used = new Array[Boolean](r.length) var i = 0 From 6875d103e07c7aecfbd29a5344c5c590b5bacc86 Mon Sep 17 00:00:00 2001 From: Alex Kotlar Date: Thu, 20 Jun 2019 14:18:25 -0400 Subject: [PATCH 4/5] LoadPlink, use ptypes --- .../hail/annotations/RegionValueBuilder.scala | 2 ++ .../scala/is/hail/expr/types/TableType.scala | 5 ++++- .../scala/is/hail/io/plink/LoadPlink.scala | 19 ++++++++++--------- hail/src/main/scala/is/hail/table/Table.scala | 6 +++--- 4 files changed, 19 insertions(+), 13 deletions(-) diff --git a/hail/src/main/scala/is/hail/annotations/RegionValueBuilder.scala b/hail/src/main/scala/is/hail/annotations/RegionValueBuilder.scala index eead5834dc1..6616f117b64 100644 --- a/hail/src/main/scala/is/hail/annotations/RegionValueBuilder.scala +++ b/hail/src/main/scala/is/hail/annotations/RegionValueBuilder.scala @@ -505,6 +505,8 @@ class RegionValueBuilder(var region: Region) { addRegionValue(t, uis.region, uis.aoff) } + def addAnnotation(t: PType, a: Annotation): Unit = addAnnotation(t.virtualType, a) + def addAnnotation(t: Type, a: Annotation) { if (a == null) setMissing() diff --git a/hail/src/main/scala/is/hail/expr/types/TableType.scala b/hail/src/main/scala/is/hail/expr/types/TableType.scala index f9755260d2a..717dbe3aedf 100644 --- a/hail/src/main/scala/is/hail/expr/types/TableType.scala +++ b/hail/src/main/scala/is/hail/expr/types/TableType.scala @@ -1,6 +1,7 @@ package is.hail.expr.types import is.hail.expr.ir._ +import is.hail.expr.types.physical.{PStruct, PType} import is.hail.expr.types.virtual.{TStruct, Type} import is.hail.rvd.RVDType import is.hail.utils._ @@ -12,7 +13,9 @@ class TableTypeSerializer extends CustomSerializer[TableType](format => ( { case tt: TableType => JString(tt.toString) })) case class TableType(rowType: TStruct, key: IndexedSeq[String], globalType: TStruct) extends BaseType { - lazy val canonicalRVDType = RVDType(rowType.physicalType, key) + lazy val canonicalPType = PType.canonical(rowType).asInstanceOf[PStruct] + lazy val canonicalRVDType = RVDType(canonicalPType, key) + key.foreach {k => if (!rowType.hasField(k)) throw new RuntimeException(s"key field $k not in row type: $rowType") diff --git a/hail/src/main/scala/is/hail/io/plink/LoadPlink.scala b/hail/src/main/scala/is/hail/io/plink/LoadPlink.scala index 79da4e913fe..f1d02032b90 100644 --- a/hail/src/main/scala/is/hail/io/plink/LoadPlink.scala +++ b/hail/src/main/scala/is/hail/io/plink/LoadPlink.scala @@ -4,6 +4,7 @@ import is.hail.HailContext import is.hail.annotations._ import is.hail.expr.ir.{LowerMatrixIR, MatrixHybridReader, MatrixRead, MatrixReader, MatrixValue, PruneDeadFields, TableRead, TableValue} import is.hail.expr.types._ +import is.hail.expr.types.physical.{PBoolean, PFloat64, PString, PStruct} import is.hail.expr.types.virtual._ import is.hail.io.vcf.LoadVCF import is.hail.rvd.{RVD, RVDContext, RVDType} @@ -44,14 +45,14 @@ object LoadPlink { """^-?(?:\d+|\d*\.\d+)(?:[eE]-?\d+)?$""".r def parseFam(filename: String, ffConfig: FamFileConfig, - fs: FS): (IndexedSeq[Row], TStruct) = { + fs: FS): (IndexedSeq[Row], PStruct) = { val delimiter = unescapeString(ffConfig.delimiter) - val phenoSig = if (ffConfig.isQuantPheno) ("quant_pheno", TFloat64()) else ("is_case", TBoolean()) + val phenoSig = if (ffConfig.isQuantPheno) ("quant_pheno", PFloat64()) else ("is_case", PBoolean()) - val signature = TStruct(("id", TString()), ("fam_id", TString()), ("pat_id", TString()), - ("mat_id", TString()), ("is_female", TBoolean()), phenoSig) + val signature = PStruct(("id", PString()), ("fam_id", PString()), ("pat_id", PString()), + ("mat_id", PString()), ("is_female", PBoolean()), phenoSig) val idBuilder = new ArrayBuilder[String] val structBuilder = new ArrayBuilder[Row] @@ -181,7 +182,7 @@ case class MatrixPLINKReader( val fullMatrixType: MatrixType = MatrixType( globalType = TStruct.empty(), colKey = Array("s"), - colType = saSignature, + colType = saSignature.virtualType, rowType = TStruct( "locus" -> TLocus.schemaFromRG(referenceGenome), "alleles" -> TArray(TString()), @@ -210,7 +211,7 @@ case class MatrixPLINKReader( nPartitions.getOrElse(sc.defaultMinPartitions))) val kType = requestedType.canonicalRVDType.kType - val rvRowType = requestedType.rowType + val rvRowType = requestedType.canonicalPType val hasRsid = requestedType.rowType.hasField("rsid") val hasCmPos = requestedType.rowType.hasField("cm_position") @@ -236,7 +237,7 @@ case class MatrixPLINKReader( else { rvb.start(kType) rvb.startStruct() - rvb.addAnnotation(kType.types(0).virtualType, Locus.annotation(contig, pos, rgLocal)) + rvb.addAnnotation(kType.types(0), Locus.annotation(contig, pos, rgLocal)) rvb.startArray(2) rvb.addString(ref) rvb.addString(alt) @@ -260,9 +261,9 @@ case class MatrixPLINKReader( if (skipInvalidLociLocal && !rgLocal.forall(_.isValidLocus(contig, pos))) None else { - rvb.start(rvRowType.physicalType) + rvb.start(rvRowType) rvb.startStruct() - rvb.addAnnotation(kType.types(0).virtualType, Locus.annotation(contig, pos, rgLocal)) + rvb.addAnnotation(kType.types(0), Locus.annotation(contig, pos, rgLocal)) rvb.startArray(2) rvb.addString(ref) rvb.addString(alt) diff --git a/hail/src/main/scala/is/hail/table/Table.scala b/hail/src/main/scala/is/hail/table/Table.scala index fcd30cabee6..d87df7681f1 100644 --- a/hail/src/main/scala/is/hail/table/Table.scala +++ b/hail/src/main/scala/is/hail/table/Table.scala @@ -62,10 +62,10 @@ object Table { delimiter: String = "\\t", missingValue: String = "NA"): String = { val ffConfig = FamFileConfig(isQuantPheno, delimiter, missingValue) - val (data, typ) = LoadPlink.parseFam(path, ffConfig, HailContext.sFS) + val (data, ptyp) = LoadPlink.parseFam(path, ffConfig, HailContext.sFS) val jv = JSONAnnotationImpex.exportAnnotation( - Row(typ.toString, data), - TStruct("type" -> TString(), "data" -> TArray(typ))) + Row(ptyp.toString, data), + TStruct("type" -> TString(), "data" -> TArray(ptyp.virtualType))) JsonMethods.compact(jv) } From 09bb42415676438ace3314fa9076985393eba4a1 Mon Sep 17 00:00:00 2001 From: Alex Kotlar Date: Thu, 20 Jun 2019 14:47:41 -0400 Subject: [PATCH 5/5] fix signature --- .../main/scala/is/hail/annotations/RegionValueBuilder.scala | 2 -- hail/src/main/scala/is/hail/io/plink/LoadPlink.scala | 6 +++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/hail/src/main/scala/is/hail/annotations/RegionValueBuilder.scala b/hail/src/main/scala/is/hail/annotations/RegionValueBuilder.scala index 6616f117b64..eead5834dc1 100644 --- a/hail/src/main/scala/is/hail/annotations/RegionValueBuilder.scala +++ b/hail/src/main/scala/is/hail/annotations/RegionValueBuilder.scala @@ -505,8 +505,6 @@ class RegionValueBuilder(var region: Region) { addRegionValue(t, uis.region, uis.aoff) } - def addAnnotation(t: PType, a: Annotation): Unit = addAnnotation(t.virtualType, a) - def addAnnotation(t: Type, a: Annotation) { if (a == null) setMissing() diff --git a/hail/src/main/scala/is/hail/io/plink/LoadPlink.scala b/hail/src/main/scala/is/hail/io/plink/LoadPlink.scala index f1d02032b90..0713c34231e 100644 --- a/hail/src/main/scala/is/hail/io/plink/LoadPlink.scala +++ b/hail/src/main/scala/is/hail/io/plink/LoadPlink.scala @@ -237,7 +237,7 @@ case class MatrixPLINKReader( else { rvb.start(kType) rvb.startStruct() - rvb.addAnnotation(kType.types(0), Locus.annotation(contig, pos, rgLocal)) + rvb.addAnnotation(kType.types(0).virtualType, Locus.annotation(contig, pos, rgLocal)) rvb.startArray(2) rvb.addString(ref) rvb.addString(alt) @@ -263,13 +263,13 @@ case class MatrixPLINKReader( else { rvb.start(rvRowType) rvb.startStruct() - rvb.addAnnotation(kType.types(0), Locus.annotation(contig, pos, rgLocal)) + rvb.addAnnotation(kType.types(0).virtualType, Locus.annotation(contig, pos, rgLocal)) rvb.startArray(2) rvb.addString(ref) rvb.addString(alt) rvb.endArray() if (hasRsid) - rvb.addAnnotation(rvRowType.types(2), rsid) + rvb.addAnnotation(rvRowType.types(2).virtualType, rsid) if (hasCmPos) rvb.addDouble(cmPos) if (!dropSamples)