Skip to content

Commit

Permalink
Add count kmers methods to SequenceDataset.
Browse files Browse the repository at this point in the history
  • Loading branch information
heuermh committed May 9, 2022
1 parent cc368a5 commit 4fa2b08
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ sealed abstract class AlignmentDataset extends AvroReadGroupGenomicDataset[Align
}

/**
* Cuts reads into _k_-mers, and then counts the number of occurrences of each _k_-mer.
* (Scala-specific) Cuts reads into _k_-mers, and then counts the number of occurrences of each _k_-mer.
*
* @param kmerLength The value of _k_ to use for cutting _k_-mers.
* @return Returns an RDD containing k-mer/count pairs.
Expand All @@ -762,7 +762,21 @@ sealed abstract class AlignmentDataset extends AvroReadGroupGenomicDataset[Align
}

/**
* Cuts reads into _k_-mers, and then counts the number of occurrences of each _k_-mer.
* (Java-specific) Cuts reads into _k_-mers, and then counts the number of occurrences of each _k_-mer.
*
* @param kmerLength The value of _k_ to use for cutting _k_-mers.
* @return Returns a JavaRDD containing k-mer/count pairs.
*/
def countKmers(kmerLength: java.lang.Integer): JavaRDD[(String, java.lang.Long)] = {
val k: Int = kmerLength
countKmers(k).map(p => {
(p._1, p._2: java.lang.Long)
}).toJavaRDD()
}

/**
* Cuts reads into _k_-mers, and then counts the number of occurrences of each _k_-mer
* as a Dataset.
*
* @param kmerLength The value of _k_ to use for cutting _k_-mers.
* @return Returns a Dataset containing k-mer/count pairs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.bdgenomics.adam.ds.sequence

import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function.{ Function => JFunction }
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
Expand Down Expand Up @@ -527,6 +528,54 @@ sealed abstract class SequenceDataset extends AvroGenomicDataset[Sequence, Seque
disableFastConcat = disableFastConcat)
}

/**
* (Scala-specific) Cuts sequences into _k_-mers, and then counts the number of occurrences of each _k_-mer.
*
* @param kmerLength The value of _k_ to use for cutting _k_-mers.
* @return Returns an RDD containing k-mer/count pairs.
*/
def countKmers(kmerLength: Int): RDD[(String, Long)] = {
rdd.flatMap(r => {
// cut each read into k-mers, and attach a count of 1L
r.getSequence
.sliding(kmerLength)
.map(k => (k, 1L))
}).reduceByKey((k1: Long, k2: Long) => k1 + k2)
}

/**
* (Java-specific) Cuts sequences into _k_-mers, and then counts the number of occurrences of each _k_-mer.
*
* @param kmerLength The value of _k_ to use for cutting _k_-mers.
* @return Returns an JavaRDD containing k-mer/count pairs.
*/
def countKmers(kmerLength: java.lang.Integer): JavaRDD[(String, java.lang.Long)] = {
val k: Int = kmerLength
countKmers(k).map(p => {
(p._1, p._2: java.lang.Long)
}).toJavaRDD()
}

/**
* Cuts sequences into _k_-mers, and then counts the number of occurrences of each _k_-mer
* as a Dataset.
*
* @param kmerLength The value of _k_ to use for cutting _k_-mers.
* @return Returns a Dataset containing k-mer/count pairs.
*/
def countKmersAsDataset(kmerLength: Int): Dataset[(String, Long)] = {
import spark.implicits._
val kmers = dataset.select($"sequence".as[String])
.flatMap(_.sliding(kmerLength))
.as[String]

kmers.toDF()
.groupBy($"value")
.count()
.select($"value".as("kmer"), $"count".as("count"))
.as[(String, Long)]
}

/**
* @param newRdd The RDD to replace the underlying RDD with.
* @param newPartitionMap New partition map, if any.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ sealed abstract class SliceDataset extends AvroGenomicDataset[Slice, SliceProduc
}

/**
* (Java-friendly) For all adjacent slices in this genomic dataset, we extend the slices so that the adjacent
* (Java-specific) For all adjacent slices in this genomic dataset, we extend the slices so that the adjacent
* slices now overlap by _n_ bases, where _n_ is the flank length.
*
* @param flankLength The length to extend adjacent slices by.
Expand All @@ -560,7 +560,7 @@ sealed abstract class SliceDataset extends AvroGenomicDataset[Slice, SliceProduc
}

/**
* (Scala-friendly) For all adjacent slices in this genomic dataset, we extend the slices so that the adjacent
* (Scala-specific) For all adjacent slices in this genomic dataset, we extend the slices so that the adjacent
* slices now overlap by _n_ bases, where _n_ is the flank length.
*
* @param flankLength The length to extend adjacent slices by.
Expand All @@ -573,9 +573,10 @@ sealed abstract class SliceDataset extends AvroGenomicDataset[Slice, SliceProduc
}

/**
* (Scala-friendly) Counts the k-mers contained in this genomic dataset of slices.
* (Scala-specific) Cuts slices after flanking into _k_-mers, and then counts the
* number of occurrences of each _k_-mer.
*
* @param kmerLength The length of k-mers to count.
* @param kmerLength The value of _k_ to use for cutting _k_-mers.
* @return Returns an RDD containing k-mer/count pairs.
*/
def countKmers(kmerLength: Int): RDD[(String, Long)] = {
Expand All @@ -596,10 +597,11 @@ sealed abstract class SliceDataset extends AvroGenomicDataset[Slice, SliceProduc
}

/**
* (Java-friendly) Counts the k-mers contained in this genomic dataset of slices.
* (Java-specific) Cuts slices after flanking into _k_-mers, and then counts the
* number of occurrences of each _k_-mer.
*
* @param kmerLength The length of k-mers to count.
* @return Returns an RDD containing k-mer/count pairs.
* @param kmerLength The value of _k_ to use for cutting _k_-mers.
* @return Returns a JavaRDD containing k-mer/count pairs.
*/
def countKmers(
kmerLength: java.lang.Integer): JavaRDD[(String, java.lang.Long)] = {
Expand Down

0 comments on commit 4fa2b08

Please sign in to comment.