Skip to content

Commit

Permalink
[FLINK-8805][runtime] Optimize EvenSerializer.isEvent method
Browse files Browse the repository at this point in the history
For example, previously if the method was used to check for EndOfPartitionEvent
and the Buffer contained huge custom event, the even had to be deserialized before
performing the actual check. Now we are quickly entering the correct if/else branch
and doing full costly deserialization only if we have to.

Other calls to isEvent() then checking against EndOfPartitionEvent were not used.
  • Loading branch information
pnowojski authored and StefanRRichter committed Feb 28, 2018
1 parent ebd39f3 commit 767027f
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 57 deletions.
Expand Up @@ -101,16 +101,15 @@ else if (eventClass == CancelCheckpointMarker.class) {
} }


/** /**
* Identifies whether the given buffer encodes the given event. * Identifies whether the given buffer encodes the given event. Custom events are not supported.
* *
* <p><strong>Pre-condition</strong>: This buffer must encode some event!</p> * <p><strong>Pre-condition</strong>: This buffer must encode some event!</p>
* *
* @param buffer the buffer to peak into * @param buffer the buffer to peak into
* @param eventClass the expected class of the event type * @param eventClass the expected class of the event type
* @param classLoader the class loader to use for custom event classes
* @return whether the event class of the <tt>buffer</tt> matches the given <tt>eventClass</tt> * @return whether the event class of the <tt>buffer</tt> matches the given <tt>eventClass</tt>
*/ */
private static boolean isEvent(ByteBuffer buffer, Class<?> eventClass, ClassLoader classLoader) throws IOException { private static boolean isEvent(ByteBuffer buffer, Class<?> eventClass) throws IOException {
if (buffer.remaining() < 4) { if (buffer.remaining() < 4) {
throw new IOException("Incomplete event"); throw new IOException("Incomplete event");
} }
Expand All @@ -122,38 +121,16 @@ private static boolean isEvent(ByteBuffer buffer, Class<?> eventClass, ClassLoad
try { try {
int type = buffer.getInt(); int type = buffer.getInt();


switch (type) { if (eventClass.equals(EndOfPartitionEvent.class)) {
case END_OF_PARTITION_EVENT: return type == END_OF_PARTITION_EVENT;
return eventClass.equals(EndOfPartitionEvent.class); } else if (eventClass.equals(CheckpointBarrier.class)) {
case CHECKPOINT_BARRIER_EVENT: return type == CHECKPOINT_BARRIER_EVENT;
return eventClass.equals(CheckpointBarrier.class); } else if (eventClass.equals(EndOfSuperstepEvent.class)) {
case END_OF_SUPERSTEP_EVENT: return type == END_OF_SUPERSTEP_EVENT;
return eventClass.equals(EndOfSuperstepEvent.class); } else if (eventClass.equals(CancelCheckpointMarker.class)) {
case CANCEL_CHECKPOINT_MARKER_EVENT: return type == CANCEL_CHECKPOINT_MARKER_EVENT;
return eventClass.equals(CancelCheckpointMarker.class); } else {
case OTHER_EVENT: throw new UnsupportedOperationException("Unsupported eventClass = " + eventClass);
try {
final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);
final String className = deserializer.readUTF();

final Class<? extends AbstractEvent> clazz;
try {
clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
}
catch (ClassNotFoundException e) {
throw new IOException("Could not load event class '" + className + "'.", e);
}
catch (ClassCastException e) {
throw new IOException("The class '" + className + "' is not a valid subclass of '"
+ AbstractEvent.class.getName() + "'.", e);
}
return eventClass.equals(clazz);
}
catch (Exception e) {
throw new IOException("Error while deserializing or instantiating event.", e);
}
default:
throw new IOException("Corrupt byte stream for event");
} }
} }
finally { finally {
Expand Down Expand Up @@ -314,17 +291,13 @@ public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) t
} }


/** /**
* Identifies whether the given buffer encodes the given event. * Identifies whether the given buffer encodes the given event. Custom events are not supported.
* *
* @param buffer the buffer to peak into * @param buffer the buffer to peak into
* @param eventClass the expected class of the event type * @param eventClass the expected class of the event type
* @param classLoader the class loader to use for custom event classes
* @return whether the event class of the <tt>buffer</tt> matches the given <tt>eventClass</tt> * @return whether the event class of the <tt>buffer</tt> matches the given <tt>eventClass</tt>
*/ */
public static boolean isEvent(final Buffer buffer, public static boolean isEvent(Buffer buffer, Class<?> eventClass) throws IOException {
final Class<?> eventClass, return !buffer.isBuffer() && isEvent(buffer.getNioBufferReadable(), eventClass);
final ClassLoader classLoader) throws IOException {
return !buffer.isBuffer() &&
isEvent(buffer.getNioBufferReadable(), eventClass, classLoader);
} }
} }
Expand Up @@ -287,8 +287,7 @@ private NetworkSequenceViewReader pollAvailableReader() {
} }


private boolean isEndOfPartitionEvent(Buffer buffer) throws IOException { private boolean isEndOfPartitionEvent(Buffer buffer) throws IOException {
return EventSerializer.isEvent(buffer, EndOfPartitionEvent.class, return EventSerializer.isEvent(buffer, EndOfPartitionEvent.class);
getClass().getClassLoader());
} }


@Override @Override
Expand Down
Expand Up @@ -33,11 +33,13 @@


import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;


import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;


/** /**
* Tests for the {@link EventSerializer}. * Tests for the {@link EventSerializer}.
Expand Down Expand Up @@ -95,7 +97,7 @@ public void testSerializeDeserializeEvent() throws Exception {
} }


/** /**
* Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} * Tests {@link EventSerializer#isEvent(Buffer, Class)}
* whether it peaks into the buffer only, i.e. after the call, the buffer * whether it peaks into the buffer only, i.e. after the call, the buffer
* is still de-serializable. * is still de-serializable.
*/ */
Expand All @@ -106,8 +108,7 @@ public void testIsEventPeakOnly() throws Exception {
try { try {
final ClassLoader cl = getClass().getClassLoader(); final ClassLoader cl = getClass().getClassLoader();
assertTrue( assertTrue(
EventSerializer EventSerializer.isEvent(serializedEvent, EndOfPartitionEvent.class));
.isEvent(serializedEvent, EndOfPartitionEvent.class, cl));
EndOfPartitionEvent event = (EndOfPartitionEvent) EventSerializer EndOfPartitionEvent event = (EndOfPartitionEvent) EventSerializer
.fromBuffer(serializedEvent, cl); .fromBuffer(serializedEvent, cl);
assertEquals(EndOfPartitionEvent.INSTANCE, event); assertEquals(EndOfPartitionEvent.INSTANCE, event);
Expand All @@ -117,7 +118,7 @@ public void testIsEventPeakOnly() throws Exception {
} }


/** /**
* Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} returns * Tests {@link EventSerializer#isEvent(Buffer, Class)} returns
* the correct answer for various encoded event buffers. * the correct answer for various encoded event buffers.
*/ */
@Test @Test
Expand All @@ -130,36 +131,48 @@ public void testIsEvent() throws Exception {
new CancelCheckpointMarker(287087987329842L) new CancelCheckpointMarker(287087987329842L)
}; };


Class[] expectedClasses = Arrays.stream(events)
.map(AbstractEvent::getClass)
.toArray(Class[]::new);

for (AbstractEvent evt : events) { for (AbstractEvent evt : events) {
for (AbstractEvent evt2 : events) { for (Class<?> expectedClass: expectedClasses) {
if (evt == evt2) { if (expectedClass.equals(TestTaskEvent.class)) {
assertTrue(checkIsEvent(evt, evt2.getClass())); try {
checkIsEvent(evt, expectedClass);
fail("This should fail");
}
catch (UnsupportedOperationException ex) {
// expected
}
}
else if (evt.getClass().equals(expectedClass)) {
assertTrue(checkIsEvent(evt, expectedClass));
} else { } else {
assertFalse(checkIsEvent(evt, evt2.getClass())); assertFalse(checkIsEvent(evt, expectedClass));
} }
} }
} }
} }


/** /**
* Returns the result of * Returns the result of
* {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} on a buffer * {@link EventSerializer#isEvent(Buffer, Class)} on a buffer
* that encodes the given <tt>event</tt>. * that encodes the given <tt>event</tt>.
* *
* @param event the event to encode * @param event the event to encode
* @param eventClass the event class to check against * @param eventClass the event class to check against
* *
* @return whether {@link EventSerializer#isEvent(ByteBuffer, Class, ClassLoader)} * @return whether {@link EventSerializer#isEvent(ByteBuffer, Class)}
* thinks the encoded buffer matches the class * thinks the encoded buffer matches the class
*/ */
private boolean checkIsEvent( private boolean checkIsEvent(
AbstractEvent event, AbstractEvent event,
Class<? extends AbstractEvent> eventClass) throws IOException { Class<?> eventClass) throws IOException {


final Buffer serializedEvent = EventSerializer.toBuffer(event); final Buffer serializedEvent = EventSerializer.toBuffer(event);
try { try {
final ClassLoader cl = getClass().getClassLoader(); return EventSerializer.isEvent(serializedEvent, eventClass);
return EventSerializer.isEvent(serializedEvent, eventClass, cl);
} finally { } finally {
serializedEvent.recycleBuffer(); serializedEvent.recycleBuffer();
} }
Expand Down

0 comments on commit 767027f

Please sign in to comment.