Skip to content

Commit

Permalink
Clean up DeflateCompressor after exception (#87163) (#87222)
Browse files Browse the repository at this point in the history
We must reset the thread-local buffer in `DeflateCompressor` whether the
compression succeeds or fails.

Closes #87160
  • Loading branch information
DaveCTurner committed May 30, 2022
1 parent ddd8378 commit 61d51e5
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 15 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/87163.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 87163
summary: Clean up `DeflateCompressor` after exception
area: Infra/Core
type: bug
issues:
- 87160
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,17 @@ public void close() throws IOException {
@Override
public BytesReference uncompress(BytesReference bytesReference) throws IOException {
final BytesStreamOutput buffer = baos.get();
final Inflater inflater = inflaterRef.get();
try (InflaterOutputStream ios = new InflaterOutputStream(buffer, inflater)) {
bytesReference.slice(HEADER.length, bytesReference.length() - HEADER.length).writeTo(ios);
try {
final Inflater inflater = inflaterRef.get();
try (InflaterOutputStream ios = new InflaterOutputStream(buffer, inflater)) {
bytesReference.slice(HEADER.length, bytesReference.length() - HEADER.length).writeTo(ios);
} finally {
inflater.reset();
}
return buffer.copyBytes();
} finally {
inflater.reset();
buffer.reset();
}
final BytesReference res = buffer.copyBytes();
buffer.reset();
return res;
}

// Reusable Deflater reference. Note: This is a separate instance from the one used for the compressing stream wrapper because we
Expand All @@ -225,15 +227,17 @@ public BytesReference uncompress(BytesReference bytesReference) throws IOExcepti
@Override
public BytesReference compress(BytesReference bytesReference) throws IOException {
final BytesStreamOutput buffer = baos.get();
buffer.write(HEADER);
final Deflater deflater = deflaterRef.get();
try (DeflaterOutputStream dos = new DeflaterOutputStream(buffer, deflater, true)) {
bytesReference.writeTo(dos);
try {
buffer.write(HEADER);
final Deflater deflater = deflaterRef.get();
try (DeflaterOutputStream dos = new DeflaterOutputStream(buffer, deflater, true)) {
bytesReference.writeTo(dos);
} finally {
deflater.reset();
}
return buffer.copyBytes();
} finally {
deflater.reset();
buffer.reset();
}
final BytesReference res = buffer.copyBytes();
buffer.reset();
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.apache.lucene.tests.util.LineFileDocs;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.test.ESTestCase;

import java.io.ByteArrayInputStream;
Expand All @@ -20,6 +22,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.zip.ZipException;

/**
* Test streaming compression (e.g. used for recovery)
Expand Down Expand Up @@ -361,6 +364,43 @@ public void run() {
}
}

public void testCompressUncompressWithCorruptions() throws Exception {
final Random r = random();
for (int i = 0; i < 10; i++) {
byte[] bytes = new byte[TestUtil.nextInt(r, 1, 100000)];
r.nextBytes(bytes);
final var offset = between(0, bytes.length - 1);
final var length = between(0, bytes.length - offset);
final var original = new BytesArray(bytes, offset, length);
final var compressed = compressor.compress(original);

if (randomBoolean()) {
var corruptIndex = between(0, compressed.length() - 1);
BytesRef bytesRef;
final var iterator = compressed.iterator();
while ((bytesRef = iterator.next()) != null) {
if (corruptIndex < bytesRef.length) {
bytesRef.bytes[bytesRef.offset + corruptIndex] = randomValueOtherThan(
bytesRef.bytes[bytesRef.offset + corruptIndex],
() -> (byte) (r.nextInt() & 0xff)
);
break;
} else {
corruptIndex -= bytesRef.length;
}
}
try {
compressor.uncompress(compressed);
} catch (ZipException e) {
// ok
}
} else {
var uncompressed = compressor.uncompress(compressed);
assertEquals(original, uncompressed);
}
}
}

private void doTest(byte bytes[]) throws IOException {
InputStream rawIn = new ByteArrayInputStream(bytes);
Compressor c = compressor;
Expand Down

0 comments on commit 61d51e5

Please sign in to comment.