Skip to content

Commit

Permalink
[hotfix] Remove 'ByteArrayInputView' and replace deserialization in T…
Browse files Browse the repository at this point in the history
…ypeInformationSerializationSchema with more efficient reusable buffers.
  • Loading branch information
StephanEwen committed Feb 1, 2016
1 parent 67b380d commit 92efcd3
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 89 deletions.

This file was deleted.

Expand Up @@ -22,13 +22,16 @@
import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.io.Output;

import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputView;

import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;


import java.io.ByteArrayInputStream;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
Expand Down Expand Up @@ -69,7 +72,8 @@ public void testOutputBufferedBeingClearedInCaseOfException() throws Exception {
// now the Kryo Output should have been cleared // now the Kryo Output should have been cleared
} }


TestRecord actualRecord = kryoSerializer.deserialize(new ByteArrayInputView(target.getBuffer())); TestRecord actualRecord = kryoSerializer.deserialize(
new DataInputViewStreamWrapper(new ByteArrayInputStream(target.getBuffer())));


Assert.assertEquals(testRecord, actualRecord); Assert.assertEquals(testRecord, actualRecord);


Expand Down
Expand Up @@ -16,7 +16,6 @@
* limitations under the License. * limitations under the License.
*/ */



package org.apache.flink.runtime.util; package org.apache.flink.runtime.util;


import java.io.EOFException; import java.io.EOFException;
Expand All @@ -31,16 +30,21 @@
/** /**
* A simple and efficient deserializer for the {@link java.io.DataInput} interface. * A simple and efficient deserializer for the {@link java.io.DataInput} interface.
*/ */
public class DataInputDeserializer implements DataInputView { public class DataInputDeserializer implements DataInputView, java.io.Serializable {

private static final long serialVersionUID = 1L;

// ------------------------------------------------------------------------


private byte[] buffer; private byte[] buffer;


private int end; private int end;


private int position; private int position;


public DataInputDeserializer() { // ------------------------------------------------------------------------
}
public DataInputDeserializer() {}


public DataInputDeserializer(byte[] buffer, int start, int len) { public DataInputDeserializer(byte[] buffer, int start, int len) {
setBuffer(buffer, start, len); setBuffer(buffer, start, len);
Expand All @@ -50,6 +54,10 @@ public DataInputDeserializer(ByteBuffer buffer) {
setBuffer(buffer); setBuffer(buffer);
} }


// ------------------------------------------------------------------------
// Chaning buffers
// ------------------------------------------------------------------------

public void setBuffer(ByteBuffer buffer) { public void setBuffer(ByteBuffer buffer) {
if (buffer.hasArray()) { if (buffer.hasArray()) {
this.buffer = buffer.array(); this.buffer = buffer.array();
Expand Down Expand Up @@ -311,44 +319,36 @@ public int skipBytes(int n) throws IOException {
return n; return n;
} }
} }

@SuppressWarnings("restriction")
private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;

@SuppressWarnings("restriction")
private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);

private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);


@Override @Override
public void skipBytesToRead(int numBytes) throws IOException { public void skipBytesToRead(int numBytes) throws IOException {
int skippedBytes = skipBytes(numBytes); int skippedBytes = skipBytes(numBytes);


if(skippedBytes < numBytes){ if (skippedBytes < numBytes){
throw new EOFException("Could not skip " + numBytes +" bytes."); throw new EOFException("Could not skip " + numBytes +" bytes.");
} }
} }


@Override @Override
public int read(byte[] b, int off, int len) throws IOException { public int read(byte[] b, int off, int len) throws IOException {
if(b == null){ if (b == null){
throw new NullPointerException("Byte array b cannot be null."); throw new NullPointerException("Byte array b cannot be null.");
} }


if(off < 0){ if (off < 0){
throw new IndexOutOfBoundsException("Offset cannot be negative."); throw new IndexOutOfBoundsException("Offset cannot be negative.");
} }


if(len < 0){ if (len < 0){
throw new IndexOutOfBoundsException("Length cannot be negative."); throw new IndexOutOfBoundsException("Length cannot be negative.");
} }


if(b.length - off < len){ if (b.length - off < len){
throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" + throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" +
"."); ".");
} }


if(this.position >= this.end) { if (this.position >= this.end) {
return -1; return -1;
} else { } else {
int toRead = Math.min(this.end-this.position, len); int toRead = Math.min(this.end-this.position, len);
Expand All @@ -363,4 +363,16 @@ public int read(byte[] b, int off, int len) throws IOException {
public int read(byte[] b) throws IOException { public int read(byte[] b) throws IOException {
return read(b, 0, b.length); return read(b, 0, b.length);
} }

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------

@SuppressWarnings("restriction")
private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;

@SuppressWarnings("restriction")
private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);

private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
} }
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemoryUtils; import org.apache.flink.core.memory.MemoryUtils;

import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -39,6 +40,8 @@ public class DataOutputSerializer implements DataOutputView {
private static final Logger LOG = LoggerFactory.getLogger(DataOutputSerializer.class); private static final Logger LOG = LoggerFactory.getLogger(DataOutputSerializer.class);


private static final int PRUNE_BUFFER_THRESHOLD = 5 * 1024 * 1024; private static final int PRUNE_BUFFER_THRESHOLD = 5 * 1024 * 1024;

// ------------------------------------------------------------------------


private final byte[] startBuffer; private final byte[] startBuffer;


Expand All @@ -47,6 +50,8 @@ public class DataOutputSerializer implements DataOutputView {
private int position; private int position;


private ByteBuffer wrapper; private ByteBuffer wrapper;

// ------------------------------------------------------------------------


public DataOutputSerializer(int startSize) { public DataOutputSerializer(int startSize) {
if (startSize < 1) { if (startSize < 1) {
Expand Down Expand Up @@ -303,14 +308,6 @@ private void resize(int minCapacityAdd) throws IOException {
this.buffer = nb; this.buffer = nb;
this.wrapper = ByteBuffer.wrap(this.buffer); this.wrapper = ByteBuffer.wrap(this.buffer);
} }

@SuppressWarnings("restriction")
private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;

@SuppressWarnings("restriction")
private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);

private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);


@Override @Override
public void skipBytesToWrite(int numBytes) throws IOException { public void skipBytesToWrite(int numBytes) throws IOException {
Expand All @@ -330,4 +327,16 @@ public void write(DataInputView source, int numBytes) throws IOException {
source.read(this.buffer, this.position, numBytes); source.read(this.buffer, this.position, numBytes);
this.position += numBytes; this.position += numBytes;
} }

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------

@SuppressWarnings("restriction")
private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;

@SuppressWarnings("restriction")
private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);

private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
} }
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView; import org.apache.flink.runtime.util.DataInputDeserializer;
import org.apache.flink.runtime.util.DataOutputSerializer; import org.apache.flink.runtime.util.DataOutputSerializer;


import java.io.IOException; import java.io.IOException;
Expand All @@ -46,10 +46,16 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDe
/** The serializer for the value */ /** The serializer for the value */
private final TypeSerializer<V> valueSerializer; private final TypeSerializer<V> valueSerializer;


/** reusable output serialization buffers */ /** reusable input deserialization buffer */
private final DataInputDeserializer inputDeserializer;

/** reusable output serialization buffer for the key */
private transient DataOutputSerializer keyOutputSerializer; private transient DataOutputSerializer keyOutputSerializer;
private transient DataOutputSerializer valueOutputSerializer;


/** reusable output serialization buffer for the value */
private transient DataOutputSerializer valueOutputSerializer;


/** The type information, to be returned by {@link #getProducedType()}. It is /** The type information, to be returned by {@link #getProducedType()}. It is
* transient, because it is not serializable. Note that this means that the type information * transient, because it is not serializable. Note that this means that the type information
* is not available at runtime, but only prior to the first serialization / deserialization */ * is not available at runtime, but only prior to the first serialization / deserialization */
Expand All @@ -68,11 +74,22 @@ public TypeInformationKeyValueSerializationSchema(TypeInformation<K> keyTypeInfo
this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo); this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo);
this.keySerializer = keyTypeInfo.createSerializer(ec); this.keySerializer = keyTypeInfo.createSerializer(ec);
this.valueSerializer = valueTypeInfo.createSerializer(ec); this.valueSerializer = valueTypeInfo.createSerializer(ec);
this.inputDeserializer = new DataInputDeserializer();
} }


/**
* Creates a new de-/serialization schema for the given types. This constructor accepts the types
* as classes and internally constructs the type information from the classes.
*
* <p>If the types are parametrized and cannot be fully defined via classes, use the constructor
* that accepts {@link TypeInformation} instead.
*
* @param keyClass The class of the key de-/serialized by this schema.
* @param valueClass The class of the value de-/serialized by this schema.
* @param config The execution config, which is used to parametrize the type serializers.
*/
public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, Class<V> valueClass, ExecutionConfig config) { public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, Class<V> valueClass, ExecutionConfig config) {
//noinspection unchecked this(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass), config);
this( (TypeInformation<K>) TypeExtractor.createTypeInfo(keyClass), (TypeInformation<V>) TypeExtractor.createTypeInfo(valueClass), config);
} }


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand All @@ -81,12 +98,15 @@ public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, Class<V> va
@Override @Override
public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
K key = null; K key = null;
if(messageKey != null) {
key = keySerializer.deserialize(new ByteArrayInputView(messageKey));
}
V value = null; V value = null;
if(message != null) {
value = valueSerializer.deserialize(new ByteArrayInputView(message)); if (messageKey != null) {
inputDeserializer.setBuffer(messageKey, 0, messageKey.length);
key = keySerializer.deserialize(inputDeserializer);
}
if (message != null) {
inputDeserializer.setBuffer(message, 0, message.length);
value = valueSerializer.deserialize(inputDeserializer);
} }
return new Tuple2<>(key, value); return new Tuple2<>(key, value);
} }
Expand All @@ -104,7 +124,7 @@ public boolean isEndOfStream(Tuple2<K,V> nextElement) {


@Override @Override
public byte[] serializeKey(Tuple2<K, V> element) { public byte[] serializeKey(Tuple2<K, V> element) {
if(element.f0 == null) { if (element.f0 == null) {
return null; return null;
} else { } else {
// key is not null. serialize it: // key is not null. serialize it:
Expand Down Expand Up @@ -132,7 +152,7 @@ public byte[] serializeKey(Tuple2<K, V> element) {
@Override @Override
public byte[] serializeValue(Tuple2<K, V> element) { public byte[] serializeValue(Tuple2<K, V> element) {
// if the value is null, its serialized value is null as well. // if the value is null, its serialized value is null as well.
if(element.f1 == null) { if (element.f1 == null) {
return null; return null;
} }


Expand Down
Expand Up @@ -39,9 +39,10 @@
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
Expand Down Expand Up @@ -82,6 +83,7 @@


import org.junit.Rule; import org.junit.Rule;


import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.ArrayList; import java.util.ArrayList;
Expand Down Expand Up @@ -734,14 +736,16 @@ public void flatMap(Tuple3<Integer, Integer, String> value, Collector<Integer> o


private static class Tuple2WithTopicDeserializationSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>> { private static class Tuple2WithTopicDeserializationSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>> {


TypeSerializer ts; private final TypeSerializer<Tuple2<Integer, Integer>> ts;

public Tuple2WithTopicDeserializationSchema(ExecutionConfig ec) { public Tuple2WithTopicDeserializationSchema(ExecutionConfig ec) {
ts = TypeInfoParser.parse("Tuple2<Integer, Integer>").createSerializer(ec); ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
} }


@Override @Override
public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
Tuple2<Integer, Integer> t2 = (Tuple2<Integer, Integer>) ts.deserialize(new ByteArrayInputView(message)); DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
Tuple2<Integer, Integer> t2 = ts.deserialize(in);
return new Tuple3<>(t2.f0, t2.f1, topic); return new Tuple3<>(t2.f0, t2.f1, topic);
} }


Expand Down Expand Up @@ -1103,8 +1107,10 @@ public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws
} }


public static class FixedNumberDeserializationSchema implements DeserializationSchema<Tuple2<Integer, Integer>> { public static class FixedNumberDeserializationSchema implements DeserializationSchema<Tuple2<Integer, Integer>> {

final int finalCount; final int finalCount;
int count = 0; int count = 0;

TypeInformation<Tuple2<Integer, Integer>> ti = TypeInfoParser.parse("Tuple2<Integer, Integer>"); TypeInformation<Tuple2<Integer, Integer>> ti = TypeInfoParser.parse("Tuple2<Integer, Integer>");
TypeSerializer<Tuple2<Integer, Integer>> ser = ti.createSerializer(new ExecutionConfig()); TypeSerializer<Tuple2<Integer, Integer>> ser = ti.createSerializer(new ExecutionConfig());


Expand All @@ -1114,7 +1120,8 @@ public FixedNumberDeserializationSchema(int finalCount) {


@Override @Override
public Tuple2<Integer, Integer> deserialize(byte[] message) throws IOException { public Tuple2<Integer, Integer> deserialize(byte[] message) throws IOException {
return ser.deserialize(new ByteArrayInputView(message)); DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
return ser.deserialize(in);
} }


@Override @Override
Expand Down

0 comments on commit 92efcd3

Please sign in to comment.