Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge VariantAnnotation and DatabaseVariantAnnotation records #1250

Closed
Expand Up @@ -27,9 +27,9 @@ import org.bdgenomics.adam.rdd.features.FeatureRDD
import org.bdgenomics.adam.rdd.fragment.FragmentRDD
import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD
import org.bdgenomics.adam.rdd.variation.{
DatabaseVariantAnnotationRDD,
GenotypeRDD,
VariantRDD
VariantRDD,
VariantAnnotationRDD
}
import org.bdgenomics.formats.avro._
import scala.collection.JavaConversions._
Expand Down Expand Up @@ -93,9 +93,9 @@ class JavaADAMContext private (val ac: ADAMContext) extends Serializable {
* Loads in variant annotations.
*
* @param filePath The path to load the file from.
* @return Returns a DatabaseVariantAnnotationRDD.
* @return Returns a VariantAnnotationRDD.
*/
def loadVariantAnnotations(filePath: java.lang.String): DatabaseVariantAnnotationRDD = {
def loadVariantAnnotations(filePath: java.lang.String): VariantAnnotationRDD = {
ac.loadVariantAnnotations(filePath)
}

Expand Down
Expand Up @@ -25,20 +25,20 @@
import org.bdgenomics.adam.models.RecordGroupDictionary;
import org.bdgenomics.adam.models.SequenceDictionary;
import org.bdgenomics.adam.rdd.ADAMContext;
import org.bdgenomics.adam.rdd.variation.DatabaseVariantAnnotationRDD;
import org.bdgenomics.adam.rdd.variation.VariantAnnotationRDD;

/**
* A simple test class for the JavaADAMRDD/Context. Writes an RDD of annotations to
* disk and reads it back.
*/
class JavaADAMAnnotationConduit {
public static DatabaseVariantAnnotationRDD conduit(DatabaseVariantAnnotationRDD recordRdd,
ADAMContext ac) throws IOException {
public static VariantAnnotationRDD conduit(VariantAnnotationRDD annotationRdd,
ADAMContext ac) throws IOException {

// make temp directory and save file
Path tempDir = Files.createTempDirectory("javaAC");
String fileName = tempDir.toString() + "/testRdd.annotation.adam";
recordRdd.save(fileName);
annotationRdd.save(fileName);

// create a new adam context and load the file
JavaADAMContext jac = new JavaADAMContext(ac);
Expand Down
Expand Up @@ -49,8 +49,8 @@ class AlleleCountArgs extends Args4jBase with ParquetArgs {
object AlleleCountHelper extends Serializable {
def chooseAllele(x: (String, java.lang.Long, String, String, GenotypeAllele)) =
x match {
case (chr, position, refAllele, varAllele, GenotypeAllele.Ref) => Some(chr, position, refAllele)
case (chr, position, refAllele, varAllele, GenotypeAllele.Alt) => Some(chr, position, varAllele)
case (chr, position, refAllele, varAllele, GenotypeAllele.REF) => Some(chr, position, refAllele)
case (chr, position, refAllele, varAllele, GenotypeAllele.ALT) => Some(chr, position, varAllele)
case _ => None
}

Expand Down
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.VariantContext
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.variation.DatabaseVariantAnnotationRDD
import org.bdgenomics.adam.rdd.variation.VariantAnnotationRDD
import org.bdgenomics.adam.rich.RichVariant
import org.bdgenomics.formats.avro._
import org.bdgenomics.utils.cli._
Expand Down Expand Up @@ -60,7 +60,7 @@ class VcfAnnotation2ADAM(val args: VcfAnnotation2ADAMArgs) extends BDGSparkComma
val keyedAnnotations = existingAnnotations.rdd.keyBy(anno => new RichVariant(anno.getVariant))
val joinedAnnotations = keyedAnnotations.join(annotations.rdd.keyBy(anno => new RichVariant(anno.getVariant)))
val mergedAnnotations = joinedAnnotations.map(kv => VariantContext.mergeAnnotations(kv._2._1, kv._2._2))
DatabaseVariantAnnotationRDD(mergedAnnotations, existingAnnotations.sequences).saveAsParquet(args)
VariantAnnotationRDD(mergedAnnotations, existingAnnotations.sequences).saveAsParquet(args)
} else {
annotations.saveAsParquet(args)
}
Expand Down
Expand Up @@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.bdgenomics.adam.converters

import htsjdk.samtools.ValidationStringency
Expand All @@ -31,7 +30,14 @@ import org.bdgenomics.formats.avro.{
import org.bdgenomics.utils.misc.Logging
import scala.collection.JavaConverters._

object VariantAnnotations extends Serializable with Logging {
/**
* Convert between htsjdk VCF INFO reserved key "ANN" values and TranscriptEffects.
*/
private[adam] object TranscriptEffectConverter extends Serializable with Logging {

/**
* Look up variant annotation messages by name and message code.
*/
private val MESSAGES: Map[String, VariantAnnotationMessage] = Map(
// name -> enum
ERROR_CHROMOSOME_NOT_FOUND.name() -> ERROR_CHROMOSOME_NOT_FOUND,
Expand All @@ -57,15 +63,33 @@ object VariantAnnotations extends Serializable with Logging {
"I3" -> INFO_NON_REFERENCE_ANNOTATION
)

/**
* Split effects by <code>&amp;</code> character.
*
* @param s effects to split
* @return effects split by <code>&amp;</code> character
*/
private def parseEffects(s: String): List[String] = {
s.split("&").toList
}

/**
* Split variant effect messages by <code>&amp;</code> character.
*
* @param s variant effect messages to split
* @return variant effect messages split by <code>&amp;</code> character
*/
private def parseMessages(s: String): List[VariantAnnotationMessage] = {
// todo: haven't seen a delimiter here, assuming it is also '&'
s.split("&").map(MESSAGES.get(_)).toList.flatten
}

/**
* Split a single or fractional value into optional numerator and denominator values.
*
* @param s single or fractional value to split
* @return single or fractional value split into optional numerator and denominator values
*/
private def parseFraction(s: String): (Option[Integer], Option[Integer]) = {
if ("".equals(s)) {
return (None, None)
Expand All @@ -78,10 +102,23 @@ object VariantAnnotations extends Serializable with Logging {
}
}

def setIfNotEmpty(s: String, setFn: String => Unit) {
/**
* Set a value via a function if the value is not empty.
*
* @param s value to set
* @param setFn function to call if the value is not empty
*/
private def setIfNotEmpty(s: String, setFn: String => Unit) {
Option(s).filter(_.nonEmpty).foreach(setFn)
}

/**
* Parse zero or one transcript effects from the specified string value.
*
* @param s value to parse
* @param stringency validation stringency
* @return zero or one transcript effects parsed from the specified string value
*/
private[converters] def parseTranscriptEffect(
s: String,
stringency: ValidationStringency): Seq[TranscriptEffect] = {
Expand Down Expand Up @@ -110,7 +147,8 @@ object VariantAnnotations extends Serializable with Logging {

val te = TranscriptEffect.newBuilder()
setIfNotEmpty(alternateAllele, te.setAlternateAllele(_))
if (!effects.isEmpty) te.setEffects(effects.asJava)
if (effects.nonEmpty) te.setEffects(effects.asJava)
// note: annotationImpact is output by SnpEff but is not part of the VCF ANN specification
setIfNotEmpty(geneName, te.setGeneName(_))
setIfNotEmpty(geneId, te.setGeneId(_))
setIfNotEmpty(featureType, te.setFeatureType(_))
Expand All @@ -132,26 +170,115 @@ object VariantAnnotations extends Serializable with Logging {
Seq(te.build())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR, as this line is unchanged, but whenever possible, I prefer Iterable to Seq unless you need random lookup by index.

}

/**
* Parse the VCF INFO reserved key "ANN" value into zero or more TranscriptEffects.
*
* @param s string to parse
* @param stringency validation stringency
* @return the VCF INFO reserved key "ANN" value parsed into zero or more TranscriptEffects
*/
private[converters] def parseAnn(
s: String,
stringency: ValidationStringency): List[TranscriptEffect] = {

s.split(",").map(parseTranscriptEffect(_, stringency)).flatten.toList
}

def createVariantAnnotation(
/**
* Convert the htsjdk VCF INFO reserved key "ANN" value into zero or more TranscriptEffects,
* matching on alternate allele.
*
* @param variant variant
* @param vc htsjdk variant context
* @param stringency validation stringency, defaults to strict
* @return the htsjdk VCF INFO reserved key "ANN" value converted into zero or more
* TranscriptEffects, matching on alternate allele, and wrapped in an option
*/
def convertToTranscriptEffects(
variant: Variant,
vc: VariantContext,
stringency: ValidationStringency = ValidationStringency.STRICT): VariantAnnotation = {
stringency: ValidationStringency = ValidationStringency.STRICT): Option[List[TranscriptEffect]] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of returning Option[List[TranscriptEffect]] I would just return List[TranscriptEffect]. If you would return a None, I would just return a List.empty instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would make my brain hurt less. The thought is elsewhere it matters whether this field has been set, so checking Option seemed more correct than checking for an empty list.


def parseAndFilter(attr: String): Option[List[TranscriptEffect]] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should be private.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error: illegal start of statement (no modifiers allowed here) [ERROR] private def parseAndFilter(attr: String): Option[List[TranscriptEffect]] = {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, I misread this and didn't notice that it is nested inside another function.

if (attr == VCFConstants.MISSING_VALUE_v4) {
None
} else {
val filtered = parseAnn(attr, stringency)
.filter(_.getAlternateAllele == variant.getAlternateAllele)
if (filtered.isEmpty) {
None
} else {
Some(filtered)
}
}
}

val va = VariantAnnotation.newBuilder()
.setVariant(variant)
val attrOpt = Option(vc.getAttributeAsString("ANN", null))
try {
attrOpt.flatMap(attr => parseAndFilter(attr))
} catch {
case t: Throwable => {
if (stringency == ValidationStringency.STRICT) {
throw t
} else if (stringency == ValidationStringency.LENIENT) {
log.warn("Could not convert VCF INFO reserved key ANN value to TranscriptEffect, caught %s.".format(t))
}
None
}
}
}

/**
* Convert the specified transcript effects into a string suitable for a VCF INFO reserved
* key "ANN" value.
*
* @param effects zero or more transcript effects
* @return the specified transcript effects converted into a string suitable for a VCF INFO
* reserved key "ANN" value
*/
def convertToVcfInfoAnnValue(effects: Seq[TranscriptEffect]): String = {
def toFraction(numerator: java.lang.Integer, denominator: java.lang.Integer): String = {
val numOpt = Option(numerator)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen Option do odd things with integers before. I think you're fine here since you're using java.lang.Integer which can be null (scala's Int type "cannot", IIRC, which triggers the odd behavior), but I would make sure to have a unit test covering the null integer case just to be safe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, with Int unit tests fail with NPE before they can be wrapped in Option.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, scala tries to coerce to the Scala native type. You need to do:

Option(myJavaInt: java.lang.Integer)

To prevent that. That may not be the exact syntax but the syntax shows up in AlignmentRecordComverter, IIRC.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This NPE with Option types still needs to be fixed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can see, this is fine with java.lang.Integer parameters.

The unit test "convert transcript effect to VCF ANN attribute value" in TranscriptEffectConverterSuite covers nulls in various places (cdnaPosition, cdnaLength, cdsLength).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, SGTM!

val denomOpt = Option(denominator)

(numOpt, denomOpt) match {
case (None, None) => {
""
}
case (Some(n), None) => {
"%d".format(n)
}
case (None, Some(d)) => {
log.warn("Incorrect fractional value ?/%d, missing numerator".format(d))
""
}
case (Some(n), Some(d)) => {
"%d/%d".format(n, d)
}
}
}

val attr = vc.getAttributeAsString("ANN", null)
if (attr != null && attr != VCFConstants.MISSING_VALUE_v4) {
va.setTranscriptEffects(parseAnn(attr, stringency).asJava)
def toAnn(te: TranscriptEffect): String = {
Seq(
Option(te.getAlternateAllele).getOrElse(""), // 0
te.getEffects.asScala.mkString("&"), // 1
"", // annotationImpact
Option(te.getGeneName).getOrElse(""), // 3
Option(te.getGeneId).getOrElse(""),
Option(te.getFeatureType).getOrElse(""),
Option(te.getFeatureId).getOrElse(""),
Option(te.getBiotype).getOrElse(""),
toFraction(te.getRank, te.getTotal), // 8
Option(te.getTranscriptHgvs).getOrElse(""), // 9
Option(te.getProteinHgvs).getOrElse(""), // 10
toFraction(te.getCdnaPosition, te.getCdnaLength), // 11
toFraction(te.getCdsPosition, te.getCdsLength), // 12
toFraction(te.getProteinPosition, te.getProteinLength), // 13
Option(te.getDistance).getOrElse(""),
te.getMessages.asScala.mkString("&")
).mkString("|")
}

va.build()
effects.map(toAnn).mkString(",")
}
}
Expand Up @@ -22,8 +22,8 @@ import htsjdk.variant.vcf._
import org.apache.avro.Schema
import org.apache.avro.specific.SpecificRecord
import org.bdgenomics.formats.avro.{
DatabaseVariantAnnotation,
Genotype,
VariantAnnotation,
VariantCallingAnnotations
}
import scala.collection.JavaConversions._
Expand Down Expand Up @@ -206,13 +206,13 @@ private[converters] object VariantAnnotationConverter extends Serializable {
* Mapping betweem VCF info field names and fields IDs in the
* VariantCallingAnnotations schema.
*/
lazy val VCF2VariantCallingAnnotations: Map[String, (Int, Object => Object)] =
lazy val vcfToVariantCallingAnnotations: Map[String, (Int, Object => Object)] =
createFieldMap(INFO_KEYS, VariantCallingAnnotations.getClassSchema)

/**
* Mapping between VCF format field names and field IDs in the Genotype schema.
*/
lazy val VCF2GenotypeAnnotations: Map[String, (Int, Object => Object)] =
lazy val vcfToGenotypeAnnotations: Map[String, (Int, Object => Object)] =
createFieldMap(FORMAT_KEYS, Genotype.getClassSchema)

/**
Expand All @@ -223,11 +223,10 @@ private[converters] object VariantAnnotationConverter extends Serializable {
DBNSFP_KEYS) // ::: COSMIC_KEYS

/**
* Mapping between VCF info field names and DatabaseVariantAnnotation schema
* field IDs for all database specific fields.
* Mapping between VCF info field names and VariantAnnotation schema field IDs.
*/
lazy val VCF2DatabaseAnnotations: Map[String, (Int, Object => Object)] = createFieldMap(EXTERNAL_DATABASE_KEYS,
DatabaseVariantAnnotation.getClassSchema)
lazy val vcfToVariantAnnotations: Map[String, (Int, Object => Object)] = createFieldMap(EXTERNAL_DATABASE_KEYS,
VariantAnnotation.getClassSchema)

/**
* Creates a mapping between a Seq of attribute keys, and the field ID for
Expand Down Expand Up @@ -277,15 +276,15 @@ private[converters] object VariantAnnotationConverter extends Serializable {
}

/**
* Remaps fields from an htsjdk variant context into a site annotation.
* Remaps fields from an htsjdk variant context into a variant annotation.
*
* @param vc htsjdk variant context for a site.
* @param annotation Pre-populated site annotation in Avro.
* @return Annotation with additional info fields filled in.
* @param annotation Pre-populated variant annotation in Avro.
* @return Variant annotation with additional info fields filled in.
*/
def convert(vc: VariantContext,
annotation: DatabaseVariantAnnotation): DatabaseVariantAnnotation = {
fillRecord(VCF2DatabaseAnnotations, vc, annotation)
annotation: VariantAnnotation): VariantAnnotation = {
fillRecord(vcfToVariantAnnotations, vc, annotation)
}

/**
Expand All @@ -298,7 +297,7 @@ private[converters] object VariantAnnotationConverter extends Serializable {
*/
def convert(vc: VariantContext,
call: VariantCallingAnnotations): VariantCallingAnnotations = {
fillRecord(VCF2VariantCallingAnnotations, vc, call)
fillRecord(vcfToVariantCallingAnnotations, vc, call)
}

/**
Expand All @@ -312,7 +311,7 @@ private[converters] object VariantAnnotationConverter extends Serializable {
*/
def convert(g: htsjdk.variant.variantcontext.Genotype,
genotype: Genotype): Genotype = {
for ((v, a) <- VariantAnnotationConverter.VCF2GenotypeAnnotations) {
for ((v, a) <- VariantAnnotationConverter.vcfToGenotypeAnnotations) {
// Add extended attributes if present
val attr = g.getExtendedAttribute(v)
if (attr != null && attr != VCFConstants.MISSING_VALUE_v4) {
Expand Down