Skip to content

Commit

Permalink
[ADAM-1169] Write GFF header line pragma in single file mode.
Browse files Browse the repository at this point in the history
Resolves #1169.
  • Loading branch information
fnothaft committed May 15, 2017
1 parent 18191f9 commit a13ff4a
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 3 deletions.
Expand Up @@ -333,11 +333,13 @@ case class FeatureRDD(rdd: RDD[Feature],
* @param asSingleFile If true, combines all partition shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* parallel file merging engine.
* @param optHeaderPath If provided, the header file to include.
*/
private def writeTextRdd[T](rdd: RDD[T],
outputPath: String,
asSingleFile: Boolean,
disableFastConcat: Boolean) {
disableFastConcat: Boolean,
optHeaderPath: Option[String] = None) {
if (asSingleFile) {

// write rdd to disk
Expand All @@ -352,8 +354,10 @@ case class FeatureRDD(rdd: RDD[Feature],
fs,
new Path(outputPath),
new Path(tailPath),
disableFastConcat = disableFastConcat)
disableFastConcat = disableFastConcat,
optHeaderPath = optHeaderPath.map(p => new Path(p)))
} else {
assert(optHeaderPath.isEmpty)
rdd.saveAsTextFile(outputPath)
}
}
Expand Down Expand Up @@ -390,10 +394,18 @@ case class FeatureRDD(rdd: RDD[Feature],
def saveAsGff3(fileName: String,
asSingleFile: Boolean = false,
disableFastConcat: Boolean = false) = {
val optHeaderPath = if (asSingleFile) {
val headerPath = "%s_head".format(fileName)
GFF3HeaderWriter(headerPath, rdd.context)
Some(headerPath)
} else {
None
}
writeTextRdd(rdd.map(FeatureRDD.toGff3),
fileName,
asSingleFile,
disableFastConcat)
disableFastConcat,
optHeaderPath = optHeaderPath)
}

/**
Expand Down
@@ -0,0 +1,45 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd.feature

import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext

/**
* Writes the header for a GFF3 file to an otherwise empty file.
*/
private[feature] object GFF3HeaderWriter {

val HEADER_STRING = "##gff-version 3.2.1"

/**
* Writes a GFF3 Header pragma to a file.
*
* @param filePath The path to write the file to.
* @param sc The SparkContext, to access the Hadoop FS Configuration.
*/
def apply(filePath: String,
sc: SparkContext) {
val path = new Path(filePath)
val fs = path.getFileSystem(sc.hadoopConfiguration)
val os = fs.create(path)
os.write(HEADER_STRING.getBytes)
os.write("\n".getBytes)
os.close()
}
}
Expand Up @@ -23,6 +23,7 @@ import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.ADAMFunSuite
import org.bdgenomics.formats.avro.{ Feature, Strand }
import org.scalactic.{ Equivalence, TypeCheckedTripleEquals }
import scala.io.Source

class FeatureRDDSuite extends ADAMFunSuite with TypeCheckedTripleEquals {
implicit val strongFeatureEq = new Equivalence[Feature] {
Expand Down Expand Up @@ -173,6 +174,12 @@ class FeatureRDDSuite extends ADAMFunSuite with TypeCheckedTripleEquals {
val outputPath = tempLocation(".gff3")
expected.saveAsGff3(outputPath, asSingleFile = true)

val lines = Source.fromFile(outputPath)
.getLines
.toSeq
assert(lines.size > 1)
assert(lines.head === GFF3HeaderWriter.HEADER_STRING)

val feature = expected.rdd.first
val gff3Columns = FeatureRDD.toGff3(feature).split('\t')
assert(gff3Columns.size === 9)
Expand Down
@@ -0,0 +1,34 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd.feature

import org.bdgenomics.adam.util.ADAMFunSuite
import scala.io.Source

class GFF3HeaderWriterSuite extends ADAMFunSuite {

sparkTest("write gff3 header pragma") {
val tmp = tmpFile(".gff3")
GFF3HeaderWriter(tmp, sc)
val lines = Source.fromFile(tmp)
.getLines
.toSeq
assert(lines.size === 1)
assert(lines.head === GFF3HeaderWriter.HEADER_STRING)
}
}

0 comments on commit a13ff4a

Please sign in to comment.