Skip to content

Commit

Permalink
[ADAM-439] Fix ADAM to account for BDG-FORMATS-35
Browse files Browse the repository at this point in the history
Includes commit by @fnothaft: Fixed flipped index in indel realigner.

Fixes #439
  • Loading branch information
laserson committed Nov 17, 2014
1 parent 60ca9d4 commit 606c78b
Show file tree
Hide file tree
Showing 17 changed files with 47 additions and 52 deletions.
Expand Up @@ -44,7 +44,7 @@ class AlleleCountArgs extends Args4jBase with ParquetArgs {
}

object AlleleCountHelper extends Serializable {
def chooseAllele(x: (CharSequence, java.lang.Long, CharSequence, CharSequence, GenotypeAllele)) =
def chooseAllele(x: (String, java.lang.Long, String, String, GenotypeAllele)) =
x match {
case (chr, position, refAllele, varAllele, GenotypeAllele.Ref) => Some(chr, position, refAllele)
case (chr, position, refAllele, varAllele, GenotypeAllele.Alt) => Some(chr, position, varAllele)
Expand Down
Expand Up @@ -61,7 +61,7 @@ object CompareADAM extends ADAMCommandCompanion with Serializable {
new CompareADAM(Args4j[CompareADAMArgs](cmdLine))
}

type GeneratedResults[A] = RDD[(CharSequence, Seq[A])]
type GeneratedResults[A] = RDD[(String, Seq[A])]

/**
* @see CompareADAMArgs.recurse1, CompareADAMArgs.recurse2
Expand Down
Expand Up @@ -122,13 +122,13 @@ class FindReads(protected val args: FindReadsArgs) extends ADAMSparkCommand[Find

val generated: CompareADAM.GeneratedResults[Any] = engine.generate(generator)

val filtered: RDD[(CharSequence, Seq[Any])] = generated.filter {
case (name: CharSequence, values: Seq[Any]) =>
val filtered: RDD[(String, Seq[Any])] = generated.filter {
case (name: String, values: Seq[Any]) =>
values.exists(filter.passesFilter)
}

val filteredJoined = engine.joined.join(filtered).map {
case (name: CharSequence, ((bucket1: ReadBucket, bucket2: ReadBucket), generated: Seq[Any])) => {
case (name: String, ((bucket1: ReadBucket, bucket2: ReadBucket), generated: Seq[Any])) => {
val rec1 = bucket1.allReads().head
val rec2 = bucket2.allReads().head
(name, "%s:%d".format(rec1.getContig.getContigName, rec1.getStart), "%s:%d".format(rec2.getContig.getContigName, rec2.getStart))
Expand Down
Expand Up @@ -88,10 +88,10 @@ class VCFLine(vcfLine: String, val samples: Array[String], sampleIndices: Seq[In
val position = array(1).toInt
val id = array(2)
val ref = array(3)
val alts: List[CharSequence] = array(4).split(",").toList
val alts: List[String] = array(4).split(",").toList
val qual = array(5) match {
case "." => null
case x: CharSequence => x.toDouble
case "." => null
case x: String => x.toDouble
}
val filter = array(6)

Expand Down Expand Up @@ -128,7 +128,7 @@ object VCFLineConverter {
None
} else {
val gts = sampleFieldMap("GT").split("\\||/").map(_.toInt)
val genotypes: Seq[CharSequence] = gts.map(idx => line.alleleArray(idx))
val genotypes: Seq[String] = gts.map(idx => line.alleleArray(idx))
val sampleId = line.samples(i)

val flatGenotype = FlatGenotype.newBuilder()
Expand Down
Expand Up @@ -35,7 +35,7 @@ object VariantContextConverter {
else GenotypeAllele.Alt
}

private def convertAllele(allele: CharSequence, isRef: Boolean = false): Seq[Allele] = {
private def convertAllele(allele: String, isRef: Boolean = false): Seq[Allele] = {
if (allele == null)
Seq()
else
Expand Down
Expand Up @@ -33,14 +33,14 @@ object BaseFeature {
}

def frameChar(feature: Feature): Char = {
val opt: Map[CharSequence, CharSequence] = feature.getAttributes
val opt: Map[String, String] = feature.getAttributes
opt.get("frame").map(_.charAt(0)).getOrElse('.')
}

def attributeString(feature: Feature): String =
feature.getAttributes.mkString(",")

def attrs(f: Feature): Map[CharSequence, CharSequence] =
def attrs(f: Feature): Map[String, String] =
JavaConversions.mapAsScalaMap(f.getAttributes)

def attributeString(feature: Feature, attrName: String): Option[String] =
Expand Down
Expand Up @@ -174,7 +174,7 @@ class SequenceRecord(
}

// No md5/url is "equal" to any md5/url in this setting
private def optionEq(o1: Option[CharSequence], o2: Option[CharSequence]) = (o1, o2) match {
private def optionEq(o1: Option[String], o2: Option[String]) = (o1, o2) match {
case (Some(c1), Some(c2)) => c1 == c2
case _ => true
}
Expand All @@ -186,12 +186,12 @@ object SequenceRecord {

def apply(name: String,
length: Long,
md5: CharSequence = null,
url: CharSequence = null,
refseq: CharSequence = null,
genbank: CharSequence = null,
assembly: CharSequence = null,
species: CharSequence = null): SequenceRecord = {
md5: String = null,
url: String = null,
refseq: String = null,
genbank: String = null,
assembly: String = null,
species: String = null): SequenceRecord = {
new SequenceRecord(
name,
length,
Expand Down
Expand Up @@ -68,9 +68,9 @@ object ADAMContext {
seqAsJavaList(list.map(i => i: java.lang.Integer))
}

implicit def charSequenceToString(cs: CharSequence): String = cs.toString
// implicit def charSequenceToString(cs: CharSequence): String = cs.toString

implicit def charSequenceToList(cs: CharSequence): List[Char] = cs.toCharArray.toList
// implicit def charSequenceToList(cs: CharSequence): List[Char] = cs.toCharArray.toList

implicit def mapToJavaMap[A, B](map: Map[A, B]): java.util.Map[A, B] = mapAsJavaMap(map)

Expand Down
Expand Up @@ -52,7 +52,7 @@ object GTFParser {
* @param attributeField The original string of tokens
* @return The Map of attributes
*/
def parseAttrs(attributeField: String): Map[CharSequence, CharSequence] =
def parseAttrs(attributeField: String): Map[String, String] =
attributeField.split(";").flatMap {
case token: String =>
attr_regex.findFirstMatchIn(token).map(m => (m.group(1), m.group(2)))
Expand Down Expand Up @@ -106,7 +106,7 @@ class GTFParser extends FeatureParser {
case _ => (attrs.get("id"), None)
}
_id.foreach(f.setFeatureId)
_parentId.foreach(parentId => f.setParentIds(List[CharSequence](parentId)))
_parentId.foreach(parentId => f.setParentIds(List[String](parentId)))

f.setAttributes(attrs)

Expand Down Expand Up @@ -149,7 +149,7 @@ class BEDParser extends FeatureParser {
case _ => Strand.Independent
})
}
val attributes = new ArrayBuffer[(CharSequence, CharSequence)]()
val attributes = new ArrayBuffer[(String, String)]()
if (fields.length > 6) {
attributes += ("thickStart" -> fields(6))
}
Expand Down Expand Up @@ -210,7 +210,7 @@ class NarrowPeakParser extends FeatureParser {
case _ => Strand.Independent
})
}
val attributes = new ArrayBuffer[(CharSequence, CharSequence)]()
val attributes = new ArrayBuffer[(String, String)]()
if (fields.length > 6) {
attributes += ("signalValue" -> fields(6))
}
Expand Down
Expand Up @@ -62,7 +62,7 @@ private[rdd] object MarkDuplicates extends Serializable {
def apply(rdd: RDD[AlignmentRecord]): RDD[AlignmentRecord] = {

// Group by library and left position
def leftPositionAndLibrary(p: (ReferencePositionPair, SingleReadBucket)): (Option[ReferencePositionWithOrientation], CharSequence) = {
def leftPositionAndLibrary(p: (ReferencePositionPair, SingleReadBucket)): (Option[ReferencePositionWithOrientation], String) = {
(p._1.read1refPos, p._2.allReads.head.getRecordGroupLibrary)
}

Expand Down
Expand Up @@ -61,25 +61,25 @@ class ComparisonTraversalEngine(schema: Seq[FieldValue], input1: RDD[AlignmentRe
}.count()
}

def generate[T](generator: BucketComparisons[T]): RDD[(CharSequence, Seq[T])] =
def generate[T](generator: BucketComparisons[T]): RDD[(String, Seq[T])] =
ComparisonTraversalEngine.generate[T](joined, generator)

def find[T](filter: GeneratorFilter[T]): RDD[CharSequence] =
def find[T](filter: GeneratorFilter[T]): RDD[String] =
ComparisonTraversalEngine.find[T](joined, filter)
}

object ComparisonTraversalEngine {

type JoinedType = RDD[(CharSequence, (ReadBucket, ReadBucket))]
type GeneratedType[T] = RDD[(CharSequence, Seq[T])]
type JoinedType = RDD[(String, (ReadBucket, ReadBucket))]
type GeneratedType[T] = RDD[(String, Seq[T])]

def generate[T](joined: JoinedType, generator: BucketComparisons[T]): GeneratedType[T] =
joined.map {
case (name, (bucket1, bucket2)) =>
(name, generator.matchedByName(bucket1, bucket2))
}

def find[T](joined: JoinedType, filter: GeneratorFilter[T]): RDD[CharSequence] =
def find[T](joined: JoinedType, filter: GeneratorFilter[T]): RDD[String] =
joined.filter {
case (name, (bucket1, bucket2)) =>
filter.comparison.matchedByName(bucket1, bucket2).exists(filter.passesFilter)
Expand Down
Expand Up @@ -326,6 +326,7 @@ private[rdd] class RealignIndels(val consensusModel: ConsensusGenerator = new Co

// if read alignment is improved by aligning against new consensus, realign
if (remapping != -1) {

realignedReadCount += 1

// bump up mapping quality by 10
Expand All @@ -335,24 +336,26 @@ private[rdd] class RealignIndels(val consensusModel: ConsensusGenerator = new Co
builder.setStart(refStart + remapping)

// recompute cigar
val newCigar: Cigar = if (refStart + remapping >= bestConsensus.index.start && refStart + remapping <= bestConsensus.index.end - 1) {
val newCigar: Cigar = {
// if element overlaps with consensus indel, modify cigar with indel
val (idElement, endLength) = if (bestConsensus.index.start == bestConsensus.index.end - 1) {
val (idElement, endLength, endPenalty) = if (bestConsensus.index.start == bestConsensus.index.end - 1) {
(new CigarElement(bestConsensus.consensus.length, CigarOperator.I),
r.getSequence.length - bestConsensus.consensus.length - (bestConsensus.index.start - (refStart + remapping)))
r.getSequence.length - bestConsensus.consensus.length - (bestConsensus.index.start - (refStart + remapping)),
-bestConsensus.consensus.length)
} else {
(new CigarElement((bestConsensus.index.end - 1 - bestConsensus.index.start).toInt, CigarOperator.D),
r.getSequence.length - (bestConsensus.index.start - (refStart + remapping)))
r.getSequence.length - (bestConsensus.index.start - (refStart + remapping)),
bestConsensus.consensus.length)
}

val cigarElements = List[CigarElement](new CigarElement((refStart + remapping - bestConsensus.index.start).toInt, CigarOperator.M),
// compensate the end
builder.setEnd(refStart + remapping + r.getSequence.length + endPenalty)

val cigarElements = List[CigarElement](new CigarElement((bestConsensus.index.start - (refStart + remapping)).toInt, CigarOperator.M),
idElement,
new CigarElement(endLength.toInt, CigarOperator.M))

new Cigar(cigarElements)
} else {
// else, new cigar is all matches
new Cigar(List[CigarElement](new CigarElement(r.getSequence.length, CigarOperator.M)))
}

// update mdtag and cigar
Expand Down
Expand Up @@ -31,8 +31,8 @@ object ImplicitJavaConversions {
seqAsJavaList(list.map(i => i: java.lang.Integer))
}

implicit def charSequenceToString(cs: CharSequence): String = cs.toString
// implicit def charSequenceToString(cs: CharSequence): String = cs.toString

implicit def charSequenceToList(cs: CharSequence): List[Char] = cs.toList
// implicit def charSequenceToList(cs: CharSequence): List[Char] = cs.toList

}
Expand Up @@ -142,13 +142,9 @@ class SequenceDictionarySuite extends FunSuite {
record("chr3"))
val str0: String = "chr0"
val str1: java.lang.String = "chr1"
val str2: CharSequence = "chr2"
val str3: java.lang.CharSequence = "chr3"

assert(dict.containsRefName(str0))
assert(dict.containsRefName(str1))
assert(dict.containsRefName(str2))
assert(dict.containsRefName(str3))
}

test("Apply on name works correctly for different String types") {
Expand All @@ -159,13 +155,9 @@ class SequenceDictionarySuite extends FunSuite {
record("chr3"))
val str0: String = "chr0"
val str1: java.lang.String = "chr1"
val str2: CharSequence = "chr2"
val str3: java.lang.CharSequence = "chr3"

assert(dict(str0).get.name === "chr0")
assert(dict(str1).get.name === "chr1")
assert(dict(str2).get.name === "chr2")
assert(dict(str3).get.name === "chr3")
}

def record(name: String, length: Long = 1000, url: Option[String] = None, md5: Option[String] = None): SequenceRecord =
Expand Down
Expand Up @@ -129,7 +129,7 @@ class GenomicRegionPartitionerSuite extends SparkFunSuite {
.build()
}

def record(name: CharSequence, length: Long) = SequenceRecord(name.toString, length.toInt)
def record(name: String, length: Long) = SequenceRecord(name.toString, length.toInt)
}

class PositionKeyed[U <: Serializable] extends Serializable {
Expand Down
Expand Up @@ -60,7 +60,7 @@ class ComparisonTraversalEngineSuite extends SparkFunSuite {

val generator = MappedPosition

val generated: RDD[(CharSequence, Seq[Long])] = engine.generate(generator)
val generated: RDD[(String, Seq[Long])] = engine.generate(generator)

val genMap = Map(generated.collect().map { case (key, value) => (key.toString, value) }: _*)

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -305,7 +305,7 @@
<dependency>
<groupId>org.bdgenomics.bdg-formats</groupId>
<artifactId>bdg-formats</artifactId>
<version>0.3.2</version>
<version>0.4.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
Expand Down

0 comments on commit 606c78b

Please sign in to comment.