Skip to content

Commit

Permalink
- using cache for remote addresses
Browse files Browse the repository at this point in the history
- in down(), all unicast destinations check if they're part of the remote cache: if yes, forward to other cluster
  • Loading branch information
belaban committed Feb 16, 2011
1 parent 7448678 commit c0997ac
Showing 1 changed file with 27 additions and 1 deletion.
28 changes: 27 additions & 1 deletion src/org/jgroups/protocols/RELAY.java
Expand Up @@ -2,6 +2,7 @@

import org.jgroups.*;
import org.jgroups.annotations.*;
import org.jgroups.blocks.LazyRemovalSet;
import org.jgroups.stack.IpAddress;
import org.jgroups.stack.Protocol;
import org.jgroups.util.*;
Expand Down Expand Up @@ -53,6 +54,10 @@ public class RELAY extends Protocol {
protected boolean present_global_views=true;


@Property(description="Max size of the cache for remote addresses")
protected int cache_size=200;


/* --------------------------------------------- Fields ------------------------------------------------ */
protected Address local_addr;
@ManagedAttribute
Expand All @@ -79,6 +84,7 @@ public class RELAY extends Protocol {

protected TimeScheduler timer;

protected LazyRemovalSet<Address> remote_cache;



Expand Down Expand Up @@ -108,10 +114,25 @@ public String getGlobalView() {
return global_view != null? global_view.toString() : "n/a";
}

@ManagedOperation(description="Prints the contents of the remote cache")
public String printRemoteCache() {
return remote_cache.printCache(new LazyRemovalSet.Printable<Address>() {
public String print(Address val) {
return val.toString();
}
});
}

@ManagedOperation(description="Evicts all elements in the remote cache which are marked as removable")
public void evictRemoteCache() {
remote_cache.removeMarkedElements(false);
}


public void init() throws Exception {
super.init();
timer=getTransport().getTimer();
remote_cache=new LazyRemovalSet<Address>(cache_size, 300000);
}

public void stop() {
Expand All @@ -127,7 +148,7 @@ public Object down(Event evt) {

// forward non local destinations to the coordinator, to relay to the remote cluster
// if(dest instanceof ProxyAddress || !local_view.containsMember(dest)) {
if(remote_view != null && remote_view.containsMember(dest)) {
if(remote_cache.contains(dest)) {
forwardToCoord(msg);
return null;
}
Expand All @@ -148,6 +169,7 @@ public Object down(Event evt) {
case Event.GET_PHYSICAL_ADDRESS:
// fix to prevent exception by JBossAS, which checks whether a physical
// address is present and throw an ex if not
// Remove this when the AS code removes that check
PhysicalAddress addr=(PhysicalAddress)down_prot.down(evt);
if(addr == null)
addr=new IpAddress(6666);
Expand Down Expand Up @@ -267,6 +289,10 @@ protected Object installView(byte[] buf, int offset, int length) {
UUID.add(data.uuids);

remote_view=data.remote_view;
if(data.remote_view != null) {
remote_cache.add(data.remote_view.getMembers());
remote_cache.retainAll(data.remote_view.getMembers());
}
if(global_view == null || (data.global_view != null &&!global_view.equals(data.global_view))) {
global_view=data.global_view;
synchronized(this) {
Expand Down

0 comments on commit c0997ac

Please sign in to comment.