configureCompression only creates compressor once (fixes compression problems with Deflate and Snappy) #352
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Since 0.8.5 I noticed that compression was broken when using Avro but was never bothered enough to look into it. With the latest 0.9.0 release I finally took some time and found the issue. The problem manifests it as either an OutOfMemoryError or yarn killing tasks for going "beyond memory limits" when using Deflate or Snappy and isn't specific to Avro, only noticed because Avro silently changes GZip to Deflate.
This stacktrace from explicitly using Snappy is what helped me. Basically the issue is that
DataSink.configureCompression
is being called for every emit from EvnDoFn. It creates a compressor to test that the settings are working and then promptly discards it. The problem with this is that the Deflate and Snappy compressors create off-heap buffers usingByteBuffer.allocateDirect
and this memory does not get GC'd as you'd expect. So for each emit you get a 64kb (for Snappy, not sure the buffer size for Deflate) memory leak.Anyway, so I added a simple check in DataSink so that configureCompression only actually acts once and things seem to work now. I haven't run a full regression but this is a pretty mild change.
By the way, this would have been much easier to diagnose because configureCompression has a debug log in it and you'd see in the logs thousands of log messages, but I couldn't get debug output working in the latest build. No matter what I tried I either got default logging or no logging.