From e29d5ef3099d5f5ffc817f3e5cdbb79c523b34c7 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 18 Jan 2017 18:52:57 +0100 Subject: [PATCH 1/4] [FLINK-5561] fix DataInputDeserializer#available() 1 smaller than correct --- .../runtime/util/DataInputDeserializer.java | 4 +- .../util/DataInputDeserializerTest.java | 58 +++++++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java index 9822a834f6283..0f994966305f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java @@ -55,7 +55,7 @@ public DataInputDeserializer(ByteBuffer buffer) { } // ------------------------------------------------------------------------ - // Chaning buffers + // Changing buffers // ------------------------------------------------------------------------ public void setBuffer(ByteBuffer buffer) { @@ -98,7 +98,7 @@ public void releaseArrays() { public int available() { if (position < end) { - return end - position - 1; + return end - position; } else { return 0; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java new file mode 100644 index 0000000000000..c6c19cbba7524 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +/** + * Test suite for the {@link DataInputDeserializer} class. + */ +public class DataInputDeserializerTest { + + @Test + public void testAvailable() throws Exception { + byte[] bytes; + DataInputDeserializer dis; + + bytes = new byte[] {}; + dis = new DataInputDeserializer(bytes, 0, bytes.length); + Assert.assertEquals(bytes.length, dis.available()); + + bytes = new byte[] {1, 2, 3}; + dis = new DataInputDeserializer(bytes, 0, bytes.length); + Assert.assertEquals(bytes.length, dis.available()); + + dis.readByte(); + Assert.assertEquals(2, dis.available()); + dis.readByte(); + Assert.assertEquals(1, dis.available()); + dis.readByte(); + Assert.assertEquals(0, dis.available()); + + try { + dis.readByte(); + } catch (IOException e) { + // ignore + } + Assert.assertEquals(0, dis.available()); + } +} From e589058e92d1f2a57d242987327777baa1fe64d4 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 19 Jan 2017 17:06:00 +0100 Subject: [PATCH 2/4] [FLINK-5576] let KvStateRequestSerializer#deserializeValue() detect unconsumed bytes KvStateRequestSerializer#deserializeValue deserializes a given byte array. This is used by clients and unit tests and it is fair to assume that these byte arrays represent a complete value since we do not offer a method to continue reading from the middle of the array anyway. Therefore, we can treat unconsumed bytes as errors, e.g. from a wrong serializer being used, and throw a IOException with an appropriate failure message. --- .../message/KvStateRequestSerializer.java | 12 +++++- .../message/KvStateRequestSerializerTest.java | 38 +++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java index 0ae60f60ca77f..2285c4e8e916a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java @@ -429,8 +429,16 @@ public static T deserializeValue(byte[] serializedValue, TypeSerializer s if (serializedValue == null) { return null; } else { - DataInputDeserializer deser = new DataInputDeserializer(serializedValue, 0, serializedValue.length); - return serializer.deserialize(deser); + final DataInputDeserializer deser = + new DataInputDeserializer(serializedValue, 0, serializedValue.length); + final T value = serializer.deserialize(deser); + if (deser.available() > 0) { + throw new IOException( + "Unconsumed bytes in the deserialized value. " + + "This indicates a mismatch in the value serializers " + + "used by the KvState instance and this access."); + } + return value; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java index a68c84b214304..304d807cca5a1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.query.KvStateID; import org.junit.Test; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -223,6 +224,43 @@ public void testValueSerialization() throws Exception { assertEquals(expectedValue, actualValue); } + /** + * Tests value deserialization with too few bytes. + */ + @Test(expected = IOException.class) + public void testDeserializeValueEmpty() throws Exception { + KvStateRequestSerializer.deserializeValue(new byte[] {}, LongSerializer.INSTANCE); + } + + /** + * Tests value deserialization with too few bytes. + */ + @Test(expected = IOException.class) + public void testDeserializeValueTooShort() throws Exception { + // 1 byte (incomplete Long) + KvStateRequestSerializer.deserializeValue(new byte[] {1}, LongSerializer.INSTANCE); + } + + /** + * Tests value deserialization with too many bytes. + */ + @Test(expected = IOException.class) + public void testDeserializeValueTooMany1() throws Exception { + // Long + 1 byte + KvStateRequestSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2}, + LongSerializer.INSTANCE); + } + + /** + * Tests value deserialization with too many bytes. + */ + @Test(expected = IOException.class) + public void testDeserializeValueTooMany2() throws Exception { + // Long + 2 bytes + KvStateRequestSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 2}, + LongSerializer.INSTANCE); + } + /** * Tests list serialization utils. */ From 12d2108333b23e01b7b978a08e868a5f888506f9 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 19 Jan 2017 17:10:54 +0100 Subject: [PATCH 3/4] [FLINK-5576] add a better failure message to exceptions in KvStateRequestSerializer#deserializeList() As in FLINK-5559, wrap the original IOException into a new one with an appropriate error message to better diagnose it. --- .../message/KvStateRequestSerializer.java | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java index 2285c4e8e916a..4eb4d1cb56ff1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java @@ -488,24 +488,32 @@ public static byte[] serializeList(Iterable values, TypeSerializer ser */ public static List deserializeList(byte[] serializedValue, TypeSerializer serializer) throws IOException { if (serializedValue != null) { - DataInputDeserializer in = new DataInputDeserializer(serializedValue, 0, serializedValue.length); - - List result = new ArrayList<>(); - while (in.available() > 0) { - result.add(serializer.deserialize(in)); - - // The expected binary format has a single byte separator. We - // want a consistent binary format in order to not need any - // special casing during deserialization. A "cleaner" format - // would skip this extra byte, but would require a memory copy - // for RocksDB, which stores the data serialized in this way - // for lists. - if (in.available() > 0) { - in.readByte(); + final DataInputDeserializer in = + new DataInputDeserializer(serializedValue, 0, serializedValue.length); + + try { + final List result = new ArrayList<>(); + while (in.available() > 0) { + result.add(serializer.deserialize(in)); + + // The expected binary format has a single byte separator. We + // want a consistent binary format in order to not need any + // special casing during deserialization. A "cleaner" format + // would skip this extra byte, but would require a memory copy + // for RocksDB, which stores the data serialized in this way + // for lists. + if (in.available() > 0) { + in.readByte(); + } } - } - return result; + return result; + } catch (IOException e) { + throw new IOException( + "Unable to deserialize value. " + + "This indicates a mismatch in the value serializers " + + "used by the KvState instance and this access.", e); + } } else { return null; } From 9770e7f3269adfa5b80dd2125bcdbf8cebc4dab8 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 19 Jan 2017 17:12:02 +0100 Subject: [PATCH 4/4] [FLINK-5576] add more unit tests for KvStateRequestSerializer#deserializeList() These tests ensure that some special cases not properly tested before are handled correctly in future. --- .../message/KvStateRequestSerializerTest.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java index 304d807cca5a1..76d04185b0299 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java @@ -288,6 +288,35 @@ public void testListSerialization() throws Exception { assertEquals(expectedValue, actualValue.get(0).longValue()); } + /** + * Tests list deserialization with too few bytes. + */ + @Test + public void testDeserializeListEmpty() throws Exception { + List actualValue = KvStateRequestSerializer + .deserializeList(new byte[] {}, LongSerializer.INSTANCE); + assertEquals(0, actualValue.size()); + } + + /** + * Tests list deserialization with too few bytes. + */ + @Test(expected = IOException.class) + public void testDeserializeListTooShort1() throws Exception { + // 1 byte (incomplete Long) + KvStateRequestSerializer.deserializeList(new byte[] {1}, LongSerializer.INSTANCE); + } + + /** + * Tests list deserialization with too few bytes. + */ + @Test(expected = IOException.class) + public void testDeserializeListTooShort2() throws Exception { + // Long + 1 byte (separator) + 1 byte (incomplete Long) + KvStateRequestSerializer.deserializeList(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 3}, + LongSerializer.INSTANCE); + } + private byte[] randomByteArray(int capacity) { byte[] bytes = new byte[capacity]; ThreadLocalRandom.current().nextBytes(bytes);