Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add hive style partitioning for contigName #1620

Closed
wants to merge 10 commits into from
176 changes: 118 additions & 58 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,10 @@
package org.bdgenomics.adam.rdd

import java.io.{ File, FileNotFoundException, InputStream }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove this whitespace?

import htsjdk.samtools.{ SAMFileHeader, SAMProgramRecord, ValidationStringency }
import htsjdk.samtools.util.Locatable
import htsjdk.variant.vcf.{
VCFHeader,
VCFCompoundHeaderLine,
VCFFormatHeaderLine,
VCFHeaderLine,
VCFInfoHeaderLine
}
import htsjdk.variant.vcf.{ VCFCompoundHeaderLine, VCFFormatHeaderLine, VCFHeader, VCFHeaderLine, VCFInfoHeaderLine }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you undo this? We break at 80 chars or 4 items.

import org.apache.avro.Schema
import org.apache.avro.file.DataFileStream
import org.apache.avro.generic.{ GenericDatumReader, GenericRecord, IndexedRecord }
Expand All @@ -42,72 +37,30 @@ import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.spark.SparkContext
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.{ DataFrame, Dataset }
import org.bdgenomics.adam.converters._
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.io._
import org.bdgenomics.adam.models._
import org.bdgenomics.adam.projections.{
FeatureField,
Projection
}
import org.bdgenomics.adam.rdd.contig.{
DatasetBoundNucleotideContigFragmentRDD,
NucleotideContigFragmentRDD,
ParquetUnboundNucleotideContigFragmentRDD,
RDDBoundNucleotideContigFragmentRDD
}
import org.bdgenomics.adam.projections.{ FeatureField, Projection }
import org.bdgenomics.adam.rdd.contig.{ DatasetBoundNucleotideContigFragmentRDD, NucleotideContigFragmentRDD, ParquetUnboundNucleotideContigFragmentRDD, RDDBoundNucleotideContigFragmentRDD }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you break this line? We indent at 80 chars/4 imports.

import org.bdgenomics.adam.rdd.feature._
import org.bdgenomics.adam.rdd.fragment.{
DatasetBoundFragmentRDD,
FragmentRDD,
ParquetUnboundFragmentRDD,
RDDBoundFragmentRDD
}
import org.bdgenomics.adam.rdd.read.{
AlignmentRecordRDD,
DatasetBoundAlignmentRecordRDD,
RepairPartitions,
ParquetUnboundAlignmentRecordRDD,
RDDBoundAlignmentRecordRDD
}
import org.bdgenomics.adam.rdd.fragment.{ DatasetBoundFragmentRDD, FragmentRDD, ParquetUnboundFragmentRDD, RDDBoundFragmentRDD }
import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, DatasetBoundAlignmentRecordRDD, ParquetUnboundAlignmentRecordRDD, RDDBoundAlignmentRecordRDD, RepairPartitions }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you break this line? We indent at 80 chars/4 imports.

import org.bdgenomics.adam.rdd.variant._
import org.bdgenomics.adam.rich.RichAlignmentRecord
import org.bdgenomics.adam.sql.{
AlignmentRecord => AlignmentRecordProduct,
Feature => FeatureProduct,
Fragment => FragmentProduct,
Genotype => GenotypeProduct,
NucleotideContigFragment => NucleotideContigFragmentProduct,
Variant => VariantProduct
}
import org.bdgenomics.adam.sql.{ AlignmentRecord => AlignmentRecordProduct, Feature => FeatureProduct, Fragment => FragmentProduct, Genotype => GenotypeProduct, NucleotideContigFragment => NucleotideContigFragmentProduct, Variant => VariantProduct }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you break this line? We indent at 80 chars/4 imports.

import org.bdgenomics.adam.util.FileExtensions._
import org.bdgenomics.adam.util.{
GenomeFileReader,
ReferenceContigMap,
ReferenceFile,
SequenceDictionaryReader,
TwoBitFile
}
import org.bdgenomics.formats.avro.{
AlignmentRecord,
Contig,
Feature,
Fragment,
Genotype,
NucleotideContigFragment,
ProcessingStep,
RecordGroup => RecordGroupMetadata,
Sample,
Variant
}
import org.bdgenomics.adam.util.{ GenomeFileReader, ReferenceContigMap, ReferenceFile, SequenceDictionaryReader, TwoBitFile }
import org.bdgenomics.formats.avro.{ AlignmentRecord, Contig, Feature, Fragment, Genotype, NucleotideContigFragment, ProcessingStep, Sample, Variant, RecordGroup => RecordGroupMetadata }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you break this line? We indent at 80 chars/4 imports.

import org.bdgenomics.utils.instrumentation.Metrics
import org.bdgenomics.utils.io.LocalFileByteAccess
import org.bdgenomics.utils.misc.{ HadoopUtil, Logging }
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods._
import org.seqdoop.hadoop_bam._
import org.seqdoop.hadoop_bam.util._

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
Expand Down Expand Up @@ -1789,6 +1742,27 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadPartitionedParquetAlignments(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None, partitionSize: Int = 1000000): AlignmentRecordRDD = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think partitionSize is used, no?


require(checkPartitionedParquetFlag(pathName),
"input Parquet files are not Partitioned")

// convert avro to sequence dictionary
val sd = loadAvroSequenceDictionary(pathName)

// convert avro to sequence dictionary
val rgd = loadAvroRecordGroupDictionary(pathName)

val pgs = loadAvroPrograms(pathName)
val reads: AlignmentRecordRDD = ParquetUnboundAlignmentRecordRDD(sc, pathName, sd, rgd, pgs)

val datasetBoundAlignmentRecordRDD: AlignmentRecordRDD = regions match {
case Some(x) => DatasetBoundAlignmentRecordRDD(reads.dataset.filter(referenceRegionsToDatasetQueryString(x)), reads.sequences, reads.recordGroups, reads.processingSteps)
case _ => DatasetBoundAlignmentRecordRDD(reads.dataset, reads.sequences, reads.recordGroups, reads.processingSteps)
}
datasetBoundAlignmentRecordRDD
}

/**
* Load unaligned alignment records from interleaved FASTQ into an AlignmentRecordRDD.
*
Expand Down Expand Up @@ -2105,6 +2079,28 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadPartitionedParquetGenotypes(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None, partitionSize: Int = 1000000): GenotypeRDD = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think partitionSize is used, no?

if (!checkPartitionedParquetFlag(pathName)) {
throw new IllegalArgumentException("input Parquet files are not Partitioned")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace with:

require(!checkPartitionedParquetFlag(pathName),
  "Input Parquet files (%s) are not partitioned.".format(pathName))

}
// load header lines
val headers = loadHeaderLines(pathName)
// load sequence info
val sd = loadAvroSequenceDictionary(pathName)
// load avro record group dictionary and convert to samples
val samples = loadAvroSamples(pathName)

val genotypes = ParquetUnboundGenotypeRDD(sc, pathName, sd, samples, headers)

val datasetBoundGenotypeRDD: GenotypeRDD = regions match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having regions: Option[Iterable[ReferenceRegion]] and running:

regions match {
  case Some(x) => DatasetBoundGenotypeRDD(genotypes.dataset.filter(referenceRegionsToDatasetQueryString(x))
  DatasetBoundGenotypeRDD(genotypes.dataset)
}

Change the type of regions to Iterable[ReferenceRegion] and then apply the region filters by running:

regions.foldLeft(genotypes)(p => p._1.transformDataset(_.filter(referenceRegionsToDatasetQueryString(p._2))))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand you want to use the more elegant fold left over a potentially empty regions list, with default being the unfiltered dataset if regions is empty.

However, I'm not sure that the code you suggest implements the OR logic that is intended for this region filter, as won't the consecutive applications of filter as the list is folded result in AND logic?

I'm incline to leave the clunky regions match code for now if we think it is correct, as it seems to work.

case Some(x) => DatasetBoundGenotypeRDD(genotypes.dataset
.filter(referenceRegionsToDatasetQueryString(x)), genotypes.sequences, genotypes.samples, genotypes.headerLines)
case _ => DatasetBoundGenotypeRDD(genotypes.dataset, genotypes.sequences, genotypes.samples, genotypes.headerLines)
}
datasetBoundGenotypeRDD

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra whitespace.

}

/**
* Load a path name in Parquet + Avro format into a VariantRDD.
*
Expand Down Expand Up @@ -2138,6 +2134,25 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadPartitionedParquetVariants(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None, partitionSize: Int = 1000000): VariantRDD = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any interest in combining this functionality into the existing load functions? It may be nice so the user does not have to know whether the files are partitioned when loading.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at that earlier, using a flag file to detect if the input was partitioned or not, but the trouble is that the existing load functions have a predicate and projection parameter that we don't right now have a way to directly translate into the something spark-sql/dataset can use.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its fine if the predicate/projection parameters can't be translated over. In the main loadParquet* functions, what I would do is:

  • Check for the partitionedParquetFlag
  • If the flag is set, check if predicate/projection are provided. If projection is provided, log a warning. If predicate is provided, throw an exception.
  • If we didn't throw an exception, call loadPartitionedParquet*

if (!checkPartitionedParquetFlag(pathName)) {
throw new IllegalArgumentException("input Parquet files are not Partitioned")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace with:

require(!checkPartitionedParquetFlag(pathName),
  "Input Parquet files (%s) are not partitioned.".format(pathName))

}
val sd = loadAvroSequenceDictionary(pathName)

// load header lines
val headers = loadHeaderLines(pathName)

val variants = ParquetUnboundVariantRDD(sc, pathName, sd, headers)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop whitespace

val datasetBoundVariantRDD: VariantRDD = regions match {
case Some(x) => DatasetBoundVariantRDD(variants.dataset
.filter(referenceRegionsToDatasetQueryString(x)), variants.sequences, headers)
case _ => DatasetBoundVariantRDD(variants.dataset, variants.sequences, headers)
}
datasetBoundVariantRDD
}

/**
* Load nucleotide contig fragments from FASTA into a NucleotideContigFragmentRDD.
*
Expand Down Expand Up @@ -2454,6 +2469,17 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadPartitionedParquetFeatures(pathName: String): FeatureRDD = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is missing regions, no?

if (!checkPartitionedParquetFlag(pathName)) {
throw new IllegalArgumentException("input Parquet files are not Partitioned")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace with:

require(!checkPartitionedParquetFlag(pathName),
  "Input Parquet files (%s) are not partitioned.".format(pathName))

}
val sd = loadAvroSequenceDictionary(pathName)
val features = ParquetUnboundFeatureRDD(sc, pathName, sd)

DatasetBoundFeatureRDD(features.dataset, features.sequences)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra whitespace.

}

/**
* Load a path name in Parquet + Avro format into a NucleotideContigFragmentRDD.
*
Expand Down Expand Up @@ -2486,6 +2512,15 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadPartitionedParquetFragments(pathName: String): NucleotideContigFragmentRDD = {
if (!checkPartitionedParquetFlag(pathName)) {
throw new IllegalArgumentException("input Parquet files are not Partitioned")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace with:

require(!checkPartitionedParquetFlag(pathName),
  "Input Parquet files (%s) are not partitioned.".format(pathName))

}
val sd = loadAvroSequenceDictionary(pathName)
val nucleotideContigFragments = ParquetUnboundNucleotideContigFragmentRDD(sc, pathName, sd)
DatasetBoundNucleotideContigFragmentRDD(nucleotideContigFragments.dataset, nucleotideContigFragments.sequences)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Break long line.

}

/**
* Load a path name in Parquet + Avro format into a FragmentRDD.
*
Expand Down Expand Up @@ -2904,4 +2939,29 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
loadParquetFragments(pathName, optPredicate = optPredicate, optProjection = optProjection)
}
}

def writePartitionedParquetFlag(filePath: String): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be private (and go in GenomicRDD.scala, methinks?)

val path = new Path(filePath, "_isPartitionedByStartPos")
val fs = path.getFileSystem(sc.hadoopConfiguration)
fs.createNewFile(path)
}

def checkPartitionedParquetFlag(filePath: String): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: should have whitespace before this line.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be private.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually find a need to sue this from Mango library in order to determine if dataset is partitioned to see if it should use the partitioned read functions, since we needed to keep them separate. Perhaps a better option would be some wrapper function like maybeReadPartitioned but for now I prefer to keep it public and give client code chance to explicitly check.

val path = new Path(filePath, "_isPartitionedByStartPos")
val fs = path.getFileSystem(sc.hadoopConfiguration)
fs.exists(path)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: shouldn't have whitespace here.

def referenceRegionsToDatasetQueryString(x: Iterable[ReferenceRegion], partitionSize: Int = 1000000): String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be private.

var regionQueryString = "(contigName=" + "\'" + x.head.referenceName.replaceAll("chr", "") + "\' and posBin >= \'" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we replacing "chr"s?

scala.math.floor(x.head.start / partitionSize).toInt + "\' and posBin < \'" + (scala.math.floor(x.head.end / partitionSize).toInt + 1) + "\' and start >= " + x.head.start + " and end <= " + x.head.end + ")"
if (x.size > 1) {
x.foreach((i) => {
regionQueryString = regionQueryString + " or " + "(contigName=" + "\'" +
i.referenceName.replaceAll("chr", "") + "\' and posBin >= \'" + scala.math.floor(i.start / partitionSize).toInt + "\' and posBin < \'" + (scala.math.floor(i.end / partitionSize).toInt + 1) + "\' and start >= " + i.start + " and end <= " + i.end + ")"
})
}
regionQueryString
}

}
20 changes: 20 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.api.java.function.{ Function => JFunction, Function2 }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ DataFrame, Dataset, SQLContext }
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models.{
Expand All @@ -38,6 +39,7 @@ import org.bdgenomics.adam.models.{
SequenceDictionary,
SequenceRecord
}
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.formats.avro.{
Contig,
ProcessingStep,
Expand Down Expand Up @@ -2476,4 +2478,22 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: A
def saveAsParquet(filePath: java.lang.String) {
saveAsParquet(new JavaSaveArgs(filePath))
}

def saveAsPartitionedParquet(filePath: String,
compressCodec: CompressionCodecName = CompressionCodecName.GZIP,
partitionSize: Int = 1000000) {
log.warn("Saving directly as Hive-partitioned Parquet from SQL. " +
"Options other than compression codec are ignored.")
val df = toDF()

df.withColumn("posBin", floor(df("start") / partitionSize))
.write
.partitionBy("contigName", "posBin")
.format("parquet")
.option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase())
.save(filePath)
rdd.context.writePartitionedParquetFlag(filePath)
saveMetadata(filePath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: whitespace before this line, no whitespace afterwards.

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ case class ParquetUnboundAlignmentRecordRDD private[rdd] (
sc.loadParquet(parquetFilename)
}

lazy val dataset = {
lazy val dataset: Dataset[AlignmentRecordProduct] = {
val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
sqlContext.read.parquet(parquetFilename).as[AlignmentRecordProduct]
Expand Down
7 changes: 7 additions & 0 deletions adam-core/src/test/resources/multi_chr.sam
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
@SQ SN:1 LN:249250621
@SQ SN:2 LN:243199373
@PG ID:p1 PN:myProg CL:"myProg 123" VN:1.0.0
@PG ID:p2 PN:myProg CL:"myProg 456" VN:1.0.0 PP:p1
simread:1:26472783:false 16 1 26472784 60 75M * 0 0 GTATAAGAGCAGCCTTATTCCTATTTATAATCAGGGTGAAACACCTGTGCCAATGCCAAGACAGGGGTGCCAAGA * NM:i:0 AS:i:75 XS:i:0
simread:1:240997787:true 0 1 240997788 60 75M * 0 0 CTTTATTTTTATTTTTAAGGTTTTTTTTGTTTGTTTGTTTTGAGATGGAGTCTCGCTCCACCGCCCAGACTGGAG * NM:i:0 AS:i:75 XS:i:39
simread:1:189606653:true 0 2 189606654 60 75M * 0 0 TGTATCTTCCTCCCCTGCTGTATGTTTCCTGCCCTCAAACATCACACTCCACGTTCTTCAGCTTTAGGACTTGGA * NM:i:0 AS:i:75 XS:i:0
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,37 @@ class NucleotideContigFragmentRDDSuite extends ADAMFunSuite {
assert(fragments3.dataset.count === 8L)
}

sparkTest("round trip a ncf to partitioned parquet") {
def testMetadata(fRdd: NucleotideContigFragmentRDD) {
val sequenceRdd = fRdd.addSequence(SequenceRecord("aSequence", 1000L))
assert(sequenceRdd.sequences.containsRefName("aSequence"))
}

val fragments1 = sc.loadFasta(testFile("HLA_DQB1_05_01_01_02.fa"), 1000L)
assert(fragments1.rdd.count === 8L)
assert(fragments1.dataset.count === 8L)
testMetadata(fragments1)

// save using dataset path
val output1 = tmpFile("ctg.adam")
val dsBound = fragments1.transformDataset(ds => ds)
testMetadata(dsBound)
dsBound.saveAsPartitionedParquet(output1)
val fragments2 = sc.loadPartitionedParquetFragments(output1)
testMetadata(fragments2)
assert(fragments2.rdd.count === 8L)
assert(fragments2.dataset.count === 8L)

// save using rdd path
val output2 = tmpFile("ctg.adam")
val rddBound = fragments2.transform(rdd => rdd)
testMetadata(rddBound)
rddBound.saveAsPartitionedParquet(output2)
val fragments3 = sc.loadPartitionedParquetFragments(output2)
assert(fragments3.rdd.count === 8L)
assert(fragments3.dataset.count === 8L)
}

sparkTest("save fasta back as a single file") {
val origFasta = testFile("artificial.fa")
val tmpFasta = tmpFile("test.fa")
Expand Down Expand Up @@ -857,4 +888,5 @@ class NucleotideContigFragmentRDDSuite extends ADAMFunSuite {

checkSave(variantContexts)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,33 @@ class FeatureRDDSuite extends ADAMFunSuite {
assert(rdd3.dataset.count === 4)
}

sparkTest("load paritioned parquet to sql, save, re-read from avro") {
def testMetadata(fRdd: FeatureRDD) {
val sequenceRdd = fRdd.addSequence(SequenceRecord("aSequence", 1000L))
assert(sequenceRdd.sequences.containsRefName("aSequence"))
}

val inputPath = testFile("small.1.bed")
val outputPath = tmpLocation()
val rrdd = sc.loadFeatures(inputPath)
testMetadata(rrdd)
val rdd = rrdd.transformDataset(ds => ds) // no-op but force to ds
testMetadata(rdd)
assert(rdd.dataset.count === 4)
assert(rdd.rdd.count === 4)
rdd.saveAsPartitionedParquet(outputPath)
val rdd2 = sc.loadPartitionedParquetFeatures(outputPath)
testMetadata(rdd2)
assert(rdd2.rdd.count === 4)
assert(rdd2.dataset.count === 4)
val outputPath2 = tmpLocation()
rdd.transform(rdd => rdd) // no-op but force to rdd
.saveAsPartitionedParquet(outputPath2)
val rdd3 = sc.loadPartitionedParquetFeatures(outputPath2)
assert(rdd3.rdd.count === 4)
assert(rdd3.dataset.count === 4)
}

sparkTest("transform features to contig rdd") {
val features = sc.loadFeatures(testFile("sample_coverage.bed"))

Expand Down
Loading