Skip to content

Commit

Permalink
Merge pull request #2024, binding attachment before a clusterInvoker …
Browse files Browse the repository at this point in the history
…invoke.

Fixes #1978
  • Loading branch information
carryxyh authored and chickenlj committed Jul 31, 2018
1 parent 0882c83 commit 0278a01
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.cluster.Directory;
Expand All @@ -33,6 +35,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -225,6 +228,13 @@ private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance = null;

// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}

List<Invoker<T>> invokers = list(invocation);
if (invokers != null && !invokers.isEmpty()) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,50 +57,55 @@ public ForkingClusterInvoker(Directory<T> directory) {
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<Invoker<T>>();
for (int i = 0; i < forks; i++) {
// TODO. Add some comment here, refer chinese version for more details.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
selected.add(invoker);
try {
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<Invoker<T>>();
for (int i = 0; i < forks; i++) {
// TODO. Add some comment here, refer chinese version for more details.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
selected.add(invoker);
}
}
}
}
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(e);
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(e);
}
}
}
});
}
try {
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
});
}
try {
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
} finally {
// clear attachments which is binding to current thread.
RpcContext.getContext().clearAttachments();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.alibaba.dubbo.rpc.cluster.Directory;
Expand Down Expand Up @@ -132,6 +133,31 @@ protected Result doInvoke(Invocation invocation, List invokers, LoadBalance load

}

@Test
public void testBindingAttachment() {
final String attachKey = "attach";
final String attachValue = "value";

// setup attachment
RpcContext.getContext().setAttachment(attachKey, attachValue);
Map<String, String> attachments = RpcContext.getContext().getAttachments();
Assert.assertTrue("set attachment failed!", attachments != null && attachments.size() == 1);

cluster = new AbstractClusterInvoker(dic) {
@Override
protected Result doInvoke(Invocation invocation, List invokers, LoadBalance loadbalance)
throws RpcException {
// attachment will be bind to invocation
String value = invocation.getAttachment(attachKey);
Assert.assertTrue("binding attachment failed!", value != null && value.equals(attachValue));
return null;
}
};

// invoke
cluster.invoke(invocation);
}

@Test
public void testSelect_Invokersize0() throws Exception {
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void testNoInvoke() {
* then we should reselect from the latest invokers before retry.
*/
@Test
public void testInvokerDestoryAndReList() {
public void testInvokerDestroyAndReList() {
final URL url = URL.valueOf("test://localhost/" + Demo.class.getName() + "?loadbalance=roundrobin&retries=" + retries);
RpcException exception = new RpcException(RpcException.TIMEOUT_EXCEPTION);
MockInvoker<Demo> invoker1 = new MockInvoker<Demo>(Demo.class, url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,29 @@
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.alibaba.dubbo.rpc.RpcResult;
import com.alibaba.dubbo.rpc.cluster.Directory;

import junit.framework.Assert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

/**
* ForkingClusterInvokerTest
*
*/
@SuppressWarnings("unchecked")
public class ForkingClusterInvokerTest {
Expand Down Expand Up @@ -71,6 +75,7 @@ public void setUp() throws Exception {
invokers.add(invoker3);

}

private void resetInvokerToException() {
given(invoker1.invoke(invocation)).willThrow(new RuntimeException());
given(invoker1.getUrl()).willReturn(url);
Expand Down Expand Up @@ -106,7 +111,7 @@ private void resetInvokerToNoException() {
}

@Test
public void testInvokeExceptoin() {
public void testInvokeException() {
resetInvokerToException();
ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
dic);
Expand All @@ -115,20 +120,44 @@ public void testInvokeExceptoin() {
invoker.invoke(invocation);
Assert.fail();
} catch (RpcException expected) {
Assert.assertTrue(expected.getMessage().contains("Failed to forking invoke provider"));
assertThat(expected.getMessage().contains("Failed to forking invoke provider"), is(true));
assertFalse(expected.getCause() instanceof RpcException);
}
}

@Test
public void testClearRpcContext() {
resetInvokerToException();
ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
dic);

String attachKey = "attach";
String attachValue = "value";

RpcContext.getContext().setAttachment(attachKey, attachValue);

Map<String, String> attachments = RpcContext.getContext().getAttachments();
assertThat("set attachment failed!", attachments != null && attachments.size() == 1, is(true));
try {
invoker.invoke(invocation);
Assert.fail();
} catch (RpcException expected) {
assertThat(expected.getMessage().contains("Failed to forking invoke provider"), is(true));
assertFalse(expected.getCause() instanceof RpcException);
}
Map<String, String> afterInvoke = RpcContext.getContext().getAttachments();
assertThat(afterInvoke != null && afterInvoke.size() == 0, is(true));
}

@Test()
public void testInvokeNoExceptoin() {
public void testInvokeNoException() {

resetInvokerToNoException();

ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
dic);
Result ret = invoker.invoke(invocation);
Assert.assertSame(result, ret);
assertSame(result, ret);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public Result invoke(Invocation inv) throws RpcException {
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null) {
if (contextAttachments != null && contextAttachments.size() != 0) {
/**
* invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
* because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
Expand Down

0 comments on commit 0278a01

Please sign in to comment.