Skip to content

Commit

Permalink
Merge 1a75ed5 into 4223f56
Browse files Browse the repository at this point in the history
  • Loading branch information
jpdna committed Jan 17, 2018
2 parents 4223f56 + 1a75ed5 commit 1d1f278
Show file tree
Hide file tree
Showing 10 changed files with 355 additions and 2 deletions.
1 change: 0 additions & 1 deletion adam-cli/src/test/resources/artificial.fa

This file was deleted.

17 changes: 17 additions & 0 deletions adam-cli/src/test/resources/artificial.fa
@@ -0,0 +1,17 @@
>artificial fasta
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAGGGGGGGGGGAAAAAAAAAAGGGGGGGGGGAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
1 change: 0 additions & 1 deletion adam-cli/src/test/resources/artificial.fa.fai

This file was deleted.

1 change: 1 addition & 0 deletions adam-cli/src/test/resources/artificial.fa.fai
@@ -0,0 +1 @@
artificial 1120 18 70 71
188 changes: 188 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Expand Up @@ -1861,6 +1861,41 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into an AlignmentRecordRDD.
*
* @note The sequence dictionary is read from an Avro file stored at
* pathName/_seqdict.avro and the record group dictionary is read from an
* Avro file stored at pathName/_rgdict.avro. These files are pure Avro,
* not Parquet + Avro.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param addChrPrefix Flag to add "chr" prefix to contigs
* @return Returns an AlignmentRecordRDD.
*/
def loadPartitionedParquetAlignments(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None, addChrPrefix: Boolean = false): AlignmentRecordRDD = {

require(checkPartitionedParquetFlag(pathName),
"Input Parquet files (%s) are not partitioned.".format(pathName))

// convert avro to sequence dictionary
val sd = loadAvroSequenceDictionary(pathName)

// convert avro to sequence dictionary
val rgd = loadAvroRecordGroupDictionary(pathName)

val pgs = loadAvroPrograms(pathName)
val reads: AlignmentRecordRDD = ParquetUnboundAlignmentRecordRDD(sc, pathName, sd, rgd, pgs)

val datasetBoundAlignmentRecordRDD: AlignmentRecordRDD = regions match {
case Some(x) => DatasetBoundAlignmentRecordRDD(reads.dataset.filter(referenceRegionsToDatasetQueryString(x, addChrPrefix = addChrPrefix)), reads.sequences, reads.recordGroups, reads.processingSteps)
case _ => DatasetBoundAlignmentRecordRDD(reads.dataset, reads.sequences, reads.recordGroups, reads.processingSteps)
}
datasetBoundAlignmentRecordRDD
}

/**
* Load unaligned alignment records from interleaved FASTQ into an AlignmentRecordRDD.
*
Expand Down Expand Up @@ -2244,6 +2279,35 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into GenotypeRDD
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param addChrPrefix Flag to add "chr" prefix to contigs
* @return Returns a GenotypeRDD.
*/
def loadPartitionedParquetGenotypes(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None, addChrPrefix: Boolean = false): GenotypeRDD = {
require(checkPartitionedParquetFlag(pathName),
"Input Parquet files (%s) are not partitioned.".format(pathName))
// load header lines
val headers = loadHeaderLines(pathName)
// load sequence info
val sd = loadAvroSequenceDictionary(pathName)
// load avro record group dictionary and convert to samples
val samples = loadAvroSamples(pathName)

val genotypes = ParquetUnboundGenotypeRDD(sc, pathName, sd, samples, headers)

val datasetBoundGenotypeRDD: GenotypeRDD = regions match {
case Some(x) => DatasetBoundGenotypeRDD(genotypes.dataset
.filter(referenceRegionsToDatasetQueryString(x, addChrPrefix = addChrPrefix)), genotypes.sequences, genotypes.samples, genotypes.headerLines)
case _ => DatasetBoundGenotypeRDD(genotypes.dataset, genotypes.sequences, genotypes.samples, genotypes.headerLines)
}
datasetBoundGenotypeRDD
}

/**
* Load a path name in Parquet + Avro format into a VariantRDD.
*
Expand Down Expand Up @@ -2277,6 +2341,34 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into an VariantRDD.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param addChrPrefix Flag to add "chr" prefix to contigs
* @return Returns a VariantRDD
*/

def loadPartitionedParquetVariants(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None, addChrPrefix: Boolean = false): VariantRDD = {
require(checkPartitionedParquetFlag(pathName),
"Input Parquet files (%s) are not partitioned.".format(pathName))
val sd = loadAvroSequenceDictionary(pathName)

// load header lines
val headers = loadHeaderLines(pathName)

val variants = ParquetUnboundVariantRDD(sc, pathName, sd, headers)

val datasetBoundVariantRDD: VariantRDD = regions match {
case Some(x) => DatasetBoundVariantRDD(variants.dataset
.filter(referenceRegionsToDatasetQueryString(x, addChrPrefix = addChrPrefix)), variants.sequences, headers)
case _ => DatasetBoundVariantRDD(variants.dataset, variants.sequences, headers)
}
datasetBoundVariantRDD
}

/**
* Load nucleotide contig fragments from FASTA into a NucleotideContigFragmentRDD.
*
Expand Down Expand Up @@ -2593,6 +2685,32 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into a FeatureRDD.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param addChrPrefix Flag to add "chr" prefix to contigs
* @return Returns a FeatureRDD.
*/

def loadPartitionedParquetFeatures(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None, addChrPrefix: Boolean = false): FeatureRDD = {
require(checkPartitionedParquetFlag(pathName),
"Input Parquet files (%s) are not partitioned.".format(pathName))
val sd = loadAvroSequenceDictionary(pathName)
val features = ParquetUnboundFeatureRDD(sc, pathName, sd)

val datasetBoundFeatureRDD: FeatureRDD = regions match {
case Some(x) => DatasetBoundFeatureRDD(features.dataset
.filter(referenceRegionsToDatasetQueryString(x, addChrPrefix = addChrPrefix)), features.sequences)
case _ => DatasetBoundFeatureRDD(features.dataset, features.sequences)
}

datasetBoundFeatureRDD

}

/**
* Load a path name in Parquet + Avro format into a NucleotideContigFragmentRDD.
*
Expand Down Expand Up @@ -2625,6 +2743,30 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into a NucleotideContigFragmentRDD.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @param addChrPrefix Flag to add "chr" prefix to contigs
* @return Returns a NucleotideContigFragmentRDD
*/
def loadPartitionedParquetFragments(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None, addChrPrefix: Boolean = false): NucleotideContigFragmentRDD = {
require(checkPartitionedParquetFlag(pathName),
"Input Parquet files (%s) are not partitioned.".format(pathName))
val sd = loadAvroSequenceDictionary(pathName)
val nucleotideContigFragments = ParquetUnboundNucleotideContigFragmentRDD(sc, pathName, sd)

val datasetBoundNucleotideContigFragmentRDD: NucleotideContigFragmentRDD = regions match {
case Some(x) => DatasetBoundNucleotideContigFragmentRDD(nucleotideContigFragments.dataset
.filter(referenceRegionsToDatasetQueryString(x, addChrPrefix = addChrPrefix)), nucleotideContigFragments.sequences)
case _ => DatasetBoundNucleotideContigFragmentRDD(nucleotideContigFragments.dataset, nucleotideContigFragments.sequences)
}

datasetBoundNucleotideContigFragmentRDD
}

/**
* Load a path name in Parquet + Avro format into a FragmentRDD.
*
Expand Down Expand Up @@ -3046,4 +3188,50 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
loadParquetFragments(pathName, optPredicate = optPredicate, optProjection = optProjection)
}
}

/**
* Test if Parquet files are partitioned
*
* @param filePath
* @return
*/

def checkPartitionedParquetFlag(filePath: String): Boolean = {
val path = new Path(filePath, "_isPartitionedByStartPos")
val fs = path.getFileSystem(sc.hadoopConfiguration)
fs.exists(path)
}

/**
* Returns a query string used to filter a dataset based on an input list of ReferenceRegion
*
* @param regions list of regions to include in query
* @param partitionSize size of genomic range partitions used when writing parquet
* @param addChrPrefix add the prefix "chr" to the contig names - use if the original loaded data and sequence
* dictionary used "chr" prefix
* @return
*/

def referenceRegionsToDatasetQueryString(regions: Iterable[ReferenceRegion], partitionSize: Int = 1000000, addChrPrefix: Boolean = false): String = {

def maybeAddChrPrefix(contig: String): String = {
val chromosomes = List("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "X", "Y", "MT")
if (chromosomes.contains(contig) && addChrPrefix) {
"chr" + contig
} else {
contig
}
}

var regionQueryString = "(contigName=" + "\'" + maybeAddChrPrefix(regions.head.referenceName) + "\' and posBin >= \'" +
scala.math.floor(regions.head.start / partitionSize).toInt + "\' and posBin < \'" + (scala.math.floor(regions.head.end / partitionSize).toInt + 1) + "\' and start >= " + regions.head.start + " and end <= " + regions.head.end + ")"
if (regions.size > 1) {
regions.foreach((i) => {
regionQueryString = regionQueryString + " or " + "(contigName=" + "\'" +
maybeAddChrPrefix(i.referenceName) + "\' and posBin >= \'" + scala.math.floor(i.start / partitionSize).toInt + "\' and posBin < \'" + (scala.math.floor(i.end / partitionSize).toInt + 1) + "\' and start >= " + i.start + " and end <= " + i.end + ")"
})
}
regionQueryString
}

}
26 changes: 26 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.api.java.function.{ Function => JFunction, Function2 }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ DataFrame, Dataset, SQLContext }
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models.{
Expand Down Expand Up @@ -2517,4 +2518,29 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: A
def saveAsParquet(filePath: java.lang.String) {
saveAsParquet(new JavaSaveArgs(filePath))
}

def writePartitionedParquetFlag(filePath: String): Boolean = {
val path = new Path(filePath, "_isPartitionedByStartPos")
val fs = path.getFileSystem(toDF().sqlContext.sparkContext.hadoopConfiguration)
fs.createNewFile(path)
}

def saveAsPartitionedParquet(filePath: String,
compressCodec: CompressionCodecName = CompressionCodecName.GZIP,
partitionSize: Int = 1000000) {
log.warn("Saving directly as Hive-partitioned Parquet from SQL. " +
"Options other than compression codec are ignored.")
val df = toDF()

df.withColumn("posBin", floor(df("start") / partitionSize))
.write
.partitionBy("contigName", "posBin")
.format("parquet")
.option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase())
.save(filePath)
writePartitionedParquetFlag(filePath)
//rdd.context.writePartitionedParquetFlag(filePath)
saveMetadata(filePath)
}

}
7 changes: 7 additions & 0 deletions adam-core/src/test/resources/multi_chr.sam
@@ -0,0 +1,7 @@
@SQ SN:1 LN:249250621
@SQ SN:2 LN:243199373
@PG ID:p1 PN:myProg CL:"myProg 123" VN:1.0.0
@PG ID:p2 PN:myProg CL:"myProg 456" VN:1.0.0 PP:p1
simread:1:26472783:false 16 1 26472784 60 75M * 0 0 GTATAAGAGCAGCCTTATTCCTATTTATAATCAGGGTGAAACACCTGTGCCAATGCCAAGACAGGGGTGCCAAGA * NM:i:0 AS:i:75 XS:i:0
simread:1:240997787:true 0 1 240997788 60 75M * 0 0 CTTTATTTTTATTTTTAAGGTTTTTTTTGTTTGTTTGTTTTGAGATGGAGTCTCGCTCCACCGCCCAGACTGGAG * NM:i:0 AS:i:75 XS:i:39
simread:1:189606653:true 0 2 189606654 60 75M * 0 0 TGTATCTTCCTCCCCTGCTGTATGTTTCCTGCCCTCAAACATCACACTCCACGTTCTTCAGCTTTAGGACTTGGA * NM:i:0 AS:i:75 XS:i:0
Expand Up @@ -140,6 +140,37 @@ class NucleotideContigFragmentRDDSuite extends ADAMFunSuite {
assert(fragments3.dataset.count === 8L)
}

sparkTest("round trip a ncf to partitioned parquet") {
def testMetadata(fRdd: NucleotideContigFragmentRDD) {
val sequenceRdd = fRdd.addSequence(SequenceRecord("aSequence", 1000L))
assert(sequenceRdd.sequences.containsRefName("aSequence"))
}

val fragments1 = sc.loadFasta(testFile("HLA_DQB1_05_01_01_02.fa"), 1000L)
assert(fragments1.rdd.count === 8L)
assert(fragments1.dataset.count === 8L)
testMetadata(fragments1)

// save using dataset path
val output1 = tmpFile("ctg.adam")
val dsBound = fragments1.transformDataset(ds => ds)
testMetadata(dsBound)
dsBound.saveAsPartitionedParquet(output1)
val fragments2 = sc.loadPartitionedParquetFragments(output1)
testMetadata(fragments2)
assert(fragments2.rdd.count === 8L)
assert(fragments2.dataset.count === 8L)

// save using rdd path
val output2 = tmpFile("ctg.adam")
val rddBound = fragments2.transform(rdd => rdd)
testMetadata(rddBound)
rddBound.saveAsPartitionedParquet(output2)
val fragments3 = sc.loadPartitionedParquetFragments(output2)
assert(fragments3.rdd.count === 8L)
assert(fragments3.dataset.count === 8L)
}

sparkTest("save fasta back as a single file") {
val origFasta = testFile("artificial.fa")
val tmpFasta = tmpFile("test.fa")
Expand Down Expand Up @@ -857,4 +888,5 @@ class NucleotideContigFragmentRDDSuite extends ADAMFunSuite {

checkSave(variantContexts)
}

}
Expand Up @@ -925,6 +925,33 @@ class FeatureRDDSuite extends ADAMFunSuite {
assert(rdd3.dataset.count === 4)
}

sparkTest("load paritioned parquet to sql, save, re-read from avro") {
def testMetadata(fRdd: FeatureRDD) {
val sequenceRdd = fRdd.addSequence(SequenceRecord("aSequence", 1000L))
assert(sequenceRdd.sequences.containsRefName("aSequence"))
}

val inputPath = testFile("small.1.bed")
val outputPath = tmpLocation()
val rrdd = sc.loadFeatures(inputPath)
testMetadata(rrdd)
val rdd = rrdd.transformDataset(ds => ds) // no-op but force to ds
testMetadata(rdd)
assert(rdd.dataset.count === 4)
assert(rdd.rdd.count === 4)
rdd.saveAsPartitionedParquet(outputPath)
val rdd2 = sc.loadPartitionedParquetFeatures(outputPath)
testMetadata(rdd2)
assert(rdd2.rdd.count === 4)
assert(rdd2.dataset.count === 4)
val outputPath2 = tmpLocation()
rdd.transform(rdd => rdd) // no-op but force to rdd
.saveAsPartitionedParquet(outputPath2)
val rdd3 = sc.loadPartitionedParquetFeatures(outputPath2)
assert(rdd3.rdd.count === 4)
assert(rdd3.dataset.count === 4)
}

sparkTest("transform features to contig rdd") {
val features = sc.loadFeatures(testFile("sample_coverage.bed"))

Expand Down

0 comments on commit 1d1f278

Please sign in to comment.