Skip to content

Commit

Permalink
[ADAM-1768] Add InFormatter for unpaired FASTQ.
Browse files Browse the repository at this point in the history
Resolves #1768.
  • Loading branch information
fnothaft committed Oct 19, 2017
1 parent 65dde41 commit 1ccbe33
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd.read

import java.io.OutputStream
import org.apache.hadoop.conf.Configuration
import org.bdgenomics.adam.converters.AlignmentRecordConverter
import org.bdgenomics.adam.rdd.{ InFormatter, InFormatterCompanion }
import org.bdgenomics.adam.rdd.fragment.FragmentRDD
import org.bdgenomics.formats.avro.AlignmentRecord
import org.bdgenomics.utils.misc.Logging

/**
* InFormatter companion that creates an InFormatter that writes FASTQ.
*/
object FASTQInFormatter extends InFormatterCompanion[AlignmentRecord, AlignmentRecordRDD, FASTQInFormatter] {

/**
* Builds an FASTQInFormatter to write FASTQ.
*
* @param gRdd GenomicRDD of AlignmentRecords. Used to get HadoopConfiguration.
* @return Returns a new Single FASTQ InFormatter.
*/
def apply(gRdd: AlignmentRecordRDD): FASTQInFormatter = {
new FASTQInFormatter(gRdd.rdd.context.hadoopConfiguration)
}
}

class FASTQInFormatter private (
conf: Configuration) extends InFormatter[AlignmentRecord, AlignmentRecordRDD, FASTQInFormatter] with Logging {

protected val companion = FASTQInFormatter
private val converter = new AlignmentRecordConverter
private val writeSuffixes = conf.getBoolean(FragmentRDD.WRITE_SUFFIXES, false)
private val writeOriginalQualities = conf.getBoolean(FragmentRDD.WRITE_ORIGINAL_QUALITIES, false)

/**
* Writes alignment records to an output stream in FASTQ format.
*
* @param os An OutputStream connected to a process we are piping to.
* @param iter An iterator of records to write.
*/
def write(os: OutputStream, iter: Iterator[AlignmentRecord]) {
iter.foreach(read => {
val fastqRead = converter.convertToFastq(read,
maybeAddSuffix = writeSuffixes,
outputOriginalBaseQualities = writeOriginalQualities) + "\n"

os.write(fastqRead.getBytes)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,29 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite {
assert(records === newRecords)
}

sparkTest("don't lose any reads when piping fastq to sam") {
// write suffixes at end of reads
sc.hadoopConfiguration.setBoolean(FragmentRDD.WRITE_SUFFIXES, true)

val fragmentsPath = testFile("interleaved_fastq_sample1.ifq")
val ardd = sc.loadFragments(fragmentsPath).toReads
val records = ardd.rdd.count
assert(records === 6)
assert(ardd.dataset.count === 6)
assert(ardd.dataset.rdd.count === 6)

implicit val tFormatter = FASTQInFormatter
implicit val uFormatter = new AnySAMOutFormatter

// this script converts interleaved fastq to unaligned sam
val scriptPath = testFile("fastq_to_usam.py")

val pipedRdd: AlignmentRecordRDD = ardd.pipe("python $0",
files = Seq(scriptPath))
val newRecords = pipedRdd.rdd.count
assert(records === newRecords)
}

sparkTest("can properly set environment variables inside of a pipe") {
val reads12Path = testFile("reads12.sam")
val smallPath = testFile("small.sam")
Expand Down Expand Up @@ -991,7 +1014,7 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite {
.build)), sd)

val joined = reads.shuffleRegionJoin(features).rdd.collect
println(joined.mkString("\n"))

assert(joined.size === 2)
assert(joined.exists(_._1.getStart == 20L))
assert(joined.exists(_._1.getStart == 40L))
Expand Down

0 comments on commit 1ccbe33

Please sign in to comment.