Skip to content

Commit

Permalink
[FLINK-2438] [runtime] Improve channel event serialization performance.
Browse files Browse the repository at this point in the history
Because channel events may become very frequent now (frequent at-least-once checkpointing), their serialization perfomance starts to matter.
  • Loading branch information
StephanEwen committed Aug 2, 2015
1 parent aa0105a commit af88aa0
Show file tree
Hide file tree
Showing 68 changed files with 247 additions and 141 deletions.
Expand Up @@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */


package org.apache.flink.runtime.event.task; package org.apache.flink.runtime.event;


import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.io.IOReadableWritable;


Expand Down
Expand Up @@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */


package org.apache.flink.runtime.event.task; package org.apache.flink.runtime.event;


/** /**
* Subclasses of this event are recognized as events exchanged by the core runtime. * Subclasses of this event are recognized as events exchanged by the core runtime.
Expand Down
Expand Up @@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */


package org.apache.flink.runtime.event.task; package org.apache.flink.runtime.event;


/** /**
* Subclasses of this event are recognized as custom events that are not part of the core * Subclasses of this event are recognized as custom events that are not part of the core
Expand Down
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.runtime.io.network; package org.apache.flink.runtime.io.network;


import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
Expand Down
Expand Up @@ -16,32 +16,33 @@
* limitations under the License. * limitations under the License.
*/ */


package org.apache.flink.streaming.runtime.tasks; package org.apache.flink.runtime.io.network.api;


import java.io.IOException; import java.io.IOException;


import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.event.RuntimeEvent;


/** /**
* Checkpoint barriers are used to synchronize checkpoints throughout the streaming topology. The * Checkpoint barriers are used to align checkpoints throughout the streaming topology. The
* barriers are emitted by the sources when instructed to do so by the JobManager. When * barriers are emitted by the sources when instructed to do so by the JobManager. When
* operators receive a {@link CheckpointBarrier} on one of its inputs it must block processing * operators receive a CheckpointBarrier on one of its inputs, it knows that this is the point
* of further elements on this input until all inputs received the checkpoint barrier * between the pre-checkpoint and post-checkpoint data.
* corresponding to to that checkpoint. Once all inputs received the checkpoint barrier for *
* a checkpoint the operator is to perform the checkpoint and then broadcast the barrier to * <p>Once an operator has received a checkpoint barrier from all its input channels, it
* downstream operators. * knows that a certain checkpoint is complete. It can trigger the operator specific checkpoint
* * behavior and broadcast the barrier to downstream operators.</p>
* <p> *
* The checkpoint barrier IDs are advancing. Once an operator receives a {@link CheckpointBarrier} * <p>Depending on the semantic guarantees, may hold off post-checkpoint data until the checkpoint
* for a checkpoint with a higher id it is to discard all barriers that it received from previous * is complete (exactly once)</p>
* checkpoints and unblock all other inputs. *
* <p>The checkpoint barrier IDs are strictly monotonous increasing.</p>
*/ */
public class CheckpointBarrier extends TaskEvent { public class CheckpointBarrier extends RuntimeEvent {


protected long id; private long id;
protected long timestamp; private long timestamp;


public CheckpointBarrier() {} public CheckpointBarrier() {}


Expand All @@ -55,7 +56,7 @@ public long getId() {
} }


public long getTimestamp() { public long getTimestamp() {
return id; return timestamp;
} }


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand Down
Expand Up @@ -20,7 +20,7 @@


import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.event.task.RuntimeEvent; import org.apache.flink.runtime.event.RuntimeEvent;


/** /**
* This event marks a subpartition as fully consumed. * This event marks a subpartition as fully consumed.
Expand Down
Expand Up @@ -20,7 +20,7 @@


import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.event.task.RuntimeEvent; import org.apache.flink.runtime.event.RuntimeEvent;


/** /**
* Marks the end of a superstep of one particular iteration superstep. * Marks the end of a superstep of one particular iteration superstep.
Expand Down
Expand Up @@ -20,7 +20,7 @@


import com.google.common.collect.HashMultimap; import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap; import com.google.common.collect.Multimap;
import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.runtime.util.event.EventListener;


/** /**
Expand Down
Expand Up @@ -18,8 +18,8 @@


package org.apache.flink.runtime.io.network.api.reader; package org.apache.flink.runtime.io.network.api.reader;


import org.apache.flink.runtime.event.task.AbstractEvent; import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.TaskEventHandler; import org.apache.flink.runtime.io.network.api.TaskEventHandler;
Expand Down
Expand Up @@ -21,7 +21,7 @@
import java.io.IOException; import java.io.IOException;


import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.runtime.util.event.EventListener;


/** /**
Expand Down
Expand Up @@ -19,63 +19,122 @@
package org.apache.flink.runtime.io.network.api.serialization; package org.apache.flink.runtime.io.network.api.serialization;


import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.event.task.AbstractEvent; import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.util.DataInputDeserializer; import org.apache.flink.runtime.util.DataInputDeserializer;
import org.apache.flink.runtime.util.DataOutputSerializer; import org.apache.flink.runtime.util.DataOutputSerializer;
import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.InstantiationUtil;


import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder;


/**
* Utility class to serialize and deserialize task events.
*/
public class EventSerializer { public class EventSerializer {

private static final int END_OF_PARTITION_EVENT = 0;


public final static BufferRecycler RECYCLER = new BufferRecycler() { private static final int CHECKPOINT_BARRIER_EVENT = 1;
@Override
public void recycle(MemorySegment memorySegment) {
memorySegment.free();
}
};

public static ByteBuffer toSerializedEvent(AbstractEvent event) {
try {
final DataOutputSerializer serializer = new DataOutputSerializer(128);


serializer.writeUTF(event.getClass().getName()); private static final int END_OF_SUPERSTEP_EVENT = 2;
event.write(serializer);


return serializer.wrapAsByteBuffer(); private static final int OTHER_EVENT = 3;

// ------------------------------------------------------------------------

public static ByteBuffer toSerializedEvent(AbstractEvent event) {
final Class<?> eventClass = event.getClass();
if (eventClass == EndOfPartitionEvent.class) {
return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_PARTITION_EVENT });
}
else if (eventClass == CheckpointBarrier.class) {
CheckpointBarrier barrier = (CheckpointBarrier) event;

ByteBuffer buf = ByteBuffer.allocate(20);
buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
buf.putLong(4, barrier.getId());
buf.putLong(12, barrier.getTimestamp());
return buf;
} }
catch (IOException e) { else if (eventClass == EndOfSuperstepEvent.class) {
throw new RuntimeException("Error while serializing event.", e); return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_SUPERSTEP_EVENT });
}
else {
try {
final DataOutputSerializer serializer = new DataOutputSerializer(128);
serializer.writeInt(OTHER_EVENT);
serializer.writeUTF(event.getClass().getName());
event.write(serializer);

return serializer.wrapAsByteBuffer();
}
catch (IOException e) {
throw new RuntimeException("Error while serializing event.", e);
}
} }
} }


public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) { public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) {
if (buffer.remaining() < 4) {
throw new RuntimeException("Incomplete event");
}

final ByteOrder bufferOrder = buffer.order();
buffer.order(ByteOrder.BIG_ENDIAN);

try { try {
final DataInputDeserializer deserializer = new DataInputDeserializer(buffer); int type = buffer.getInt();


final String className = deserializer.readUTF(); if (type == END_OF_PARTITION_EVENT) {

return EndOfPartitionEvent.INSTANCE;
final Class<? extends AbstractEvent> clazz;
try {
clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
} }
catch (ClassNotFoundException e) { else if (type == CHECKPOINT_BARRIER_EVENT) {
throw new RuntimeException("Could not load event class '" + className + "'.", e); long id = buffer.getLong();
long timestamp = buffer.getLong();
return new CheckpointBarrier(id, timestamp);
} }
catch (ClassCastException e) { else if (type == END_OF_SUPERSTEP_EVENT) {
throw new RuntimeException("The class '" + className + "' is not a valid subclass of '" + AbstractEvent.class.getName() + "'.", e); return EndOfSuperstepEvent.INSTANCE;
}
else if (type == OTHER_EVENT) {
try {
final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);

final String className = deserializer.readUTF();

final Class<? extends AbstractEvent> clazz;
try {
clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
}
catch (ClassNotFoundException e) {
throw new RuntimeException("Could not load event class '" + className + "'.", e);
}
catch (ClassCastException e) {
throw new RuntimeException("The class '" + className + "' is not a valid subclass of '"
+ AbstractEvent.class.getName() + "'.", e);
}

final AbstractEvent event = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
event.read(deserializer);

return event;
}
catch (Exception e) {
throw new RuntimeException("Error while deserializing or instantiating event.", e);
}
}
else {
throw new RuntimeException("Corrupt byte stream for event");
} }

final AbstractEvent event = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
event.read(deserializer);

return event;
} }
catch (IOException e) { finally {
throw new RuntimeException("Error while deserializing event.", e); buffer.order(bufferOrder);
} }
} }


Expand All @@ -86,7 +145,8 @@ public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader c
public static Buffer toBuffer(AbstractEvent event) { public static Buffer toBuffer(AbstractEvent event) {
final ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event); final ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event);


final Buffer buffer = new Buffer(new MemorySegment(serializedEvent.array()), RECYCLER, false); final Buffer buffer = new Buffer(new MemorySegment(serializedEvent.array()),
FreeingBufferRecycler.INSTANCE, false);
buffer.setSize(serializedEvent.remaining()); buffer.setSize(serializedEvent.remaining());


return buffer; return buffer;
Expand Down
Expand Up @@ -20,7 +20,7 @@


import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.task.AbstractEvent; import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.Buffer;
Expand Down
Expand Up @@ -18,8 +18,8 @@


package org.apache.flink.runtime.io.network.api.writer; package org.apache.flink.runtime.io.network.api.writer;


import org.apache.flink.runtime.event.task.AbstractEvent; import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.TaskEventHandler; import org.apache.flink.runtime.io.network.api.TaskEventHandler;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
Expand Down
Expand Up @@ -16,20 +16,26 @@
* limitations under the License. * limitations under the License.
*/ */


package org.apache.flink.streaming.runtime.io; package org.apache.flink.runtime.io.network.buffer;


import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;


/** /**
* A simple buffer recycler that only frees the memory segments. * A simple buffer recycler that frees the memory segments.
*/ */
public class FreeingBufferRecycler implements BufferRecycler { public class FreeingBufferRecycler implements BufferRecycler {


public static final BufferRecycler INSTANCE = new FreeingBufferRecycler(); public static final BufferRecycler INSTANCE = new FreeingBufferRecycler();


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


// Not instantiable
private FreeingBufferRecycler() {}

/**
* Frees the given memory segment.
* @param memorySegment The memory segment to be recycled.
*/
@Override @Override
public void recycle(MemorySegment memorySegment) { public void recycle(MemorySegment memorySegment) {
memorySegment.free(); memorySegment.free();
Expand Down
Expand Up @@ -32,7 +32,7 @@
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.Buffer;
Expand Down
Expand Up @@ -21,7 +21,7 @@
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException; import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
Expand Down
Expand Up @@ -21,10 +21,11 @@
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;

import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException; import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException; import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.netty.exception.TransportException; import org.apache.flink.runtime.io.network.netty.exception.TransportException;
Expand Down Expand Up @@ -300,7 +301,7 @@ else if (bufferProvider.isDestroyed()) {
byte[] byteArray = new byte[bufferOrEvent.getSize()]; byte[] byteArray = new byte[bufferOrEvent.getSize()];
bufferOrEvent.getNettyBuffer().readBytes(byteArray); bufferOrEvent.getNettyBuffer().readBytes(byteArray);


Buffer buffer = new Buffer(new MemorySegment(byteArray), EventSerializer.RECYCLER, false); Buffer buffer = new Buffer(new MemorySegment(byteArray), FreeingBufferRecycler.INSTANCE, false);


inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber); inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber);


Expand Down

0 comments on commit af88aa0

Please sign in to comment.