Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADAM-793] adding command to convert ADAM nucleotide contig fragments to FASTA files #816

Closed
wants to merge 1 commit into
base: master
from

Conversation

Projects
None yet
4 participants
@heuermh
Copy link
Member

heuermh commented Sep 3, 2015

Fixes #793

@heuermh heuermh referenced this pull request Sep 3, 2015

Closed

Add FASTA export #793

@AmplabJenkins

This comment has been minimized.

Copy link

AmplabJenkins commented Sep 3, 2015

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/ADAM-prb/894/
Test PASSed.

*
* @param fileName file name
*/
def adamSaveAsFasta(fileName: String) = {

This comment has been minimized.

@fnothaft

fnothaft Sep 3, 2015

Member

Nit: rename to saveAsFasta? I think we are moving away from prefixing functions with adam.

This comment has been minimized.

@heuermh

heuermh Sep 3, 2015

Author Member

+1

rdd
.sortBy(fragment => (fragment.contig.contigName, Option(fragment.fragmentNumber) match { case Some(n) => n.toInt case None => -1 }))
.map(fragment => (fragment.contig, fragment))
.reduceByKey(merge)

This comment has been minimized.

@fnothaft

fnothaft Sep 3, 2015

Member

If you're doing a reduceByKey, you shouldn't need the sortBy earlier.

This comment has been minimized.

@fnothaft

fnothaft Sep 3, 2015

Member

Ah, I think I see why you're doing the sort first. Are you trying to ensure that the reduceByKey runs in sorted order? If so, I'm not sure that this approach is guaranteed to work. Technically, the guaranteed correct way would be to do:

rdd.map(f => (f.getContig.getContigName, (f.getFragmentNumber, f)))
  .groupByKey()
  .map(kv => {
    val (_, records) = kv
    records.sortBy(_._1)
  .map(_._2)
  .reduce(merge)
  })

This comment has been minimized.

@fnothaft

fnothaft Sep 3, 2015

Member

This has the disadvantage of materializing all fragments that make up a single contig, but will be correct, IIRC.

This comment has been minimized.

@heuermh

heuermh Sep 3, 2015

Author Member

Yup, the values need to be in sorted order (fragments 1 through n of the same contig) for the reduce to work. I went after this a few different ways and will take a look at your suggestion.

def mergeFragments(): RDD[NucleotideContigFragment] = {

def merge(first: NucleotideContigFragment, second: NucleotideContigFragment): NucleotideContigFragment = {
val merged = NucleotideContigFragment.newBuilder()

This comment has been minimized.

@fnothaft

fnothaft Sep 3, 2015

Member

I think you could simplify this to:

val merged = NucleotideContigFragment.newBuilder(first)
  .setFragmentNumber(null)
  .setFragmentStartPosition(null)
  .setNumberOfFragmentsInContig(null)
  .setFragmentSequence(first.fragmentSequence + second.fragmentSequence)
  .build

This comment has been minimized.

@heuermh

heuermh Sep 3, 2015

Author Member

+1

On a related note, would it be worth pushing down a predicate to the parquet loader to only gather those fields I actually use?

This comment has been minimized.

@fnothaft

fnothaft Sep 3, 2015

Member

That's not a bad idea. However, that assumes that the data is coming out of Parquet. That is probably a reasonable assumption though.

@@ -0,0 +1,836 @@
>chromosome:GRCh38:6:29941260:29966260:1

This comment has been minimized.

@fnothaft

fnothaft Sep 3, 2015

Member

Do you know how many fragments (approximately) this contig breaks down into?

This comment has been minimized.

@heuermh

heuermh Sep 3, 2015

Author Member

I believe the default is 10k, so at 25k each both of the test contigs should break into 2 1/2 fragments.

sb.toString
}

rdd.map(toFasta).coalesce(1).saveAsTextFile(fileName)

This comment has been minimized.

@fnothaft

fnothaft Sep 3, 2015

Member

I wouldn't coalesce here by default. Rather, I would allow the user to specify an optional argument with the number of partitions to coalesce down to.

This comment has been minimized.

@heuermh

heuermh Sep 3, 2015

Author Member

+1

Let me know if there are any other arguments that might make sense to push to the command line. Hard wrap might be another one.

sb.append(record.contig.contigName)
Option(record.description).foreach(n => sb.append(" ").append(n))
if (isFragment(record)) {
sb.append(" fragment %d of %d".format(record.fragmentNumber + 1, record.numberOfFragmentsInContig))

This comment has been minimized.

@fnothaft

fnothaft Sep 3, 2015

Member

Why are we writing this out/where are we writing this out to?

This comment has been minimized.

@heuermh

heuermh Sep 3, 2015

Author Member

Not sure what you mean here. There is a map NucleotideContigFragment --> String before saveAsTextFile at line 84.

This comment has been minimized.

@fnothaft

fnothaft Sep 3, 2015

Member

Sorry, I meant to ask why are we writing " fragment %d of %d" to the string as part of the map?

This comment has been minimized.

@heuermh

heuermh Sep 3, 2015

Author Member

If you just call saveAsFasta on an RDD it will write out separate FASTA records for each fragment. E.g. https://github.com/bigdatagenomics/adam/pull/816/files#diff-26ce0c60df6e82a09e1c8f5e491e58b3R356

The adam2fasta command calls rdd.mergeFragments().saveAsFasta(fileName), or soon something like rdd.mergeFragments().coalesce(partitions, shuffle).saveAsFasta(fileName). Suppose merge fragments could be a command-line argument as well.

This comment has been minimized.

@heuermh

heuermh Sep 3, 2015

Author Member

Added these (and hard wrap) as command line arguments in latest commit and rebased.

This comment has been minimized.

@fnothaft

fnothaft Sep 12, 2015

Member

Ah, OK. The fragments are just subdivisions of a single FASTA record. E.g., instead of storingchr1` as a 250M character string, we break it up into 25,000 10K character strings. This is useful if you want to parallelize something like counting k-mers on the reference genome.

This comment has been minimized.

@heuermh

heuermh Sep 30, 2015

Author Member

Right, the fasta2adam command splits each FASTA record into fragments. The adam2fasta command merges these fragments before coalescing and saving to FASTA format.

contigFragments
  .mergeFragments()
  .coalesce(args.partitions, args.shuffle)
  .saveAsFasta(args.outputPath, args.lineWidth)

If one wanted to save the fragments without merging, they could just call the saveAsFasta method, or I suppose we could add a -merge-fragments command line argument.

Curiously, we are using -single_dash_with_underscores in some places and -single-dash-with-dashes in other places, why not --double-dashes style for argument names?

def adamSaveAsFasta(fileName: String) = {

def isFragment(record: NucleotideContigFragment): Boolean = {
(Option(record.fragmentNumber), Option(record.numberOfFragmentsInContig)) match {

This comment has been minimized.

@fnothaft

fnothaft Sep 3, 2015

Member

You can reduce this down to:

Option(record.fragmentNumber).isDefined && Option(record.numberOfFragmentsInContig).fold(false)(_ > 1)

This comment has been minimized.

@fnothaft

fnothaft Sep 3, 2015

Member

Also, as an aside, here and elsewhere, you're using record.fragmentNumber, where you should be using record.getFragmentNumber. While record.fragmentNumber is a valid field, access directly to the field should be deprecated, so you should be seeing deprecation warnings on these accesses.

This comment has been minimized.

@heuermh

heuermh Sep 3, 2015

Author Member

I thought it was that you only get deprecation warnings for record.getFragmentNumber, record.fragmentNumber and record.getFragmentNumber() are ok

@fnothaft

This comment has been minimized.

Copy link
Member

fnothaft commented Sep 3, 2015

This looks good @heuermh! Let's sort out the sortBy issue, but otherwise I am +1.

@heuermh heuermh force-pushed the heuermh:adam2fasta branch from f6c4be3 to 94cd6a3 Sep 3, 2015

@AmplabJenkins

This comment has been minimized.

Copy link

AmplabJenkins commented Sep 3, 2015

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/ADAM-prb/895/
Test PASSed.

@heuermh heuermh force-pushed the heuermh:adam2fasta branch from 94cd6a3 to b596ca9 Sep 8, 2015

@heuermh

This comment has been minimized.

Copy link
Member Author

heuermh commented Sep 8, 2015

@fnothaft I tried scaling up EC2 instances until I could get something similar to your approach to complete in less than six hours, and wasn't able to.

The code in my last commit here completes in around 47 minutes wall time on an r3.4xlarge and I've verified the sequences re-assemble correctly with lastz. I still need to verify it works ok in clustered mode.

@AmplabJenkins

This comment has been minimized.

Copy link

AmplabJenkins commented Sep 8, 2015

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/ADAM-prb/901/
Test PASSed.

@heuermh heuermh force-pushed the heuermh:adam2fasta branch 2 times, most recently from 9ad1b41 to 2e0110b Sep 9, 2015

@AmplabJenkins

This comment has been minimized.

Copy link

AmplabJenkins commented Sep 9, 2015

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/ADAM-prb/906/
Test PASSed.

}

rdd
.sortBy(fragment => (fragment.contig.contigName, Option(fragment.fragmentNumber) match { case Some(n) => n.toInt case None => -1 }))

This comment has been minimized.

@ryan-williams

ryan-williams Sep 13, 2015

Member

I might do Option(fragment.fragmentNumber).map(_.toInt).getOrElse(-1) here, fwiw. If that doesn't seem cleaner then feel free to ignore.

This comment has been minimized.

@fnothaft

This comment has been minimized.

@heuermh

heuermh Sep 14, 2015

Author Member

I like this too, thank you. I will add it the next time I rebase and push.

@heuermh heuermh force-pushed the heuermh:adam2fasta branch from 2e0110b to d27afe3 Sep 14, 2015

@AmplabJenkins

This comment has been minimized.

Copy link

AmplabJenkins commented Sep 14, 2015

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/ADAM-prb/912/
Test PASSed.

@fnothaft

This comment has been minimized.

Copy link
Member

fnothaft commented Oct 2, 2015

OK, I checked this on a full genome build on the cluster and it seems to check out. I think there are various performance improvements we can make, but I've opened #842 for that.

@fnothaft

This comment has been minimized.

Copy link
Member

fnothaft commented Oct 2, 2015

Thanks @heuermh! Merged as 3ac54a1.

@fnothaft fnothaft closed this Oct 2, 2015

@heuermh

This comment has been minimized.

Copy link
Member Author

heuermh commented Oct 2, 2015

Great, thanks!

@heuermh heuermh deleted the heuermh:adam2fasta branch Oct 2, 2015

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.