forked from fengjiachun/Jupiter
/
ProtocolDecoder.java
157 lines (139 loc) · 6.29 KB
/
ProtocolDecoder.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/*
* Copyright (c) 2015 The Jupiter Project
*
* Licensed under the Apache License, version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jupiter.transport.netty.handler;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import org.jupiter.common.util.Signal;
import org.jupiter.common.util.SystemClock;
import org.jupiter.common.util.SystemPropertyUtil;
import org.jupiter.transport.JProtocolHeader;
import org.jupiter.transport.exception.IoSignals;
import org.jupiter.transport.payload.JRequestPayload;
import org.jupiter.transport.payload.JResponsePayload;
/**
* <pre>
* **************************************************************************************************
* Protocol
* ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
* 2 │ 1 │ 1 │ 8 │ 4 │
* ├ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┤
* │ │ │ │ │
* │ MAGIC Sign Status Invoke Id Body Size Body Content │
* │ │ │ │ │
* └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
*
* 消息头16个字节定长
* = 2 // magic = (short) 0xbabe
* + 1 // 消息标志位, 低地址4位用来表示消息类型request/response/heartbeat等, 高地址4位用来表示序列化类型
* + 1 // 状态位, 设置请求响应状态
* + 8 // 消息 id, long 类型, 未来jupiter可能将id限制在48位, 留出高地址的16位作为扩展字段
* + 4 // 消息体 body 长度, int 类型
* </pre>
*
* jupiter
* org.jupiter.transport.netty.handler
*
* @author jiachun.fjc
*/
public class ProtocolDecoder extends ReplayingDecoder<ProtocolDecoder.State> {
// 协议体最大限制, 默认5M
private static final int MAX_BODY_SIZE = SystemPropertyUtil.getInt("jupiter.io.decoder.max.body.size", 1024 * 1024 * 5);
/**
* Cumulate {@link ByteBuf}s by add them to a CompositeByteBuf and so do no memory copy whenever possible.
* Be aware that CompositeByteBuf use a more complex indexing implementation so depending on your use-case
* and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}.
*/
private static final boolean USE_COMPOSITE_BUF = SystemPropertyUtil.getBoolean("jupiter.io.decoder.composite.buf", false);
public ProtocolDecoder() {
super(State.MAGIC);
if (USE_COMPOSITE_BUF) {
setCumulator(COMPOSITE_CUMULATOR);
}
}
// 协议头
private final JProtocolHeader header = new JProtocolHeader();
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state()) {
case MAGIC:
checkMagic(in.readShort()); // MAGIC
checkpoint(State.SIGN);
case SIGN:
header.sign(in.readByte()); // 消息标志位
checkpoint(State.STATUS);
case STATUS:
header.status(in.readByte()); // 状态位
checkpoint(State.ID);
case ID:
header.id(in.readLong()); // 消息id
checkpoint(State.BODY_SIZE);
case BODY_SIZE:
header.bodySize(in.readInt()); // 消息体长度
checkpoint(State.BODY);
case BODY:
switch (header.messageCode()) {
case JProtocolHeader.HEARTBEAT:
break;
case JProtocolHeader.REQUEST: {
int length = checkBodySize(header.bodySize());
byte[] bytes = new byte[length];
in.readBytes(bytes);
JRequestPayload request = new JRequestPayload(header.id());
request.timestamp(SystemClock.millisClock().now());
request.bytes(header.serializerCode(), bytes);
out.add(request);
break;
}
case JProtocolHeader.RESPONSE: {
int length = checkBodySize(header.bodySize());
byte[] bytes = new byte[length];
in.readBytes(bytes);
JResponsePayload response = new JResponsePayload(header.id());
response.status(header.status());
response.bytes(header.serializerCode(), bytes);
out.add(response);
break;
}
default:
throw IoSignals.ILLEGAL_SIGN;
}
checkpoint(State.MAGIC);
default:
throw new Error("Shouldn't reach here.");
}
}
private static void checkMagic(short magic) throws Signal {
if (magic != JProtocolHeader.MAGIC) {
throw IoSignals.ILLEGAL_MAGIC;
}
}
private static int checkBodySize(int size) throws Signal {
if (size > MAX_BODY_SIZE) {
throw IoSignals.BODY_TOO_LARGE;
}
return size;
}
enum State {
MAGIC,
SIGN,
STATUS,
ID,
BODY_SIZE,
BODY
}
}