Skip to content

Commit

Permalink
Merge dc6f761 into aa43a56
Browse files Browse the repository at this point in the history
  • Loading branch information
fnothaft committed Feb 5, 2018
2 parents aa43a56 + dc6f761 commit 36011fc
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 2 deletions.
Expand Up @@ -35,7 +35,10 @@ import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.{ MultisampleAvroGenomicRDD, VCFHeaderUtils }
import org.bdgenomics.adam.rich.RichVariant
import org.bdgenomics.adam.serialization.AvroSerializer
import org.bdgenomics.adam.sql.{ Genotype => GenotypeProduct }
import org.bdgenomics.adam.sql.{
Genotype => GenotypeProduct,
Variant => VariantProduct
}
import org.bdgenomics.utils.interval.array.{ IntervalArray, IntervalArraySerializer }
import org.bdgenomics.formats.avro.{ Genotype, Sample }
import scala.collection.JavaConversions._
Expand Down Expand Up @@ -308,6 +311,39 @@ sealed abstract class GenotypeRDD extends MultisampleAvroGenomicRDD[Genotype, Ge
VariantContextRDD(vcRdd, sequences, samples, headerLines)
}

/**
* Extracts the variants contained in this RDD of genotypes.
*
* Does not perform any filtering looking at whether the variant was called or
* not. By default, does not deduplicate variants.
*
* @param dedupe If true, drops variants described in more than one genotype
* record.
* @return Returns the variants described by this GenotypeRDD.
*/
def toVariants(dedupe: Boolean = false): VariantRDD = {
val sqlContext = SQLContext.getOrCreate(rdd.context)
import sqlContext.implicits._

val notDedupedVariants = dataset.select($"variant.*")
.as[VariantProduct]

val maybeDedupedVariants = if (dedupe) {
// we can't call dropDuplicates without specifying fields,
// because you can't call a set operation on a schema that includes
// map/array types
notDedupedVariants.dropDuplicates("contigName",
"start",
"end",
"referenceAllele",
"alternateAllele")
} else {
notDedupedVariants
}

VariantRDD(maybeDedupedVariants, sequences, headerLines)
}

/**
* @param newRdd An RDD to replace the underlying RDD with.
* @return Returns a new GenotypeRDD with the underlying RDD replaced.
Expand Down
Expand Up @@ -20,6 +20,7 @@ package org.bdgenomics.adam.rdd.variant
import htsjdk.variant.vcf.VCFHeaderLine
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.bdgenomics.adam.converters.VariantContextConverter
import org.bdgenomics.adam.models.{
Coverage,
ReferenceRegion,
Expand Down Expand Up @@ -119,7 +120,6 @@ class GenotypeRDDSuite extends ADAMFunSuite {
ReferenceRegion("1", 752790L, 752793L))
val filteredGenotypes = sc.loadParquetGenotypes(outputPath,
optPredicate = Some(predicate))
filteredGenotypes.rdd.foreach(println)
assert(filteredGenotypes.rdd.count === 9)
val starts = filteredGenotypes.rdd.map(_.getStart).distinct.collect.toSet
assert(starts.size === 3)
Expand Down Expand Up @@ -555,4 +555,23 @@ class GenotypeRDDSuite extends ADAMFunSuite {

checkSave(variantContexts)
}

sparkTest("loading genotypes then converting to variants yields same output as loading variants") {
VariantContextConverter.setNestAnnotationInGenotypesProperty(sc.hadoopConfiguration,
true)

val genotypes = sc.loadGenotypes(testFile("small.vcf"))
val variants = sc.loadVariants(testFile("small.vcf"))

assert(genotypes.toVariants().dataset.count() === genotypes.dataset.count())

val g2v = genotypes.toVariants(dedupe = true)
val variantsFromGenotypes = g2v.rdd.collect
assert(variantsFromGenotypes.size === variants.rdd.count())

val directlyLoadedVariants = variants.rdd.collect().toSet

assert(variantsFromGenotypes.size === directlyLoadedVariants.size)
variantsFromGenotypes.foreach(v => assert(directlyLoadedVariants(v)))
}
}

0 comments on commit 36011fc

Please sign in to comment.