Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1879] Prevent blocked by group transfer service #1881

Merged
merged 1 commit into from Apr 8, 2020

Conversation

duhenglucky
Copy link
Contributor

What is the purpose of the change

close issue

Brief changelog

XX

Verifying this change

XXXX

Follow this checklist to help us incorporate your contribution quickly and easily. Notice, it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR.

  • Make sure there is a Github issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
  • Format the pull request title like [ISSUE #123] Fix UnknownException when host config not exist. Each commit in the pull request should have a meaningful subject line and body.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in test module.
  • Run mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle to make sure basic checks pass. Run mvn clean install -DskipITs to make sure unit-test pass. Run mvn clean test-compile failsafe:integration-test to make sure integration-test pass.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.01%) to 50.933% when pulling 14969aa on duhenglucky:issue_1879 into 1558e05 on apache:develop.

@RongtongJin RongtongJin changed the title Prevent blocked by group transfer service [ISSUE#1879] Prevent blocked by group transfer service Mar 25, 2020
@RongtongJin RongtongJin changed the title [ISSUE#1879] Prevent blocked by group transfer service [ISSUE #1879] Prevent blocked by group transfer service Mar 25, 2020
Copy link
Contributor

@rushsky518 rushsky518 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Run example org.apache.rocketmq.example.quickstart.Producer to send a msg.

for debug, i add a sout here

@Override
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
	asyncProcessRequest(ctx, request).thenAcceptAsync(
			r -> {
				System.out.println(r);
				responseCallback.callback(r);
			},
			this.brokerController.getSendMessageExecutor());
}

debug result:

  1. org.apache.rocketmq.broker.processor.SendMessageProcessor#handlePutMessageResult is invoked in GroupCommitService thread.
{
  System.out.println(r);
  responseCallback.callback(r);
}

is invoked in SendMessageThread_1, and r is null

rushsky518 added a commit to rushsky518/rocketmq that referenced this pull request Apr 7, 2020
@duhenglucky
Copy link
Contributor Author

Run example org.apache.rocketmq.example.quickstart.Producer to send a msg.

for debug, i add a sout here

@Override
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
	asyncProcessRequest(ctx, request).thenAcceptAsync(
			r -> {
				System.out.println(r);
				responseCallback.callback(r);
			},
			this.brokerController.getSendMessageExecutor());
}

debug result:

  1. org.apache.rocketmq.broker.processor.SendMessageProcessor#handlePutMessageResult is invoked in GroupCommitService thread.
{
  System.out.println(r);
  responseCallback.callback(r);
}

is invoked in SendMessageThread_1, and r is null

r is null just because of the succeed send message response returned to the client directly by org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#doResponse in org.apache.rocketmq.broker.processor.SendMessageProcessor#handlePutMessageResult method and these responses will not be processed by AfterRPCHook, the code here is a little confusing

@rushsky518
Copy link
Contributor

Run example org.apache.rocketmq.example.quickstart.Producer to send a msg.
for debug, i add a sout here

@Override
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
	asyncProcessRequest(ctx, request).thenAcceptAsync(
			r -> {
				System.out.println(r);
				responseCallback.callback(r);
			},
			this.brokerController.getSendMessageExecutor());
}

debug result:

  1. org.apache.rocketmq.broker.processor.SendMessageProcessor#handlePutMessageResult is invoked in GroupCommitService thread.
{
  System.out.println(r);
  responseCallback.callback(r);
}

is invoked in SendMessageThread_1, and r is null

r is null just because of the succeed send message response returned to the client directly by org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#doResponse in org.apache.rocketmq.broker.processor.SendMessageProcessor#handlePutMessageResult method and these responses will not be processed by AfterRPCHook, the code here is a little confusing

Yes, that's why i apply thenApplyAsync method on SendMessageProcessor#handlePutMessageResult, it returns the real response

@duhenglucky
Copy link
Contributor Author

Run example org.apache.rocketmq.example.quickstart.Producer to send a msg.
for debug, i add a sout here

@Override
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
	asyncProcessRequest(ctx, request).thenAcceptAsync(
			r -> {
				System.out.println(r);
				responseCallback.callback(r);
			},
			this.brokerController.getSendMessageExecutor());
}

debug result:

  1. org.apache.rocketmq.broker.processor.SendMessageProcessor#handlePutMessageResult is invoked in GroupCommitService thread.
{
  System.out.println(r);
  responseCallback.callback(r);
}

is invoked in SendMessageThread_1, and r is null

r is null just because of the succeed send message response returned to the client directly by org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#doResponse in org.apache.rocketmq.broker.processor.SendMessageProcessor#handlePutMessageResult method and these responses will not be processed by AfterRPCHook, the code here is a little confusing

Yes, that's why i apply thenApplyAsync method on SendMessageProcessor#handlePutMessageResult, it returns the real response

yep, actually ctx.writeAndFlush is also a blocking operation, let us polish it in your PR.

@duhenglucky duhenglucky merged commit dd822ae into apache:develop Apr 8, 2020
GenerousMan pushed a commit to GenerousMan/rocketmq that referenced this pull request Aug 12, 2022
[ISSUE apache#1879] Prevent blocked by group transfer service
pulllock pushed a commit to pulllock/rocketmq that referenced this pull request Oct 19, 2023
[ISSUE apache#1879] Prevent blocked by group transfer service
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants