Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

[XNIO-141] Fix BufferedChannelInputStream.read methods and clear buff…

…er on close.

This commit contains several small fixes for the read methods, that before could throw BufferUnderflowException or even return more bytes than were actually read.
Also:
- rename "closed" field to "eof"
- clear the buffer on close
- mark eof to true if read returns -1
  • Loading branch information...
commit 71451ba94770425213c681c71056de71938f9d15 1 parent 5a1b6a9
Flavia Rainone authored June 18, 2012
88  api/src/main/java/org/xnio/streams/BufferedChannelInputStream.java
@@ -45,7 +45,7 @@
45 45
 public class BufferedChannelInputStream extends InputStream {
46 46
     private final StreamSourceChannel channel;
47 47
     private final ByteBuffer buffer;
48  
-    private volatile boolean closed;
  48
+    private volatile boolean eof;
49 49
     private volatile long timeout;
50 50
 
51 51
     /**
@@ -125,35 +125,45 @@ public void setReadTimeout(long timeout, TimeUnit unit) {
125 125
      * @throws IOException if an I/O error occurs
126 126
      */
127 127
     public int read() throws IOException {
128  
-        if (closed) return -1;
  128
+        if (eof) return -1;
129 129
         final ByteBuffer buffer = this.buffer;
  130
+        final int oldBufferPosition = buffer.position();
  131
+        final int oldBufferLimit = buffer.limit();
130 132
         final StreamSourceChannel channel = this.channel;
131 133
         final long timeout = this.timeout;
132  
-        if (timeout == 0L) {
133  
-            while (! buffer.hasRemaining()) {
134  
-                buffer.clear();
135  
-                final int res = Channels.readBlocking(channel, buffer);
136  
-                if (res == -1) {
137  
-                    return -1;
138  
-                }
139  
-                buffer.flip();
140  
-            }
141  
-        } else {
142  
-            if (! buffer.hasRemaining()) {
143  
-                long now = System.currentTimeMillis();
144  
-                final long deadline = timeout - now;
145  
-                do {
  134
+        try {
  135
+            if (timeout == 0L) {
  136
+                while (! buffer.hasRemaining()) {
146 137
                     buffer.clear();
147  
-                    if (deadline <= now) {
148  
-                        throw new ReadTimeoutException("Read timed out");
149  
-                    }
150  
-                    final int res = Channels.readBlocking(channel, buffer, deadline - now, TimeUnit.MILLISECONDS);
  138
+                    final int res = Channels.readBlocking(channel, buffer);
  139
+                    buffer.flip();
151 140
                     if (res == -1) {
  141
+                        eof = true;
152 142
                         return -1;
153 143
                     }
154  
-                    buffer.flip();
155  
-                } while (! buffer.hasRemaining());
  144
+                }
  145
+            } else {
  146
+                if (! buffer.hasRemaining()) {
  147
+                    long now = System.currentTimeMillis();
  148
+                    final long deadline = timeout + now;
  149
+                    do {
  150
+                        buffer.clear();
  151
+                        if (deadline <= System.currentTimeMillis()) {
  152
+                            buffer.flip();
  153
+                            throw new ReadTimeoutException("Read timed out");
  154
+                        }
  155
+                        final int res = Channels.readBlocking(channel, buffer, deadline - now, TimeUnit.MILLISECONDS);
  156
+                        buffer.flip();
  157
+                        if (res == -1) {
  158
+                            eof = true;
  159
+                            return -1;
  160
+                        }
  161
+                    } while (! buffer.hasRemaining());
  162
+                }
156 163
             }
  164
+        } catch (IOException e) {
  165
+            buffer.position(oldBufferPosition).limit(oldBufferLimit);
  166
+            throw e;
157 167
         }
158 168
         return buffer.get() & 0xff;
159 169
     }
@@ -173,14 +183,16 @@ public int read(final byte[] b, int off, int len) throws IOException {
173 183
         }
174 184
         int total = 0;
175 185
         final ByteBuffer buffer = this.buffer;
  186
+        final int oldBufferPosition = buffer.position();
  187
+        final int oldBufferLimit = buffer.limit();
176 188
         if (buffer.hasRemaining()) {
177 189
             final int cnt = min(buffer.remaining(), len);
178  
-            buffer.get(b, off, len);
  190
+            buffer.get(b, off, cnt);
179 191
             total += cnt;
180 192
             off += cnt;
181 193
             len -= cnt;
182 194
         }
183  
-        if (closed) return -1;
  195
+        if (eof) return -1;
184 196
         final StreamSourceChannel channel = this.channel;
185 197
         final long timeout = this.timeout;
186 198
         try {
@@ -189,9 +201,11 @@ public int read(final byte[] b, int off, int len) throws IOException {
189 201
                     final ByteBuffer dst = ByteBuffer.wrap(b, off, len);
190 202
                     int res = total > 0 ? channel.read(dst) : Channels.readBlocking(channel, dst);
191 203
                     if (res == -1) {
  204
+                        eof = true;
192 205
                         return total == 0 ? -1 : total;
193 206
                     }
194 207
                     total += res;
  208
+                    len -= res;
195 209
                     if (res == 0) {
196 210
                         break;
197 211
                     }
@@ -209,9 +223,11 @@ public int read(final byte[] b, int off, int len) throws IOException {
209 223
                         }
210 224
                     }
211 225
                     if (res == -1) {
  226
+                        eof = true;
212 227
                         return total == 0 ? -1 : total;
213 228
                     }
214 229
                     total += res;
  230
+                    len -= res;
215 231
                     if (res == 0) {
216 232
                         break;
217 233
                     }
@@ -219,6 +235,7 @@ public int read(final byte[] b, int off, int len) throws IOException {
219 235
             }
220 236
         } catch (InterruptedIOException e) {
221 237
             e.bytesTransferred = total;
  238
+            buffer.position(oldBufferPosition).limit(oldBufferLimit);
222 239
             throw e;
223 240
         }
224 241
         return total;
@@ -237,30 +254,40 @@ public long skip(long n) throws IOException {
237 254
         }
238 255
         long total = 0L;
239 256
         final ByteBuffer buffer = this.buffer;
  257
+        final int oldBufferPosition = buffer.position();
  258
+        final int oldBufferLimit = buffer.limit();
240 259
         if (buffer.hasRemaining()) {
241 260
             final int cnt = (int) min(buffer.remaining(), n);
242 261
             Buffers.skip(buffer, cnt);
243 262
             total += cnt;
244 263
             n -= cnt;
245 264
         }
246  
-        if (closed) {
  265
+        if (eof) {
247 266
             return total;
248 267
         }
249 268
         final StreamSourceChannel channel = this.channel;
250 269
         if (n > 0L) {
251  
-            // Buffer was cleared
252 270
             try {
  271
+                // Buffer was cleared
253 272
                 while (n > 0L) {
254 273
                     buffer.clear();
255 274
                     int res = total > 0L ? channel.read(buffer) : Channels.readBlocking(channel, buffer);
256 275
                     if (res <= 0) {
  276
+                        buffer.position(0).limit(0);
257 277
                         return total;
258 278
                     }
  279
+                    if (res > n) {
  280
+                        buffer.position( buffer.position() - (res - (int) n));
  281
+                        return total + n;
  282
+                    }
  283
+                    n -= res;
259 284
                     total += (long) res;
260 285
                 }
261  
-            } finally {
262  
-                buffer.position(0).limit(0);
  286
+            } catch (IOException e) {
  287
+                buffer.position(oldBufferPosition).limit(oldBufferLimit);
  288
+                throw e;
263 289
             }
  290
+            buffer.position(0).limit(0);
264 291
         }
265 292
         return total;
266 293
     }
@@ -276,7 +303,7 @@ public long skip(long n) throws IOException {
276 303
     public int available() throws IOException {
277 304
         final ByteBuffer buffer = this.buffer;
278 305
         final int rem = buffer.remaining();
279  
-        if (rem > 0 || closed) {
  306
+        if (rem > 0 || eof) {
280 307
             return rem;
281 308
         }
282 309
         buffer.clear();
@@ -296,7 +323,8 @@ public int available() throws IOException {
296 323
      * @throws IOException if an I/O error occurs
297 324
      */
298 325
     public void close() throws IOException {
299  
-        closed = true;
  326
+        eof = true;
  327
+        buffer.clear().flip();
300 328
         channel.shutdownReads();
301 329
     }
302 330
 }

0 notes on commit 71451ba

Please sign in to comment.
Something went wrong with that request. Please try again.