| 
13 | 13 |            java.io.InputStreamReader  | 
14 | 14 |            java.io.ByteArrayOutputStream  | 
15 | 15 |            java.util.zip.GZIPOutputStream  | 
16 |  | -           java.net.ServerSocket))  | 
 | 16 | +           java.net.ServerSocket  | 
 | 17 | +           [java.util.concurrent ConcurrentLinkedQueue]))  | 
17 | 18 | 
 
  | 
18 | 19 | (def ^:dynamic state nil)  | 
 | 20 | +(def connq (ConcurrentLinkedQueue.))  | 
 | 21 | +(def promiseq (ConcurrentLinkedQueue.))  | 
19 | 22 | 
 
  | 
20 | 23 | (defn connection  | 
21 |  | -  "Promise to return a connection when one is available. If a  | 
22 |  | -  connection is not available, store the promise in server/state."  | 
 | 24 | +  "Promise to return a connection when one is available. If no connection is  | 
 | 25 | +   available put the promise into FIFO queue to get the next available  | 
 | 26 | +   connection."  | 
23 | 27 |   []  | 
24 |  | -  (let [p    (promise)  | 
25 |  | -        conn (:connection @state)]  | 
 | 28 | +  (let [p (promise)  | 
 | 29 | +        conn (.poll connq)]  | 
26 | 30 |     (if (and conn (not (.isClosed conn)))  | 
27 |  | -      (do  | 
28 |  | -        (deliver p conn)  | 
29 |  | -        p)  | 
30 |  | -      (do  | 
31 |  | -        (swap! state (fn [old] (assoc old :promised-conn p)))  | 
32 |  | -        p))))  | 
 | 31 | +      (deliver p conn)  | 
 | 32 | +      (.offer promiseq p))  | 
 | 33 | +    p))  | 
33 | 34 | 
 
  | 
34 | 35 | (defn set-connection  | 
35 |  | -  "Given a new available connection, either use it to deliver the  | 
36 |  | -  connection which was promised or store the connection for later  | 
37 |  | -  use."  | 
 | 36 | +  "Given a new available connection, poll the promise queue for and deliver  | 
 | 37 | +   the connection. Otherwise put the connection into a FIFO queue."  | 
38 | 38 |   [conn]  | 
39 |  | -  (if-let [promised-conn (:promised-conn @state)]  | 
40 |  | -    (do  | 
41 |  | -      (swap! state  | 
42 |  | -        (fn [old]  | 
43 |  | -          (-> old  | 
44 |  | -            (assoc :connection nil)  | 
45 |  | -            (assoc :promised-conn nil))))  | 
46 |  | -      (deliver promised-conn conn))  | 
47 |  | -    (swap! state (fn [old] (assoc old :connection conn)))))  | 
 | 39 | +  (if-let [p (.poll promiseq)]  | 
 | 40 | +    (deliver p conn)  | 
 | 41 | +    (.offer connq conn)))  | 
48 | 42 | 
 
  | 
49 | 43 | (defonce handlers (atom {}))  | 
50 | 44 | 
 
  | 
 | 
0 commit comments