Skip to content

Commit

Permalink
first imple of generics (https://issues.jboss.org/browse/JGRP-1141)
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed May 3, 2011
1 parent 7411b23 commit 7bb4327
Show file tree
Hide file tree
Showing 25 changed files with 352 additions and 220 deletions.
46 changes: 46 additions & 0 deletions doc/API_Changes.txt
Expand Up @@ -49,6 +49,52 @@ API changes in 3.0.0

- Removed Address.isMulticastAddress(): a null address is a multicast address

- Rsp, RspList, RpcDispatcher.callRemoteMethodsXXX() and MessageDispatcher.cast/sendXXX() methods now use
generics, so the code below:

RspList rsps=disp.callRemoteMethods(null, call, new RequestOptions(ResponseMode.GET_ALL, 5000));
for(Map.Entry<Address,Rsp> rsp: rsps.entrySet())
System.out.println(rsp.getKey() + ": " + rsp.getValue().getValue());

has to be changed to:

RspList<Date> rsps=disp.callRemoteMethods(null, call, new RequestOptions(ResponseMode.GET_ALL, 5000));
for(Map.Entry<Address,Rsp<Date>> rsp: rsps.entrySet())
System.out.println(rsp.getKey() + ": " + rsp.getValue().getValue());

- RpcDispatcher.callRemoteMethod() now throw an exception if the target method threw an exception

- RpcDispatcher.callRemoteMethods() return a RspList. If a member P threw an exception, then the Rsp for
P will have it in the field 'exception'. The 'result' field is *not* used for exceptions any longer !

An example to check for exceptions is:


Multicast:

RspList<Long> rsps=dispatcher.callRemoteMethods(null, "foo", null, null, new RequestOptions(...));
for(Rsp<Long> rsp: rsps.values) {
if(rsp.getException() != null) {
// we have an exception
}
else {
Long val=rsp.getValue();
// do something with the value
}
}

Unicast:

try {
Rsp<Long> rsp=disp.callRemoteMethod(target, "foo", null, null, new RequestOptions(...));
Long val=rsp.getValue();
// do something with the return value
}
catch(Throwable t) {
// "foo" threw an exception
}





Expand Down
46 changes: 25 additions & 21 deletions src/org/jgroups/blocks/GroupRequest.java
Expand Up @@ -48,11 +48,11 @@
*
* @author Bela Ban
*/
public class GroupRequest extends Request {
public class GroupRequest<T> extends Request {

/** Correlates requests and responses */
@GuardedBy("lock")
private final Map<Address,Rsp> requests;
private final Map<Address,Rsp<T>> requests;

@GuardedBy("lock")
int num_received, num_not_received, num_suspected;
Expand All @@ -72,21 +72,21 @@ public class GroupRequest extends Request {
public GroupRequest(Message msg, RequestCorrelator corr, Collection<Address> targets, RequestOptions options) {
super(msg, corr, null, options);
int size=targets.size();
requests=new HashMap<Address,Rsp>(size);
requests=new HashMap<Address,Rsp<T>>(size);
setTargets(targets);
}

public GroupRequest(Message m, RequestCorrelator corr, Address target, RequestOptions options) {
super(m, corr, null, options);
requests=new HashMap<Address,Rsp>(1);
requests=new HashMap<Address,Rsp<T>>(1);
setTarget(target);
}


public GroupRequest(Message m, Transport transport, Collection<Address> mbrs, RequestOptions options) {
super(m, null, transport, options);
int size=mbrs.size();
requests=new HashMap<Address,Rsp>(size);
requests=new HashMap<Address,Rsp<T>>(size);
setTargets(mbrs);
}

Expand All @@ -111,18 +111,22 @@ public void sendRequest() throws Exception {
* Adds a response to the response table. When all responses have been received,
* <code>execute()</code> returns.
*/
public void receiveResponse(Object response_value, Address sender) {
public void receiveResponse(Object response_value, Address sender, boolean is_exception) {
if(done)
return;
Rsp rsp=requests.get(sender);
Rsp<T> rsp=requests.get(sender);
if(rsp == null)
return;

RspFilter rsp_filter=options.getRspFilter();
boolean responseReceived=false;
if(!rsp.wasReceived()) {
if((responseReceived=(rsp_filter == null) || rsp_filter.isAcceptable(response_value, sender)))
rsp.setValue(response_value);
if((responseReceived=(rsp_filter == null) || rsp_filter.isAcceptable(response_value, sender))) {
if(is_exception && response_value instanceof Throwable)
rsp.setException((Throwable)response_value);
else
rsp.setValue((T)response_value);
}
rsp.setReceived(responseReceived);
}

Expand Down Expand Up @@ -155,7 +159,7 @@ public void suspect(Address suspected_member) {
return;

boolean changed=false;
Rsp rsp=requests.get(suspected_member);
Rsp<T> rsp=requests.get(suspected_member);
if(rsp != null) {
if(rsp.setSuspected(true)) {
rsp.setValue(null);
Expand Down Expand Up @@ -204,10 +208,10 @@ public void viewChange(View new_view) {

lock.lock();
try {
for(Map.Entry<Address,Rsp> entry: requests.entrySet()) {
for(Map.Entry<Address,Rsp<T>> entry: requests.entrySet()) {
Address mbr=entry.getKey();
if(!mbrs.contains(mbr)) {
Rsp rsp=entry.getValue();
Rsp<T> rsp=entry.getValue();
rsp.setValue(null);
if(rsp.setSuspected(true)) {
num_suspected++;
Expand All @@ -231,14 +235,14 @@ public void viewChange(View new_view) {


/** Returns the results as a RspList */
public RspList getResults() {
Collection<Rsp> rsps=requests.values();
return new RspList(rsps);
public RspList<T> getResults() {
Collection<Rsp<T>> rsps=requests.values();
return new RspList<T>(rsps);
}



public RspList get() throws InterruptedException, ExecutionException {
public RspList<T> get() throws InterruptedException, ExecutionException {
lock.lock();
try {
waitForResults(0);
Expand All @@ -249,7 +253,7 @@ public RspList get() throws InterruptedException, ExecutionException {
return getResults();
}

public RspList get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
public RspList<T> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
boolean ok;
lock.lock();
try {
Expand All @@ -269,9 +273,9 @@ public String toString() {

if(!requests.isEmpty()) {
ret.append(", entries:\n");
for(Map.Entry<Address,Rsp> entry: requests.entrySet()) {
for(Map.Entry<Address,Rsp<T>> entry: requests.entrySet()) {
Address mbr=entry.getKey();
Rsp rsp=entry.getValue();
Rsp<T> rsp=entry.getValue();
ret.append(mbr).append(": ").append(rsp).append("\n");
}
}
Expand All @@ -282,13 +286,13 @@ public String toString() {
/* --------------------------------- Private Methods -------------------------------------*/

private void setTarget(Address mbr) {
requests.put(mbr, new Rsp(mbr));
requests.put(mbr, new Rsp<T>(mbr));
num_not_received=1;
}

private void setTargets(Collection<Address> mbrs) {
for(Address mbr: mbrs)
requests.put(mbr, new Rsp(mbr));
requests.put(mbr, new Rsp<T>(mbr));
num_not_received=requests.size();
}

Expand Down
24 changes: 14 additions & 10 deletions src/org/jgroups/blocks/MessageDispatcher.java
Expand Up @@ -236,18 +236,18 @@ else if (canReplace) {
* @return
* @since 2.9
*/
public RspList castMessage(final Collection<Address> dests, Message msg, RequestOptions options) {
GroupRequest req=cast(dests, msg, options, true);
public <T> RspList<T> castMessage(final Collection<Address> dests, Message msg, RequestOptions options) {
GroupRequest<T> req=cast(dests, msg, options, true);
return req != null? req.getResults() : RspList.EMPTY_RSP_LIST;
}


public NotifyingFuture<RspList> castMessageWithFuture(final Collection<Address> dests, Message msg, RequestOptions options) {
GroupRequest req=cast(dests, msg, options, false);
public <T> NotifyingFuture<RspList<T>> castMessageWithFuture(final Collection<Address> dests, Message msg, RequestOptions options) {
GroupRequest<T> req=cast(dests, msg, options, false);
return req != null? req : new NullFuture<RspList>(RspList.EMPTY_RSP_LIST);
}

protected GroupRequest cast(final Collection<Address> dests, Message msg, RequestOptions options, boolean block_for_results) {
protected <T> GroupRequest<T> cast(final Collection<Address> dests, Message msg, RequestOptions options, boolean block_for_results) {
List<Address> real_dests;

// we need to clone because we don't want to modify the original
Expand Down Expand Up @@ -289,7 +289,7 @@ protected GroupRequest cast(final Collection<Address> dests, Message msg, Reques
return null;
}

GroupRequest req=new GroupRequest(msg, corr, real_dests, options);
GroupRequest<T> req=new GroupRequest<T>(msg, corr, real_dests, options);
if(options != null) {
req.setResponseFilter(options.getRspFilter());
req.setAnycasting(options.getAnycasting());
Expand All @@ -315,7 +315,7 @@ public void done(long req_id) {



public Object sendMessage(Message msg, RequestOptions opts) throws TimeoutException, SuspectedException {
public <T> T sendMessage(Message msg, RequestOptions opts) throws Throwable {
Address dest=msg.getDest();
if(dest == null) {
if(log.isErrorEnabled())
Expand All @@ -329,7 +329,7 @@ public Object sendMessage(Message msg, RequestOptions opts) throws TimeoutExcept
msg.setScope(opts.getScope());
}

UnicastRequest req=new UnicastRequest(msg, corr, dest, opts);
UnicastRequest<T> req=new UnicastRequest<T>(msg, corr, dest, opts);
try {
req.execute();
}
Expand All @@ -340,9 +340,13 @@ public Object sendMessage(Message msg, RequestOptions opts) throws TimeoutExcept
if(opts != null && opts.getMode() == ResponseMode.GET_NONE)
return null;

Rsp rsp=req.getResult();
Rsp<T> rsp=req.getResult();
if(rsp.wasSuspected())
throw new SuspectedException(dest);

if(rsp.getException() != null)
throw rsp.getException();

if(!rsp.wasReceived())
throw new TimeoutException("timeout sending message to " + dest);
return rsp.getValue();
Expand Down Expand Up @@ -379,7 +383,7 @@ public <T> NotifyingFuture<T> sendMessageWithFuture(Message msg, RequestOptions


/* ------------------------ RequestHandler Interface ---------------------- */
public Object handle(Message msg) {
public Object handle(Message msg) throws Throwable {
if(req_handler != null) {
return req_handler.handle(msg);
}
Expand Down
10 changes: 7 additions & 3 deletions src/org/jgroups/blocks/MultiRequest.java
Expand Up @@ -125,7 +125,7 @@ Rsp findResponse(Address target) {
* Adds a response to the response table. When all responses have been received,
* <code>execute()</code> returns.
*/
public void receiveResponse(Object response_value, Address sender) {
public void receiveResponse(Object response_value, Address sender, boolean is_exception) {
if(done)
return;
Rsp rsp=findResponse(sender);
Expand All @@ -135,8 +135,12 @@ public void receiveResponse(Object response_value, Address sender) {
RspFilter rsp_filter=options.getRspFilter();
boolean responseReceived=false;
if(!rsp.wasReceived()) {
if((responseReceived=(rsp_filter == null) || rsp_filter.isAcceptable(response_value, sender)))
rsp.setValue(response_value);
if((responseReceived=(rsp_filter == null) || rsp_filter.isAcceptable(response_value, sender))) {
if(is_exception && response_value instanceof Throwable)
rsp.setException((Throwable)response_value);
else
rsp.setValue(response_value);
}
rsp.setReceived(responseReceived);
}

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/blocks/ReplCache.java
Expand Up @@ -423,7 +423,7 @@ public V get(K key) {

// 3. Execute a cluster wide GET
try {
RspList rsps=disp.callRemoteMethods(null,
RspList<Object> rsps=disp.callRemoteMethods(null,
new MethodCall(GET, key),
new RequestOptions(ResponseMode.GET_ALL, call_timeout));
for(Rsp rsp: rsps.values()) {
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/blocks/Request.java
Expand Up @@ -99,7 +99,7 @@ public boolean execute() throws Exception {

protected abstract void sendRequest() throws Exception;

public abstract void receiveResponse(Object response_value, Address sender);
public abstract void receiveResponse(Object response_value, Address sender, boolean is_exception);

public abstract void viewChange(View new_view);

Expand Down

0 comments on commit 7bb4327

Please sign in to comment.