Skip to content

Commit

Permalink
Collapse GenomicRDD trait into GenomicDataset.
Browse files Browse the repository at this point in the history
  • Loading branch information
Frank Austin Nothaft committed Feb 14, 2018
1 parent 8f19161 commit ac7cf6e
Show file tree
Hide file tree
Showing 56 changed files with 1,434 additions and 2,311 deletions.

Large diffs are not rendered by default.

Expand Up @@ -23,7 +23,7 @@ import org.bdgenomics.adam.models.{
Coverage,
VariantContext
}
import org.bdgenomics.adam.rdd.{ ADAMContext, GenomicRDD }
import org.bdgenomics.adam.rdd.ADAMContext
import org.bdgenomics.adam.rdd.contig.NucleotideContigFragmentRDD
import org.bdgenomics.adam.rdd.feature.{ CoverageRDD, FeatureRDD }
import org.bdgenomics.adam.rdd.fragment.FragmentRDD
Expand All @@ -35,16 +35,13 @@ import org.bdgenomics.adam.rdd.variant.{
}
import org.bdgenomics.formats.avro._

sealed trait SameTypeConversion[T, U <: GenomicRDD[T, U]] extends Function2[U, RDD[T], U] {
final class ContigsToContigsConverter extends Function2[NucleotideContigFragmentRDD, RDD[NucleotideContigFragment], NucleotideContigFragmentRDD] {

def call(v1: U, v2: RDD[T]): U = {
ADAMContext.sameTypeConversionFn(v1, v2)
def call(v1: NucleotideContigFragmentRDD, v2: RDD[NucleotideContigFragment]): NucleotideContigFragmentRDD = {
ADAMContext.contigsToContigsConversionFn(v1, v2)
}
}

final class ContigsToContigsConverter extends SameTypeConversion[NucleotideContigFragment, NucleotideContigFragmentRDD] {
}

final class ContigsToCoverageConverter extends Function2[NucleotideContigFragmentRDD, RDD[Coverage], CoverageRDD] {

def call(v1: NucleotideContigFragmentRDD, v2: RDD[Coverage]): CoverageRDD = {
Expand Down Expand Up @@ -101,7 +98,11 @@ final class CoverageToContigsConverter extends Function2[CoverageRDD, RDD[Nucleo
}
}

final class CoverageToCoverageConverter extends SameTypeConversion[Coverage, CoverageRDD] {
final class CoverageToCoverageConverter extends Function2[CoverageRDD, RDD[Coverage], CoverageRDD] {

def call(v1: CoverageRDD, v2: RDD[Coverage]): CoverageRDD = {
ADAMContext.coverageToCoverageConversionFn(v1, v2)
}
}

final class CoverageToFeaturesConverter extends Function2[CoverageRDD, RDD[Feature], FeatureRDD] {
Expand Down Expand Up @@ -160,7 +161,11 @@ final class FeaturesToCoverageConverter extends Function2[FeatureRDD, RDD[Covera
}
}

final class FeaturesToFeatureConverter extends SameTypeConversion[Feature, FeatureRDD] {
final class FeaturesToFeatureConverter extends Function2[FeatureRDD, RDD[Feature], FeatureRDD] {

def call(v1: FeatureRDD, v2: RDD[Feature]): FeatureRDD = {
ADAMContext.featuresToFeaturesConversionFn(v1, v2)
}
}

final class FeaturesToFragmentsConverter extends Function2[FeatureRDD, RDD[Fragment], FragmentRDD] {
Expand Down Expand Up @@ -219,7 +224,11 @@ final class FragmentsToFeaturesConverter extends Function2[FragmentRDD, RDD[Feat
}
}

final class FragmentsToFragmentConverter extends SameTypeConversion[Fragment, FragmentRDD] {
final class FragmentsToFragmentsConverter extends Function2[FragmentRDD, RDD[Fragment], FragmentRDD] {

def call(v1: FragmentRDD, v2: RDD[Fragment]): FragmentRDD = {
ADAMContext.fragmentsToFragmentsConversionFn(v1, v2)
}
}

final class FragmentsToAlignmentRecordsConverter extends Function2[FragmentRDD, RDD[AlignmentRecord], AlignmentRecordRDD] {
Expand Down Expand Up @@ -278,7 +287,11 @@ final class AlignmentRecordsToFragmentsConverter extends Function2[AlignmentReco
}
}

final class AlignmentRecordsToAlignmentRecordsConverter extends SameTypeConversion[AlignmentRecord, AlignmentRecordRDD] {
final class AlignmentRecordsToAlignmentRecordsConverter extends Function2[AlignmentRecordRDD, RDD[AlignmentRecord], AlignmentRecordRDD] {

def call(v1: AlignmentRecordRDD, v2: RDD[AlignmentRecord]): AlignmentRecordRDD = {
ADAMContext.alignmentRecordsToAlignmentRecordsConversionFn(v1, v2)
}
}

final class AlignmentRecordsToGenotypesConverter extends Function2[AlignmentRecordRDD, RDD[Genotype], GenotypeRDD] {
Expand Down Expand Up @@ -337,7 +350,11 @@ final class GenotypesToAlignmentRecordsConverter extends Function2[GenotypeRDD,
}
}

final class GenotypesToGenotypesConverter extends SameTypeConversion[Genotype, GenotypeRDD] {
final class GenotypesToGenotypesConverter extends Function2[GenotypeRDD, RDD[Genotype], GenotypeRDD] {

def call(v1: GenotypeRDD, v2: RDD[Genotype]): GenotypeRDD = {
ADAMContext.genotypesToGenotypesConversionFn(v1, v2)
}
}

final class GenotypesToVariantsConverter extends Function2[GenotypeRDD, RDD[Variant], VariantRDD] {
Expand Down Expand Up @@ -396,7 +413,11 @@ final class VariantsToGenotypesConverter extends Function2[VariantRDD, RDD[Genot
}
}

final class VariantsToVariantsConverter extends SameTypeConversion[Variant, VariantRDD] {
final class VariantsToVariantsConverter extends Function2[VariantRDD, RDD[Variant], VariantRDD] {

def call(v1: VariantRDD, v2: RDD[Variant]): VariantRDD = {
ADAMContext.variantsToVariantsConversionFn(v1, v2)
}
}

final class VariantsToVariantContextConverter extends Function2[VariantRDD, RDD[VariantContext], VariantContextRDD] {
Expand Down Expand Up @@ -455,5 +476,9 @@ final class VariantContextsToVariantsConverter extends Function2[VariantContextR
}
}

final class VariantContextsToVariantContextConverter extends SameTypeConversion[VariantContext, VariantContextRDD] {
final class VariantContextsToVariantContextConverter extends Function2[VariantContextRDD, RDD[VariantContext], VariantContextRDD] {

def call(v1: VariantContextRDD, v2: RDD[VariantContext]): VariantContextRDD = {
ADAMContext.variantContextsToVariantContextsConversionFn(v1, v2)
}
}
Expand Up @@ -21,7 +21,7 @@ import htsjdk.samtools.ValidationStringency
import org.apache.spark.SparkContext
import org.bdgenomics.adam.converters.VariantContextConverter
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.{ ADAMSaveAnyArgs, GenomicRDD }
import org.bdgenomics.adam.rdd.{ ADAMSaveAnyArgs, GenomicDataset }
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option Args4jOption }

Expand Down Expand Up @@ -81,12 +81,12 @@ class TransformGenotypes(val args: TransformGenotypesArgs)
val stringency = ValidationStringency.valueOf(args.stringency)

/**
* Coalesce the specified GenomicRDD if requested.
* Coalesce the specified GenomicDataset if requested.
*
* @param rdd GenomicRDD to coalesce.
* @return The specified GenomicRDD coalesced if requested.
* @param rdd GenomicDataset to coalesce.
* @return The specified GenomicDataset coalesced if requested.
*/
private def maybeCoalesce[U <: GenomicRDD[_, U]](rdd: U): U = {
private def maybeCoalesce[U <: GenomicDataset[_, _, U]](rdd: U): U = {
if (args.coalesce != -1) {
log.info("Coalescing the number of partitions to '%d'".format(args.coalesce))
if (args.coalesce > rdd.rdd.partitions.length || args.forceShuffle) {
Expand All @@ -100,12 +100,12 @@ class TransformGenotypes(val args: TransformGenotypesArgs)
}

/**
* Sort the specified GenomicRDD if requested.
* Sort the specified GenomicDataset if requested.
*
* @param rdd GenomicRDD to sort.
* @return The specified GenomicRDD sorted if requested.
* @param rdd GenomicDataset to sort.
* @return The specified GenomicDataset sorted if requested.
*/
private def maybeSort[U <: GenomicRDD[_, U]](rdd: U): U = {
private def maybeSort[U <: GenomicDataset[_, _, U]](rdd: U): U = {
if (args.sort) {
log.info("Sorting before saving")
rdd.sort()
Expand Down
Expand Up @@ -20,7 +20,7 @@ package org.bdgenomics.adam.cli
import htsjdk.samtools.ValidationStringency
import org.apache.spark.SparkContext
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.{ ADAMSaveAnyArgs, GenomicRDD }
import org.bdgenomics.adam.rdd.{ ADAMSaveAnyArgs, GenomicDataset }
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option Args4jOption }

Expand Down Expand Up @@ -77,12 +77,12 @@ class TransformVariants(val args: TransformVariantsArgs)
val stringency = ValidationStringency.valueOf(args.stringency)

/**
* Coalesce the specified GenomicRDD if requested.
* Coalesce the specified GenomicDataset if requested.
*
* @param rdd GenomicRDD to coalesce.
* @return The specified GenomicRDD coalesced if requested.
* @param rdd GenomicDataset to coalesce.
* @return The specified GenomicDataset coalesced if requested.
*/
private def maybeCoalesce[U <: GenomicRDD[_, U]](rdd: U): U = {
private def maybeCoalesce[U <: GenomicDataset[_, _, U]](rdd: U): U = {
if (args.coalesce != -1) {
log.info("Coalescing the number of partitions to '%d'".format(args.coalesce))
if (args.coalesce > rdd.rdd.partitions.length || args.forceShuffle) {
Expand All @@ -96,12 +96,12 @@ class TransformVariants(val args: TransformVariantsArgs)
}

/**
* Sort the specified GenomicRDD if requested.
* Sort the specified GenomicDataset if requested.
*
* @param rdd GenomicRDD to sort.
* @return The specified GenomicRDD sorted if requested.
* @param rdd GenomicDataset to sort.
* @return The specified GenomicDataset sorted if requested.
*/
private def maybeSort[U <: GenomicRDD[_, U]](rdd: U): U = {
private def maybeSort[U <: GenomicDataset[_, _, U]](rdd: U): U = {
if (args.sort) {
log.info("Sorting before saving")
rdd.sort()
Expand Down
62 changes: 52 additions & 10 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Expand Up @@ -144,8 +144,8 @@ private case class LocatableReferenceRegion(rr: ReferenceRegion) extends Locatab
object ADAMContext {

// conversion functions for pipes
implicit def sameTypeConversionFn[T, U <: GenomicRDD[T, U]](gRdd: U,
rdd: RDD[T]): U = {
implicit def contigsToContigsConversionFn(gRdd: NucleotideContigFragmentRDD,
rdd: RDD[NucleotideContigFragment]): NucleotideContigFragmentRDD = {
// hijack the transform function to discard the old RDD
gRdd.transform(oldRdd => rdd)
}
Expand Down Expand Up @@ -269,6 +269,12 @@ object ADAMContext {
new DatasetBoundNucleotideContigFragmentRDD(ds, gRdd.sequences)
}

implicit def coverageToCoverageConversionFn(gRdd: CoverageRDD,
rdd: RDD[Coverage]): CoverageRDD = {
// hijack the transform function to discard the old RDD
gRdd.transform(oldRdd => rdd)
}

implicit def coverageToFeaturesConversionFn(
gRdd: CoverageRDD,
rdd: RDD[Feature]): FeatureRDD = {
Expand Down Expand Up @@ -388,6 +394,12 @@ object ADAMContext {
new DatasetBoundCoverageRDD(ds, gRdd.sequences)
}

implicit def featuresToFeaturesConversionFn(gRdd: FeatureRDD,
rdd: RDD[Feature]): FeatureRDD = {
// hijack the transform function to discard the old RDD
gRdd.transform(oldRdd => rdd)
}

implicit def featuresToFragmentsConversionFn(
gRdd: FeatureRDD,
rdd: RDD[Fragment]): FragmentRDD = {
Expand Down Expand Up @@ -507,6 +519,12 @@ object ADAMContext {
new DatasetBoundFeatureRDD(ds, gRdd.sequences)
}

implicit def fragmentsToFragmentsConversionFn(gRdd: FragmentRDD,
rdd: RDD[Fragment]): FragmentRDD = {
// hijack the transform function to discard the old RDD
gRdd.transform(oldRdd => rdd)
}

implicit def fragmentsToAlignmentRecordsConversionFn(
gRdd: FragmentRDD,
rdd: RDD[AlignmentRecord]): AlignmentRecordRDD = {
Expand Down Expand Up @@ -571,25 +589,25 @@ object ADAMContext {
DefaultHeaderLines.allHeaderLines)
}

implicit def genericToContigsConversionFn[Y <: GenericGenomicRDD[_]](
implicit def genericToContigsConversionFn[Y <: GenericGenomicDataset[_, _]](
gRdd: Y,
rdd: RDD[NucleotideContigFragment]): NucleotideContigFragmentRDD = {
new RDDBoundNucleotideContigFragmentRDD(rdd, gRdd.sequences, None)
}

implicit def genericToCoverageConversionFn[Y <: GenericGenomicRDD[_]](
implicit def genericToCoverageConversionFn[Y <: GenericGenomicDataset[_, _]](
gRdd: Y,
rdd: RDD[Coverage]): CoverageRDD = {
new RDDBoundCoverageRDD(rdd, gRdd.sequences, None)
}

implicit def genericToFeatureConversionFn[Y <: GenericGenomicRDD[_]](
implicit def genericToFeatureConversionFn[Y <: GenericGenomicDataset[_, _]](
gRdd: Y,
rdd: RDD[Feature]): FeatureRDD = {
new RDDBoundFeatureRDD(rdd, gRdd.sequences, None)
}

implicit def genericToFragmentsConversionFn[Y <: GenericGenomicRDD[_]](
implicit def genericToFragmentsConversionFn[Y <: GenericGenomicDataset[_, _]](
gRdd: Y,
rdd: RDD[Fragment]): FragmentRDD = {
new RDDBoundFragmentRDD(rdd,
Expand All @@ -599,7 +617,7 @@ object ADAMContext {
None)
}

implicit def genericToAlignmentRecordsConversionFn[Y <: GenericGenomicRDD[_]](
implicit def genericToAlignmentRecordsConversionFn[Y <: GenericGenomicDataset[_, _]](
gRdd: Y,
rdd: RDD[AlignmentRecord]): AlignmentRecordRDD = {
new RDDBoundAlignmentRecordRDD(rdd,
Expand All @@ -609,7 +627,7 @@ object ADAMContext {
None)
}

implicit def genericToGenotypesConversionFn[Y <: GenericGenomicRDD[_]](
implicit def genericToGenotypesConversionFn[Y <: GenericGenomicDataset[_, _]](
gRdd: Y,
rdd: RDD[Genotype]): GenotypeRDD = {
new RDDBoundGenotypeRDD(rdd,
Expand All @@ -619,7 +637,7 @@ object ADAMContext {
None)
}

implicit def genericToVariantsConversionFn[Y <: GenericGenomicRDD[_]](
implicit def genericToVariantsConversionFn[Y <: GenericGenomicDataset[_, _]](
gRdd: Y,
rdd: RDD[Variant]): VariantRDD = {
new RDDBoundVariantRDD(rdd,
Expand All @@ -628,7 +646,7 @@ object ADAMContext {
None)
}

implicit def genericToVariantContextsConversionFn[Y <: GenericGenomicRDD[_]](
implicit def genericToVariantContextsConversionFn[Y <: GenericGenomicDataset[_, _]](
gRdd: Y,
rdd: RDD[VariantContext]): VariantContextRDD = {
new RDDBoundVariantContextRDD(rdd,
Expand Down Expand Up @@ -693,6 +711,12 @@ object ADAMContext {
gRdd.processingSteps)
}

implicit def alignmentRecordsToAlignmentRecordsConversionFn(gRdd: AlignmentRecordRDD,
rdd: RDD[AlignmentRecord]): AlignmentRecordRDD = {
// hijack the transform function to discard the old RDD
gRdd.transform(oldRdd => rdd)
}

implicit def alignmentRecordsToGenotypesConversionFn(
gRdd: AlignmentRecordRDD,
rdd: RDD[Genotype]): GenotypeRDD = {
Expand Down Expand Up @@ -812,6 +836,12 @@ object ADAMContext {
Seq.empty)
}

implicit def genotypesToGenotypesConversionFn(gRdd: GenotypeRDD,
rdd: RDD[Genotype]): GenotypeRDD = {
// hijack the transform function to discard the old RDD
gRdd.transform(oldRdd => rdd)
}

implicit def genotypesToVariantsConversionFn(
gRdd: GenotypeRDD,
rdd: RDD[Variant]): VariantRDD = {
Expand Down Expand Up @@ -931,6 +961,12 @@ object ADAMContext {
gRdd.headerLines)
}

implicit def variantsToVariantsConversionFn(gRdd: VariantRDD,
rdd: RDD[Variant]): VariantRDD = {
// hijack the transform function to discard the old RDD
gRdd.transform(oldRdd => rdd)
}

implicit def variantsToVariantContextConversionFn(
gRdd: VariantRDD,
rdd: RDD[VariantContext]): VariantContextRDD = {
Expand Down Expand Up @@ -997,6 +1033,12 @@ object ADAMContext {
None)
}

implicit def variantContextsToVariantContextsConversionFn(gRdd: VariantContextRDD,
rdd: RDD[VariantContext]): VariantContextRDD = {
// hijack the transform function to discard the old RDD
gRdd.transform(oldRdd => rdd)
}

// Add ADAM Spark context methods
implicit def sparkContextToADAMContext(sc: SparkContext): ADAMContext = new ADAMContext(sc)

Expand Down
@@ -0,0 +1,27 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd

import org.apache.spark.broadcast.Broadcast
import org.bdgenomics.adam.models.ReferenceRegion
import org.bdgenomics.utils.interval.array.IntervalArray

case class GenomicBroadcast[T, U <: Product, V <: GenomicDataset[T, U, V]] private[rdd] (
backingDataset: V,
broadcastTree: Broadcast[IntervalArray[ReferenceRegion, T]]) {
}

0 comments on commit ac7cf6e

Please sign in to comment.