Skip to content

Conversation

NicoK
Copy link
Contributor

@NicoK NicoK commented Nov 14, 2016

Only serialise events once in RecordWriter#broadcastEvent.

While adapting this, also clean up some related APIs which is now unused or used similar patterns.

@zentol
Copy link
Contributor

zentol commented Nov 14, 2016

Please adjust the PR title to the following format: [FLINK-5059] <issue title/description>, this allows the mirroring of comments made here to JIRA.

final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
for (ResultPartitionWriter output : getEnvironment().getAllWriters()) {
output.writeEventToAllChannels(message);
final Buffer eventBuffer = EventSerializer.toBuffer(message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add a method to the ResultPartitionWriter that encapsulates this behavior? The IterationHeadTask now contains an exact copy of this code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was the case before and I could have adapted ResultPartitionWriter#writeEventToAllChannels() accordingly. The question is, however, whether we want ResultPartitionWriter to be aware of the difference between events and buffers or offer a cleaner API that is based on buffers only...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in any case the entire try/finally block could be moved into the ResultPartitionWriter, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's right - Let's go with writeBufferToAllChannels(Buffer) instead.

@NicoK NicoK changed the title Flink 5059 [FLINK-5059] only serialise events once in RecordWriter#broadcastEvent Nov 15, 2016
@NicoK NicoK force-pushed the FLINK-5059 branch 3 times, most recently from 5df291d to a96c169 Compare November 15, 2016 09:31
Copy link
Contributor

@uce uce left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes look good to me in general. It's a very low hanging fruit to only serialize the event buffers once since they don't come out of the buffer pools anyways. +1 to merge this after the comments are addressed.

}

private <T> void addBufferToOutputList(RecordDeserializer<DeserializationDelegate<T>> recordDeserializer,
NonReusingDeserializationDelegate<T> delegate, Buffer buffer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In test scope, this is not a problem, but in main scope the style guidelines (and check style plugin) do not allow leading whitespace. Personally, I don't think it's good practice to differentiate between tests and main code. You might consider removing the leading space here, too (for example one argument per line).

}
}

private <T> void addBufferToOutputList(RecordDeserializer<DeserializationDelegate<T>> recordDeserializer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that it was commented before, but you might want to add a high level comment about what happens here.

if (buffer != null) {
writeAndClearBuffer(buffer, targetChannel, serializer);
// retain the buffer so that it can be recycled by each channel of targetPartition
eventBuffer.retain();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add a special RecordWriterTest that ensure that the reference counting logic works.

@Test
public void testBroadcastEventBufferReferenceCounting() throws Exception {
  Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);

  // Partial mocking of static method...
  PowerMockito.stub(PowerMockito.method(EventSerializer.class, "toBuffer")).toReturn(buffer);

  @SuppressWarnings("unchecked")
  ArrayDeque<BufferOrEvent>[] queues = new ArrayDeque[] { new ArrayDeque(), new ArrayDeque() };

  ResultPartitionWriter partition = createCollectingPartitionWriter(queues, new TestInfiniteBufferProvider());
  RecordWriter<?> writer = new RecordWriter<>(partition);

  writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);

  // Verify added to all queues
  assertEquals(1, queues[0].size());
  assertEquals(1, queues[1].size());

  assertTrue(buffer.isRecycled());
}

You have to adjust the createCollectingPartitionWriter to correctly recycle event buffers and replace the PrepareForTest class annotation with @PrepareForTest({ResultPartitionWriter.class, EventSerializer.class}).


Not changed in this PR, but to work correctly this relies on the ResultPartition to recycle the buffer if the add calls fails. It might make sense to add a special test (to ResultPartitionTest or RecordWriterTest) where we ensure that this actually happens to guard against future behaviour changes in ResultPartition. A possible better behaviour would be to let the RecordWriter recycle it if an Exception occurs. This should be addressed in a different PR though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thanks

I opened [FLINK-5277] for the ResultPartition#add test: https://issues.apache.org/jira/browse/FLINK-5277

try {
for (int targetChannel = 0; targetChannel < partition.getNumberOfSubpartitions(); targetChannel++) {
// retain the buffer so that it can be recycled by each channel of targetPartition
eventBuffer.retain();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comments as in the RecordWriter apply here.

} else {
// is event:
AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
Integer targetChannel = (Integer) invocationOnMock.getArguments()[1];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was missing before, too, but we should recycle the buffer after creating the event.

@uce
Copy link
Contributor

uce commented Dec 12, 2016

Thanks for addressing the comments, Nico. Going to merge this with the next batch.

uce pushed a commit to uce/flink that referenced this pull request Dec 12, 2016
uce pushed a commit to uce/flink that referenced this pull request Jan 9, 2017
@asfgit asfgit closed this in 9cff8c9 Jan 9, 2017
liuyuzhong7 pushed a commit to liuyuzhong7/flink that referenced this pull request Jan 17, 2017
joseprupi pushed a commit to joseprupi/flink that referenced this pull request Feb 12, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants