Skip to content
This repository has been archived by the owner on Dec 20, 2018. It is now read-only.

Feat no union #95

Closed

Conversation

koertkuipers
Copy link
Contributor

I am not entirely sure why spark-avro creates a RDD for every input path and then merges them together with UnionRDD. Perhaps there is a compelling reason for this, and if so then this pullreq is probably misguided.

We noticed that when reading using globs like /data/* that many RDDs were created and for each the Configuration object would get broadcast. In one situation this would go on for 30 mins or so (or maybe longer, i killed it).

So given that AvroInputFormat already has excellent build-in support for globs and multiple paths, why not use it, and lose the UnionRDD. Thats the basic idea of this pullreq.

See also here:
https://www.mail-archive.com/user@spark.apache.org/msg39393.html

@JoshRosen
Copy link
Contributor

I think that we ran into similar issues in other parts of Spark: https://issues.apache.org/jira/browse/SPARK-7410. In that issue, I think that the ultimate resolution was to provide an API where we can create the configuration broadcast once then pass it into the individual RDDs. See apache/spark#6252.

I also forget why we need the union, but maybe those links would help figure it out.

@codecov-io
Copy link

Current coverage is 93.68%

Merging #95 into master will increase coverage by +0.23% as of 8ba4b3f

@@            master     #95   diff @@
======================================
  Files            6       6       
  Stmts          275     269     -6
  Branches        45      40     -5
  Methods          0       0       
======================================
- Hit            257     252     -5
  Partial          0       0       
+ Missed          18      17     -1

Review entire Coverage Diff as of 8ba4b3f

Powered by Codecov. Updated on successful CI builds.

@koertkuipers
Copy link
Contributor Author

could the issue be if different paths contain avro files with different schemas?

if so, can be fixed i think, by forcing avro to output all of them with same schema (and while we do that, why not also force it to just output the required columns, using avro's build in projection).

something like:
read schema from first file
project only required columns
job.set(AvroJob.INPUT_SCHEMA, schema.toString)

@koertkuipers
Copy link
Contributor Author

i think i do have an issue with different avro files having different schemas currently with this aproach. i will do more investigation, maybe a unit test with avro files that differ in schema.

i also just realized that the UnionRDD approach does not just create an RDD per input path.... it creates one per input file (so part file)! why would that be? that can easily mean thousands of RDDs

@JoshRosen
Copy link
Contributor

@koertkuipers, if you write a unit test for multiple part files then I'd be glad to commit it. PRs that only add new tests are always my favorite.

@koertkuipers
Copy link
Contributor Author

i did some more testing, and the good news is that HadoopFsRelation reads every partition separately. this is good news because i would say its safe to assume all files in a partition have the same schema, making a union unnecessary from a schema perspective.

the bad news is that currently it does indeed create and RDD per part file (and broadcast a configuration). since its not unusual to have 10k part files in a large dataset this means 10k RDDs. that doesnt sound like a good idea to me and explains why i saw it hang for 30 mins.

@jaley
Copy link
Contributor

jaley commented Oct 28, 2015

I think we're running into problems relating to this.

We have an application that merges Avro files into larger Parquet files. This app will regularly merge many thousands of input Avro files. The result is that we see a huge amount of heap space being lost to the broadcast manager, apparently containing Hadoop configuration. It seems to be several MBs of data retained for every file processed.

Making things worse, I believe, is that that memory is never recovered, as data that has been broadcast remains in the driver application until it closes? The result is that we find if we try to merge "too much" Avro data in one run, our driver application gets killed by YARN for running beyond memory limits. Something about the way the broadcast manager is holding onto that data seems to cause the usual Java OOM exceptions not to occur.

@koertkuipers
Copy link
Contributor Author

i think i found a way to deal with different avro files having different schemas (without creating an rdd per part file).

in avro you can simply pass in a schema when you read, and avro makes sure all data will be converted to that schema. you can also use this to do projections, which comes in handy with requiredColumns.

so i extract avro schema from a single part file, select only the required columns, and pass that into AvroInputFormat as the reader schema. now i only get out the requiredColumns, and all data has consistent types across part files.

@koertkuipers
Copy link
Contributor Author

@JoshRosen if you wanted to go the other way (which i dont see a compelling reason for) and keep the union of rdds without doing the config broadcast per rdd, then i believe the main blocker is the fact that SerializableConfiguration is not public or developerapi in spark.

@boosh
Copy link

boosh commented Nov 24, 2015

👍 from me. I've been battling this all afternoon and have only just discovered that this is the issue.

@pravilla
Copy link

👍

Can we also specify a schema file while reading multiple avro files? Something like

val df = sqlContext.read
.format("com.databricks.spark.avro")
.schema("src/test/resources/episodes.avsc")
.load("src/test/resources/episodes.avro", "src/test/resources/episodes1.avro")

@koertkuipers
Copy link
Contributor Author

i think specifying a data file to read schema from instead of using first data file from first (or last) partition is doable, yes

@ksindi
Copy link

ksindi commented Jan 31, 2016

@koertkuipers is there a way around this issue using PySpark 1.6?

@koertkuipers
Copy link
Contributor Author

sorry i am not very familiar with pySpark. can you use a version of
spark-avro you build yourself with pySpark?

On Sun, Jan 31, 2016 at 5:06 PM, Kamil Sindi notifications@github.com
wrote:

@koertkuipers https://github.com/koertkuipers is there currently a way
around this issue using PySpark 1.6?


Reply to this email directly or view it on GitHub
#95 (comment).

@ksindi
Copy link

ksindi commented Jan 31, 2016

I guess I can build your PR and use that?

@koertkuipers
Copy link
Contributor Author

yes thats we do. we maintain an inhouse version that includes this PR

On Sun, Jan 31, 2016 at 5:15 PM, Kamil Sindi notifications@github.com
wrote:

I guess I can build your PR and use that?


Reply to this email directly or view it on GitHub
#95 (comment).

@ksindi
Copy link

ksindi commented Feb 1, 2016

@koertkuipers thanks for you help. I was able to build and submit your PR. That said, I have to wait for a while for the logs INFO FileInputFormat: Total input paths to process XXXX to complete (processing 1TB across 100K input paths). Curious if you have the same experience.

@arkadiuszbicz
Copy link

Any idea if this solution will be merged into main branch ? Or maybe there is other workaround to this problem ?

@koertkuipers
Copy link
Contributor Author

I don't know.
The workaround is to use older version of spark-avro.
The alternative to this branch is a fix to avoid excessive broadcasting of
job conf (but that only solves part of problem). The api needed to avoid
repeated broadcasting is already in spark but not exposes for use by this
library.
On Feb 2, 2016 08:28, "Arkadiusz Bicz" notifications@github.com wrote:

Any idea if this solution will be merged into main branch ? Or maybe there
is other workaround to this problem ?


Reply to this email directly or view it on GitHub
#95 (comment).

@boosh
Copy link

boosh commented Feb 2, 2016

The workaround we use is to use globs instead of loading lots of paths explicitly.

@koertkuipers
Copy link
Contributor Author

i think using globs avoids the excessive broadcasting but not the merging
of schemas across all data (part) files? how is the performance with that?
it would be good indication of how good the alternative fix (just avoid
excessive broadcasting) would be.

On Tue, Feb 2, 2016 at 10:33 AM, boosh notifications@github.com wrote:

The workaround we use is to use globs instead of loading lots of paths
explicitly.


Reply to this email directly or view it on GitHub
#95 (comment).

@boosh
Copy link

boosh commented Feb 2, 2016

Using a combination of maintaining our own library with your PR in and globs has allowed us to run spark jobs over hundreds or thousands of input paths. I don't have any other metrics other than that the job runs now, where it didn't before.

@ksindi
Copy link

ksindi commented Feb 2, 2016

@boosh I also have hundreds of thousands of input paths. I'm using a jar built from this PR. Do your jobs take a lot of time outputting INFO FileInputFormat: Total input paths to process XXXX? These logs take me ~40min when dealing with 1TB across 100K input paths.

@koertkuipers
Copy link
Contributor Author

kamil,
could this be caused by the issue below? just a guess.
https://issues.apache.org/jira/browse/SPARK-11441

On Tue, Feb 2, 2016 at 11:59 AM, Kamil Sindi notifications@github.com
wrote:

@boosh https://github.com/boosh I also have hundreds of thousands of
input paths. I'm using a jar built from this PR. Does you jobs take a lot
of time outputting INFO FileInputFormat: Total input paths to process XXXX?
These logs take me ~40min when dealing with 1TB across 100K input paths.
Just curious about your experience there.


Reply to this email directly or view it on GitHub
#95 (comment).

@ksindi
Copy link

ksindi commented Feb 2, 2016

@koertkuipers thanks. SPARK-11441 seems like the issue I'm having. Do you have a workaround? I'm building this PR and am able to avoid the broadcasting issue. That said, the job hangs on INFO FileInputFormat: Total input paths to process XXXX for a while before any tasks begin.

@koertkuipers
Copy link
Contributor Author

unfortunately i do not have a workaround. using an older version of
spark-avro that is not based on HadoopFsRelation seems like it could be
your best bet. and plz draw attention to SPARK-1141 (watch it, vote for it,
etc.) so the community realizes the issue.

On Tue, Feb 2, 2016 at 12:13 PM, Kamil Sindi notifications@github.com
wrote:

@koertkuipers https://github.com/koertkuipers thanks. SPARK-11441 seems
like the issue I'm having. Do you have a workaround? I'm building this PR
and am able to avoid the broadcast issue. That said, the job hangs on this INFO
FileInputFormat: Total input paths to process XXXX issue.


Reply to this email directly or view it on GitHub
#95 (comment).

@boosh
Copy link

boosh commented Feb 2, 2016

@ksindi I'm processing hundreds or thousands, not 100K+ paths. I'm not running into the linked issue and I don't recall seeing FileInputFormat: Total input paths to process XXXX, certainly nothing blocking the job for 10s of minutes now.

@robinloxley1
Copy link

I encounter the same issue using pyspark, and 2.0.1 spark-avro.
calling path with glob and all files with same schema.
No idea if it is going to impact performance a lot, and what's the fix.

WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_74857 in memory.

WARN MemoryStore: Not enough space to cache broadcast_74857 in memory! (computed 496.0 B so far)

INFO MemoryStore: Memory use = 4.1 GB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 4.1 GB. Storage limit = 4.1 GB.

WARN MemoryStore: Persisting block broadcast_74857 to disk instead.

INFO MemoryStore: ensureFreeSpace(19816) called with curMem=4445629456, maxMem=4445763010

INFO MemoryStore: Block broadcast_74857_piece0 stored as bytes in memory (estimated size 19.4 KB, free 111.1 KB)

INFO BlockManagerInfo: Added broadcast_74857_piece0 in memory on 10.0.x.x:49958 (size: 19.4 KB, free: 2.8 GB)

INFO SparkContext: Created broadcast 74857 from hadoopFile at AvroRelation.scala:121

74857 is just one, and the number grows incrementally to forever.
Eventually, the program dies and I have to kill it.

Broadcasting consumes a lot driver memory if glob contain many files (in my case 80k+).
For driver memory 8G, I can only process up to 5k files. By increasing driver memory, I can process more without having to kill my program (driver).

petrovg pushed a commit to petrovg/spark-avro that referenced this pull request Feb 18, 2016
@jaley
Copy link
Contributor

jaley commented Feb 19, 2016

Is anyone able to comment on the progress around resolving this issue? I'm assuming that if the problem lie in HadoopFsRelation then it will be dealt with outside spark-avro, but it would really help us out with our dev cycle planning if we had some idea when we can expect to be able to merge 50k+ input hadoop files using Spark.

Right now we're using a fork with this patch. Looking at the reference links above, it seems many others have their own forks of this repo to pull in this patch. We find that when merging one month of data for one of about 200 of our Avro data sets will require reading ~50k inputs. With 2.0.1 this OOMs even with 20GB+ heap space available to the driver for broadcasts, but with this patch the whole job can run no problem using < 100kb of broadcast space.

@Gauravshah
Copy link

this pull also helps to merge two avro files of different schema together. There are other pulls that address this issue: #109 & #113 . But both of them are waiting on this pull request since this solves it in a better way. Unsure how should I be moving ahead, all 3 pull request are in a stuck state. Currently using a fork of this repo too

@tomseddon
Copy link

Not sure which out of #109, #113 and this PR is the best way, but I really need a way to be able to specify a superset schema that can allow processing of older files with optional fields without errors. What can we do to progress this?

@Gauravshah
Copy link

@tomseddon unsure how to take this forward, branch of this one works well, but now has merge conflicts

@JoshRosen
Copy link
Contributor

The 3.x releases of this library are based on Spark 2.0's FileFormat and will therefore no longer use UnionRDD and a separate RDD per file. Given this, I'm going to close this PR since it's not relevant for the 3.x releases. If it's really, really important to support this in Spark 1.x-compatible versions of spark-avro then we should re-target a new PR against branch-2.0 in this repository. Thanks!

@JoshRosen JoshRosen closed this Nov 21, 2016
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.