diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml index ee79ba927a3c1..4c59eeb45a6b2 100644 --- a/hadoop-common-project/hadoop-common/pom.xml +++ b/hadoop-common-project/hadoop-common/pom.xml @@ -160,6 +160,11 @@ junit test + + org.assertj + assertj-core + test + commons-beanutils commons-beanutils diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/CompressDecompressTester.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/CompressDecompressTester.java index 35f84b950e427..8be2dce06d1fe 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/CompressDecompressTester.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/CompressDecompressTester.java @@ -126,7 +126,7 @@ private void addPair(T compressor, E decompressor, String name) { builder.add(new TesterPair(name, compressor, decompressor)); } - public void test() throws InstantiationException, IllegalAccessException { + public void test() throws Exception { pairs = builder.build(); pairs = assertionDelegate.filterOnAssumeWhat(pairs); @@ -287,47 +287,45 @@ private boolean checkSetInputArrayIndexOutOfBoundsException( @Override public void assertCompression(String name, Compressor compressor, - Decompressor decompressor, byte[] rawData) { + Decompressor decompressor, byte[] rawData) throws Exception { int cSize = 0; int decompressedSize = 0; - byte[] compressedResult = new byte[rawData.length]; + // Snappy compression can increase data size + int maxCompressedLength = 32 + rawData.length + rawData.length/6; + byte[] compressedResult = new byte[maxCompressedLength]; byte[] decompressedBytes = new byte[rawData.length]; - try { - assertTrue( - joiner.join(name, "compressor.needsInput before error !!!"), - compressor.needsInput()); - assertTrue( + assertTrue( + joiner.join(name, "compressor.needsInput before error !!!"), + compressor.needsInput()); + assertEquals( joiner.join(name, "compressor.getBytesWritten before error !!!"), - compressor.getBytesWritten() == 0); - compressor.setInput(rawData, 0, rawData.length); - compressor.finish(); - while (!compressor.finished()) { - cSize += compressor.compress(compressedResult, 0, - compressedResult.length); - } - compressor.reset(); - - assertTrue( - joiner.join(name, "decompressor.needsInput() before error !!!"), - decompressor.needsInput()); - decompressor.setInput(compressedResult, 0, cSize); - assertFalse( - joiner.join(name, "decompressor.needsInput() after error !!!"), - decompressor.needsInput()); - while (!decompressor.finished()) { - decompressedSize = decompressor.decompress(decompressedBytes, 0, - decompressedBytes.length); - } - decompressor.reset(); - assertTrue(joiner.join(name, " byte size not equals error !!!"), - decompressedSize == rawData.length); - assertArrayEquals( - joiner.join(name, " byte arrays not equals error !!!"), rawData, - decompressedBytes); - } catch (Exception ex) { - fail(joiner.join(name, ex.getMessage())); + 0, compressor.getBytesWritten()); + compressor.setInput(rawData, 0, rawData.length); + compressor.finish(); + while (!compressor.finished()) { + cSize += compressor.compress(compressedResult, 0, + compressedResult.length); + } + compressor.reset(); + + assertTrue( + joiner.join(name, "decompressor.needsInput() before error !!!"), + decompressor.needsInput()); + decompressor.setInput(compressedResult, 0, cSize); + assertFalse( + joiner.join(name, "decompressor.needsInput() after error !!!"), + decompressor.needsInput()); + while (!decompressor.finished()) { + decompressedSize = decompressor.decompress(decompressedBytes, 0, + decompressedBytes.length); } + decompressor.reset(); + assertEquals(joiner.join(name, " byte size not equals error !!!"), + rawData.length, decompressedSize); + assertArrayEquals( + joiner.join(name, " byte arrays not equals error !!!"), rawData, + decompressedBytes); } }), @@ -519,6 +517,6 @@ abstract static class TesterCompressionStrategy { protected final Logger logger = Logger.getLogger(getClass()); abstract void assertCompression(String name, Compressor compressor, - Decompressor decompressor, byte[] originalRawData); + Decompressor decompressor, byte[] originalRawData) throws Exception; } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCompressorDecompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCompressorDecompressor.java index cc986c7e0aea4..c8900bad1df56 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCompressorDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCompressorDecompressor.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.io.compress.snappy; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -44,11 +45,16 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.junit.Assume.*; public class TestSnappyCompressorDecompressor { + public static final Logger LOG = + LoggerFactory.getLogger(TestSnappyCompressorDecompressor.class); + @Before public void before() { assumeTrue(SnappyCodec.isNativeCodeLoaded()); @@ -167,40 +173,41 @@ public void testSnappyDecompressorCompressAIOBException() { } @Test - public void testSnappyCompressDecompress() { + public void testSnappyCompressDecompress() throws Exception { int BYTE_SIZE = 1024 * 54; byte[] bytes = BytesGenerator.get(BYTE_SIZE); SnappyCompressor compressor = new SnappyCompressor(); - try { - compressor.setInput(bytes, 0, bytes.length); - assertTrue("SnappyCompressDecompress getBytesRead error !!!", - compressor.getBytesRead() > 0); - assertTrue( - "SnappyCompressDecompress getBytesWritten before compress error !!!", - compressor.getBytesWritten() == 0); - - byte[] compressed = new byte[BYTE_SIZE]; - int cSize = compressor.compress(compressed, 0, compressed.length); - assertTrue( - "SnappyCompressDecompress getBytesWritten after compress error !!!", - compressor.getBytesWritten() > 0); - - SnappyDecompressor decompressor = new SnappyDecompressor(BYTE_SIZE); - // set as input for decompressor only compressed data indicated with cSize - decompressor.setInput(compressed, 0, cSize); - byte[] decompressed = new byte[BYTE_SIZE]; - decompressor.decompress(decompressed, 0, decompressed.length); - - assertTrue("testSnappyCompressDecompress finished error !!!", - decompressor.finished()); - Assert.assertArrayEquals(bytes, decompressed); - compressor.reset(); - decompressor.reset(); - assertTrue("decompressor getRemaining error !!!", - decompressor.getRemaining() == 0); - } catch (Exception e) { - fail("testSnappyCompressDecompress ex error!!!"); - } + compressor.setInput(bytes, 0, bytes.length); + assertTrue("SnappyCompressDecompress getBytesRead error !!!", + compressor.getBytesRead() > 0); + assertEquals( + "SnappyCompressDecompress getBytesWritten before compress error !!!", + 0, compressor.getBytesWritten()); + + // snappy compression may increase data size. + // This calculation comes from "Snappy::MaxCompressedLength(size_t)" + int maxSize = 32 + BYTE_SIZE + BYTE_SIZE / 6; + byte[] compressed = new byte[maxSize]; + int cSize = compressor.compress(compressed, 0, compressed.length); + LOG.info("input size: {}", BYTE_SIZE); + LOG.info("compressed size: {}", cSize); + assertTrue( + "SnappyCompressDecompress getBytesWritten after compress error !!!", + compressor.getBytesWritten() > 0); + + SnappyDecompressor decompressor = new SnappyDecompressor(); + // set as input for decompressor only compressed data indicated with cSize + decompressor.setInput(compressed, 0, cSize); + byte[] decompressed = new byte[BYTE_SIZE]; + decompressor.decompress(decompressed, 0, decompressed.length); + + assertTrue("testSnappyCompressDecompress finished error !!!", + decompressor.finished()); + Assert.assertArrayEquals(bytes, decompressed); + compressor.reset(); + decompressor.reset(); + assertEquals("decompressor getRemaining error !!!", + 0, decompressor.getRemaining()); } @Test @@ -278,7 +285,38 @@ public void testSnappyBlockCompression() { fail("testSnappyBlockCompression ex error !!!"); } } - + + @Test + // The buffer size is smaller than the input. + public void testSnappyCompressDecompressWithSmallBuffer() throws Exception { + int inputSize = 1024 * 50; + int bufferSize = 512; + ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte[] buffer = new byte[bufferSize]; + byte[] input = BytesGenerator.get(inputSize); + + SnappyCompressor compressor = new SnappyCompressor(); + compressor.setInput(input, 0, inputSize); + compressor.finish(); + while (!compressor.finished()) { + int len = compressor.compress(buffer, 0, buffer.length); + out.write(buffer, 0, len); + } + byte[] compressed = out.toByteArray(); + assertThat(compressed).hasSizeGreaterThan(0); + out.reset(); + + SnappyDecompressor decompressor = new SnappyDecompressor(); + decompressor.setInput(compressed, 0, compressed.length); + while (!decompressor.finished()) { + int len = decompressor.decompress(buffer, 0, buffer.length); + out.write(buffer, 0, len); + } + byte[] decompressed = out.toByteArray(); + + assertThat(decompressed).isEqualTo(input); + } + private void compressDecompressLoop(int rawDataSize) throws IOException { byte[] rawData = BytesGenerator.get(rawDataSize); byte[] compressedResult = new byte[rawDataSize+20]; diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index ed1ad44fa625b..08d2e820bd6a6 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -184,6 +184,7 @@ 1.2.6 2.0.0-beta-1 4.13.2 + 3.12.2 5.3.0 2.4.2 9.8.1 @@ -1492,7 +1493,7 @@ org.assertj assertj-core - 3.8.0 + ${assertj.version} test