From 92efcd34a5da2bccb07666f2c647974ea3e7c94f Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Mon, 1 Feb 2016 14:39:24 +0100 Subject: [PATCH] [hotfix] Remove 'ByteArrayInputView' and replace deserialization in TypeInformationSerializationSchema with more efficient reusable buffers. --- .../typeutils/runtime/ByteArrayInputView.java | 40 ---------------- .../runtime/kryo/KryoClearedBufferTest.java | 8 +++- .../runtime/util/DataInputDeserializer.java | 48 ++++++++++++------- .../runtime/util/DataOutputSerializer.java | 25 ++++++---- ...nformationKeyValueSerializationSchema.java | 44 ++++++++++++----- .../kafka/KafkaConsumerTestBase.java | 17 +++++-- .../TypeInformationSerializationSchema.java | 14 ++++-- ...ypeInformationSerializationSchemaTest.java | 2 +- 8 files changed, 109 insertions(+), 89 deletions(-) delete mode 100644 flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java deleted file mode 100644 index 48d6a3da21dcc..0000000000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.core.memory.DataInputView; - -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; - -public class ByteArrayInputView extends DataInputStream implements DataInputView { - - public ByteArrayInputView(byte[] data) { - super(new ByteArrayInputStream(data)); - } - - @Override - public void skipBytesToRead(int numBytes) throws IOException { - while (numBytes > 0) { - int skipped = skipBytes(numBytes); - numBytes -= skipped; - } - } -} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java index ab2e45fac6472..75724088bcdb0 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java @@ -22,13 +22,16 @@ import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; + import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView; import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; + import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; import java.io.EOFException; import java.io.IOException; import java.io.Serializable; @@ -69,7 +72,8 @@ public void testOutputBufferedBeingClearedInCaseOfException() throws Exception { // now the Kryo Output should have been cleared } - TestRecord actualRecord = kryoSerializer.deserialize(new ByteArrayInputView(target.getBuffer())); + TestRecord actualRecord = kryoSerializer.deserialize( + new DataInputViewStreamWrapper(new ByteArrayInputStream(target.getBuffer()))); Assert.assertEquals(testRecord, actualRecord); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java index e8e8f6d52517a..bdccdd121c01b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.util; import java.io.EOFException; @@ -31,7 +30,11 @@ /** * A simple and efficient deserializer for the {@link java.io.DataInput} interface. */ -public class DataInputDeserializer implements DataInputView { +public class DataInputDeserializer implements DataInputView, java.io.Serializable { + + private static final long serialVersionUID = 1L; + + // ------------------------------------------------------------------------ private byte[] buffer; @@ -39,8 +42,9 @@ public class DataInputDeserializer implements DataInputView { private int position; - public DataInputDeserializer() { - } + // ------------------------------------------------------------------------ + + public DataInputDeserializer() {} public DataInputDeserializer(byte[] buffer, int start, int len) { setBuffer(buffer, start, len); @@ -50,6 +54,10 @@ public DataInputDeserializer(ByteBuffer buffer) { setBuffer(buffer); } + // ------------------------------------------------------------------------ + // Chaning buffers + // ------------------------------------------------------------------------ + public void setBuffer(ByteBuffer buffer) { if (buffer.hasArray()) { this.buffer = buffer.array(); @@ -311,44 +319,36 @@ public int skipBytes(int n) throws IOException { return n; } } - - @SuppressWarnings("restriction") - private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; - - @SuppressWarnings("restriction") - private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); - - private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN); @Override public void skipBytesToRead(int numBytes) throws IOException { int skippedBytes = skipBytes(numBytes); - if(skippedBytes < numBytes){ + if (skippedBytes < numBytes){ throw new EOFException("Could not skip " + numBytes +" bytes."); } } @Override public int read(byte[] b, int off, int len) throws IOException { - if(b == null){ + if (b == null){ throw new NullPointerException("Byte array b cannot be null."); } - if(off < 0){ + if (off < 0){ throw new IndexOutOfBoundsException("Offset cannot be negative."); } - if(len < 0){ + if (len < 0){ throw new IndexOutOfBoundsException("Length cannot be negative."); } - if(b.length - off < len){ + if (b.length - off < len){ throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" + "."); } - if(this.position >= this.end) { + if (this.position >= this.end) { return -1; } else { int toRead = Math.min(this.end-this.position, len); @@ -363,4 +363,16 @@ public int read(byte[] b, int off, int len) throws IOException { public int read(byte[] b) throws IOException { return read(b, 0, b.length); } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @SuppressWarnings("restriction") + private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; + + @SuppressWarnings("restriction") + private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + + private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java index 0e93544d8d272..18940ede761a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java @@ -21,6 +21,7 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.MemoryUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,8 @@ public class DataOutputSerializer implements DataOutputView { private static final Logger LOG = LoggerFactory.getLogger(DataOutputSerializer.class); private static final int PRUNE_BUFFER_THRESHOLD = 5 * 1024 * 1024; + + // ------------------------------------------------------------------------ private final byte[] startBuffer; @@ -47,6 +50,8 @@ public class DataOutputSerializer implements DataOutputView { private int position; private ByteBuffer wrapper; + + // ------------------------------------------------------------------------ public DataOutputSerializer(int startSize) { if (startSize < 1) { @@ -303,14 +308,6 @@ private void resize(int minCapacityAdd) throws IOException { this.buffer = nb; this.wrapper = ByteBuffer.wrap(this.buffer); } - - @SuppressWarnings("restriction") - private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; - - @SuppressWarnings("restriction") - private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); - - private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN); @Override public void skipBytesToWrite(int numBytes) throws IOException { @@ -330,4 +327,16 @@ public void write(DataInputView source, int numBytes) throws IOException { source.read(this.buffer, this.position, numBytes); this.position += numBytes; } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @SuppressWarnings("restriction") + private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; + + @SuppressWarnings("restriction") + private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + + private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN); } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java index a35c01e600cd5..a3f8ba15f2517 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java @@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView; +import org.apache.flink.runtime.util.DataInputDeserializer; import org.apache.flink.runtime.util.DataOutputSerializer; import java.io.IOException; @@ -46,10 +46,16 @@ public class TypeInformationKeyValueSerializationSchema implements KeyedDe /** The serializer for the value */ private final TypeSerializer valueSerializer; - /** reusable output serialization buffers */ + /** reusable input deserialization buffer */ + private final DataInputDeserializer inputDeserializer; + + /** reusable output serialization buffer for the key */ private transient DataOutputSerializer keyOutputSerializer; - private transient DataOutputSerializer valueOutputSerializer; + /** reusable output serialization buffer for the value */ + private transient DataOutputSerializer valueOutputSerializer; + + /** The type information, to be returned by {@link #getProducedType()}. It is * transient, because it is not serializable. Note that this means that the type information * is not available at runtime, but only prior to the first serialization / deserialization */ @@ -68,11 +74,22 @@ public TypeInformationKeyValueSerializationSchema(TypeInformation keyTypeInfo this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo); this.keySerializer = keyTypeInfo.createSerializer(ec); this.valueSerializer = valueTypeInfo.createSerializer(ec); + this.inputDeserializer = new DataInputDeserializer(); } + /** + * Creates a new de-/serialization schema for the given types. This constructor accepts the types + * as classes and internally constructs the type information from the classes. + * + *

If the types are parametrized and cannot be fully defined via classes, use the constructor + * that accepts {@link TypeInformation} instead. + * + * @param keyClass The class of the key de-/serialized by this schema. + * @param valueClass The class of the value de-/serialized by this schema. + * @param config The execution config, which is used to parametrize the type serializers. + */ public TypeInformationKeyValueSerializationSchema(Class keyClass, Class valueClass, ExecutionConfig config) { - //noinspection unchecked - this( (TypeInformation) TypeExtractor.createTypeInfo(keyClass), (TypeInformation) TypeExtractor.createTypeInfo(valueClass), config); + this(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass), config); } // ------------------------------------------------------------------------ @@ -81,12 +98,15 @@ public TypeInformationKeyValueSerializationSchema(Class keyClass, Class va @Override public Tuple2 deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { K key = null; - if(messageKey != null) { - key = keySerializer.deserialize(new ByteArrayInputView(messageKey)); - } V value = null; - if(message != null) { - value = valueSerializer.deserialize(new ByteArrayInputView(message)); + + if (messageKey != null) { + inputDeserializer.setBuffer(messageKey, 0, messageKey.length); + key = keySerializer.deserialize(inputDeserializer); + } + if (message != null) { + inputDeserializer.setBuffer(message, 0, message.length); + value = valueSerializer.deserialize(inputDeserializer); } return new Tuple2<>(key, value); } @@ -104,7 +124,7 @@ public boolean isEndOfStream(Tuple2 nextElement) { @Override public byte[] serializeKey(Tuple2 element) { - if(element.f0 == null) { + if (element.f0 == null) { return null; } else { // key is not null. serialize it: @@ -132,7 +152,7 @@ public byte[] serializeKey(Tuple2 element) { @Override public byte[] serializeValue(Tuple2 element) { // if the value is null, its serialized value is null as well. - if(element.f1 == null) { + if (element.f1 == null) { return null; } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 85921820022b8..3d39869b27b5b 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -39,9 +39,10 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; @@ -82,6 +83,7 @@ import org.junit.Rule; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; @@ -734,14 +736,16 @@ public void flatMap(Tuple3 value, Collector o private static class Tuple2WithTopicDeserializationSchema implements KeyedDeserializationSchema> { - TypeSerializer ts; + private final TypeSerializer> ts; + public Tuple2WithTopicDeserializationSchema(ExecutionConfig ec) { - ts = TypeInfoParser.parse("Tuple2").createSerializer(ec); + ts = TypeInfoParser.>parse("Tuple2").createSerializer(ec); } @Override public Tuple3 deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { - Tuple2 t2 = (Tuple2) ts.deserialize(new ByteArrayInputView(message)); + DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); + Tuple2 t2 = ts.deserialize(in); return new Tuple3<>(t2.f0, t2.f1, topic); } @@ -1103,8 +1107,10 @@ public void flatMap(Tuple2 value, Collector out) throws } public static class FixedNumberDeserializationSchema implements DeserializationSchema> { + final int finalCount; int count = 0; + TypeInformation> ti = TypeInfoParser.parse("Tuple2"); TypeSerializer> ser = ti.createSerializer(new ExecutionConfig()); @@ -1114,7 +1120,8 @@ public FixedNumberDeserializationSchema(int finalCount) { @Override public Tuple2 deserialize(byte[] message) throws IOException { - return ser.deserialize(new ByteArrayInputView(message)); + DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); + return ser.deserialize(in); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java index 6577be8eefda4..61876e4572e5c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java @@ -21,7 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView; +import org.apache.flink.runtime.util.DataInputDeserializer; import org.apache.flink.runtime.util.DataOutputSerializer; import java.io.IOException; @@ -29,7 +29,6 @@ /** * A serialization and deserialization schema that uses Flink's serialization stack to * transform typed from and to byte arrays. - * * * @param The type to be serialized. */ @@ -42,6 +41,9 @@ public class TypeInformationSerializationSchema implements DeserializationSch /** The reusable output serialization buffer */ private transient DataOutputSerializer dos; + + /** The reusable input deserialization buffer */ + private transient DataInputDeserializer dis; /** The type information, to be returned by {@link #getProducedType()}. It is * transient, because it is not serializable. Note that this means that the type information @@ -65,8 +67,14 @@ public TypeInformationSerializationSchema(TypeInformation typeInfo, Execution @Override public T deserialize(byte[] message) { + if (dis != null) { + dis.setBuffer(message, 0, message.length); + } else { + dis = new DataInputDeserializer(message, 0, message.length); + } + try { - return serializer.deserialize(new ByteArrayInputView(message)); + return serializer.deserialize(dis); } catch (IOException e) { throw new RuntimeException("Unable to deserialize message", e); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java index 1c0f850b38cef..e722f5363f0d0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java @@ -112,7 +112,7 @@ public boolean equals(Object obj) { @Override public String toString() { - return String.format("MyPOJO " + aField + " " + aList); + return "MyPOJO " + aField + " " + aList; } } }