@@ -62,7 +62,7 @@ private static void processPendingRegistrations() {
6262 t .channel .register (selector (), SelectionKey .OP_READ , t );
6363 t .send ();
6464 } catch (IOException e ) {
65- t .f . completeExceptionally (e );
65+ t .completeExceptionally (e );
6666 }
6767 }
6868 }
@@ -71,8 +71,7 @@ private static void checkTransactionTimeouts() {
7171 for (Iterator <Transaction > it = pendingTransactions .iterator (); it .hasNext (); ) {
7272 Transaction t = it .next ();
7373 if (t .endTime - System .nanoTime () < 0 ) {
74- t .silentCloseChannel ();
75- t .f .completeExceptionally (new SocketTimeoutException ("Query timed out" ));
74+ t .completeExceptionally (new SocketTimeoutException ("Query timed out" ));
7675 it .remove ();
7776 }
7877 }
@@ -102,8 +101,7 @@ void send() throws IOException {
102101 @ Override
103102 public void processReadyKey (SelectionKey key ) {
104103 if (!key .isReadable ()) {
105- silentCloseChannel ();
106- f .completeExceptionally (new EOFException ("channel not readable" ));
104+ completeExceptionally (new EOFException ("Key for transaction " + id + " is not readable" ));
107105 pendingTransactions .remove (this );
108106 return ;
109107 }
@@ -117,8 +115,7 @@ public void processReadyKey(SelectionKey key) {
117115 throw new EOFException ();
118116 }
119117 } catch (IOException e ) {
120- silentCloseChannel ();
121- f .completeExceptionally (e );
118+ completeExceptionally (e );
122119 pendingTransactions .remove (this );
123120 return ;
124121 }
@@ -136,6 +133,11 @@ public void processReadyKey(SelectionKey key) {
136133 pendingTransactions .remove (this );
137134 }
138135
136+ private void completeExceptionally (Exception e ) {
137+ silentCloseChannel ();
138+ f .completeExceptionally (e );
139+ }
140+
139141 private void silentCloseChannel () {
140142 try {
141143 channel .disconnect ();
@@ -153,11 +155,15 @@ private void silentCloseChannel() {
153155
154156 static CompletableFuture <byte []> sendrecv (
155157 InetSocketAddress local , InetSocketAddress remote , byte [] data , int max , Duration timeout ) {
158+ long endTime = System .nanoTime () + timeout .toNanos ();
156159 CompletableFuture <byte []> f = new CompletableFuture <>();
160+ DatagramChannel channel = null ;
157161 try {
158162 final Selector selector = selector ();
159- DatagramChannel channel = DatagramChannel .open ();
163+ channel = DatagramChannel .open ();
160164 channel .configureBlocking (false );
165+
166+ Transaction t = new Transaction (data , max , endTime , channel , f );
161167 if (local == null || local .getPort () == 0 ) {
162168 boolean bound = false ;
163169 for (int i = 0 ; i < 1024 ; i ++) {
@@ -185,19 +191,23 @@ static CompletableFuture<byte[]> sendrecv(
185191 }
186192
187193 if (!bound ) {
188- channel .close ();
189- f .completeExceptionally (new IOException ("No available source port found" ));
194+ t .completeExceptionally (new IOException ("No available source port found" ));
190195 return f ;
191196 }
192197 }
193198
194199 channel .connect (remote );
195- long endTime = System .nanoTime () + timeout .toNanos ();
196- Transaction t = new Transaction (data , max , endTime , channel , f );
197200 pendingTransactions .add (t );
198201 registrationQueue .add (t );
199202 selector .wakeup ();
200203 } catch (IOException e ) {
204+ if (channel != null ) {
205+ try {
206+ channel .close ();
207+ } catch (IOException ioe ) {
208+ // ignore
209+ }
210+ }
201211 f .completeExceptionally (e );
202212 }
203213
@@ -207,7 +217,7 @@ static CompletableFuture<byte[]> sendrecv(
207217 private static void closeUdp () {
208218 registrationQueue .clear ();
209219 EOFException closing = new EOFException ("Client is closing" );
210- pendingTransactions .forEach (t -> t .f . completeExceptionally (closing ));
220+ pendingTransactions .forEach (t -> t .completeExceptionally (closing ));
211221 pendingTransactions .clear ();
212222 }
213223}
0 commit comments