From cf25277e36e481b1eb038d4197c06aaf3cb7f166 Mon Sep 17 00:00:00 2001 From: Vamsi Date: Fri, 22 May 2026 17:35:30 +0530 Subject: [PATCH 1/4] fix(common): reuse Inflater/Deflater in BitCaskDiskMap to avoid JDK8 finalizer contention MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CompressionHandler currently allocates a new Deflater on every compressBytes() and a new Inflater on every decompressBytes(). On JDK 8 both classes register a Finalizer on construction. Under sustained, multi-threaded disk-map traffic (observed during MDT/RLI compaction merging millions of records across several Spark task threads on the same executor), the rate of zlib allocations exceeds the rate at which the single Finalizer thread can drain its queue. Native ZStreamRef handles pile up in old gen, the heap saturates, and G1 enters a mixed-GC death spiral while application threads make no progress. CompressionHandler is already held in a ThreadLocal, so a single Deflater/ Inflater pair per worker thread is sufficient. This change: - Adds transient Deflater/Inflater fields and lazy accessors (transient so the class remains Serializable; lazy so deserialized instances rebuild the codecs on first use). - Calls reset() on the cached codecs at the start of each call. - Passes the user-supplied codecs to DeflaterOutputStream(out, def) and InflaterInputStream(in, inf), which sets usesDefaultDeflater/Inflater to false so close() does not call end() on the codec — the codec survives the try-with-resources for reuse on the next call. On-disk format, compression level, and error semantics are unchanged. Allocation rate drops from O(records) to O(threads). On JDK 9+ this also removes per-call Cleaner registration overhead. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../util/collection/BitCaskDiskMap.java | 43 ++++++++++++++++--- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java index e7b298dc2b8d4..6fe689a704d33 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java @@ -58,6 +58,7 @@ import java.util.stream.Stream; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; +import java.util.zip.Inflater; import java.util.zip.InflaterInputStream; import static org.apache.hudi.common.util.BinaryUtil.generateChecksum; @@ -408,6 +409,14 @@ private static class CompressionHandler implements Serializable { private final ByteArrayOutputStream compressBaos; private final ByteArrayOutputStream decompressBaos; private final byte[] decompressIntermediateBuffer; + // Reusable zlib engines. Each CompressionHandler is held in a ThreadLocal, + // so a single Deflater/Inflater pair per worker thread is sufficient and + // avoids per-call construction. On JDK 8 every new Deflater/Inflater + // registers a finalizer; under concurrent disk-map traffic the Finalizer + // thread cannot drain the queue, pinning native zlib handles in old gen + // and driving the JVM into a GC death spiral. + private transient Deflater deflater; + private transient Inflater inflater; CompressionHandler() { compressBaos = new ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE); @@ -415,22 +424,42 @@ private static class CompressionHandler implements Serializable { decompressIntermediateBuffer = new byte[DECOMPRESS_INTERMEDIATE_BUFFER_SIZE]; } + // Lazy accessors so the handler stays usable after Java deserialization + // (transient fields come back null). + private Deflater deflater() { + if (deflater == null) { + deflater = new Deflater(Deflater.BEST_COMPRESSION); + } + return deflater; + } + + private Inflater inflater() { + if (inflater == null) { + inflater = new Inflater(); + } + return inflater; + } + private byte[] compressBytes(final byte[] value) throws IOException { compressBaos.reset(); - Deflater deflater = new Deflater(Deflater.BEST_COMPRESSION); - DeflaterOutputStream dos = new DeflaterOutputStream(compressBaos, deflater); - try { + Deflater def = deflater(); + def.reset(); + // Passing our own Deflater puts DeflaterOutputStream in usesDefaultDeflater=false + // mode, so close() will finish() the stream but will NOT call def.end() — + // exactly what we want for reuse. + try (DeflaterOutputStream dos = new DeflaterOutputStream(compressBaos, def)) { dos.write(value); - } finally { - dos.close(); - deflater.end(); } return compressBaos.toByteArray(); } private byte[] decompressBytes(final byte[] bytes) throws IOException { decompressBaos.reset(); - try (InputStream in = new InflaterInputStream(new ByteArrayInputStream(bytes))) { + Inflater inf = inflater(); + inf.reset(); + // Passing our own Inflater puts InflaterInputStream in usesDefaultInflater=false + // mode, so close() will not call inf.end() — exactly what we want for reuse. + try (InputStream in = new InflaterInputStream(new ByteArrayInputStream(bytes), inf)) { int len; while ((len = in.read(decompressIntermediateBuffer)) > 0) { decompressBaos.write(decompressIntermediateBuffer, 0, len); From 1b8a581c2343f63033dac11833b1fcf979272c0d Mon Sep 17 00:00:00 2001 From: Vamsi Date: Fri, 22 May 2026 20:17:17 +0530 Subject: [PATCH 2/4] remove extra comments Co-Authored-By: Claude Opus 4.7 (1M context) --- .../common/util/collection/BitCaskDiskMap.java | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java index 6fe689a704d33..8d56be3d4b447 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java @@ -409,12 +409,9 @@ private static class CompressionHandler implements Serializable { private final ByteArrayOutputStream compressBaos; private final ByteArrayOutputStream decompressBaos; private final byte[] decompressIntermediateBuffer; - // Reusable zlib engines. Each CompressionHandler is held in a ThreadLocal, + // Each CompressionHandler is held in a ThreadLocal, // so a single Deflater/Inflater pair per worker thread is sufficient and - // avoids per-call construction. On JDK 8 every new Deflater/Inflater - // registers a finalizer; under concurrent disk-map traffic the Finalizer - // thread cannot drain the queue, pinning native zlib handles in old gen - // and driving the JVM into a GC death spiral. + // avoids per-call construction. private transient Deflater deflater; private transient Inflater inflater; @@ -424,8 +421,6 @@ private static class CompressionHandler implements Serializable { decompressIntermediateBuffer = new byte[DECOMPRESS_INTERMEDIATE_BUFFER_SIZE]; } - // Lazy accessors so the handler stays usable after Java deserialization - // (transient fields come back null). private Deflater deflater() { if (deflater == null) { deflater = new Deflater(Deflater.BEST_COMPRESSION); @@ -444,9 +439,6 @@ private byte[] compressBytes(final byte[] value) throws IOException { compressBaos.reset(); Deflater def = deflater(); def.reset(); - // Passing our own Deflater puts DeflaterOutputStream in usesDefaultDeflater=false - // mode, so close() will finish() the stream but will NOT call def.end() — - // exactly what we want for reuse. try (DeflaterOutputStream dos = new DeflaterOutputStream(compressBaos, def)) { dos.write(value); } @@ -457,8 +449,6 @@ private byte[] decompressBytes(final byte[] bytes) throws IOException { decompressBaos.reset(); Inflater inf = inflater(); inf.reset(); - // Passing our own Inflater puts InflaterInputStream in usesDefaultInflater=false - // mode, so close() will not call inf.end() — exactly what we want for reuse. try (InputStream in = new InflaterInputStream(new ByteArrayInputStream(bytes), inf)) { int len; while ((len = in.read(decompressIntermediateBuffer)) > 0) { From 89c34d3ccf1e593159c6f04e9ee25fc6cd231add Mon Sep 17 00:00:00 2001 From: Vamsi Date: Sat, 23 May 2026 12:54:13 +0530 Subject: [PATCH 3/4] address comments --- .../common/util/collection/BitCaskDiskMap.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java index 8d56be3d4b447..4f3e315ff239e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java @@ -409,7 +409,7 @@ private static class CompressionHandler implements Serializable { private final ByteArrayOutputStream compressBaos; private final ByteArrayOutputStream decompressBaos; private final byte[] decompressIntermediateBuffer; - // Each CompressionHandler is held in a ThreadLocal, + // ENG-41975 - Each CompressionHandler is held in a ThreadLocal, // so a single Deflater/Inflater pair per worker thread is sufficient and // avoids per-call construction. private transient Deflater deflater; @@ -421,14 +421,14 @@ private static class CompressionHandler implements Serializable { decompressIntermediateBuffer = new byte[DECOMPRESS_INTERMEDIATE_BUFFER_SIZE]; } - private Deflater deflater() { + private Deflater getDeflater() { if (deflater == null) { deflater = new Deflater(Deflater.BEST_COMPRESSION); } return deflater; } - private Inflater inflater() { + private Inflater getInflater() { if (inflater == null) { inflater = new Inflater(); } @@ -437,9 +437,9 @@ private Inflater inflater() { private byte[] compressBytes(final byte[] value) throws IOException { compressBaos.reset(); - Deflater def = deflater(); - def.reset(); - try (DeflaterOutputStream dos = new DeflaterOutputStream(compressBaos, def)) { + Deflater deflater = getDeflater(); + deflater.reset(); + try (DeflaterOutputStream dos = new DeflaterOutputStream(compressBaos, deflater)) { dos.write(value); } return compressBaos.toByteArray(); @@ -447,9 +447,9 @@ private byte[] compressBytes(final byte[] value) throws IOException { private byte[] decompressBytes(final byte[] bytes) throws IOException { decompressBaos.reset(); - Inflater inf = inflater(); - inf.reset(); - try (InputStream in = new InflaterInputStream(new ByteArrayInputStream(bytes), inf)) { + Inflater inflater = getInflater(); + inflater.reset(); + try (InputStream in = new InflaterInputStream(new ByteArrayInputStream(bytes), inflater)) { int len; while ((len = in.read(decompressIntermediateBuffer)) > 0) { decompressBaos.write(decompressIntermediateBuffer, 0, len); From 709a39b70d0ec09886c64e02d241dfb275d02728 Mon Sep 17 00:00:00 2001 From: Vamsi Date: Mon, 25 May 2026 11:37:06 +0530 Subject: [PATCH 4/4] address nits --- .../org/apache/hudi/common/util/collection/BitCaskDiskMap.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java index 4f3e315ff239e..fb22c9869ae26 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java @@ -409,7 +409,7 @@ private static class CompressionHandler implements Serializable { private final ByteArrayOutputStream compressBaos; private final ByteArrayOutputStream decompressBaos; private final byte[] decompressIntermediateBuffer; - // ENG-41975 - Each CompressionHandler is held in a ThreadLocal, + // Each CompressionHandler is held in a ThreadLocal, // so a single Deflater/Inflater pair per worker thread is sufficient and // avoids per-call construction. private transient Deflater deflater;