New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Spark SQL APIs supported across all types #1921

Merged
merged 4 commits into from Mar 20, 2018

Conversation

Projects
None yet
5 participants
@fnothaft
Member

fnothaft commented Feb 21, 2018

Resolves #1580, #1867, WIP towards #1728. Merges the GenomicRDD and GenomicDataset traits together, and makes the VariantContextRDD and GenericGenomicRDD implementations support GenomicDataset. Not done yet, but I wanted to get this out for review, as it's a big one. Remaining TODO:

  • Clear merge conflicts
  • Plumb join and GenericGenomicRDD APIs out to Python
  • Plumb join and GenericGenomicRDD APIs out to R
  • Resolve a serialization issue that occurs when using GenericGenomicRDD from the dataset API
  • Update user docs

One general stylistic question: this PR goes halfway to renaming everything "GenomicDataset" instead of "GenomicRDD". Is this a good change, or should we hold on to the GenomicRDD name for the sake of consistency?

@fnothaft fnothaft requested review from heuermh and akmorrow13 Feb 21, 2018

@coveralls

This comment has been minimized.

coveralls commented Feb 21, 2018

Coverage Status

Coverage decreased (-1.8%) to 80.786% when pulling b75295e on fnothaft:issues/1867-1580-vc-sql into 26f0608 on bigdatagenomics:master.

1 similar comment
@coveralls

This comment has been minimized.

coveralls commented Feb 21, 2018

Coverage Status

Coverage decreased (-1.8%) to 80.786% when pulling b75295e on fnothaft:issues/1867-1580-vc-sql into 26f0608 on bigdatagenomics:master.

@coveralls

This comment has been minimized.

coveralls commented Feb 21, 2018

Coverage Status

Coverage decreased (-2.8%) to 79.74% when pulling a26047a on fnothaft:issues/1867-1580-vc-sql into 9ea870c on bigdatagenomics:master.

@AmplabJenkins

This comment has been minimized.

AmplabJenkins commented Feb 21, 2018

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/ADAM-prb/2678/

Build result: FAILURE

[...truncated 7 lines...] > /home/jenkins/git2/bin/git init /home/jenkins/workspace/ADAM-prb # timeout=10Fetching upstream changes from https://github.com/bigdatagenomics/adam.git > /home/jenkins/git2/bin/git --version # timeout=10 > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/heads/:refs/remotes/origin/ # timeout=15 > /home/jenkins/git2/bin/git config remote.origin.url https://github.com/bigdatagenomics/adam.git # timeout=10 > /home/jenkins/git2/bin/git config --add remote.origin.fetch +refs/heads/:refs/remotes/origin/ # timeout=10 > /home/jenkins/git2/bin/git config remote.origin.url https://github.com/bigdatagenomics/adam.git # timeout=10Fetching upstream changes from https://github.com/bigdatagenomics/adam.git > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse b75295e^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains b75295e # timeout=10Checking out Revision b75295e (origin/pr/1921/head) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f b75295e6d1fb816321a5f4bf5206b9541ca7cb8bFirst time build. Skipping changelog.Triggering ADAM-prb ? 2.6.2,2.10,2.2.1,centosTriggering ADAM-prb ? 2.6.2,2.11,2.2.1,centosTriggering ADAM-prb ? 2.7.3,2.10,2.2.1,centosTriggering ADAM-prb ? 2.7.3,2.11,2.2.1,centosADAM-prb ? 2.6.2,2.10,2.2.1,centos completed with result SUCCESSADAM-prb ? 2.6.2,2.11,2.2.1,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,2.2.1,centos completed with result SUCCESSADAM-prb ? 2.7.3,2.11,2.2.1,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'
Test FAILed.

@heuermh

This comment has been minimized.

Member

heuermh commented Feb 22, 2018

One general stylistic question: this PR goes halfway to renaming everything "GenomicDataset" instead of "GenomicRDD".

+1

@heuermh

a few random review comments/questions, carry on!

/**
* If true and saving as FASTQ, we will sort by read name.
*/
var sortFastqOutput: Boolean

This comment has been minimized.

@heuermh

heuermh Feb 22, 2018

Member

is now a good time to rid the world of this?

This comment has been minimized.

@fnothaft

fnothaft Mar 7, 2018

Member

Don't think so.

This comment has been minimized.

@heuermh
*
* @param pathName The path name to load genotypes from.
* Globs/directories are supported.
* @return Returns a GenotypeRDD.

This comment has been minimized.

@heuermh

heuermh Feb 22, 2018

Member

doc doesn't match

import sqlContext.implicits._
val ds = sqlContext.read.parquet(pathName).as[VariantContextProduct]
new DatasetBoundVariantContextRDD(ds, sd, samples, headers)

This comment has been minimized.

@heuermh

heuermh Feb 22, 2018

Member

we're intentionally bound to dataset here, is the idea to move other loadParquetXxx methods in the same direction?

This comment has been minimized.

@fnothaft

fnothaft Mar 7, 2018

Member

Nah, we don't have an Avro impl of VariantContext, so we can't use the normal loadParquet method.

This comment has been minimized.

@heuermh
val uTag: TypeTag[U]
/**
* This data as a Spark SQL Dataset.

This comment has been minimized.

@heuermh

heuermh Feb 22, 2018

Member

These data?

*
* @param tFn A function that transforms the underlying RDD as a DataFrame.
* @return A new RDD where the RDD of genomic data has been replaced, but the
* metadata (sequence dictionary, and etc) is copied without modification.

This comment has been minimized.

@heuermh

heuermh Feb 22, 2018

Member

metadata ... are?

genomicRdd: GenomicRDD[X, Y],
flankSize: Long)(
def rightOuterBroadcastRegionJoin[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]](
genomicRdd: GenomicDataset[X, Y, Z])(

This comment has been minimized.

@heuermh

heuermh Feb 22, 2018

Member

genomicRdd → genomicDataset

This comment has been minimized.

@fnothaft

fnothaft Mar 7, 2018

Member

How much do you want me to go through and patch these? For now, I've left them as is (genomicRdd). I'd rather make a cleanup pass through later.

This comment has been minimized.

@heuermh

heuermh Mar 8, 2018

Member

ok, please create an issue to track consistency in names and doc for RDD vs Dataset

}
/**
* Performs a broadcast inner join between this RDD and another RDD.

This comment has been minimized.

@heuermh

heuermh Feb 22, 2018

Member

RDD → Dataset

regionFn)
@transient val uTag: TypeTag[U] = typeTag[U]
def saveAsParquet(filePath: String,

This comment has been minimized.

@heuermh

heuermh Feb 22, 2018

Member

does this one need an override?

pageSize: Int = 1 * 1024 * 1024,
compressCodec: CompressionCodecName = CompressionCodecName.GZIP,
disableDictionaryEncoding: Boolean = false,
optSchema: Option[Schema] = None): Unit = SaveAsADAM.time {

This comment has been minimized.

@heuermh

heuermh Feb 22, 2018

Member

would be nice to have separate SaveRddAsParquet and SaveDatasetAsParquet timers

This comment has been minimized.

@fnothaft

fnothaft Mar 7, 2018

Member

We're not really going to get meaningful timing info out of the SaveDatasetAsParquet timer.

This comment has been minimized.

@heuermh
@@ -210,7 +210,10 @@ case class RDDBoundGenotypeRDD private[rdd] (
}
}
sealed abstract class GenotypeRDD extends MultisampleAvroGenomicRDD[Genotype, GenotypeProduct, GenotypeRDD] {
sealed abstract class GenotypeRDD extends MultisampleAvroGenomicDataset[Genotype, GenotypeProduct, GenotypeRDD] {

This comment has been minimized.

@heuermh

heuermh Feb 22, 2018

Member

will you be proposing to move this to GenotypeDataset?

@fnothaft fnothaft added this to the 0.24.0 milestone Mar 5, 2018

@AmplabJenkins

This comment has been minimized.

AmplabJenkins commented Mar 7, 2018

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/ADAM-prb/2702/

Build result: FAILURE

[...truncated 7 lines...] > /home/jenkins/git2/bin/git init /home/jenkins/workspace/ADAM-prb # timeout=10Fetching upstream changes from https://github.com/bigdatagenomics/adam.git > /home/jenkins/git2/bin/git --version # timeout=10 > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/heads/:refs/remotes/origin/ # timeout=15 > /home/jenkins/git2/bin/git config remote.origin.url https://github.com/bigdatagenomics/adam.git # timeout=10 > /home/jenkins/git2/bin/git config --add remote.origin.fetch +refs/heads/:refs/remotes/origin/ # timeout=10 > /home/jenkins/git2/bin/git config remote.origin.url https://github.com/bigdatagenomics/adam.git # timeout=10Fetching upstream changes from https://github.com/bigdatagenomics/adam.git > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1921/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains c8dd89d # timeout=10Checking out Revision c8dd89d (origin/pr/1921/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f c8dd89d25204e4dd7fa729ef22d01b3a1aa9d280First time build. Skipping changelog.Triggering ADAM-prb ? 2.6.2,2.10,2.2.1,centosTriggering ADAM-prb ? 2.6.2,2.11,2.2.1,centosTriggering ADAM-prb ? 2.7.3,2.10,2.2.1,centosTriggering ADAM-prb ? 2.7.3,2.11,2.2.1,centosADAM-prb ? 2.6.2,2.10,2.2.1,centos completed with result SUCCESSADAM-prb ? 2.6.2,2.11,2.2.1,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,2.2.1,centos completed with result SUCCESSADAM-prb ? 2.7.3,2.11,2.2.1,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'
Test FAILed.

@fnothaft

This comment has been minimized.

Member

fnothaft commented Mar 7, 2018

Jenkins, retest this please.

@AmplabJenkins

This comment has been minimized.

AmplabJenkins commented Mar 7, 2018

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/ADAM-prb/2704/
Test PASSed.

@fnothaft

This comment has been minimized.

Member

fnothaft commented Mar 7, 2018

@heuermh I think everything in Scala is going to be stable from here on out; can you make a review pass? I'm going to finish up the R bit in the morning.

@AmplabJenkins

This comment has been minimized.

AmplabJenkins commented Mar 7, 2018

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/ADAM-prb/2705/
Test PASSed.

@AmplabJenkins

This comment has been minimized.

AmplabJenkins commented Mar 7, 2018

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/ADAM-prb/2706/
Test PASSed.

@AmplabJenkins

This comment has been minimized.

AmplabJenkins commented Mar 7, 2018

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/ADAM-prb/2710/
Test PASSed.

@AmplabJenkins

This comment has been minimized.

AmplabJenkins commented Mar 7, 2018

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/ADAM-prb/2711/
Test PASSed.

tManifest: ClassTag[T],
xManifest: ClassTag[X]): Y = {
def pipe[X, Y <: Product, Z <: GenomicDataset[X, Y, Z], W <: InFormatter[T, U, V, W]](cmd: Seq[String],
files: Seq[String] = Seq.empty,

This comment has been minimized.

@heuermh

heuermh Mar 8, 2018

Member

nit: reformat as above for line length. Same with other pipe methods

implicit val txTag = ClassTag.AnyRef.asInstanceOf[ClassTag[(Option[T], X)]]
implicit val u1Tag: TypeTag[U] = uTag
implicit val u2Tag: TypeTag[Y] = genomicRdd.uTag
implicit val uyTag = typeTag[(Option[U], Y)]

This comment has been minimized.

@heuermh

heuermh Mar 8, 2018

Member

hope I never need to touch this method signature ;)

genomicRdd: GenomicRDD[X, Y],
flankSize: Long)(
def rightOuterBroadcastRegionJoin[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]](
genomicRdd: GenomicDataset[X, Y, Z])(

This comment has been minimized.

@heuermh

heuermh Mar 8, 2018

Member

ok, please create an issue to track consistency in names and doc for RDD vs Dataset

mvn -U \
-P python,r \
test \
-Dsuites=select.no.suites\* \

This comment has been minimized.

@heuermh

heuermh Mar 8, 2018

Member

is this to ignore the scala tests while still running the Python and R ones?

@antonkulaga

This comment has been minimized.

Contributor

antonkulaga commented Mar 8, 2018

Is this a good change, or should we hold on to the GenomicRDD name for the sake of consistency?

Do not know, for my ADAM-based code/libs renaming is not a problem.. What I noticed with the previous version was that in many cases using genomic RDDs was faster than trying to use spark sql.

@AmplabJenkins

This comment has been minimized.

AmplabJenkins commented Mar 15, 2018

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/ADAM-prb/2729/
Test PASSed.

@heuermh heuermh merged commit a28da44 into bigdatagenomics:master Mar 20, 2018

1 of 2 checks passed

Codacy/PR Quality Review Not so good... This pull request quality could be better.
Details
default Merged build finished.
Details
@heuermh

This comment has been minimized.

Member

heuermh commented Mar 20, 2018

Thank you, @fnothaft

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment