Permalink
Browse files

completed resending of lock requests to new coord and copying of serv…

…er locks to new backups
  • Loading branch information...
1 parent 9846a25 commit 25f234021786b987485392b9b5c68e691e9659ab Bela Ban committed Jan 19, 2011
@@ -120,10 +120,10 @@ public void receive(Message msg) {
handleLockDeniedResponse(req.lock_name, req.owner);
break;
case CREATE_LOCK:
- handleCreateLockRequest(req.lock_name, req.owner, msg.getSrc());
+ handleCreateLockRequest(req.lock_name, req.owner);
break;
case DELETE_LOCK:
- handleDeleteLockRequest(req.lock_name, msg.getSrc());
+ handleDeleteLockRequest(req.lock_name);
break;
default:
log.error("Request of type " + req.type + " not known");
@@ -258,14 +258,14 @@ protected void handleLockDeniedResponse(String lock_name, Owner owner) {
lock.lockDenied();
}
- protected void handleCreateLockRequest(String lock_name, Owner owner, Address sender) {
+ protected void handleCreateLockRequest(String lock_name, Owner owner) {
synchronized(server_locks) {
server_locks.put(lock_name, new ServerLock(lock_name, owner));
}
}
- protected void handleDeleteLockRequest(String lock_name, Address sender) {
+ protected void handleDeleteLockRequest(String lock_name) {
synchronized(server_locks) {
server_locks.remove(lock_name);
}
@@ -511,8 +511,11 @@ public String toString() {
protected class ClientLock implements Lock {
protected final String name;
+ protected Owner owner;
protected volatile boolean acquired;
protected volatile boolean denied;
+ protected volatile boolean is_trylock;
+ protected long timeout;
public ClientLock(String name) {
this.name=name;
@@ -571,7 +574,8 @@ protected void handleLockGrantedResponse(Owner owner, Address sender) {
protected synchronized void acquire() throws InterruptedException {
if(!acquired) {
- sendGrantLockRequest(name, getOwner(), 0, false);
+ owner=getOwner();
+ sendGrantLockRequest(name, owner, 0, false);
while(!acquired)
this.wait();
}
@@ -580,7 +584,10 @@ protected synchronized void acquire() throws InterruptedException {
protected synchronized void _unlock(boolean force) {
if(!acquired && !denied && !force)
return;
+ this.timeout=0;
+ this.is_trylock=false;
sendReleaseLockRequest(name, getOwner());
+ owner=null;
acquired=denied=false;
notifyAll();
@@ -592,7 +599,10 @@ protected synchronized boolean acquireTryLock(long timeout, boolean use_timeout)
if(denied)
return false;
if(!acquired) {
- sendGrantLockRequest(name, getOwner(), timeout, true);
+ is_trylock=true;
+ this.timeout=timeout;
+ owner=getOwner();
+ sendGrantLockRequest(name, owner, timeout, true);
long target_time=use_timeout? System.currentTimeMillis() + timeout : 0;
while(!acquired && !denied) {
@@ -601,6 +611,7 @@ protected synchronized boolean acquireTryLock(long timeout, boolean use_timeout)
if(wait_time <= 0)
break;
else {
+ this.timeout=wait_time;
this.wait(wait_time);
}
}
@@ -610,7 +621,6 @@ protected synchronized boolean acquireTryLock(long timeout, boolean use_timeout)
}
if(!acquired || denied)
_unlock(true);
-
return acquired && !denied;
}
}
@@ -6,10 +6,7 @@
import org.jgroups.annotations.Experimental;
import org.jgroups.util.Util;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
/**
* Implementation of a lock service which acquires locks by contacting the coordinator.</p> Because the central
@@ -75,24 +72,44 @@ protected void sendDeleteLockRequest(Address dest, String lock_name) {
public void viewAccepted(View view) {
super.viewAccepted(view);
+ Address old_coord=coord;
if(view.size() > 0) {
coord=view.getMembers().firstElement();
is_coord=coord.equals(ch.getAddress());
if(log.isDebugEnabled())
log.debug("local_addr=" + ch.getAddress() + ", coord=" + coord + ", is_coord=" + is_coord);
}
- if(num_backups > 0) {
- List<Address> new_joiners=null;
+ if(is_coord && num_backups > 0) {
+ List<Address> new_backups=Util.pickNext(view.getMembers(), ch.getAddress(), num_backups);
+ List<Address> copy_locks_list=null;
synchronized(backups) {
- List<Address> tmp=Util.pickNext(view.getMembers(), ch.getAddress(), num_backups);
- if(!tmp.isEmpty() && !tmp.equals(backups))
- new_joiners=determineNewJoiners();
- backups.clear();
- backups.addAll(tmp);
+ if(!backups.equals(new_backups)) {
+ copy_locks_list=new ArrayList<Address>(new_backups);
+ copy_locks_list.removeAll(backups);
+ backups.clear();
+ backups.addAll(new_backups);
+ }
+ }
+
+ if(copy_locks_list != null && !copy_locks_list.isEmpty())
+ copyLocksTo(copy_locks_list);
+ }
+
+ // For all non-acquired client locks, send the GRANT_LOCK request to the new coordinator (if changed)
+ if(old_coord != null && !old_coord.equals(coord)) {
+ Map<String,Map<Owner,ClientLock>> copy;
+ synchronized(client_locks) {
+ copy=new HashMap<String,Map<Owner,ClientLock>>(client_locks);
+ }
+ if(!copy.isEmpty()) {
+ for(Map<Owner,ClientLock> map: copy.values()) {
+ for(ClientLock lock: map.values()) {
+ if(!lock.acquired && !lock.denied)
+ sendGrantLockRequest(lock.name, lock.owner, lock.timeout, lock.is_trylock);
+ }
+ }
}
- if(new_joiners != null && !new_joiners.isEmpty())
- copyLocksTo(new_joiners);
}
}
@@ -103,11 +120,13 @@ public void lockDeleted(String name) {
}
public void locked(String lock_name, Owner owner) {
- updateBackups(Type.CREATE_LOCK, lock_name, owner);
+ if(is_coord)
+ updateBackups(Type.CREATE_LOCK, lock_name, owner);
}
public void unlocked(String lock_name, Owner owner) {
- updateBackups(Type.DELETE_LOCK, lock_name, owner);
+ if(is_coord)
+ updateBackups(Type.DELETE_LOCK, lock_name, owner);
}
protected void updateBackups(Type type, String lock_name, Owner owner) {
@@ -119,18 +138,16 @@ protected void updateBackups(Type type, String lock_name, Owner owner) {
- protected List<Address> determineNewJoiners() {
- return null;
- }
-
protected void copyLocksTo(List<Address> new_joiners) {
Map<String,ServerLock> copy;
synchronized(server_locks) {
copy=new HashMap<String,ServerLock>(server_locks);
}
- for(Map.Entry<String,ServerLock> entry: server_locks.entrySet()) {
+ if(log.isTraceEnabled())
+ log.trace("copying locks to " + new_joiners);
+ for(Map.Entry<String,ServerLock> entry: copy.entrySet()) {
for(Address joiner: new_joiners)
sendCreateLockRequest(joiner, entry.getKey(), entry.getValue().current_owner);
}

0 comments on commit 25f2340

Please sign in to comment.