Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

header challenge: header with multivalued columns raises "java.lang.ArithmeticException: / by zero" during df.select(any_column).show() or df.select(any_column).take() #69

Closed
jacobic opened this issue May 6, 2019 · 5 comments

Comments

@jacobic
Copy link

jacobic commented May 6, 2019

Hi guys just wanted to give some more feedback about my favourite spark package!

I encounter an error reading a fits file with an "exotic header". I assume the issue is due to the columns which contain data arrays. I would expect spark-fits to load multivalued columns as vectors but I think it might be causing bigger problems as I cannot view any columns.

For example when I read the data:

path = 'photoObj-001000-1-0027.fits'
df = sqlc.read.format("fits").option("hdu", 1).load(path)

The following error is thrown when calling:

df.select('OBJID').show()

The header is shown here example.txt and the file itself is zipped here photoObj-001000-1-0027.fits.zip

Before the error the schema is inferred:

Screenshot 2019-05-06 at 19 07 09

Despite this the multivalued columns (e.g. code 5E with shape 5, such as 'MODELMAG') are teated as floats. I would expect them to be them to be treated as vectors. Is this possible?

Screenshot 2019-05-06 at 19 19 04

Screenshot 2019-05-06 at 19 18 40

The error itself occurs after selecting any column (even if it is a regular non-multivalue column) and then applying the .take(n) or .show(n) method:

com.github.astrolabsoftware#spark-fits_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3714d087-ba08-4b46-bb49-9693f86131bb;1.0
	confs: [default]
	found com.github.astrolabsoftware#spark-fits_2.11;0.7.3 in central
:: resolution report :: resolve 183ms :: artifacts dl 3ms
	:: modules in use:
	com.github.astrolabsoftware#spark-fits_2.11;0.7.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-3714d087-ba08-4b46-bb49-9693f86131bb
	confs: [default]
	0 artifacts copied, 1 already retrieved (0kB/5ms)
2019-05-06 19:07:42 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2019-05-06 19:08:28 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
[Stage 0:>                                                          (0 + 1) / 1]2019-05-06 19:08:30 ERROR Executor:91 - Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ArithmeticException: / by zero
	at com.astrolabsoftware.sparkfits.FitsRecordReader.nextKeyValue(FitsRecordReader.scala:318)
	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2019-05-06 19:08:30 WARN  TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ArithmeticException: / by zero
	at com.astrolabsoftware.sparkfits.FitsRecordReader.nextKeyValue(FitsRecordReader.scala:318)
	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Please let me know if you require any additional information or have any questions,
Cheers,
Jacob

@JulienPeloton
Copy link
Member

JulienPeloton commented May 6, 2019

Many thanks @jacobic for reporting the issue, and providing a detailed explanation!
Indeed, spark-fits handles only scalar entries, and there is not yet support for column elements being arrays :-(

I will have a closer look by the end of the week and try to provide a fix quickly, though I suspect it will require some dev.

@JulienPeloton
Copy link
Member

JulienPeloton commented May 9, 2019

Hi @jacobic,

A bit more on this - the error you see is actually not related to the multivalued columns. This error is thrown when the recordLength parameter (default is 1 KB) is lower than the actual size of one line of the FITS table (3 KB here). The recordlength option controls how the data is split and read inside each HDFS block (or more precisely inside each InputSplit as they are not the same) by individual mappers for processing. 1 KB seemed to give good performance (and is large enough for most of the FITS I was using), and for larger value you might suffer from a longer garbage collector time.

Hence, if you do:

# Use 5KB for recordLength
df = spark.read.format("fits")\
  .option("hdu", 1)\
  .option("recordlength", 5*1024)\
  .load("path/to/photoObj-001000-1-0027.fits")

df.show(2)

It does not crash anymore. But that does not mean it works correctly as it takes only the first value in each of the multivalued columns. I am working on a fix for this.

@jacobic
Copy link
Author

jacobic commented May 9, 2019 via email

@JulienPeloton
Copy link
Member

JulienPeloton commented May 9, 2019

Hi @jacobic

By one line, do you mean a single row? If so, does this mean that the large recordlength is due to a large number of columns or the data size (in bytes) for each element in a column?

Yes by one line, I mean a single row whose size is given by the number of columns and the type of objects for each column.

I assume it would not be related to the number of columns because things were lazily evaluated?

When you call show or take, things are no more lazy, and data has to be read. The recordLength is a very low-level object, inherited from the Hadoop I/O classes that Spark is using. The record reader is just loading a chunk of data and decoding it on-the-fly. By default, the record reader decode row-by-row, and it forces the recordLength to be at the very least the size of one row. If not, it fails:

// in FitsRecordReader.scala
// Convert each row
// 1 task: 32 MB @ 2s
val tmp = Seq.newBuilder[Row]
for (i <- 0 to recordLength / rowSizeLong.toInt - 1) {
  tmp += Row.fromSeq(fits.getRow(
      recordValueBytes.slice(
        rowSizeInt*i, rowSizeInt*(i+1))))
}
recordValue = tmp.result

One could be more clever and specify only specific columns to be decoded (parquet does it for example), and this is something that will be added in the future.

In short, how do I find the optimal recordlength for an arbitrary fits file in order to avoid the recordlength exception?

The current value of 1KB has been found completely empirically, through a series of benchmarks and profiling. For the moment, the user needs manually to make sure that the recordLength default option (1 KB) is higher than the row size (from the header).
This is far from optimal, and I will change this in the future such that recordLength is by default either (1 KB if row size is below 1KB) or (the size of the row). The first option is to make sure we process several rows at once if possible (more efficient).

JulienPeloton added a commit that referenced this issue May 10, 2019
Issue 69: Add support for vector elements
@JulienPeloton
Copy link
Member

Fixed in #70!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants