New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question:How to tranform a line of sam to AlignmentRecord? #1425

Closed
xubo245 opened this Issue Mar 7, 2017 · 16 comments

Comments

3 participants
@xubo245

xubo245 commented Mar 7, 2017

How to tranform a line of sam to AlignmentRecord?

SRR062634.19197346 99 22 16054958 60 100M = 16055045 187 CACATTCCCAGTTTGAGGGAGCCAGGTGTATTCCTGAGCCTGGGCATTGGATGAAAATGTTAGGGTGGGCCTATGATTAGGCCTGAGAGGTTGTTTGTCT GGGGGGGGGGGFGGGGGGGGGGGGGGEGEGGGGGGGGFGGEGFGG?GEGFGGGEBDDFFCEEEEEAFCCCEEBFEEEEDDE?EEECECDD/:=>>:AAAA RG:Z:SRR062634 XT:A:U NM:i:0 SM:i:23 AM:i:23 X0:i:1 X1:i:1 XM:i:0 XO:i:0 XG:i:0 MD:Z:100 XA:Z:14,-19787951,100M,1;

SequenceDictionary and RecordGroupDictionary are ready :

val firstRecord = samToADAMConverter.convert(newSAMRecord.next(), newSequenceDictionary, new RecordGroupDictionary(Seq()))

I want to transform a line(String format) of sam to AlignmentRecord, but not read form file.

Are there a function to realize it?

Thanks

@heuermh

This comment has been minimized.

Member

heuermh commented Mar 7, 2017

Parsing to a SAMRecord is handled by htsjdk, then you can use the same ADAM converter as above.

@xubo245

This comment has been minimized.

xubo245 commented Mar 8, 2017

I use converters from samtools and adam,but there are some error:

code:

  def samToAlignmentRecordRDD(samRDD: RDD[String], sequenceDictionary: SequenceDictionary, recordGroupDictionary: RecordGroupDictionary): AlignmentRecordRDD = {
    val samRecordConverter = new SAMRecordConverter
    val arc = new AlignmentRecordConverter
    var SAMHeader = arc.createSAMHeader(sequenceDictionary, recordGroupDictionary)
    val samLineParser = new SAMLineParser(SAMHeader)

    var rdd = samRDD.map { each =>
      var sAMRecordSerializable=new SAMRecordSerializable
//      var samRecord = samLineParser.parseLine(each)
//      samRecordConverter.convert(samRecord, sequenceDictionary, recordGroupDictionary)
      sAMRecordSerializable.samRecord = samLineParser.parseLine(each)
      samRecordConverter.convert(sAMRecordSerializable.samRecord, sequenceDictionary, recordGroupDictionary)
    }

    AlignedReadRDD(rdd, sequenceDictionary, recordGroupDictionary)
  }

error:

90
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)
	at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318)
	at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:317)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
	at org.apache.spark.rdd.RDD.map(RDD.scala:317)
	at org.gcdss.alignment.transform.Converter.samToAlignmentRecordRDD(Converter.scala:22)
	at org.gcdss.alignment.CloudBWA$.saveAsAdam(CloudBWA.scala:199)
	at org.gcdss.alignment.saveAsAdamTest$.main(saveAsAdamTest.scala:47)
	at org.gcdss.alignment.saveAsAdamTest.main(saveAsAdamTest.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.io.NotSerializableException: htsjdk.samtools.DefaultSAMRecordFactory
Serialization stack:
	- object not serializable (class: htsjdk.samtools.DefaultSAMRecordFactory, value: htsjdk.samtools.DefaultSAMRecordFactory@77c10a5f)
	- field (class: htsjdk.samtools.SAMLineParser, name: samRecordFactory, type: interface htsjdk.samtools.SAMRecordFactory)
	- object (class htsjdk.samtools.SAMLineParser, htsjdk.samtools.SAMLineParser@2f9a4401)
	- field (class: org.gcdss.alignment.transform.Converter$$anonfun$1, name: samLineParser$1, type: class htsjdk.samtools.SAMLineParser)
	- object (class org.gcdss.alignment.transform.Converter$$anonfun$1, <function1>)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
	... 18 more

How to fix it?

@fnothaft

This comment has been minimized.

Member

fnothaft commented Mar 8, 2017

Ah, interesting! I haven't tried it like that before. I would try:

def samToAlignmentRecordRDD(samRDD: RDD[String], sequenceDictionary: SequenceDictionary, recordGroupDictionary: RecordGroupDictionary): AlignmentRecordRDD = {
  val samRecordConverter = new SAMRecordConverter
  val arc = new AlignmentRecordConverter

  val rdd = samRDD.mapPartitions(iter =>
    val samHeader = arc.createSAMHeader(sequenceDictionary, recordGroupDictionary)
    val samLineParser = new SAMLineParser(SAMHeader)
    val samRecordSerializable = new SAMRecordSerializable
    iter.map(each => {
      samRecordSerializable.set(samLineParser.parseLine(each))
      samRecordConverter.convert(sAMRecordSerializable.samRecord, sequenceDictionary, recordGroupDictionary)
    })
  })

  AlignedReadRDD(rdd, sequenceDictionary, recordGroupDictionary)
}

This moves the creation of the SAMLineParser inside of the code that gets executed on the worker, which means that it doesn't need to get serialized. That said, I haven't tried this, so YMMV.

@xubo245

This comment has been minimized.

xubo245 commented Mar 8, 2017

Have not tried it like that in your upstream of avocado? read mapping return RDD(sam format), It need to transform adam. If write to local Filesystem, it waste time...

If you fix the problem ,please tell me.
Thanks.

@fnothaft

This comment has been minimized.

Member

fnothaft commented Mar 8, 2017

Ah, no, I haven't done this downstream in Avocado. However, we've read SAM from a pipe and used it to create AlignmentRecords. That code is in https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AnySAMOutFormatter.scala, which gets used in https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala#L284-L309. That code is a little different from what you're trying to do, but might be an alternative solution. In your case, what's generating the SAM string?

@xubo245

This comment has been minimized.

xubo245 commented Mar 8, 2017

I just have fix it

BWA(bwamem algorithm) generate the SAM string.

I run bwa in Spark and generate RDD[SAM String].

Because there are problem #1424 (comment), both in Adam-0.21.0 and Adam-0.20.0,. So I use Adam-0.19.0, there are not AnySAMOutFormatter.scala and GenomicRDD is a trait in Adam-0.19.0....

InputStream and SamReaderFactory.makeDefault().open in AnySAMOutFormatter.scala should only support a file Stream? other function in samtools also like that, I can not find a function or API in adam and samtools to meet the need like mine.

@xubo245

This comment has been minimized.

xubo245 commented Mar 8, 2017

I want to pull request into Adam, where should I upload ?

@fnothaft

@heuermh

This comment has been minimized.

Member

heuermh commented Mar 8, 2017

Use the New pull request button here https://github.com/bigdatagenomics/adam/pulls

@fnothaft

This comment has been minimized.

Member

fnothaft commented Mar 8, 2017

That's great to hear! We'd love to see a pull request. Here's some additional docs from Github around opening a PR: https://help.github.com/articles/about-pull-requests/. Let us know if there's any way we can help you!

@xubo245

This comment has been minimized.

xubo245 commented Mar 9, 2017

I want to known which function or scala file ....

@fnothaft

This comment has been minimized.

Member

fnothaft commented Mar 9, 2017

Oh, gotcha! I would add it to ADAMContext as a method named something like def loadReadsFromSamString(rdd: RDD[String]).

@xubo245

This comment has been minimized.

xubo245 commented Mar 9, 2017

ok, Thanks!

@fnothaft

This comment has been minimized.

Member

fnothaft commented Mar 9, 2017

Oh, thank you!

@xubo245

This comment has been minimized.

xubo245 commented Mar 10, 2017

I have finished it(loadReadsFromSamString) in adam-0.19.0 and update for Adam-0.21.1-SNAPSHOT today.
PR link: #1434
@fnothaft @heuermh

If there are any problem, please tell me , I will fix it.

@fnothaft fnothaft added the wontfix label Jan 9, 2018

@fnothaft

This comment has been minimized.

Member

fnothaft commented Jan 9, 2018

Long term, we plan to direct this usage pattern to the pipe API. Closing as won't fix.

@fnothaft fnothaft closed this Jan 9, 2018

@heuermh heuermh added this to the 0.24.0 milestone Jan 9, 2018

@xubo245

This comment has been minimized.

xubo245 commented Jan 10, 2018

You require me raise this PR before, but now won't fix and don't merge...Why?

@heuermh heuermh added this to Completed in Release 0.24.0 Feb 10, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment