Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binding attachment before a clusterInvoker invoke. #2024

Merged
merged 3 commits into from
Jul 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.cluster.Directory;
Expand All @@ -33,6 +35,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -225,6 +228,13 @@ private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance = null;

// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}

List<Invoker<T>> invokers = list(invocation);
if (invokers != null && !invokers.isEmpty()) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,50 +57,55 @@ public ForkingClusterInvoker(Directory<T> directory) {
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<Invoker<T>>();
for (int i = 0; i < forks; i++) {
// TODO. Add some comment here, refer chinese version for more details.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
selected.add(invoker);
try {
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<Invoker<T>>();
for (int i = 0; i < forks; i++) {
// TODO. Add some comment here, refer chinese version for more details.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
selected.add(invoker);
}
}
}
}
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(e);
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LinkedBlockingQueue You should give an initialization size, The default value should not be used: Integer.max_value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Integer.max_value has problems?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the queue producer is faster than the consumer, and the queue is not fully blocked, memory consumption is high

Copy link
Member Author

@carryxyh carryxyh Jul 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. In this case, it will only be consumed once.
  2. This queue is a linked queue so why the memory consumption is high?
  3. The producer count is controllable and every producer only produce result once.

So it isn't a problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of producers is controllable, and that should be fine

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, that make sence.
In this pr, it is to fix the clearing RpcContext.
Maybe next pr to fix the capacity problem :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GOOD JOB

for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(e);
}
}
}
});
}
try {
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
});
}
try {
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
} finally {
// clear attachments which is binding to current thread.
RpcContext.getContext().clearAttachments();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.alibaba.dubbo.rpc.cluster.Directory;
Expand Down Expand Up @@ -132,6 +133,31 @@ protected Result doInvoke(Invocation invocation, List invokers, LoadBalance load

}

@Test
public void testBindingAttachment() {
final String attachKey = "attach";
final String attachValue = "value";

// setup attachment
RpcContext.getContext().setAttachment(attachKey, attachValue);
Map<String, String> attachments = RpcContext.getContext().getAttachments();
Assert.assertTrue("set attachment failed!", attachments != null && attachments.size() == 1);

cluster = new AbstractClusterInvoker(dic) {
@Override
protected Result doInvoke(Invocation invocation, List invokers, LoadBalance loadbalance)
throws RpcException {
// attachment will be bind to invocation
String value = invocation.getAttachment(attachKey);
Assert.assertTrue("binding attachment failed!", value != null && value.equals(attachValue));
return null;
}
};

// invoke
cluster.invoke(invocation);
}

@Test
public void testSelect_Invokersize0() throws Exception {
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void testNoInvoke() {
* then we should reselect from the latest invokers before retry.
*/
@Test
public void testInvokerDestoryAndReList() {
public void testInvokerDestroyAndReList() {
final URL url = URL.valueOf("test://localhost/" + Demo.class.getName() + "?loadbalance=roundrobin&retries=" + retries);
RpcException exception = new RpcException(RpcException.TIMEOUT_EXCEPTION);
MockInvoker<Demo> invoker1 = new MockInvoker<Demo>(Demo.class, url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,29 @@
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.alibaba.dubbo.rpc.RpcResult;
import com.alibaba.dubbo.rpc.cluster.Directory;

import junit.framework.Assert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

/**
* ForkingClusterInvokerTest
*
*/
@SuppressWarnings("unchecked")
public class ForkingClusterInvokerTest {
Expand Down Expand Up @@ -71,6 +75,7 @@ public void setUp() throws Exception {
invokers.add(invoker3);

}

private void resetInvokerToException() {
given(invoker1.invoke(invocation)).willThrow(new RuntimeException());
given(invoker1.getUrl()).willReturn(url);
Expand Down Expand Up @@ -106,7 +111,7 @@ private void resetInvokerToNoException() {
}

@Test
public void testInvokeExceptoin() {
public void testInvokeException() {
resetInvokerToException();
ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
dic);
Expand All @@ -115,20 +120,44 @@ public void testInvokeExceptoin() {
invoker.invoke(invocation);
Assert.fail();
} catch (RpcException expected) {
Assert.assertTrue(expected.getMessage().contains("Failed to forking invoke provider"));
assertThat(expected.getMessage().contains("Failed to forking invoke provider"), is(true));
assertFalse(expected.getCause() instanceof RpcException);
}
}

@Test
public void testClearRpcContext() {
resetInvokerToException();
ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
dic);

String attachKey = "attach";
String attachValue = "value";

RpcContext.getContext().setAttachment(attachKey, attachValue);

Map<String, String> attachments = RpcContext.getContext().getAttachments();
assertThat("set attachment failed!", attachments != null && attachments.size() == 1, is(true));
try {
invoker.invoke(invocation);
Assert.fail();
} catch (RpcException expected) {
assertThat(expected.getMessage().contains("Failed to forking invoke provider"), is(true));
assertFalse(expected.getCause() instanceof RpcException);
}
Map<String, String> afterInvoke = RpcContext.getContext().getAttachments();
assertThat(afterInvoke != null && afterInvoke.size() == 0, is(true));
}

@Test()
public void testInvokeNoExceptoin() {
public void testInvokeNoException() {

resetInvokerToNoException();

ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
dic);
Result ret = invoker.invoke(invocation);
Assert.assertSame(result, ret);
assertSame(result, ret);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public Result invoke(Invocation inv) throws RpcException {
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null) {
if (contextAttachments != null && contextAttachments.size() != 0) {
/**
* invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
* because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
Expand Down