Skip to content

Conversation

@akalash
Copy link
Contributor

@akalash akalash commented Oct 8, 2021

What is the purpose of the change

This PR fixes NPE which happens when we try to use channel context inside of ClientHandler before the channel was activated

Brief change log

  • Wait for the channel activation before creating partition request client
  • Ignore the exception during the throughput calculation in order to avoid the failure of the further calculation

Verifying this change

This change added tests and can be verified as follows:

  • Added test for CreditBasedPartitionRequestClientHandler

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 8, 2021

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 37cf758 (Fri Oct 08 15:42:04 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 8, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A higher level question. Do we really just stop debloating or do we trigger a failover?

Shouldn't the exception from the mailbox processor fail the task? Honestly, I am a bit hesitant to simply ignoring exceptions there and continue processing.

@dawidwys
Copy link
Contributor

Another question. Why does it happen only for announcing the new buffer size? Shouldn't we have the same issue for all other methods of CreditBasedPartitionRequestClientHandler such as e.g. notifyCreditAvailable?

@akalash
Copy link
Contributor Author

akalash commented Oct 11, 2021

Do we really just stop debloating or do we trigger a failover?

As I can see we just stopping debloating and no failover would be triggered

Shouldn't the exception from the mailbox processor fail the task?

I was also surprised but it looks like we just ignore this error(it is exactly what should be figured out later).

Why does it happen only for announcing the new buffer size?

if we have input data already then the this NPE is impossible, it is exactly what happens when we call other methods except for the new buffer size. So everything except the announcement of the buffer size is a reaction to the input data.

Shouldn't we have the same issue for all other methods of CreditBasedPartitionRequestClientHandler such as e.g. notifyCreditAvailable?

As you can see I made this NPE impossible by waiting for channel initialization on the start. So everything should work well now

Is it an error if we continue processing? IMO, at best it should be warn, I am event thinking if it shouldn't be just a debug. What are users supposed to do with the line in logs?

I don't really sure that we need this try/catch block at all. My main fix is about waiting for the initialization but these changes I made just in case, but perhaps it is much better to understand the flink behavior when we have exceptions inside of the action.( But in general, I agree that debug suits here more)

@dawidwys
Copy link
Contributor

dawidwys commented Oct 12, 2021

I found the reason why the exception was swallowed. Please see: https://issues.apache.org/jira/browse/FLINK-24515.

I think we should not swallow exceptions if debloating fails. Therefore I'd remove the try/catch block and replace the submit with execute in org.apache.flink.streaming.runtime.tasks.StreamTask#scheduleBufferDebloater:

    private void scheduleBufferDebloater() {
        // See https://issues.apache.org/jira/browse/FLINK-23560
        // If there are no input gates, there is no point of calculating the throughput and running
        // the debloater. At the same time, for SourceStreamTask using legacy sources and checkpoint
        // lock, enqueuing even a single mailbox action can cause performance regression. This is
        // especially visible in batch, with disabled checkpointing and no processing time timers.
        if (getEnvironment().getAllInputGates().length == 0) {
            return;
        }
        systemTimerService.registerTimer(
                systemTimerService.getCurrentProcessingTime() + bufferDebloatPeriod,
                timestamp ->
                        mainMailboxExecutor.execute(
                                () -> {
                                    debloat();
                                    scheduleBufferDebloater();
                                },
                                "Buffer size recalculation"));
    }

As for the waitForActivation, I need to think about it for a bit more.

@dawidwys
Copy link
Contributor

So I spent some time looking into the waitForChannelActivation thingy. I am a bit concern if the sync waiting does not prolong the startup too much. If I understand the code correctly, we will sequentially wait for each channel to become active before requesting a partition on another channel.

As the initial error shows it may take some time. I was rather thinking if we could return the future itself (I know we would've to pass it through a couple of layers). Then we could start buffer debloating once a future for all channels activation completes.
Something like:

        CompletableFuture.allOf(
                        Arrays.stream(environment.getAllInputGates())
                                .map(InputGate::getAllChannelsActivatedFuture)
                                .toArray(CompletableFuture[]::new))
                .whenComplete(
                        (r, ex) -> {
                            scheduleBufferDebloater();
                        });

@akalash
Copy link
Contributor Author

akalash commented Oct 14, 2021

I don't really sure about this proposal because we already create the client via clientBuilder, and then, by some reason, we need to wait on a future before the call of the one particular method. It looks pretty fragile because if tomorrow we will add one more method like that we will need to duplicate this waiting.
In my opinion, the problem is returning the unprepared client. I mean, in fact, when we create the PartitionRequestClient we can not use it as is but we should wait while it will be ready which seems wrong. At the same time, I agree with you that at least we can safely call requestPartition method without waiting for channel initialization.
So at least right now, I see two solutions:

  • Separating PartitionRequestClient into two classes one of them for requestPartiton while another for other things(this change looks pretty expensive and I don't think that it makes sense now)
  • Adding waiting for channel activation inside every methods that use the context:
Context receiveContext() {
   If(ctx == null) {
     channelActivationFuture.get();
   }
  return ctx;
}

void announceBufferSize(int bufferSize) {
 Context ctx = receiveContext();
 ctx.....
}

Only one open question for this solution - how to handle the interruptedException which is not typical for this place right now.

I will try to come up with other ideas.

@dawidwys
Copy link
Contributor

Another idea to throw into the mix:

Why do we even have the methods like: notifyCreditAvailable, notifyNewBufferSize, resumeConsumption and acknowledgeAllRecordsProcessed in the NetworkClientHandler. It seems to me those methods go against the purpose of ChannelHandler in netty. As far as I understand them, they are supposed to be reactive components that act on a certain event happening in the network stack. That is not the case with the aforementioned methods.

On the other hand we have the NettyPartitionRequestClient, which is an active client of the network stack, we can e.g. requestSubpartition or sendTaskEvent. I am wondering why don't we implement the methods like notifyNewBufferSize there?
Something like:

    @Override
    public void notifyNewBufferSize(RemoteInputChannel inputChannel, int bufferSize) {
        tcpChannel
                .pipeline()
                .fireUserEventTriggered(
                        new CreditBasedPartitionRequestClientHandler
                                .NewBufferSizeMessage(inputChannel, bufferSize));
    }

or (I am not sure about the threading model)

        tcpChannel
                .eventLoop()
                .execute(
                        () -> {
                            tcpChannel
                                    .pipeline()
                                    .fireUserEventTriggered(
                                            new CreditBasedPartitionRequestClientHandler
                                                    .NewBufferSizeMessage(
                                                    inputChannel, bufferSize));
                        });

I am sorry, I do not have much expertise on netty, so would be really nice to hear from @pnowojski on the idea above.

@pnowojski
Copy link
Contributor

Most likely we added notifyNewBufferSize to the ChannelHandler to follow the same path as notifyCreditAvailable. At least me myself, have not given much thought whether it should actually be implemented how you are proposing. Maybe you are right @dawidwys .

I don't know why notifyCreditAvailable was added the way it was added. Maybe that was also a wrong place to add it.

I haven't read the full thread, so I'm not sure. If we had changed the way you are proposing @dawidwys , would it solve/help solve the problems in this ticket?

@dawidwys
Copy link
Contributor

I understand notifyNewBufferSize, resumeConsumption and acknowledgeAllRecordsProcessed was added that way, because of how notifyCreditAvailable was introduced.

I think it was added inside of the NetworkClienHandler in order to have two parallel implementations for credit-based and the legacy flow control and there were already two classes for those. Overall, I think that was an unfortunate choice. I'd recommend trying out what I am suggesting. It should fix the issue of the thread, because we would use netty mechanisms exclusively and we would not depend that the activation message comes first. @akalash would you like to try it out?

@akalash
Copy link
Contributor Author

akalash commented Oct 18, 2021

@dawidwys, I will try to implement it this way.

@akalash akalash marked this pull request as draft October 18, 2021 16:24
@dawidwys
Copy link
Contributor

Looking at the test results it seems to be working just fine (e2e failure unrelated). How do you feel about the solution @akalash ?

If you see no obstacles, I'd suggest proceeding with it.

@akalash akalash marked this pull request as ready for review October 19, 2021 14:30
@akalash
Copy link
Contributor Author

akalash commented Oct 19, 2021

@dawidwys, thanks for the idea. I have checked it and it looks like it should work without any problem and this implementation looks more correct than current one. So let's wait for tests for the last commits and then you can take a look at it again.

Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some nitpicking on classes scope. I can fix it while merging. I'd just like to hear a confirmation.

}

private void sendToChannel(ClientOutboundMessage message) {
tcpChannel.eventLoop().execute(() -> tcpChannel.pipeline().fireUserEventTriggered(message));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you investigate if we need to submit it through the eventLoop or would it be enough to call tcpChannel.pipeline().fireUserEventTriggered(message)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I did. Before these changes, we always submitted it through the evenLoop so I don't see a reason why we should change that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine merging it as is. However, I am curious myself if it was/is necessary (I could not find an answer, but I have not searched for long). But, I am not too familiar with how netty works.

BTW, I've just looked into it again and if I go into: ChannelPipeline#fireUserEventTriggered -> DefaultPipeline#fireUserEventTriggered -> AbstractChannelHandlerContext.invokeUserEventTriggered(this.head, event); I can see a code like this:

    static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
        ObjectUtil.checkNotNull(event, "event");
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeUserEventTriggered(event);
        } else {
            executor.execute(new Runnable() {
                public void run() {
                    next.invokeUserEventTriggered(event);
                }
            });
        }

    }

which would indicate submitting it explicitly from the eventLoop is unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunatelly, it is not possible to get rid of tcpChannel.eventLoop().execute because executor.inEventLoop() is always true since this call happens in the network thread but we still don't want to block the current execution and we want to send the massage in the other thread(or in the same thread but later).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I can be wrong here and this problem can be related only to the test. But right now, I don't ready to answer more precisely.

…tworkClientHandler to NettyPartitioonRequestClient since it is not responsibility of handler to send them
Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dawidwys dawidwys merged commit 60f39ed into apache:master Oct 21, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants