Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Merged branch JGRP-1298 (support for Condition objects, contributed b…

  • Loading branch information...
commit 2e43ba918788dbe8663dc5bb4ef47967eea2eac9 2 parents 7ac81b8 + 6c39b0e
Bela Ban authored
4  src/org/jgroups/Event.java
@@ -55,6 +55,8 @@
55 55
     public static final int LOCK                               = 95; // arg=LockInfo
56 56
     public static final int UNLOCK                             = 96; // arg=LockInfo
57 57
     public static final int UNLOCK_ALL                         = 97; // arg=null
  58
+    public static final int LOCK_AWAIT                         = 98; // arg=LockInfo
  59
+    public static final int LOCK_SIGNAL                        = 99; // arg=AwaitInfo
58 60
 
59 61
 
60 62
     public static final int USER_DEFINED                       = 1000; // arg = <user def., e.g. evt type + data>
@@ -149,6 +151,8 @@ public static String type2String(int t) {
149 151
             case LOCK:                   return "LOCK";
150 152
             case UNLOCK:                 return "UNLOCK";
151 153
             case UNLOCK_ALL:             return "UNLOCK_ALL";
  154
+            case LOCK_AWAIT:             return "LOCK_AWAIT";
  155
+            case LOCK_SIGNAL:            return "LOCK_SIGNAL";
152 156
             
153 157
             case USER_DEFINED:           return "USER_DEFINED";
154 158
             default:                     return "UNDEFINED(" + t + ")";
30  src/org/jgroups/blocks/locking/AwaitInfo.java
... ...
@@ -0,0 +1,30 @@
  1
+package org.jgroups.blocks.locking;
  2
+
  3
+
  4
+public class AwaitInfo {
  5
+    protected final String   name;
  6
+    protected final boolean  all;
  7
+    
  8
+    public AwaitInfo(String name, boolean all) {
  9
+        this.name=name;
  10
+        this.all=all;
  11
+    }
  12
+
  13
+    /**
  14
+     * @return Returns the name.
  15
+     */
  16
+    public String getName() {
  17
+        return name;
  18
+    }
  19
+
  20
+    /**
  21
+     * @return Returns whether is all.
  22
+     */
  23
+    public boolean isAll() {
  24
+        return all;
  25
+    }
  26
+    
  27
+    public String toString() {
  28
+        return name + ", awaitAll=" + all;
  29
+    }
  30
+}
2  src/org/jgroups/blocks/locking/LockNotification.java
@@ -8,4 +8,6 @@
8 8
     void lockDeleted(String name);
9 9
     void locked(String lock_name, Owner owner);
10 10
     void unlocked(String lock_name, Owner owner);
  11
+    void awaiting(String lock_name, Owner owner);
  12
+    void awaited(String lock_name, Owner owner);
11 13
 }
96  src/org/jgroups/blocks/locking/LockService.java
... ...
@@ -1,14 +1,16 @@
1 1
 package org.jgroups.blocks.locking;
2 2
 
  3
+import java.util.Date;
  4
+import java.util.concurrent.TimeUnit;
  5
+import java.util.concurrent.atomic.AtomicReference;
  6
+import java.util.concurrent.locks.Condition;
  7
+import java.util.concurrent.locks.Lock;
  8
+
3 9
 import org.jgroups.Event;
4 10
 import org.jgroups.JChannel;
5 11
 import org.jgroups.annotations.Experimental;
6 12
 import org.jgroups.protocols.Locking;
7 13
 
8  
-import java.util.concurrent.TimeUnit;
9  
-import java.util.concurrent.locks.Condition;
10  
-import java.util.concurrent.locks.Lock;
11  
-
12 14
 /**
13 15
  * LockService is the main class for to use for distributed locking functionality. LockService needs access to a
14 16
  * {@link JChannel} and interacts with a locking protocol (e.g. {@link org.jgroups.protocols.CENTRAL_LOCK}) via events.<p/>
@@ -78,6 +80,7 @@ public String printLocks() {
78 80
 
79 81
     protected class LockImpl implements Lock {
80 82
         protected final String name;
  83
+        protected final AtomicReference<Thread> holder = new AtomicReference<Thread>();
81 84
 
82 85
         public LockImpl(String name) {
83 86
             this.name=name;
@@ -85,16 +88,23 @@ public LockImpl(String name) {
85 88
 
86 89
         public void lock() {
87 90
             ch.down(new Event(Event.LOCK, new LockInfo(name, false, false, false, 0, TimeUnit.MILLISECONDS)));
  91
+            holder.set(Thread.currentThread());
88 92
         }
89 93
 
90 94
         public void lockInterruptibly() throws InterruptedException {
91 95
             ch.down(new Event(Event.LOCK, new LockInfo(name, false, true, false, 0, TimeUnit.MILLISECONDS)));
92  
-            if(Thread.currentThread().isInterrupted())
  96
+            Thread currentThread = Thread.currentThread();
  97
+            if(currentThread.isInterrupted())
93 98
                 throw new InterruptedException();
  99
+            else
  100
+                holder.set(Thread.currentThread());
94 101
         }
95 102
 
96 103
         public boolean tryLock() {
97 104
             Boolean retval=(Boolean)ch.downcall(new Event(Event.LOCK, new LockInfo(name, true, false, false, 0, TimeUnit.MILLISECONDS)));
  105
+            if (retval == Boolean.TRUE) {
  106
+                holder.set(Thread.currentThread());
  107
+            }
98 108
             return retval.booleanValue();
99 109
         }
100 110
 
@@ -102,15 +112,89 @@ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
102 112
             Boolean retval=(Boolean)ch.downcall(new Event(Event.LOCK, new LockInfo(name, true, true, true, time, unit)));
103 113
             if(Thread.currentThread().isInterrupted())
104 114
                 throw new InterruptedException();
  115
+            if (retval == Boolean.TRUE) {
  116
+                holder.set(Thread.currentThread());
  117
+            }
105 118
             return retval.booleanValue();
106 119
         }
107 120
 
108 121
         public void unlock() {
109 122
             ch.down(new Event(Event.UNLOCK, new LockInfo(name, false, false, false, 0, TimeUnit.MILLISECONDS)));
  123
+            holder.set(null);
110 124
         }
111 125
 
  126
+        /**
  127
+         * This condition object is only allowed to work 1 for each lock.
  128
+         * If more than 1 condition is created for this lock, they both will
  129
+         * be awaiting/signalling on the same lock
  130
+         */
112 131
         public Condition newCondition() {
113  
-            throw new UnsupportedOperationException();
  132
+            return new ConditionImpl(name, holder);
  133
+        }
  134
+    }
  135
+    
  136
+    private class ConditionImpl implements Condition {
  137
+        protected final String name;
  138
+        protected final AtomicReference<Thread> holder;
  139
+
  140
+        public ConditionImpl(String name, AtomicReference<Thread> holder) {
  141
+            this.name=name;
  142
+            this.holder=holder;
  143
+        }
  144
+        
  145
+        @Override
  146
+        public void await() throws InterruptedException {
  147
+            ch.down(new Event(Event.LOCK_AWAIT, new LockInfo(name, false, 
  148
+                true, false, 0, TimeUnit.MILLISECONDS)));
  149
+            if(Thread.currentThread().isInterrupted())
  150
+                throw new InterruptedException();
  151
+        }
  152
+
  153
+        @Override
  154
+        public void awaitUninterruptibly() {
  155
+            ch.down(new Event(Event.LOCK_AWAIT, new LockInfo(name, false, 
  156
+                false, false, 0, TimeUnit.MILLISECONDS)));
  157
+        }
  158
+
  159
+        @Override
  160
+        public long awaitNanos(long nanosTimeout) throws InterruptedException {
  161
+            Long waitLeft = (Long)ch.downcall(new Event(Event.LOCK_AWAIT, 
  162
+                new LockInfo(name, false, true, true, nanosTimeout, 
  163
+                    TimeUnit.NANOSECONDS)));
  164
+            if(Thread.currentThread().isInterrupted())
  165
+                throw new InterruptedException();
  166
+            return waitLeft.longValue();
  167
+        }
  168
+
  169
+        @Override
  170
+        public boolean await(long time, TimeUnit unit)
  171
+                throws InterruptedException {
  172
+            return awaitNanos(unit.toNanos(time)) > 0;
  173
+        }
  174
+
  175
+        @Override
  176
+        public boolean awaitUntil(Date deadline) throws InterruptedException {
  177
+            long waitUntilTime = deadline.getTime();
  178
+            long currentTime = System.currentTimeMillis();
  179
+            
  180
+            long waitTime = waitUntilTime - currentTime;
  181
+            return waitTime > 0 && await(waitTime, TimeUnit.MILLISECONDS);
  182
+        }
  183
+
  184
+        @Override
  185
+        public void signal() {
  186
+            if (holder.get() != Thread.currentThread()) {
  187
+                throw new IllegalMonitorStateException();
  188
+            }
  189
+            ch.down(new Event(Event.LOCK_SIGNAL, new AwaitInfo(name, false)));
  190
+        }
  191
+
  192
+        @Override
  193
+        public void signalAll() {
  194
+            if (holder.get() != Thread.currentThread()) {
  195
+                throw new IllegalMonitorStateException();
  196
+            }
  197
+            ch.down(new Event(Event.LOCK_SIGNAL, new AwaitInfo(name, true)));
114 198
         }
115 199
     }
116 200
 }
21  src/org/jgroups/demos/LockServiceDemo.java
... ...
@@ -1,17 +1,18 @@
1 1
 package org.jgroups.demos;
2 2
 
3  
-import org.jgroups.ChannelException;
4  
-import org.jgroups.JChannel;
5  
-import org.jgroups.blocks.locking.*;
6  
-import org.jgroups.jmx.JmxConfigurator;
7  
-import org.jgroups.util.Util;
8  
-
9 3
 import java.util.ArrayList;
10 4
 import java.util.List;
11 5
 import java.util.StringTokenizer;
12 6
 import java.util.concurrent.TimeUnit;
13 7
 import java.util.concurrent.locks.Lock;
14 8
 
  9
+import org.jgroups.JChannel;
  10
+import org.jgroups.blocks.locking.LockNotification;
  11
+import org.jgroups.blocks.locking.LockService;
  12
+import org.jgroups.blocks.locking.Owner;
  13
+import org.jgroups.jmx.JmxConfigurator;
  14
+import org.jgroups.util.Util;
  15
+
15 16
 /**
16 17
  * Demos the LockService
17 18
  */
@@ -60,6 +61,13 @@ public void unlocked(String lock_name, Owner owner) {
60 61
         System.out.println("\"" + lock_name + "\" unlocked by " + owner);
61 62
     }
62 63
 
  64
+    public void awaiting(String lock_name, Owner owner) {
  65
+        System.out.println("awaiting \"" + lock_name + "\" by " + owner);
  66
+    }
  67
+
  68
+    public void awaited(String lock_name, Owner owner) {
  69
+        System.out.println("awaited \"" + lock_name + "\" by " + owner);
  70
+    }
63 71
 
64 72
     protected void loop() throws Exception {
65 73
         List<String> lock_names;
@@ -167,5 +175,4 @@ protected static void help() {
167 175
         System.out.println("Example:\nlock lock lock2 lock3\nunlock all\ntrylock bela michelle 300");
168 176
     }
169 177
 
170  
-
171 178
 }
51  src/org/jgroups/protocols/CENTRAL_LOCK.java
... ...
@@ -1,5 +1,11 @@
1 1
 package org.jgroups.protocols;
2 2
 
  3
+import java.util.ArrayList;
  4
+import java.util.HashMap;
  5
+import java.util.List;
  6
+import java.util.Map;
  7
+import java.util.Queue;
  8
+
3 9
 import org.jgroups.Address;
4 10
 import org.jgroups.View;
5 11
 import org.jgroups.annotations.Experimental;
@@ -9,11 +15,6 @@
9 15
 import org.jgroups.blocks.locking.Owner;
10 16
 import org.jgroups.util.Util;
11 17
 
12  
-import java.util.ArrayList;
13  
-import java.util.HashMap;
14  
-import java.util.List;
15  
-import java.util.Map;
16  
-
17 18
 
18 19
 /**
19 20
  * Implementation of a locking protocol which acquires locks by contacting the coordinator.</p> Because the
@@ -97,6 +98,20 @@ protected void sendDeleteLockRequest(Address dest, String lock_name) {
97 98
         sendRequest(dest, Type.DELETE_LOCK, lock_name, null, 0, false);
98 99
     }
99 100
 
  101
+    @Override
  102
+    protected void sendAwaitConditionRequest(String lock_name, Owner owner) {
  103
+        sendRequest(coord, Type.LOCK_AWAIT, lock_name, owner, 0, false);
  104
+    }
  105
+
  106
+    @Override
  107
+    protected void sendSignalConditionRequest(String lock_name, boolean all) {
  108
+        sendRequest(coord, all ? Type.COND_SIG_ALL : Type.COND_SIG, lock_name, null, 0, false);
  109
+    }
  110
+    
  111
+    @Override
  112
+    protected void sendDeleteAwaitConditionRequest(String lock_name, Owner owner) {
  113
+        sendRequest(coord, Type.DELETE_LOCK_AWAIT, lock_name, owner, 0, false);
  114
+    }
100 115
 
101 116
     public void handleView(View view) {
102 117
         super.handleView(view);
@@ -156,6 +171,16 @@ public void unlocked(String lock_name, Owner owner) {
156 171
         if(is_coord)
157 172
             updateBackups(Type.DELETE_LOCK, lock_name, owner);
158 173
     }
  174
+    
  175
+    public void awaiting(String lock_name, Owner owner) {
  176
+        if(is_coord)
  177
+            updateBackups(Type.CREATE_AWAITER, lock_name, owner);
  178
+    }
  179
+
  180
+    public void awaited(String lock_name, Owner owner) {
  181
+        if(is_coord)
  182
+            updateBackups(Type.DELETE_AWAITER, lock_name, owner);
  183
+    }
159 184
 
160 185
     protected void updateBackups(Type type, String lock_name, Owner owner) {
161 186
         synchronized(backups) {
@@ -176,11 +201,19 @@ protected void copyLocksTo(List<Address> new_joiners) {
176 201
         if(log.isTraceEnabled())
177 202
             log.trace("copying locks to " + new_joiners);
178 203
         for(Map.Entry<String,ServerLock> entry: copy.entrySet()) {
179  
-            for(Address joiner: new_joiners)
180  
-                sendCreateLockRequest(joiner, entry.getKey(), entry.getValue().current_owner);
  204
+            for(Address joiner: new_joiners) {
  205
+                ServerLock lock = entry.getValue();
  206
+                if (lock.current_owner != null) {
  207
+                    sendCreateLockRequest(joiner, entry.getKey(), entry.getValue().current_owner);
  208
+                }
  209
+                synchronized (lock.condition) {
  210
+                    Queue<Owner> queue = lock.condition.queue;
  211
+                    for (Owner owner : queue) {
  212
+                        sendAwaitConditionRequest(lock.lock_name, owner);
  213
+                    }
  214
+                }
  215
+            }
181 216
         }
182 217
     }
183  
-
184  
-
185 218
 }
186 219
 
489  src/org/jgroups/protocols/Locking.java
... ...
@@ -1,10 +1,37 @@
1 1
 package org.jgroups.protocols;
2 2
 
3  
-import org.jgroups.*;
  3
+import java.io.DataInputStream;
  4
+import java.io.DataOutputStream;
  5
+import java.io.IOException;
  6
+import java.util.ArrayDeque;
  7
+import java.util.ArrayList;
  8
+import java.util.Collection;
  9
+import java.util.Date;
  10
+import java.util.HashMap;
  11
+import java.util.HashSet;
  12
+import java.util.Iterator;
  13
+import java.util.List;
  14
+import java.util.Map;
  15
+import java.util.Queue;
  16
+import java.util.Set;
  17
+import java.util.concurrent.ConcurrentMap;
  18
+import java.util.concurrent.TimeUnit;
  19
+import java.util.concurrent.atomic.AtomicBoolean;
  20
+import java.util.concurrent.atomic.AtomicReference;
  21
+import java.util.concurrent.locks.Condition;
  22
+import java.util.concurrent.locks.Lock;
  23
+import java.util.concurrent.locks.LockSupport;
  24
+
  25
+import org.jgroups.Address;
  26
+import org.jgroups.Event;
  27
+import org.jgroups.Header;
  28
+import org.jgroups.Message;
  29
+import org.jgroups.View;
4 30
 import org.jgroups.annotations.MBean;
5 31
 import org.jgroups.annotations.ManagedAttribute;
6 32
 import org.jgroups.annotations.ManagedOperation;
7 33
 import org.jgroups.annotations.Property;
  34
+import org.jgroups.blocks.locking.AwaitInfo;
8 35
 import org.jgroups.blocks.locking.LockInfo;
9 36
 import org.jgroups.blocks.locking.LockNotification;
10 37
 import org.jgroups.blocks.locking.Owner;
@@ -12,15 +39,6 @@
12 39
 import org.jgroups.util.Streamable;
13 40
 import org.jgroups.util.Util;
14 41
 
15  
-import java.io.DataInputStream;
16  
-import java.io.DataOutputStream;
17  
-import java.io.IOException;
18  
-import java.util.*;
19  
-import java.util.concurrent.ConcurrentMap;
20  
-import java.util.concurrent.TimeUnit;
21  
-import java.util.concurrent.locks.Condition;
22  
-import java.util.concurrent.locks.Lock;
23  
-
24 42
 
25 43
 
26 44
 /**
@@ -50,16 +68,23 @@
50 68
     protected final Map<String,Map<Owner,ClientLock>> client_locks=new HashMap<String,Map<Owner,ClientLock>>();
51 69
 
52 70
     protected final Set<LockNotification> lock_listeners=new HashSet<LockNotification>();
53  
-
  71
+    
54 72
 
55 73
 
56 74
     protected static enum Type {
57  
-        GRANT_LOCK,    // request to acquire a lock
58  
-        LOCK_GRANTED,  // response to sender of GRANT_LOCK on succcessful lock acquisition
59  
-        LOCK_DENIED,   // response to sender of GRANT_LOCK on unsuccessful lock acquisition (e.g. on tryLock())
60  
-        RELEASE_LOCK,  // request to release a lock
61  
-        CREATE_LOCK,   // request to create a server lock (sent by coordinator to backups). Used by CentralLockService
62  
-        DELETE_LOCK    // request to delete a server lock (sent by coordinator to backups). Used by CentralLockService
  75
+        GRANT_LOCK,        // request to acquire a lock
  76
+        LOCK_GRANTED,      // response to sender of GRANT_LOCK on succcessful lock acquisition
  77
+        LOCK_DENIED,       // response to sender of GRANT_LOCK on unsuccessful lock acquisition (e.g. on tryLock())
  78
+        RELEASE_LOCK,      // request to release a lock
  79
+        CREATE_LOCK,       // request to create a server lock (sent by coordinator to backups). Used by CentralLockService
  80
+        DELETE_LOCK,       // request to delete a server lock (sent by coordinator to backups). Used by CentralLockService
  81
+        LOCK_AWAIT,        // request to await until condition is signaled
  82
+        COND_SIG,          // request to signal awaiting thread
  83
+        COND_SIG_ALL,      // request to signal all awaiting threads
  84
+        SIG_RET,           // response to alert of signal
  85
+        DELETE_LOCK_AWAIT, // request to delete a waiter
  86
+        CREATE_AWAITER,    // request to create a server lock await (sent by coordinator to backups). Used by CentralLockService
  87
+        DELETE_AWAITER     // request to delete a server lock await (sent by coordinator to backups). Used by CentralLockService
63 88
     }
64 89
 
65 90
 
@@ -141,6 +166,43 @@ public Object down(Event evt) {
141 166
             case Event.UNLOCK_ALL:
142 167
                 unlockAll();
143 168
                 return null;
  169
+            case Event.LOCK_AWAIT:
  170
+                info=(LockInfo)evt.getArg();
  171
+                lock=getLock(info.getName(), false);
  172
+                if (lock == null || !lock.acquired) {
  173
+                    throw new IllegalMonitorStateException();
  174
+                }
  175
+                Condition condition = lock.newCondition();
  176
+                if (info.isUseTimeout()) {
  177
+                    try {
  178
+                        return condition.awaitNanos(info.getTimeUnit().toNanos(
  179
+                            info.getTimeout()));
  180
+                    }
  181
+                    catch (InterruptedException e) {
  182
+                        Thread.currentThread().interrupt();
  183
+                    }
  184
+                }
  185
+                else if (info.isLockInterruptibly()) {
  186
+                    try {
  187
+                        condition.await();
  188
+                    }
  189
+                    catch (InterruptedException e) {
  190
+                        Thread.currentThread().interrupt();
  191
+                    }
  192
+                }
  193
+                else {
  194
+                    condition.awaitUninterruptibly();
  195
+                }
  196
+                break;
  197
+            case Event.LOCK_SIGNAL:
  198
+                AwaitInfo awaitInfo = (AwaitInfo)evt.getArg();
  199
+                lock=getLock(awaitInfo.getName(), false);
  200
+                if (lock == null || !lock.acquired) {
  201
+                    throw new IllegalMonitorStateException();
  202
+                }
  203
+                sendSignalConditionRequest(awaitInfo.getName(), 
  204
+                    awaitInfo.isAll());
  205
+                break;
144 206
             case Event.SET_LOCAL_ADDRESS:
145 207
                 local_addr=(Address)evt.getArg();
146 208
                 break;
@@ -180,6 +242,26 @@ public Object up(Event evt) {
180 242
                     case DELETE_LOCK:
181 243
                         handleDeleteLockRequest(req.lock_name);
182 244
                         break;
  245
+                    case COND_SIG:
  246
+                    case COND_SIG_ALL:
  247
+                        handleSignalRequest(req);
  248
+                        break;
  249
+                    case LOCK_AWAIT:
  250
+                        handleAwaitRequest(req.lock_name, req.owner);
  251
+                        handleLockRequest(req);
  252
+                        break;
  253
+                    case DELETE_LOCK_AWAIT:
  254
+                        handleDeleteAwaitRequest(req.lock_name, req.owner);
  255
+                        break;
  256
+                    case SIG_RET:
  257
+                        handleSignalResponse(req.lock_name, req.owner);
  258
+                        break;
  259
+                    case CREATE_AWAITER:
  260
+                        handleCreateAwaitingRequest(req.lock_name, req.owner);
  261
+                        break;
  262
+                    case DELETE_AWAITER:
  263
+                        handleDeleteAwaitingRequest(req.lock_name, req.owner);
  264
+                        break;
183 265
                     default:
184 266
                         log.error("Request of type " + req.type + " not known");
185 267
                         break;
@@ -277,6 +359,9 @@ protected Owner getOwner() {
277 359
 
278 360
     abstract protected void sendGrantLockRequest(String lock_name, Owner owner, long timeout, boolean is_trylock);
279 361
     abstract protected void sendReleaseLockRequest(String lock_name, Owner owner);
  362
+    abstract protected void sendAwaitConditionRequest(String lock_name, Owner owner);
  363
+    abstract protected void sendSignalConditionRequest(String lock_name, boolean all);
  364
+    abstract protected void sendDeleteAwaitConditionRequest(String lock_name, Owner owner);
280 365
 
281 366
 
282 367
     protected void sendRequest(Address dest, Type type, String lock_name, Owner owner, long timeout, boolean is_trylock) {
@@ -315,6 +400,25 @@ protected void sendLockResponse(Type type, Owner dest, String lock_name) {
315 400
     }
316 401
 
317 402
 
  403
+    protected void sendSignalResponse(Owner dest, String lock_name) {
  404
+        Request rsp=new Request(Type.SIG_RET, lock_name, dest, 0);
  405
+        Message lock_granted_rsp=new Message(dest.getAddress(), null, rsp);
  406
+        lock_granted_rsp.putHeader(id, new LockingHeader());
  407
+        if(bypass_bundling)
  408
+            lock_granted_rsp.setFlag(Message.DONT_BUNDLE);
  409
+
  410
+        if(log.isTraceEnabled())
  411
+            log.trace("[" + local_addr + "] --> [" + dest.getAddress() + "] " + rsp);
  412
+
  413
+        try {
  414
+            down_prot.down(new Event(Event.MSG, lock_granted_rsp));
  415
+        }
  416
+        catch(Exception ex) {
  417
+            log.error("failed sending " + Type.SIG_RET + " message to " + dest + ": " + ex);
  418
+        }
  419
+    }
  420
+
  421
+
318 422
     protected void handleLockRequest(Request req) {
319 423
         ServerLock lock=server_locks.get(req.lock_name);
320 424
         if(lock == null) {
@@ -322,12 +426,15 @@ protected void handleLockRequest(Request req) {
322 426
             ServerLock tmp=server_locks.putIfAbsent(req.lock_name, lock);
323 427
             if(tmp != null)
324 428
                 lock=tmp;
325  
-            else
  429
+            else {
326 430
                 notifyLockCreated(req.lock_name);
  431
+            }
327 432
         }
328 433
         lock.handleRequest(req);
329  
-        if(lock.isEmpty() && lock.current_owner == null)
  434
+        // We remove the lock if there is no waiters or owner
  435
+        if(lock.isEmpty() && lock.current_owner == null && lock.condition.queue.isEmpty()) {
330 436
             server_locks.remove(req.lock_name);
  437
+        }
331 438
     }
332 439
 
333 440
 
@@ -342,7 +449,49 @@ protected void handleLockDeniedResponse(String lock_name, Owner owner) {
342 449
          if(lock != null)
343 450
              lock.lockDenied();
344 451
     }
345  
-
  452
+    
  453
+    protected void handleAwaitRequest(String lock_name, Owner owner) {
  454
+        ServerLock lock=server_locks.get(lock_name);
  455
+        if (lock != null) {
  456
+            lock.condition.addWaiter(owner);
  457
+        }
  458
+        else {
  459
+            log.error("Condition await was received but lock was not created.  Waiter may block forever");
  460
+        }
  461
+    }
  462
+    
  463
+    protected void handleDeleteAwaitRequest(String lock_name, Owner owner) {
  464
+        ServerLock lock=server_locks.get(lock_name);
  465
+        if (lock != null) {
  466
+            lock.condition.removeWaiter(owner);
  467
+        }
  468
+        else {
  469
+            log.error("Condition await delete was received, but lock was gone");
  470
+        }
  471
+    }
  472
+    
  473
+    protected void handleSignalResponse(String lock_name, Owner owner) {
  474
+        ClientLock lock=getLock(lock_name, owner, false);
  475
+        if(lock != null) {
  476
+            synchronized (lock.condition) {
  477
+                lock.condition.signaled();
  478
+            }
  479
+        }
  480
+        else {
  481
+            log.error("Condition response was client lock was not present.  Ignored signal.");
  482
+        }
  483
+    }
  484
+    
  485
+    protected void handleSignalRequest(Request req) {
  486
+        ServerLock lock=server_locks.get(req.lock_name);
  487
+        if (lock != null) {
  488
+            lock.handleRequest(req);
  489
+        }
  490
+        else {
  491
+            log.error("Condition signal was received but lock was not created.  Couldn't notify anyone.");
  492
+        }
  493
+    }
  494
+    
346 495
     protected void handleCreateLockRequest(String lock_name, Owner owner) {
347 496
         synchronized(server_locks) {
348 497
             server_locks.put(lock_name, new ServerLock(lock_name, owner));
@@ -352,7 +501,41 @@ protected void handleCreateLockRequest(String lock_name, Owner owner) {
352 501
 
353 502
     protected void handleDeleteLockRequest(String lock_name) {
354 503
         synchronized(server_locks) {
355  
-            server_locks.remove(lock_name);
  504
+            ServerLock lock = server_locks.get(lock_name);
  505
+            synchronized (lock.condition) {
  506
+                if (lock.condition.queue.isEmpty()) {
  507
+                    server_locks.remove(lock_name);
  508
+                }
  509
+                else {
  510
+                    lock.current_owner = null;
  511
+                }
  512
+            }
  513
+        }
  514
+    }
  515
+
  516
+
  517
+    protected void handleCreateAwaitingRequest(String lock_name, Owner owner) {
  518
+        synchronized(server_locks) {
  519
+            ServerLock lock = server_locks.get(lock_name);
  520
+            if (lock == null) {
  521
+                lock = new ServerLock(lock_name);
  522
+            }
  523
+            lock.condition.queue.add(owner);
  524
+        }
  525
+    }
  526
+
  527
+
  528
+    protected void handleDeleteAwaitingRequest(String lock_name, Owner owner) {
  529
+        synchronized(server_locks) {
  530
+            ServerLock lock = server_locks.get(lock_name);
  531
+            if (lock != null) {
  532
+                synchronized (lock.condition) {
  533
+                    lock.condition.queue.remove(owner);
  534
+                    if (lock.condition.queue.isEmpty() && lock.current_owner == null) {
  535
+                        server_locks.remove(lock_name);
  536
+                    }
  537
+                }
  538
+            }
356 539
         }
357 540
     }
358 541
 
@@ -435,6 +618,27 @@ protected void notifyUnlocked(String lock_name, Owner owner) {
435 618
         }
436 619
     }
437 620
 
  621
+    protected void notifyAwaiting(String lock_name, Owner owner) {
  622
+        for(LockNotification listener: lock_listeners) {
  623
+            try {
  624
+                listener.awaiting(lock_name, owner);
  625
+            }
  626
+            catch(Throwable t) {
  627
+                log.error("failed notifying " + listener, t);
  628
+            }
  629
+        }
  630
+    }
  631
+
  632
+    protected void notifyAwaited(String lock_name, Owner owner) {
  633
+        for(LockNotification listener: lock_listeners) {
  634
+            try {
  635
+                listener.awaited(lock_name, owner);
  636
+            }
  637
+            catch(Throwable t) {
  638
+                log.error("failed notifying " + listener, t);
  639
+            }
  640
+        }
  641
+    }
438 642
 
439 643
 
440 644
 
@@ -446,14 +650,17 @@ protected void notifyUnlocked(String lock_name, Owner owner) {
446 650
         protected final String lock_name;
447 651
         protected Owner current_owner;
448 652
         protected final List<Request> queue=new ArrayList<Request>();
  653
+        protected final ServerCondition condition;
449 654
 
450 655
         public ServerLock(String lock_name) {
451 656
             this.lock_name=lock_name;
  657
+            this.condition=new ServerCondition(this);
452 658
         }
453 659
 
454 660
         protected ServerLock(String lock_name, Owner owner) {
455 661
             this.lock_name=lock_name;
456 662
             this.current_owner=owner;
  663
+            this.condition=new ServerCondition(this);
457 664
         }
458 665
 
459 666
         protected synchronized void handleRequest(Request req) {
@@ -476,6 +683,7 @@ protected synchronized void handleRequest(Request req) {
476 683
                     }
477 684
                     break;
478 685
                 case RELEASE_LOCK:
  686
+                case LOCK_AWAIT:
479 687
                     if(current_owner == null)
480 688
                         break;
481 689
                     if(current_owner.equals(req.owner))
@@ -483,6 +691,12 @@ protected synchronized void handleRequest(Request req) {
483 691
                     else
484 692
                         addToQueue(req);
485 693
                     break;
  694
+                case COND_SIG:
  695
+                    condition.signal(false);
  696
+                    break;
  697
+                case COND_SIG_ALL:
  698
+                    condition.signal(true);
  699
+                    break;
486 700
                 default:
487 701
                     throw new IllegalArgumentException("type " + req.type + " is invalid here");
488 702
             }
@@ -503,6 +717,13 @@ protected synchronized void handleView(List<Address> members) {
503 717
                 if(!members.contains(req.owner.getAddress()))
504 718
                     it.remove();
505 719
             }
  720
+            
  721
+            for(Iterator<Owner> it=condition.queue.iterator(); it.hasNext();) {
  722
+                Owner own=it.next();
  723
+                if(!members.contains(own.getAddress())) {
  724
+                    it.remove();
  725
+                }
  726
+            }
506 727
 
507 728
             processQueue();
508 729
         }
@@ -591,6 +812,60 @@ public String toString() {
591 812
         }
592 813
     }
593 814
 
  815
+    protected class ServerCondition {
  816
+        protected final ServerLock lock;
  817
+        protected final Queue<Owner> queue=new ArrayDeque<Owner>();
  818
+        
  819
+        public ServerCondition(ServerLock lock) {
  820
+            this.lock = lock;
  821
+        }
  822
+        
  823
+        public synchronized void addWaiter(Owner waiter) {
  824
+            notifyAwaiting(lock.lock_name, waiter);
  825
+            if (log.isTraceEnabled()) {
  826
+                log.trace("Waiter [" + waiter + "] was added for " + lock.lock_name);
  827
+            }
  828
+            queue.add(waiter);
  829
+        }
  830
+        
  831
+        public synchronized void removeWaiter(Owner waiter) {
  832
+            notifyAwaited(lock.lock_name, waiter);
  833
+            if (log.isTraceEnabled()) {
  834
+                log.trace("Waiter [" + waiter + "] was removed for " + lock.lock_name);
  835
+            }
  836
+            queue.remove(waiter);
  837
+        }
  838
+        
  839
+        public synchronized void signal(boolean all) {
  840
+            if (queue.isEmpty()) {
  841
+                if (log.isTraceEnabled()) {
  842
+                    log.trace("Signal for [" + lock.lock_name + 
  843
+                        "] ignored since, no one is waiting in queue.");
  844
+                }
  845
+            }
  846
+            
  847
+            Owner entry;
  848
+            if (all) {
  849
+                while ((entry = queue.poll()) != null) {
  850
+                    notifyAwaited(lock.lock_name, entry);
  851
+                    if (log.isTraceEnabled()) {
  852
+                        log.trace("Signalled " + entry + " for " + lock.lock_name);
  853
+                    }
  854
+                    sendSignalResponse(entry, lock.lock_name);
  855
+                }
  856
+            }
  857
+            else {
  858
+                entry = queue.poll();
  859
+                if (entry != null) {
  860
+                    notifyAwaited(lock.lock_name, entry);
  861
+                    if (log.isTraceEnabled()) {
  862
+                        log.trace("Signalled " + entry + " for " + lock.lock_name);
  863
+                    }
  864
+                    sendSignalResponse(entry, lock.lock_name);
  865
+                }
  866
+            }
  867
+        }
  868
+    }
594 869
 
595 870
 
596 871
 
@@ -601,9 +876,12 @@ public String toString() {
601 876
         protected volatile boolean  denied;
602 877
         protected volatile boolean  is_trylock;
603 878
         protected long              timeout;
  879
+        
  880
+        protected final ClientCondition condition;
604 881
 
605 882
         public ClientLock(String name) {
606 883
             this.name=name;
  884
+            this.condition = new ClientCondition(this);
607 885
         }
608 886
 
609 887
         public void lock() {
@@ -637,7 +915,8 @@ public synchronized void unlock() {
637 915
         }
638 916
 
639 917
         public Condition newCondition() {
640  
-            throw new UnsupportedOperationException("currently not implemented");
  918
+            // Currently only 1 condition per Lock is supported
  919
+            return condition;
641 920
         }
642 921
 
643 922
         public String toString() {
@@ -749,6 +1028,172 @@ protected synchronized boolean acquireTryLock(long timeout, boolean use_timeout)
749 1028
             return acquired && !denied;
750 1029
         }
751 1030
     }
  1031
+    
  1032
+    protected class ClientCondition implements Condition {
  1033
+
  1034
+        protected final ClientLock lock;
  1035
+        protected final AtomicBoolean signaled = new AtomicBoolean(false);
  1036
+        /**
  1037
+         * This is okay only having 1 since a client condition is 1 per 
  1038
+         * lock_name, thread id combination.
  1039
+         */
  1040
+        protected volatile AtomicReference<Thread> parker=new AtomicReference<Thread>();
  1041
+        
  1042
+        public ClientCondition(ClientLock lock) {
  1043
+            this.lock = lock;
  1044
+        }
  1045
+        
  1046
+        @Override
  1047
+        public void await() throws InterruptedException {
  1048
+            await(true);
  1049
+            lock.lockInterruptibly();
  1050
+        }
  1051
+
  1052
+        @Override
  1053
+        public void awaitUninterruptibly() {
  1054
+            try {
  1055
+                await(false);
  1056
+                lock.lock();
  1057
+            }
  1058
+            catch(InterruptedException e) {
  1059
+                // This should never happen
  1060
+            }
  1061
+        }
  1062
+
  1063
+        @Override
  1064
+        public long awaitNanos(long nanosTimeout) throws InterruptedException {
  1065
+            long value = await(nanosTimeout);
  1066
+            long begin = System.nanoTime();
  1067
+            lock.lockInterruptibly();
  1068
+            return value - System.nanoTime() + begin;
  1069
+        }
  1070
+
  1071
+        /**
  1072
+         * Note this wait will only work correctly if the converted value is less
  1073
+         * than 292 years.  This is due to the limitation in System.nano and long
  1074
+         * values that can only store up to 292 years (22<sup>63</sup> nanoseconds).
  1075
+         * 
  1076
+         * For more information please see {@link System#nanoTime()}
  1077
+         */
  1078
+        @Override
  1079
+        public boolean await(long time, TimeUnit unit)
  1080
+                throws InterruptedException {
  1081
+            return awaitNanos(unit.toNanos(time)) > 0;
  1082
+        }
  1083
+
  1084
+        @Override
  1085
+        public boolean awaitUntil(Date deadline) throws InterruptedException {
  1086
+            long waitUntilTime = deadline.getTime();
  1087
+            long currentTime = System.currentTimeMillis();
  1088
+            
  1089
+            long waitTime = waitUntilTime - currentTime;
  1090
+            if (waitTime > 0) {
  1091
+                return await(waitTime, TimeUnit.MILLISECONDS);
  1092
+            }
  1093
+            else {
  1094
+                return false;
  1095
+            }
  1096
+        }
  1097
+        
  1098
+        protected void await(boolean throwInterrupt) throws InterruptedException {
  1099
+            if(!signaled.get()) {
  1100
+                lock.acquired = false;
  1101
+                sendAwaitConditionRequest(lock.name, lock.owner);
  1102
+                boolean interrupted=false;
  1103
+                while(!signaled.get()) {
  1104
+                    parker.set(Thread.currentThread());
  1105
+                    LockSupport.park(this);
  1106
+                    
  1107
+                    if (Thread.interrupted()) {
  1108
+                        // If we were interrupted and haven't received a response yet then we try to
  1109
+                        // clean up the lock request and throw the exception
  1110
+                        if (!signaled.get()) {
  1111
+                            sendDeleteAwaitConditionRequest(lock.name, lock.owner);
  1112
+                            throw new InterruptedException();
  1113
+                        }
  1114
+                        // In the case that we were signaled and interrupted
  1115
+                        // we want to return the signal but still interrupt
  1116
+                        // our thread
  1117
+                        interrupted = true; 
  1118
+                    }
  1119
+                }
  1120
+                if(interrupted)
  1121
+                    Thread.currentThread().interrupt();
  1122
+            }
  1123
+            
  1124
+            // We set as if this signal was no released.  This way if the
  1125
+            // condition is reused again, but the client condition isn't lost
  1126
+            // we won't think we were signaled immediately
  1127
+            signaled.set(false);
  1128
+        }
  1129
+        
  1130
+        protected long await(long nanoSeconds) throws InterruptedException {
  1131
+            long target_nano=System.nanoTime() + nanoSeconds;
  1132
+            
  1133
+            long wait_nano=0;
  1134
+            
  1135
+            if(!signaled.get()) {
  1136
+                // We release the lock at the same time as waiting on the
  1137
+                // condition
  1138
+                lock.acquired = false;
  1139
+                sendAwaitConditionRequest(lock.name, lock.owner);
  1140
+                
  1141
+                boolean interrupted = false;
  1142
+                while(!signaled.get()) {
  1143
+                    wait_nano=target_nano - System.nanoTime();
  1144
+                    // If we waited max time break out
  1145
+                    if(wait_nano > 0) {
  1146
+                        parker.set(Thread.currentThread());
  1147
+                        LockSupport.parkNanos(this, wait_nano);
  1148
+                        
  1149
+                        if (Thread.interrupted()) {
  1150
+                            // If we were interrupted and haven't received a response yet then we try to
  1151
+                            // clean up the lock request and throw the exception
  1152
+                            if (!signaled.get()) {
  1153
+                                sendDeleteAwaitConditionRequest(lock.name, lock.owner);
  1154
+                                throw new InterruptedException();
  1155
+                            }
  1156
+                            // In the case that we were signaled and interrupted
  1157
+                            // we want to return the signal but still interrupt
  1158
+                            // our thread
  1159
+                            interrupted = true; 
  1160
+                        }
  1161
+                    }
  1162
+                    else {
  1163
+                        break;
  1164
+                    }
  1165
+                }
  1166
+                if(interrupted)
  1167
+                    Thread.currentThread().interrupt();
  1168
+            }
  1169
+            
  1170
+            // We set as if this signal was no released.  This way if the
  1171
+            // condition is reused again, but the client condition isn't lost
  1172
+            // we won't think we were signaled immediately
  1173
+            // If we weren't signaled then delete our request
  1174
+            if (!signaled.getAndSet(false)) {
  1175
+                sendDeleteAwaitConditionRequest(lock.name, lock.owner);
  1176
+            }
  1177
+            return wait_nano;
  1178
+        }
  1179
+
  1180
+        @Override
  1181
+        public void signal() {
  1182
+            sendSignalConditionRequest(lock.name, false);
  1183
+        }
  1184
+
  1185
+        @Override
  1186
+        public void signalAll() {
  1187
+            sendSignalConditionRequest(lock.name, true);
  1188
+        }
  1189
+        
  1190
+        protected void signaled() {
  1191
+            signaled.set(true);
  1192
+            Thread thread = parker.getAndSet(null);
  1193
+            if (thread != null)
  1194
+                LockSupport.unpark(thread);
  1195
+        }
  1196
+    }
752 1197
 
753 1198
 
754 1199
     protected static class Request implements Streamable {
28  src/org/jgroups/protocols/PEER_LOCK.java
@@ -4,15 +4,15 @@
4 4
  * @author Bela Ban
5 5
  */
6 6
 
  7
+import java.util.ArrayList;
  8
+import java.util.List;
  9
+import java.util.Map;
  10
+
7 11
 import org.jgroups.Address;
8 12
 import org.jgroups.View;
9 13
 import org.jgroups.annotations.Experimental;
10 14
 import org.jgroups.blocks.locking.Owner;
11 15
 
12  
-import java.util.ArrayList;
13  
-import java.util.List;
14  
-import java.util.Map;
15  
-
16 16
 /**
17 17
  * Implementation of a locking protocol which acquires locks by contacting <em>all</em> of the nodes of a cluster.</p>
18 18
  * Unless a total order configuration is used (e.g. {@link org.jgroups.protocols.SEQUENCER} based), lock requests for
@@ -50,6 +50,25 @@ protected void sendReleaseLockRequest(String lock_name, Owner owner) {
50 50
     }
51 51
 
52 52
 
  53
+    @Override
  54
+    protected void sendAwaitConditionRequest(String lock_name, Owner owner) {
  55
+        sendRequest(null, Type.LOCK_AWAIT, lock_name, owner, 0, false);
  56
+    }
  57
+
  58
+
  59
+    @Override
  60
+    protected void sendSignalConditionRequest(String lock_name, boolean all) {
  61
+        sendRequest(null, all ? Type.COND_SIG_ALL : Type.COND_SIG, lock_name, 
  62
+                null, 0, false);
  63
+    }
  64
+
  65
+
  66
+    @Override
  67
+    protected void sendDeleteAwaitConditionRequest(String lock_name, Owner owner) {
  68
+        sendRequest(null, Type.DELETE_LOCK_AWAIT, lock_name, owner, 0, false);
  69
+    }
  70
+
  71
+
53 72
     public void handleView(View view) {
54 73
         super.handleView(view);
55 74
         List<Address> members=view.getMembers();
@@ -91,5 +110,4 @@ protected synchronized void handleLockGrantedResponse(Owner owner, Address sende
91 110
                 lockGranted();
92 111
         }
93 112
     }
94  
-
95 113
 }
80  tests/junit/org/jgroups/blocks/LockServiceTest.java
... ...
@@ -1,18 +1,23 @@
1 1
 package org.jgroups.blocks;
2 2
 
  3
+import java.util.concurrent.BrokenBarrierException;
  4
+import java.util.concurrent.CyclicBarrier;
  5
+import java.util.concurrent.TimeUnit;
  6
+import java.util.concurrent.locks.Condition;
  7
+import java.util.concurrent.locks.Lock;
  8
+
3 9
 import org.jgroups.Global;
4 10
 import org.jgroups.JChannel;
5 11
 import org.jgroups.blocks.locking.LockService;
6 12
 import org.jgroups.protocols.CENTRAL_LOCK;
  13
+import org.jgroups.stack.Protocol;
7 14
 import org.jgroups.stack.ProtocolStack;
8 15
 import org.jgroups.tests.ChannelTestBase;
9 16
 import org.jgroups.util.Util;
10  
-import org.testng.annotations.*;
11  
-
12  
-import java.util.concurrent.BrokenBarrierException;
13  
-import java.util.concurrent.CyclicBarrier;
14  
-import java.util.concurrent.TimeUnit;
15  
-import java.util.concurrent.locks.Lock;
  17
+import org.testng.annotations.AfterClass;
  18
+import org.testng.annotations.BeforeClass;
  19
+import org.testng.annotations.BeforeMethod;
  20
+import org.testng.annotations.Test;
16 21
 
17 22
 /** Tests {@link org.jgroups.blocks.locking.LockService}
18 23
  * @author Bela Ban
@@ -158,6 +163,17 @@ public void testLockInterruptibly() throws InterruptedException {
158 163
     }
159 164
 
160 165
 
  166
+    public void testSuccessfulSignalAllTimeout() throws InterruptedException, BrokenBarrierException {
  167
+        Lock lock2=s2.getLock(LOCK); 
  168
+        Thread locker=new Signaller(true);
  169
+        boolean rc=tryLock(lock2, 5000, LOCK);
  170
+        assert rc;
  171
+        locker.start();
  172
+        assert awaitNanos(lock2.newCondition(), TimeUnit.SECONDS.toNanos(5), LOCK) > 0 : "Condition was not signalled";
  173
+        unlock(lock2, LOCK);
  174
+    }
  175
+
  176
+
161 177
     public void testSuccessfulTryLockTimeout() throws InterruptedException, BrokenBarrierException {
162 178
         final CyclicBarrier barrier=new CyclicBarrier(2);
163 179
         Thread locker=new Locker(barrier);
@@ -235,6 +251,34 @@ public void run() {
235 251
             }
236 252
         }
237 253
     }
  254
+    
  255
+    protected class Signaller extends Thread {
  256
+        protected final boolean all;
  257
+
  258
+        public Signaller(boolean all) {
  259
+            this.all=all;
  260
+        }
  261
+
  262
+        public void run() {
  263
+            lock(lock, LOCK);
  264
+            try {
  265
+                Util.sleep(500);