New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NPE when trying to map *loadVariants* over RDD #1713

Closed
pbashyal-nmdp opened this Issue Sep 6, 2017 · 3 comments

Comments

Projects
3 participants
@pbashyal-nmdp
Contributor

pbashyal-nmdp commented Sep 6, 2017

Trying to map over a RDD of filenames against loadVariants with the following

// Does not work
import org.bdgenomics.adam.rdd.ADAMContext._

// RDD of filenames (eg. from HDFS. sc.textFile("vcfList.txt"))
val vcfFiles = sc.parallelize(Array("./adam-core/src/test/resources/small.vcf", "./adam-core/src/test/resources/random.vcf"))

val allVariants = vcfFiles.map(vcfFile => {
    val variants = sc.loadVariants(vcfFile)
    (vcfFile, variants)
})

allVariants.take(1)

gives a NullPointerException.

adam git/master*  $ bin/adam-shell
Using SPARK_SHELL=/Users/pbashyal/Applications/spark-2.1.1-bin-hadoop2.7/bin/spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2017-09-06 16:25:08 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2017-09-06 16:25:12 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://10.5.45.119:4040
Spark context available as 'sc' (master = local[*], app id = local-1504733108818).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMContext._

scala> val vcfFiles = sc.parallelize(Array("./adam-core/src/test/resources/small.vcf", "./adam-core/src/test/resources/random.vcf"))
vcfFiles: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> val allVariants = vcfFiles.map(vcfFile => {
     |     val variants = sc.loadVariants(vcfFile)
     |     (vcfFile, variants)
     | })
allVariants: org.apache.spark.rdd.RDD[(String, org.bdgenomics.adam.rdd.variant.VariantRDD)] = MapPartitionsRDD[1] at map at <console>:29

scala>

scala> allVariants.take(1)
2017-09-06 16:25:20 ERROR Executor:91 - Exception in task 2.0 in stage 1.0 (TID 3)
java.lang.NullPointerException
	at org.bdgenomics.utils.misc.HadoopUtil$.newJob(HadoopUtil.scala:48)
	at org.bdgenomics.adam.rdd.ADAMContext.org$bdgenomics$adam$rdd$ADAMContext$$readVcfRecords(ADAMContext.scala:1976)
	at org.bdgenomics.adam.rdd.ADAMContext$$anonfun$loadVcf$1.apply(ADAMContext.scala:2008)
	at org.bdgenomics.adam.rdd.ADAMContext$$anonfun$loadVcf$1.apply(ADAMContext.scala:2005)
	at scala.Option.fold(Option.scala:158)
	at org.apache.spark.rdd.Timer.time(Timer.scala:48)
	at org.bdgenomics.adam.rdd.ADAMContext.loadVcf(ADAMContext.scala:2005)
	at org.bdgenomics.adam.rdd.ADAMContext$$anonfun$loadVariants$1.apply(ADAMContext.scala:2776)
	at org.bdgenomics.adam.rdd.ADAMContext$$anonfun$loadVariants$1.apply(ADAMContext.scala:2774)
	at scala.Option.fold(Option.scala:158)
	at org.apache.spark.rdd.Timer.time(Timer.scala:48)
	at org.bdgenomics.adam.rdd.ADAMContext.loadVariants(ADAMContext.scala:2772)
	at $line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:30)

But if I use a scala Array, no NPE.

val vcfFiles = Array("./adam-core/src/test/resources/small.vcf", "./adam-core/src/test/resources/random.vcf")
@heuermh

This comment has been minimized.

Show comment
Hide comment
@heuermh

heuermh Sep 6, 2017

Member

I'm thinking it has something to do with call ordering and implicits. At the sc.parallelize call the context is not yet an ADAMContext then later in loadVariants the reference to the SparkContext hasn't been set properly.

Starting with this may work

scala> import org.bdgenomics.adam.rdd.ADAMContext
import org.bdgenomics.adam.rdd.ADAMContext

scala> val ac = new ADAMContext(sc)
ac: org.bdgenomics.adam.rdd.ADAMContext = org.bdgenomics.adam.rdd.ADAMContext@4257163f
Member

heuermh commented Sep 6, 2017

I'm thinking it has something to do with call ordering and implicits. At the sc.parallelize call the context is not yet an ADAMContext then later in loadVariants the reference to the SparkContext hasn't been set properly.

Starting with this may work

scala> import org.bdgenomics.adam.rdd.ADAMContext
import org.bdgenomics.adam.rdd.ADAMContext

scala> val ac = new ADAMContext(sc)
ac: org.bdgenomics.adam.rdd.ADAMContext = org.bdgenomics.adam.rdd.ADAMContext@4257163f
@fnothaft

This comment has been minimized.

Show comment
Hide comment
@fnothaft

fnothaft Sep 6, 2017

Member

You can't make SparkContext/RDD calls inside of a closure that's operating on top of an RDD; the Array approach is correct. I'd suggest closing as won't fix.

Member

fnothaft commented Sep 6, 2017

You can't make SparkContext/RDD calls inside of a closure that's operating on top of an RDD; the Array approach is correct. I'd suggest closing as won't fix.

@pbashyal-nmdp

This comment has been minimized.

Show comment
Hide comment
@pbashyal-nmdp

pbashyal-nmdp Sep 7, 2017

Contributor

Thanks for the explanation. In my case, I can fix this by vcfFiles.collect().map.

Contributor

pbashyal-nmdp commented Sep 7, 2017

Thanks for the explanation. In my case, I can fix this by vcfFiles.collect().map.

@heuermh heuermh added this to the 0.23.0 milestone Dec 7, 2017

@heuermh heuermh added this to Completed in Release 0.23.0 Jan 4, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment