@@ -186,50 +186,56 @@ Pooled<ByteBuffer> allocate(byte protoId) {
186186
187187 void handleIncoming (Pooled <ByteBuffer > pooledBuffer ) {
188188 boolean eof ;
189- synchronized (inputStream ) {
190- ByteBuffer buffer = pooledBuffer .getResource ();
191- final int bufRemaining = buffer .remaining ();
192- if ((inboundWindow -= bufRemaining ) < 0 ) {
193- channel .getRemoteConnection ().handleException (new IOException ("Input overrun" ));
194- }
195- if (log .isTraceEnabled ()) {
196- log .tracef ("Received message (chan %08x msg %04x) (%d-%d=%d remaining)" , Integer .valueOf (channel .getChannelId ()), Short .valueOf (messageId ), Integer .valueOf (inboundWindow + bufRemaining ), Integer .valueOf (bufRemaining ), Integer .valueOf (inboundWindow ));
197- }
198- buffer .position (buffer .position () - 1 );
199- byte flags = buffer .get ();
189+ boolean free = true ;
190+ try {
191+ synchronized (inputStream ) {
192+ ByteBuffer buffer = pooledBuffer .getResource ();
193+ final int bufRemaining = buffer .remaining ();
194+ if ((inboundWindow -= bufRemaining ) < 0 ) {
195+ channel .getRemoteConnection ().handleException (new IOException ("Input overrun" ));
196+ return ;
197+ }
198+ if (log .isTraceEnabled ()) {
199+ log .tracef ("Received message (chan %08x msg %04x) (%d-%d=%d remaining)" , Integer .valueOf (channel .getChannelId ()), Short .valueOf (messageId ), Integer .valueOf (inboundWindow + bufRemaining ), Integer .valueOf (bufRemaining ), Integer .valueOf (inboundWindow ));
200+ }
201+ buffer .position (buffer .position () - 1 );
202+ byte flags = buffer .get ();
200203
201- eof = (flags & Protocol .MSG_FLAG_EOF ) != 0 ;
202- boolean cancelled = (flags & Protocol .MSG_FLAG_CANCELLED ) != 0 ;
203- if (bufRemaining > remaining ) {
204- cancelled = true ;
205- doClose ();
206- }
207- if (cancelled ) {
208- this .cancelled = true ;
209- // make sure it goes through
210- inputStream .pushException (new MessageCancelledException ());
211- }
212- if (streamClosed ) {
213- // ignore, but keep the bits flowing
214- if (! eof && ! closeSent ) {
215- // we don't need to acknowledge if it's EOF or if we sent a close msg since no more data is coming anyway
216- buffer .position (buffer .limit ()); // "consume" everything
217- doAcknowledge (pooledBuffer );
204+ eof = (flags & Protocol .MSG_FLAG_EOF ) != 0 ;
205+ boolean cancelled = (flags & Protocol .MSG_FLAG_CANCELLED ) != 0 ;
206+ if (bufRemaining > remaining ) {
207+ cancelled = true ;
208+ doClose ();
218209 }
219- pooledBuffer .free ();
220- } else if (! cancelled ) {
221- remaining -= bufRemaining ;
222- inputStream .push (pooledBuffer );
223- }
224- if (eof ) {
225- eofReceived = true ;
226- if (!streamClosed ) {
227- inputStream .pushEof ();
210+ if (cancelled ) {
211+ this .cancelled = true ;
212+ // make sure it goes through
213+ inputStream .pushException (new MessageCancelledException ());
214+ }
215+ if (streamClosed ) {
216+ // ignore, but keep the bits flowing
217+ if (! eof && ! closeSent ) {
218+ // we don't need to acknowledge if it's EOF or if we sent a close msg since no more data is coming anyway
219+ buffer .position (buffer .limit ()); // "consume" everything
220+ doAcknowledge (pooledBuffer );
221+ }
222+ } else if (! cancelled ) {
223+ remaining -= bufRemaining ;
224+ free = false ;
225+ inputStream .push (pooledBuffer );
226+ }
227+ if (eof ) {
228+ eofReceived = true ;
229+ if (!streamClosed ) {
230+ inputStream .pushEof ();
231+ }
232+ channel .freeInboundMessage (messageId );
233+ // if the peer is old, they might reuse the ID now regardless of us; if new, we have to send the close message to acknowledge the remainder
234+ doSendCloseMessage ();
228235 }
229- channel .freeInboundMessage (messageId );
230- // if the peer is old, they might reuse the ID now regardless of us; if new, we have to send the close message to acknowledge the remainder
231- doSendCloseMessage ();
232236 }
237+ } finally {
238+ if (free ) pooledBuffer .free ();
233239 }
234240 }
235241
0 commit comments