Skip to content

Commit

Permalink
state transfer of counter values and versions to new backup coordinators
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Oct 6, 2011
1 parent 9ccbf0a commit 3e4f5a8
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 32 deletions.
36 changes: 20 additions & 16 deletions src/org/jgroups/protocols/COUNTER.java
Expand Up @@ -263,10 +263,6 @@ protected void handleRequest(Request req, Address sender) {
sendCounterNotFoundExceptionResponse(sender, ((SimpleRequest)req).owner, ((SimpleRequest)req).name); sendCounterNotFoundExceptionResponse(sender, ((SimpleRequest)req).owner, ((SimpleRequest)req).name);
return; return;
} }

System.out.println("<< INCR " + val);


result=val.addAndGet(((AddAndGetRequest)req).value); result=val.addAndGet(((AddAndGetRequest)req).value);
rsp=new ValueResponse(((SimpleRequest)req).owner, result[0], result[1]); rsp=new ValueResponse(((SimpleRequest)req).owner, result[0], result[1]);
sendResponse(sender, rsp); sendResponse(sender, rsp);
Expand All @@ -289,8 +285,6 @@ protected void handleRequest(Request req, Address sender) {


// return all values except those with lower or same versions than the ones in the ReconcileRequest // return all values except those with lower or same versions than the ones in the ReconcileRequest
ReconcileRequest reconcile_req=(ReconcileRequest)req; ReconcileRequest reconcile_req=(ReconcileRequest)req;
System.out.println(local_addr + " <-- RECONCILE from " + sender);

Map<String,VersionedValue> map=new HashMap<String,VersionedValue>(counters); Map<String,VersionedValue> map=new HashMap<String,VersionedValue>(counters);
if(reconcile_req.names != null) { if(reconcile_req.names != null) {
for(int i=0; i < reconcile_req.names.length; i++) { for(int i=0; i < reconcile_req.names.length; i++) {
Expand All @@ -317,14 +311,13 @@ protected void handleRequest(Request req, Address sender) {
} }


rsp=new ReconcileResponse(names, values, versions); rsp=new ReconcileResponse(names, values, versions);
System.out.println(local_addr + " --> RECONCILE-RSP to " + sender);
sendResponse(sender, rsp); sendResponse(sender, rsp);
break; break;
case RESEND_PENDING_REQUESTS: case RESEND_PENDING_REQUESTS:
System.out.println("-- resending " + pending_requests.values().size() + " requests:");
for(Tuple<Request,Promise> tuple: pending_requests.values()) { for(Tuple<Request,Promise> tuple: pending_requests.values()) {
Request request=tuple.getVal1(); Request request=tuple.getVal1();
System.out.println("-- resending " + request); if(log.isTraceEnabled())
log.trace("[" + local_addr + "] --> [" + coord + "] resending " + request);
sendRequest(coord, request); sendRequest(coord, request);
} }
break; break;
Expand All @@ -342,12 +335,12 @@ protected VersionedValue getCounter(String name) {
return val; return val;
} }


@SuppressWarnings("unchecked")
protected void handleResponse(Response rsp, Address sender) { protected void handleResponse(Response rsp, Address sender) {
if(rsp instanceof ReconcileResponse) { if(rsp instanceof ReconcileResponse) {

if(log.isTraceEnabled() && ((ReconcileResponse)rsp).names != null && ((ReconcileResponse)rsp).names.length > 0)
System.out.println(local_addr + " <-- RECONCILE-RSP from " + sender + ": " + log.trace("[" + local_addr + "] <-- [" + sender + "] RECONCILE-RSP: " +
dump(((ReconcileResponse)rsp).names, ((ReconcileResponse)rsp).values, ((ReconcileResponse)rsp).versions)); dump(((ReconcileResponse)rsp).names, ((ReconcileResponse)rsp).values, ((ReconcileResponse)rsp).versions));

if(reconciliation_task != null) if(reconciliation_task != null)
reconciliation_task.add((ReconcileResponse)rsp, sender); reconciliation_task.add((ReconcileResponse)rsp, sender);
return; return;
Expand Down Expand Up @@ -409,8 +402,19 @@ protected void handleView(View view) {
if(!members.isEmpty()) if(!members.isEmpty())
coord=members.get(0); coord=members.get(0);


if(coord != null && coord.equals(local_addr)) if(coord != null && coord.equals(local_addr)) {
List<Address> old_backups=backup_coords != null? new ArrayList<Address>(backup_coords) : null;
backup_coords=new CopyOnWriteArrayList<Address>(Util.pickNext(members, local_addr, num_backups)); backup_coords=new CopyOnWriteArrayList<Address>(Util.pickNext(members, local_addr, num_backups));

// send the current values to all *new* backups
List<Address> new_backups=Util.newElements(old_backups,backup_coords);
for(Address new_backup: new_backups) {
for(Map.Entry<String,VersionedValue> entry: counters.entrySet()) {
UpdateRequest update=new UpdateRequest(entry.getKey(), entry.getValue().value, entry.getValue().version);
sendRequest(new_backup, update);
}
}
}
else else
backup_coords=null; backup_coords=null;


Expand Down Expand Up @@ -1092,7 +1096,8 @@ public void readFrom(DataInput in) throws IOException, IllegalAccessException, I
} }


public String toString() { public String toString() {
return "ReconcileResponse (" + names.length + ") entries"; int num=names != null? names.length : 0;
return "ReconcileResponse (" + num + ") entries";
} }
} }


Expand Down Expand Up @@ -1180,7 +1185,6 @@ protected void _run() {
targets.remove(local_addr); targets.remove(local_addr);
responses=new ResponseCollector<ReconcileResponse>(targets); // send to everyone but us responses=new ResponseCollector<ReconcileResponse>(targets); // send to everyone but us
Request req=new ReconcileRequest(names, values, versions); Request req=new ReconcileRequest(names, values, versions);
System.out.println(local_addr + ": --> RECONCILE mcast");
sendRequest(null, req); sendRequest(null, req);


responses.waitForAllResponses(reconciliation_timeout); responses.waitForAllResponses(reconciliation_timeout);
Expand Down
25 changes: 9 additions & 16 deletions src/org/jgroups/util/Util.java
Expand Up @@ -2144,6 +2144,15 @@ public static List<Address> newMembers(List<Address> old_list, List<Address> new
return retval; return retval;
} }


public static <T> List<T> newElements(List<T> old_list, List<T> new_list) {
if(new_list == null)
return new ArrayList<T>();
List<T> retval=new ArrayList<T>(new_list);
if(old_list != null)
retval.removeAll(old_list);
return retval;
}



/** /**
* Selects a random subset of members according to subset_percentage and returns them. * Selects a random subset of members according to subset_percentage and returns them.
Expand Down Expand Up @@ -2355,22 +2364,6 @@ public static List<Address> determineLeftMembers(List<Address> old_mbrs, List<Ad
} }




/**
* Returns the members which joined between 2 subsequent views
* @param old_mbrs
* @param new_mbrs
* @return
*/
public static List<Address> determineNewMembers(List<Address> old_mbrs, List<Address> new_mbrs) {
if(old_mbrs == null || new_mbrs == null)
return new ArrayList<Address>();

List<Address> retval=new ArrayList<Address>(new_mbrs);
retval.removeAll(old_mbrs);
return retval;
}




public static String printViews(Collection<View> views) { public static String printViews(Collection<View> views) {
StringBuilder sb=new StringBuilder(); StringBuilder sb=new StringBuilder();
Expand Down

0 comments on commit 3e4f5a8

Please sign in to comment.