Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

On the multifile problem in spark-fits #54

Open
JulienPeloton opened this issue Oct 18, 2018 · 4 comments
Open

On the multifile problem in spark-fits #54

JulienPeloton opened this issue Oct 18, 2018 · 4 comments
Assignees
Labels
bug Something isn't working enhancement New feature or request IO

Comments

@JulienPeloton
Copy link
Member

The current implementation is not great for reading many files (100+).

Current implementation, and why this is not great.

The way we read and distribute the data from many files is by:

  1. list all the files to read,
  2. for each file create a RDD from the HDU data,
  3. make the union of RDD recursively
// Union if more than one file
for ((file, index) <- fns.slice(1, nFiles).zipWithIndex) {
  rdd = if (implemented) {
    rdd.union(loadOneHDU(file))
  } else {
    rdd.union(loadOneEmpty)
  }
}

While it is very simple and it works great for a small number of files, it completely explodes when the number of files gets big (100+). There are two problems: (1) the RDD lineage gets horribly long (to enforce fault tolerance, the DAG keeps track of every single RDD) and Spark overhead is going to absolutely dominate anything, and (2) for file size smaller than the partition size the union of RDD is creating one (quasi-empty) partition per file hence getting super sub-optimal. Here is the DAG for reading 3 files where you can see how the final RDD is created:

screen shot 2018-10-18 at 07 54 17

Moreover for large number of files (i.e. large number of rdd.union), you are not just optimal, you run into deeper problem

Exception in thread "dag-scheduler-event-loop" java.lang.StackOverflowError
       at java.io.ObjectStreamClass$WeakClassKey.<init>(ObjectStreamClass.java:2505)
       at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:348)
       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
       at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
       at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
       at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
       at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
       at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
       at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

Small fix, and why this is not great either.

Instead of performing multiple RDD unions, one can squeeze into one union of all the RDD in once (changing point c. above).
For that, one can use the method union of the sparkContext directly:

// Union if more than one file
val rdd = if (implemented) {
  sqlContext.sparkContext.union(
    fns.map(file => loadOneHDU(file))
  )
} else {
  sqlContext.sparkContext.union(
    fns.map(file => loadOneEmpty)
  )
}

The DAG gets updated nicely:

screen shot 2018-10-18 at 07 53 59

and I could go up to 1000 files. While I do not encounter the StackOverflowError anymore, for more than 1000 files I got the nasty:

2018-10-18 08:33:30 WARN  TransportChannelHandler:78 - Exception in connection from /134.158.75.162:47942
java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:357)
	at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1884)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2041)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:108)
	at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:271)

Of course! Looking at the DAG, it is obvious that this series of union cannot scale... We need to implement a different approach.

How this could be fixed

We could instead forget about this union strategy, and focus on what is used by other connectors (PartitionedFile).

What should really be done

Rewrite everything for complying with V2... ;-)

@JulienPeloton JulienPeloton added bug Something isn't working enhancement New feature or request IO labels Oct 18, 2018
@JulienPeloton JulienPeloton self-assigned this Oct 18, 2018
@JulienPeloton JulienPeloton changed the title On the multifiles problem in spark-fits On the multifile problem in spark-fits Oct 18, 2018
@JulienPeloton
Copy link
Member Author

Problem solved in #55 ! For the record, after the fix, the DAG is:

screen shot 2018-10-18 at 14 49 06

No more explicit union! Under the hood it uses PartitionedFile but the only things I had to do was to tell Spark directly that I have many files... (and not creating RDD one-by-one + union...).

@JulienPeloton
Copy link
Member Author

Still need to understand the error when reading >> 10000 files at once... Keeping this issue open.

@ptallada
Copy link

Hi!

Is there any fix yet for reading > 10000 files yet?

Thanks!

@JulienPeloton
Copy link
Member Author

Hi @ptallada,

Unfortunately there has been no work on this recently - and I have little time these days.
Eventually, this would be solved when spark-fits moves to Spark Datasource V2 (stalled work for the moment).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working enhancement New feature or request IO
Projects
None yet
Development

No branches or pull requests

2 participants