forked from netty/netty
/
ByteToMessageDecoderForBuffer.java
686 lines (607 loc) · 24.1 KB
/
ByteToMessageDecoderForBuffer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.CompositeBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.StringUtil;
import java.net.SocketAddress;
import static io.netty.util.internal.MathUtil.safeFindNextPositivePowerOfTwo;
import static java.util.Objects.requireNonNull;
/**
* {@link ChannelHandler} which decodes bytes in a stream-like fashion from one {@link Buffer} to an
* other Message type.
*
* For example here is an implementation which reads all readable bytes from
* the input {@link Buffer}, creates a new {@link Buffer} and forward it to the next {@link ChannelHandler}
* in the {@link ChannelPipeline}.
*
* <pre>
* public class SquareDecoder extends {@link ByteToMessageDecoderForBuffer} {
* {@code @Override}
* public void decode({@link ChannelHandlerContext} ctx, {@link Buffer} in)
* throws {@link Exception} {
* ctx.fireChannelRead(in.readBytes(in.readableBytes()));
* }
* }
* </pre>
*
* <h3>Frame detection</h3>
* <p>
* Generally frame detection should be handled earlier in the pipeline by adding a
* {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
* or {@link LineBasedFrameDecoder}.
* <p>
* If a custom frame decoder is required, then one needs to be careful when implementing
* one with {@link ByteToMessageDecoderForBuffer}. Ensure there are enough bytes in the buffer for a
* complete frame by checking {@link Buffer#readableBytes()}. If there are not enough bytes
* for a complete frame, return without modifying the reader index to allow more bytes to arrive.
* <p>
* To check for complete frames without modifying the reader index, use methods like {@link Buffer#getInt(int)}.
* One <strong>MUST</strong> use the reader index when using methods like {@link Buffer#getInt(int)}.
* For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
* is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
* <h3>Pitfalls</h3>
* <p>
* Be aware that sub-classes of {@link ByteToMessageDecoderForBuffer} <strong>MUST NOT</strong>
* annotated with {@link @Sharable}.
*/
public abstract class ByteToMessageDecoderForBuffer extends ChannelHandlerAdapter {
/**
* Cumulate {@link Buffer}s by merge them into one {@link Buffer}'s, using memory copies.
*/
public static final Cumulator MERGE_CUMULATOR = new MergeCumulator();
/**
* Cumulate {@link Buffer}s by add them to a {@link CompositeBuffer} and so do no memory copy whenever possible.
* Be aware that {@link CompositeBuffer} use a more complex indexing implementation so depending on your use-case
* and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}.
*/
public static final Cumulator COMPOSITE_CUMULATOR = new CompositeBufferCumulator();
private final int discardAfterReads = 16;
private final Cumulator cumulator;
private Buffer cumulation;
private boolean singleDecode;
private boolean first;
/**
* This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
* when {@link ChannelConfig#isAutoRead()} is {@code false}.
*/
private boolean firedChannelRead;
private int numReads;
private ByteToMessageDecoderContext context;
protected ByteToMessageDecoderForBuffer() {
this(MERGE_CUMULATOR);
}
protected ByteToMessageDecoderForBuffer(Cumulator cumulator) {
this.cumulator = requireNonNull(cumulator, "cumulator");
ensureNotSharable();
}
/**
* If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
* call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
*
* Default is {@code false} as this has performance impacts.
*/
public void setSingleDecode(boolean singleDecode) {
this.singleDecode = singleDecode;
}
/**
* If {@code true} then only one message is decoded on each
* {@link #channelRead(ChannelHandlerContext, Object)} call.
*
* Default is {@code false} as this has performance impacts.
*/
public boolean isSingleDecode() {
return singleDecode;
}
/**
* Returns the actual number of readable bytes in the internal cumulative
* buffer of this decoder. You usually do not need to rely on this value
* to write a decoder. Use it only when you must use it at your own risk.
* This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
*/
protected int actualReadableBytes() {
return internalBuffer().readableBytes();
}
/**
* Returns the internal cumulative buffer of this decoder, if exists, else {@code null}. You usually
* do not need to access the internal buffer directly to write a decoder.
* Use it only when you must use it at your own risk.
*
* @return Internal {@link Buffer} if exists, else {@code null}.
*/
protected Buffer internalBuffer() {
return cumulation;
}
@Override
public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
context = new ByteToMessageDecoderContext(ctx);
handlerAdded0(context);
}
protected void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
}
@Override
public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Buffer buf = cumulation;
if (buf != null) {
// Directly set this to null so we are sure we not access it in any other method here anymore.
cumulation = null;
numReads = 0;
int readable = buf.readableBytes();
if (readable > 0) {
ctx.fireChannelRead(buf);
ctx.fireChannelReadComplete();
} else {
buf.close();
}
}
handlerRemoved0(context);
}
/**
* Gets called after the {@link ByteToMessageDecoderForBuffer} was removed from the actual context and it doesn't
* handle events anymore.
*/
protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Buffer) {
try {
Buffer data = (Buffer) msg;
first = cumulation == null;
if (first) {
if (data.readOnly()) {
cumulation = CompositeBuffer.compose(ctx.bufferAllocator(), data.copy().send());
data.close();
} else {
cumulation = data;
}
} else {
cumulation = cumulator.cumulate(ctx.bufferAllocator(), cumulation, data);
}
assert context.ctx == ctx || ctx == context;
callDecode(context, cumulation);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && cumulation.readableBytes() == 0) {
numReads = 0;
if (cumulation.isAccessible()) {
cumulation.close();
}
cumulation = null;
} else if (++numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
firedChannelRead |= context.fireChannelReadCallCount() > 0;
context.reset();
}
} else {
ctx.fireChannelRead(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
numReads = 0;
discardSomeReadBytes();
if (!firedChannelRead && !ctx.channel().config().isAutoRead()) {
ctx.read();
}
firedChannelRead = false;
ctx.fireChannelReadComplete();
}
protected final void discardSomeReadBytes() {
if (cumulation != null && !first) {
// discard some bytes if possible to make more room in the buffer.
cumulation.compact();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
assert context.ctx == ctx || ctx == context;
channelInputClosed(context, true);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
if (evt instanceof ChannelInputShutdownEvent) {
// The decodeLast method is invoked when a channelInactive event is encountered.
// This method is responsible for ending requests in some situations and must be called
// when the input has been shutdown.
assert context.ctx == ctx || ctx == context;
channelInputClosed(context, false);
}
}
private void channelInputClosed(ByteToMessageDecoderContext ctx, boolean callChannelInactive) {
try {
channelInputClosed(ctx);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null) {
cumulation.close();
cumulation = null;
}
if (ctx.fireChannelReadCallCount() > 0) {
ctx.reset();
// Something was read, call fireChannelReadComplete()
ctx.fireChannelReadComplete();
}
if (callChannelInactive) {
ctx.fireChannelInactive();
}
}
}
/**
* Called when the input of the channel was closed which may be because it changed to inactive or because of
* {@link ChannelInputShutdownEvent}.
*/
void channelInputClosed(ByteToMessageDecoderContext ctx) throws Exception {
if (cumulation != null) {
callDecode(ctx, cumulation);
// If callDecode(...) removed the handle from the pipeline we should not call decodeLast(...) as this would
// be unexpected.
if (!ctx.isRemoved()) {
// Use Unpooled.EMPTY_BUFFER if cumulation become null after calling callDecode(...).
// See https://github.com/netty/netty/issues/10802.
Buffer buffer = cumulation == null ? ctx.bufferAllocator().allocate(0) : cumulation;
decodeLast(ctx, buffer);
}
} else {
decodeLast(ctx, ctx.bufferAllocator().allocate(0));
}
}
/**
* Called once data should be decoded from the given {@link Buffer}. This method will call
* {@link #decode(ChannelHandlerContext, Buffer)} as long as decoding should take place.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoderForBuffer} belongs to
* @param in the {@link Buffer} from which to read data
*/
void callDecode(ByteToMessageDecoderContext ctx, Buffer in) {
try {
while (in.readableBytes() > 0 && !ctx.isRemoved()) {
int oldInputLength = in.readableBytes();
int numReadCalled = ctx.fireChannelReadCallCount();
decodeRemovalReentryProtection(ctx, in);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (numReadCalled == ctx.fireChannelReadCallCount()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
/**
* Decode the from one {@link Buffer} to another. This method will be called till either the input
* {@link Buffer} has nothing to read when return from this method or till nothing was read from the input
* {@link Buffer}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoderForBuffer} belongs to
* @param in the {@link Buffer} from which to read data
* @throws Exception is thrown if an error occurs
*/
protected abstract void decode(ChannelHandlerContext ctx, Buffer in) throws Exception;
/**
* Decode the from one {@link Buffer} to an other. This method will be called till either the input
* {@link Buffer} has nothing to read when return from this method or till nothing was read from the input
* {@link Buffer}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoderForBuffer} belongs to
* @param in the {@link Buffer} from which to read data
* @throws Exception is thrown if an error occurs
*/
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, Buffer in)
throws Exception {
decode(ctx, in);
}
/**
* Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
* {@link #channelInactive(ChannelHandlerContext)} was triggered.
*
* By default this will just call {@link #decode(ChannelHandlerContext, Buffer)} but sub-classes may
* override this for some special cleanup operation.
*/
protected void decodeLast(ChannelHandlerContext ctx, Buffer in) throws Exception {
if (in.readableBytes() > 0) {
// Only call decode() if there is something left in the buffer to decode.
// See https://github.com/netty/netty/issues/4386
decodeRemovalReentryProtection(ctx, in);
}
}
private static Buffer expandCumulationAndWrite(BufferAllocator alloc, Buffer oldCumulation, Buffer in) {
final int newSize = safeFindNextPositivePowerOfTwo(oldCumulation.readableBytes() + in.readableBytes());
Buffer newCumulation = oldCumulation.readOnly() ? alloc.allocate(newSize) :
oldCumulation.ensureWritable(newSize);
try {
if (newCumulation != oldCumulation) {
newCumulation.writeBytes(oldCumulation);
}
newCumulation.writeBytes(in);
return newCumulation;
} finally {
if (newCumulation != oldCumulation) {
oldCumulation.close();
}
}
}
/**
* Cumulate {@link ByteBuf}s.
*/
public interface Cumulator {
/**
* Cumulate the given {@link Buffer}s and return the {@link Buffer} that holds the cumulated bytes.
* The implementation is responsible to correctly handle the life-cycle of the given {@link Buffer}s and so
* call {@link Buffer#close()} if a {@link Buffer} is fully consumed.
*/
Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in);
}
// Package private so we can also make use of it in ReplayingDecoder.
static final class ByteToMessageDecoderContext implements ChannelHandlerContext {
private final ChannelHandlerContext ctx;
private int fireChannelReadCalled;
private ByteToMessageDecoderContext(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
void reset() {
fireChannelReadCalled = 0;
}
int fireChannelReadCallCount() {
return fireChannelReadCalled;
}
@Override
public Channel channel() {
return ctx.channel();
}
@Override
public EventExecutor executor() {
return ctx.executor();
}
@Override
public String name() {
return ctx.name();
}
@Override
public ChannelHandler handler() {
return ctx.handler();
}
@Override
public boolean isRemoved() {
return ctx.isRemoved();
}
@Override
public ChannelHandlerContext fireChannelRegistered() {
ctx.fireChannelRegistered();
return this;
}
@Override
public ChannelHandlerContext fireChannelUnregistered() {
ctx.fireChannelUnregistered();
return this;
}
@Override
public ChannelHandlerContext fireChannelActive() {
ctx.fireChannelActive();
return this;
}
@Override
public ChannelHandlerContext fireChannelInactive() {
ctx.fireChannelInactive();
return this;
}
@Override
public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
ctx.fireExceptionCaught(cause);
return this;
}
@Override
public ChannelHandlerContext fireUserEventTriggered(Object evt) {
ctx.fireUserEventTriggered(evt);
return this;
}
@Override
public ChannelHandlerContext fireChannelRead(Object msg) {
fireChannelReadCalled ++;
ctx.fireChannelRead(msg);
return this;
}
@Override
public ChannelHandlerContext fireChannelReadComplete() {
ctx.fireChannelReadComplete();
return this;
}
@Override
public ChannelHandlerContext fireChannelWritabilityChanged() {
ctx.fireChannelWritabilityChanged();
return this;
}
@Override
public Future<Void> register() {
return ctx.register();
}
@Override
public ChannelHandlerContext read() {
ctx.read();
return this;
}
@Override
public ChannelHandlerContext flush() {
ctx.flush();
return this;
}
@Override
public ChannelPipeline pipeline() {
return ctx.pipeline();
}
@Override
public ByteBufAllocator alloc() {
return ctx.alloc();
}
@Override
public BufferAllocator bufferAllocator() {
return ctx.bufferAllocator();
}
@Override
@Deprecated
public <T> Attribute<T> attr(AttributeKey<T> key) {
return ctx.attr(key);
}
@Override
@Deprecated
public <T> boolean hasAttr(AttributeKey<T> key) {
return ctx.hasAttr(key);
}
@Override
public Future<Void> bind(SocketAddress localAddress) {
return ctx.bind(localAddress);
}
@Override
public Future<Void> connect(SocketAddress remoteAddress) {
return ctx.connect(remoteAddress);
}
@Override
public Future<Void> connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return ctx.connect(remoteAddress, localAddress);
}
@Override
public Future<Void> disconnect() {
return ctx.disconnect();
}
@Override
public Future<Void> close() {
return ctx.close();
}
@Override
public Future<Void> deregister() {
return ctx.deregister();
}
@Override
public Future<Void> write(Object msg) {
return ctx.write(msg);
}
@Override
public Future<Void> writeAndFlush(Object msg) {
return ctx.writeAndFlush(msg);
}
@Override
public Promise<Void> newPromise() {
return ctx.newPromise();
}
@Override
public Future<Void> newSucceededFuture() {
return ctx.newSucceededFuture();
}
@Override
public Future<Void> newFailedFuture(Throwable cause) {
return ctx.newFailedFuture(cause);
}
}
private static final class CompositeBufferCumulator implements Cumulator {
@Override
public Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in) {
if (cumulation.readableBytes() == 0) {
cumulation.close();
return in;
}
CompositeBuffer composite;
try (in) {
if (CompositeBuffer.isComposite(cumulation)) {
CompositeBuffer tmp = (CompositeBuffer) cumulation;
// Since we are extending the composite buffer below, we have to make sure there is no space to
// write in the existing cumulation.
if (tmp.writerOffset() < tmp.capacity()) {
composite = tmp.split();
tmp.close();
} else {
composite = tmp;
}
} else {
composite = CompositeBuffer.compose(alloc, cumulation.send());
}
composite.extendWith((in.readOnly() ? in.copy() : in).send());
return composite;
}
}
@Override
public String toString() {
return "CompositeBufferCumulator";
}
}
private static final class MergeCumulator implements Cumulator {
@Override
public Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in) {
if (cumulation.readableBytes() == 0) {
// If cumulation is empty and input buffer is contiguous, use it directly
cumulation.close();
return in;
}
// We must close input Buffer in all cases as otherwise it may produce a leak if writeBytes(...) throw
// for whatever close (for example because of OutOfMemoryError)
try (in) {
final int required = in.readableBytes();
if (required > cumulation.writableBytes() || cumulation.readOnly()) {
return expandCumulationAndWrite(alloc, cumulation, in);
}
cumulation.writeBytes(in);
return cumulation;
}
}
@Override
public String toString() {
return "MergeCumulator";
}
}
}