From 905e513612142a934bde78e54bf8a311c25a3107 Mon Sep 17 00:00:00 2001 From: "wenlong.lwl" Date: Wed, 22 Mar 2017 09:48:42 +0800 Subject: [PATCH 1/9] Fix bug in ByteArrayOutputStreamWithPos --- .../memory/ByteArrayOutputStreamWithPos.java | 2 +- .../ByteArrayOutputStreamWithPosTest.java | 30 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) create mode 100644 flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java index abf65b1830122..0c9c984605de3 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java @@ -110,7 +110,7 @@ public int getPosition() { } public void setPosition(int position) { - Preconditions.checkArgument(position < getEndPosition(), "Position out of bounds."); + Preconditions.checkArgument(position <= getEndPosition(), "Position out of bounds."); count = position; } diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java new file mode 100644 index 0000000000000..2ceefe16ce890 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.core.memory; + +import org.junit.Test; + +public class ByteArrayOutputStreamWithPosTest { + @Test + public void setPositionWhenBufferIsFull() throws Exception { + ByteArrayOutputStreamWithPos stream = new ByteArrayOutputStreamWithPos(); + stream.write(new byte[64]); + stream.setPosition(stream.getPosition()); + } + +} From 09cc96adf52f78052bb3555f2ccda0d860da4895 Mon Sep 17 00:00:00 2001 From: "wenlong.lwl" Date: Wed, 22 Mar 2017 14:30:59 +0800 Subject: [PATCH 2/9] fix typo --- .../flink/core/memory/ByteArrayOutputStreamWithPosTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java index 2ceefe16ce890..6511dbf3847a9 100644 --- a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.core.memory; import org.junit.Test; From 69fb3c372a77b092a9f65a6c4c4d37beff54da08 Mon Sep 17 00:00:00 2001 From: "wenlong.lwl" Date: Thu, 23 Mar 2017 09:47:05 +0800 Subject: [PATCH 3/9] use explicit initial buffer size --- .../flink/core/memory/ByteArrayOutputStreamWithPosTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java index 6511dbf3847a9..b0712493506c2 100644 --- a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java @@ -23,8 +23,8 @@ public class ByteArrayOutputStreamWithPosTest { @Test public void setPositionWhenBufferIsFull() throws Exception { - ByteArrayOutputStreamWithPos stream = new ByteArrayOutputStreamWithPos(); - stream.write(new byte[64]); + ByteArrayOutputStreamWithPos stream = new ByteArrayOutputStreamWithPos(32); + stream.write(new byte[32]); stream.setPosition(stream.getPosition()); } From d7cf66e91bbabdcce3636481a7b498fccc82dd1c Mon Sep 17 00:00:00 2001 From: "wenlong.lwl" Date: Fri, 24 Mar 2017 12:25:14 +0800 Subject: [PATCH 4/9] address comments --- .../ByteArrayOutputStreamWithPosTest.java | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java index b0712493506c2..23fb6b55a9a21 100644 --- a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java @@ -18,14 +18,30 @@ package org.apache.flink.core.memory; +import org.junit.Assert; import org.junit.Test; public class ByteArrayOutputStreamWithPosTest { @Test public void setPositionWhenBufferIsFull() throws Exception { - ByteArrayOutputStreamWithPos stream = new ByteArrayOutputStreamWithPos(32); - stream.write(new byte[32]); - stream.setPosition(stream.getPosition()); + + int initBufferSize = 32; + + ByteArrayOutputStreamWithPos stream = new ByteArrayOutputStreamWithPos(initBufferSize); + + stream.write(new byte[initBufferSize]); + + // check whether the buffer is filled fully + Assert.assertEquals(initBufferSize, stream.getBuf().length); + + // check current position is the end of the buffer + Assert.assertEquals(initBufferSize, stream.getPosition()); + + stream.setPosition(initBufferSize); + + // confirm current position is at where we expects. + Assert.assertEquals(initBufferSize, stream.getPosition()); + } } From cad4845bc9e3fbd3a8ad6eb401f856a9f2e681d2 Mon Sep 17 00:00:00 2001 From: "wenlong.lwl" Date: Fri, 24 Mar 2017 12:27:51 +0800 Subject: [PATCH 5/9] fix typo --- .../flink/core/memory/ByteArrayOutputStreamWithPosTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java index 23fb6b55a9a21..b908e59e7fd97 100644 --- a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java @@ -39,7 +39,7 @@ public void setPositionWhenBufferIsFull() throws Exception { stream.setPosition(initBufferSize); - // confirm current position is at where we expects. + // confirm current position is at where we expect. Assert.assertEquals(initBufferSize, stream.getPosition()); } From 0e4db9ea10a197037d81f0c6f000f70068848ec9 Mon Sep 17 00:00:00 2001 From: "wenlong.lwl" Date: Tue, 28 Mar 2017 14:22:51 +0800 Subject: [PATCH 6/9] add check and test in ByteArrayInputStream --- .../memory/ByteArrayInputStreamWithPos.java | 3 +- .../ByteArrayInputStreamWithPosTest.java | 50 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java index dd381a467b668..8138d398a12e4 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java @@ -118,7 +118,8 @@ public int getPosition() { return position; } - public void setPos(int pos) { + public void setPosition(int pos) { + Preconditions.checkArgument(pos < count, "Position out of bounds."); this.position = pos; } } diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java new file mode 100644 index 0000000000000..a3e2fafd07340 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.memory; + +import org.junit.Assert; +import org.junit.Test; + +public class ByteArrayInputStreamWithPosTest { + + /** + * This tests setting position on a {@link ByteArrayInputStreamWithPos} + */ + @Test + public void testSetPosition() throws Exception { + byte[] data = new byte[] {'0','1','2','3','4','5','6','7','8','9'}; + ByteArrayInputStreamWithPos inputStreamWithPos = new ByteArrayInputStreamWithPos(data); + inputStreamWithPos.setPosition(1); + Assert.assertEquals('1', inputStreamWithPos.read()); + inputStreamWithPos.setPosition(3); + Assert.assertEquals('3', inputStreamWithPos.read()); + } + + /** + * This tests that the expected position exceeds the capacity of the byte array. + */ + @Test(expected = IllegalArgumentException.class) + public void testSetErrorPosition() throws Exception { + byte[] data = new byte[] {'0','1','2','3','4','5','6','7','8','9'}; + ByteArrayInputStreamWithPos inputStreamWithPos = new ByteArrayInputStreamWithPos(data); + inputStreamWithPos.setPosition(data.length); + Assert.fail("Should not reach here !!!!"); + } + +} From 0f62595e49a9d1b032dae39defcec9456682568e Mon Sep 17 00:00:00 2001 From: "wenlong.lwl" Date: Thu, 30 Mar 2017 13:24:00 +0800 Subject: [PATCH 7/9] address comments --- .../memory/ByteArrayInputStreamWithPos.java | 2 +- .../memory/ByteArrayOutputStreamWithPos.java | 7 +- .../ByteArrayInputStreamWithPosTest.java | 24 ++++++- .../ByteArrayOutputStreamWithPosTest.java | 72 ++++++++++++++++++- 4 files changed, 96 insertions(+), 9 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java index 8138d398a12e4..e4fc3e4fbb2b5 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java @@ -119,7 +119,7 @@ public int getPosition() { } public void setPosition(int pos) { - Preconditions.checkArgument(pos < count, "Position out of bounds."); + Preconditions.checkArgument(pos >=0 && pos <= count, "Position out of bounds."); this.position = pos; } } diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java index 0c9c984605de3..545989a2ceb9d 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java @@ -101,16 +101,13 @@ public String toString() { return new String(buffer, 0, count, ConfigConstants.DEFAULT_CHARSET); } - private int getEndPosition() { - return buffer.length; - } - public int getPosition() { return count; } public void setPosition(int position) { - Preconditions.checkArgument(position <= getEndPosition(), "Position out of bounds."); + Preconditions.checkArgument(position >=0, "Position out of bounds."); + ensureCapacity(position + 1); count = position; } diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java index a3e2fafd07340..8b32e8d16c045 100644 --- a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java @@ -19,10 +19,15 @@ package org.apache.flink.core.memory; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; public class ByteArrayInputStreamWithPosTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + /** * This tests setting position on a {@link ByteArrayInputStreamWithPos} */ @@ -39,12 +44,27 @@ public void testSetPosition() throws Exception { /** * This tests that the expected position exceeds the capacity of the byte array. */ - @Test(expected = IllegalArgumentException.class) - public void testSetErrorPosition() throws Exception { + @Test + public void testSetTooLargePosition() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Position out of bounds."); byte[] data = new byte[] {'0','1','2','3','4','5','6','7','8','9'}; ByteArrayInputStreamWithPos inputStreamWithPos = new ByteArrayInputStreamWithPos(data); inputStreamWithPos.setPosition(data.length); Assert.fail("Should not reach here !!!!"); } + /** + * This tests setting a negative position + */ + @Test + public void testSetNegativePosition() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Position out of bounds."); + byte[] data = new byte[] {'0','1','2','3','4','5','6','7','8','9'}; + ByteArrayInputStreamWithPos inputStreamWithPos = new ByteArrayInputStreamWithPos(data); + inputStreamWithPos.setPosition(-1); + Assert.fail("Should not reach here !!!!"); + } + } diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java index b908e59e7fd97..6000c1447a260 100644 --- a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java @@ -18,12 +18,25 @@ package org.apache.flink.core.memory; +import org.apache.flink.configuration.ConfigConstants; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.Arrays; public class ByteArrayOutputStreamWithPosTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + /** + * This tests setting position which is exactly the same with the buffer size. + */ @Test - public void setPositionWhenBufferIsFull() throws Exception { + public void testSetPositionWhenBufferIsFull() throws Exception { int initBufferSize = 32; @@ -44,4 +57,61 @@ public void setPositionWhenBufferIsFull() throws Exception { } + /** + * This tests setting negative position + */ + @Test + public void testSetNegativePosition() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Position out of bounds"); + + int initBufferSize = 32; + + ByteArrayOutputStreamWithPos stream = new ByteArrayOutputStreamWithPos(initBufferSize); + + stream.write(new byte[initBufferSize]); + + stream.setPosition(-1); + + Assert.fail("Should not reach here !!!!"); + } + + /** + * This tests setting position larger than buffer size + */ + @Test + public void testSetPositionLargerThanBufferSize() throws Exception { + int initBufferSize = 32; + + ByteArrayOutputStreamWithPos stream = new ByteArrayOutputStreamWithPos(initBufferSize); + + stream.write(new byte[initBufferSize]); + + Assert.assertEquals(initBufferSize, stream.getBuf().length); + + stream.setPosition(33); + + Assert.assertEquals(64, stream.getBuf().length); + + Assert.assertEquals(33, stream.getPosition()); + } + + /** + * THis tests that toString returns a substring of the buffer with range(0, position) + */ + @Test + public void testToString() throws IOException { + ByteArrayOutputStreamWithPos stream = new ByteArrayOutputStreamWithPos(); + + byte[] data = "1234567890".getBytes(ConfigConstants.DEFAULT_CHARSET); + stream.write(data); + Assert.assertArrayEquals(data, stream.toString().getBytes(ConfigConstants.DEFAULT_CHARSET)); + + stream.setPosition(1); + Assert.assertArrayEquals(Arrays.copyOf(data, 1), stream.toString().getBytes(ConfigConstants.DEFAULT_CHARSET)); + + stream.setPosition(data.length + 1); + Assert.assertArrayEquals(Arrays.copyOf(data, data.length + 1), stream.toString().getBytes(ConfigConstants.DEFAULT_CHARSET)); + + } } From 312b5787890040585f1316cb532ceb20e4111ddf Mon Sep 17 00:00:00 2001 From: "wenlong.lwl" Date: Thu, 30 Mar 2017 13:27:59 +0800 Subject: [PATCH 8/9] update test in ByteArrayInputStreamWithPosTest --- .../flink/core/memory/ByteArrayInputStreamWithPosTest.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java index 8b32e8d16c045..a497d968153aa 100644 --- a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java @@ -36,9 +36,14 @@ public void testSetPosition() throws Exception { byte[] data = new byte[] {'0','1','2','3','4','5','6','7','8','9'}; ByteArrayInputStreamWithPos inputStreamWithPos = new ByteArrayInputStreamWithPos(data); inputStreamWithPos.setPosition(1); + Assert.assertEquals(data.length - 1, inputStreamWithPos.available()); Assert.assertEquals('1', inputStreamWithPos.read()); inputStreamWithPos.setPosition(3); + Assert.assertEquals(data.length - 3, inputStreamWithPos.available()); Assert.assertEquals('3', inputStreamWithPos.read()); + inputStreamWithPos.setPosition(data.length); + Assert.assertEquals(0, inputStreamWithPos.available()); + Assert.assertEquals(-1, inputStreamWithPos.read()); } /** @@ -50,7 +55,7 @@ public void testSetTooLargePosition() throws Exception { thrown.expectMessage("Position out of bounds."); byte[] data = new byte[] {'0','1','2','3','4','5','6','7','8','9'}; ByteArrayInputStreamWithPos inputStreamWithPos = new ByteArrayInputStreamWithPos(data); - inputStreamWithPos.setPosition(data.length); + inputStreamWithPos.setPosition(data.length + 1); Assert.fail("Should not reach here !!!!"); } From cf9f0b48eeb74c02d99071bcdb91366cc5aa5eb5 Mon Sep 17 00:00:00 2001 From: "wenlong.lwl" Date: Thu, 30 Mar 2017 13:30:12 +0800 Subject: [PATCH 9/9] update test in ByteArrayOutputStreamWithPosTest --- .../flink/core/memory/ByteArrayOutputStreamWithPosTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java index 6000c1447a260..42e75c6ad9436 100644 --- a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java @@ -89,11 +89,11 @@ public void testSetPositionLargerThanBufferSize() throws Exception { Assert.assertEquals(initBufferSize, stream.getBuf().length); - stream.setPosition(33); + stream.setPosition(initBufferSize + 1); - Assert.assertEquals(64, stream.getBuf().length); + Assert.assertEquals(initBufferSize * 2, stream.getBuf().length); - Assert.assertEquals(33, stream.getPosition()); + Assert.assertEquals(initBufferSize + 1, stream.getPosition()); } /**