Skip to content

Conversation

@klion26
Copy link
Member

@klion26 klion26 commented Jun 26, 2019

What is the purpose of the change

Currently it is non-blocking in case of credit-based flow control (default), however for SpilledBufferOrEventSequence it is blocking on reading from file. We might want to consider reimplementing it to be non blocking with CompletableFuture<?> isAvailable() method.

Otherwise we will block mailbox processing for the duration of reading from file - for example we will block processing time timers and potentially in the future network flushes.

Brief change log

Use a ByteBuffer pool to read the BufferOrEvent asynchronous.

  • the pool size if default 2, can be configurated by key taskmanager.memory.async-load.buffer-count
  • will reuse the read thread in IOManager

Verifying this change

This change is already covered by existing tests, such as:

  • SpilledBufferOrEventSequenceTest.java

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@klion26 klion26 changed the title [FLINK-12536][coordinator]Make BufferOrEventSequence#getNext non blocking [FLINK-12536][Runtime/Network]Make BufferOrEventSequence#getNext non blocking Jun 26, 2019
@klion26
Copy link
Member Author

klion26 commented Jun 27, 2019

Travis build failed seems irrelevant, have filed a issue to track it.

Test testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase) failed with:
java.lang.NullPointerException: java.lang.NullPointerException
	at org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics.getAggregateAppResourceUsage(RMAppAttemptMetrics.java:128)
	at org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl.getApplicationResourceUsageReport(RMAppAttemptImpl.java:900)
	at org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl.createAndGetApplicationReport(RMAppImpl.java:660)
	at org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplications(ClientRMService.java:930)
	at org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplications(ApplicationClientProtocolPBServiceImpl.java:273)
	at org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:507)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:847)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:790)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2486)

Copy link
Contributor

@StephanEwen StephanEwen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I also mentioned in the JIRA discussion I am unsure about this change.

This is adding a lot of extra complexity for a legacy code path for a situation where we don't yet know whether this is a problem in practice.
When trading off overall project code complexity with possibly improved behavior, I am not sure we should change this here.

I think we need to solve that discussion first...

*/
public static void readFully(FileChannel fileChannel, ByteBuffer buffer) throws IOException {
int toRead = buffer.limit() - buffer.position();
while (toRead > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified to while (buffer.hasRemaining())


public abstract BufferFileReader createBufferFileReader(FileIOChannel.ID channelID, RequestDoneCallback<Buffer> callback) throws IOException;

public abstract BufferFileReader createBufferOrEventFileReader(FileIOChannel.ID channelID, RequestDoneCallback<Buffer> callback) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go forward, I would suggest to factor this out into an interface (that is also implemented by IOManager) so we do not pass such a big dependency. This component does not need to know about the IOManager as a whole if it is only interested in this sub-functionality.


private void readBufferOrEventMeta(ByteBuffer buffer) {
// ignore channel.
buffer.order(ByteOrder.LITTLE_ENDIAN);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing the values through fields seems like passing information sideways or through side effects.
If you want to avoid object creation for the return type, I would suggest to look at splitting the methods differently.

@klion26
Copy link
Member Author

klion26 commented Jul 3, 2019

@StephanEwen thanks for the comments, I'll continue after the discussion has been solved.

@klion26 klion26 closed this Jun 1, 2020
@klion26 klion26 deleted the 12536_Make_BufferOrEventSequence_getNExt_non_blocking branch June 1, 2020 05:52
@klion26
Copy link
Member Author

klion26 commented Jun 1, 2020

closed the pr, as this issue won't do.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants