Skip to content

Commit

Permalink
- Preventing chaining of RspFilters with NoMuxHandlerRspFilter (https…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Sep 28, 2011
1 parent af99c7e commit d270553
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 1 deletion.
7 changes: 7 additions & 0 deletions src/org/jgroups/blocks/mux/MuxRpcDispatcher.java
Expand Up @@ -97,9 +97,16 @@ public void stop() {
super.stop();
}

/**
@Override
protected <T> GroupRequest<T> cast(Collection<Address> dests, Message msg, RequestOptions options, boolean blockForResults) throws Exception {
RspFilter filter = options.getRspFilter();
return super.cast(dests, msg, options.setRspFilter((filter != null) ? new NoMuxHandlerRspFilter(filter) : new NoMuxHandlerRspFilter()), blockForResults);
}*/

@Override
protected <T> GroupRequest <T> cast(Collection<Address> dests, Message msg, RequestOptions options, boolean blockForResults) throws Exception {
RspFilter filter = options.getRspFilter();
return super.cast(dests, msg, options.setRspFilter(NoMuxHandlerRspFilter.createInstance(filter)), blockForResults);
}
}
12 changes: 11 additions & 1 deletion src/org/jgroups/blocks/mux/NoMuxHandlerRspFilter.java
Expand Up @@ -18,7 +18,17 @@ public NoMuxHandlerRspFilter() {
public NoMuxHandlerRspFilter(RspFilter filter) {
this.filter = filter;
}


public static RspFilter createInstance(RspFilter filter) {
if(filter instanceof NoMuxHandlerRspFilter)
return filter;
return new NoMuxHandlerRspFilter(filter) ;
}

public RspFilter getFilter() {
return filter;
}

@Override
public boolean isAcceptable(Object response, Address sender) {
return !(response instanceof NoMuxHandler) && ((filter == null) || filter.isAcceptable(response, sender));
Expand Down
55 changes: 55 additions & 0 deletions tests/junit-functional/org/jgroups/tests/RequestOptionsTest.java
@@ -0,0 +1,55 @@
package org.jgroups.tests;

import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.blocks.*;
import org.jgroups.blocks.mux.MuxRpcDispatcher;
import org.jgroups.blocks.mux.NoMuxHandlerRspFilter;
import org.jgroups.util.Util;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


@Test(groups=Global.FUNCTIONAL,sequential=true)
public class RequestOptionsTest {
protected JChannel channel;
protected RequestOptions reqOpt = new RequestOptions(ResponseMode.GET_ALL, 5000);
protected static final String simple_props="SHARED_LOOPBACK:PING(timeout=1000):" +
"pbcast.NAKACK(log_discard_msgs=false;log_not_found_msgs=false)" +
":UNICAST:pbcast.STABLE(stability_delay=200):pbcast.GMS:MFC:UFC:FRAG2";


@BeforeMethod
protected void start() throws Exception {
channel=new JChannel(simple_props);
}

protected void stop() throws Exception {
Util.close(channel);
}

/**
* Tests https://issues.jboss.org/browse/JGRP-1369
*/
public void testRequestOptionsChaining() throws Exception {
MuxRpcDispatcher muxRpc = new MuxRpcDispatcher((short) 1, channel, null, null, new Server());
channel.connect("group");
for(int i=0; i < 20; i++)
muxRpc.callRemoteMethods(null, new MethodCall(Server.class.getMethod("foo", null)), reqOpt);

RspFilter filter=reqOpt.getRspFilter();
int count=count(filter);
System.out.println("count=" + count);
assert count == 1;
}

protected static int count(RspFilter filter) {
if(filter instanceof NoMuxHandlerRspFilter)
return 1 + count(((NoMuxHandlerRspFilter)filter).getFilter());
return 0;
}

static public class Server {
public static void foo() {System.out.println("Entering foo"); }
}
}

0 comments on commit d270553

Please sign in to comment.