Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added ExtractRegions #1637

Merged
merged 1 commit into from
Aug 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,43 @@ sealed abstract class NucleotideContigFragmentRDD extends AvroGenomicRDD[Nucleot
}
}

/**
* From a set of contigs, returns a list of sequences based on reference regions provided
* @param regions List of Reference regions over which to get sequences
* @return RDD[(ReferenceRegion, String)] of region -> sequence pairs.
*/
def extractRegions(regions: Iterable[ReferenceRegion]): RDD[(ReferenceRegion, String)] = {

def extractSequence(fragmentRegion: ReferenceRegion, fragment: NucleotideContigFragment, region: ReferenceRegion): (ReferenceRegion, String) = {
val merged = fragmentRegion.intersection(region)
val start = (merged.start - fragmentRegion.start).toInt
val end = (merged.end - fragmentRegion.start).toInt
val fragmentSequence: String = fragment.getSequence
(merged, fragmentSequence.substring(start, end))
}

def reduceRegionSequences(
kv1: (ReferenceRegion, String),
kv2: (ReferenceRegion, String)): (ReferenceRegion, String) = {
(kv1._1.merge(kv2._1), if (kv1._1.compareTo(kv2._1) <= 0) {
kv1._2 + kv2._2
} else {
kv2._2 + kv1._2
})
}

val places = flattenRddByRegions()
.flatMap {
case (fragmentRegion, fragment) =>
regions.collect {
case region if fragmentRegion.overlaps(region) =>
(region, extractSequence(fragmentRegion, fragment, region))
}
}.sortByKey()

places.reduceByKey(reduceRegionSequences).values
}

/**
* For all adjacent records in the RDD, we extend the records so that the adjacent
* records now overlap by _n_ bases, where _n_ is the flank length.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,52 @@ class NucleotideContigFragmentRDDSuite extends ADAMFunSuite {
assert(rdd.extract(region1) === "GTACTCTCATG")
}

sparkTest("extract sequences based on the list of reference regions") {
val test = "test"

def dnas2fragments(dnas: Seq[String]): List[NucleotideContigFragment] = {
val (_, frags) = dnas.foldLeft((0L, List.empty[NucleotideContigFragment])) {
case ((start, acc), str) =>
val fragment = NucleotideContigFragment.newBuilder()
.setContigName("test")
.setStart(start)
.setLength(str.length: Long)
.setSequence(str)
.setEnd(start + str.length)
.build()
(start + str.length, fragment :: acc)
}
frags.reverse
}

val dnas: Seq[String] = Vector(
"ACAGCTGATCTCCAGATATGACCATGGGTT",
"CAGCTGATCTCCAGATATGACCATGGGTTT",
"CCAGAAGTTTGAGCCACAAACCCATGGTCA"
)

val merged = dnas.reduce(_ + _)

val record = SequenceRecord("test", merged.length)

val dic = new SequenceDictionary(Vector(record))
val frags = sc.parallelize(dnas2fragments(dnas))
val fragments = NucleotideContigFragmentRDD(frags, dic)

val byRegion = fragments.rdd.keyBy(ReferenceRegion(_))

val regions = List(
new ReferenceRegion(test, 0, 5),
new ReferenceRegion(test, 25, 35),
new ReferenceRegion(test, 40, 50),
new ReferenceRegion(test, 50, 70)
)

val results: Set[(ReferenceRegion, String)] = fragments.extractRegions(regions).collect().toSet
val seqs = regions.zip(List("ACAGC", "GGGTTCAGCT", "CCAGATATGA", "CCATGGGTTTCCAGAAGTTT")).toSet
assert(seqs === results)
}

sparkTest("recover trimmed reference string from multiple contig fragments") {

val sequence = "ACTGTACTC"
Expand Down