-
Notifications
You must be signed in to change notification settings - Fork 13
/
EventHeader.java
288 lines (240 loc) · 10.5 KB
/
EventHeader.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
/*
* Copyright 2022 Bloomberg Finance L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.bloomberg.bmq.impl.infr.proto;
import com.bloomberg.bmq.impl.infr.io.ByteBufferInputStream;
import com.bloomberg.bmq.impl.infr.io.ByteBufferOutputStream;
import com.bloomberg.bmq.impl.infr.util.Argument;
import com.bloomberg.bmq.impl.infr.util.BitUtil;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EventHeader {
static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// This class represents the header for all the events received by the
// broker from local clients or from peer brokers. A well-behaved event
// header will always have its fragment bit set to zero.
// EventHeader datagram [8 bytes]:
// ..
// +---------------+---------------+---------------+---------------+
// |0|1|2|3|4|5|6|7|0|1|2|3|4|5|6|7|0|1|2|3|4|5|6|7|0|1|2|3|4|5|6|7|
// +---------------+---------------+---------------+---------------+
// |F| Length |
// +---------------+---------------+---------------+---------------+
// |PV | Type | HeaderWords | TypeSpecific | Reserved |
// +---------------+---------------+---------------+---------------+
// F..: Fragment
// PV.: Protocol Version
//
// Fragment (F)..........: Always set to 0
// Length................: Total size (bytes) of this event
// Protocol Version (PV).: Protocol Version (up to 4 concurrent versions)
// Type..................: Type of the event (from EventType::Enum)
// HeaderWords...........: Number of words of this event header
// TypeSpecific..........: Content specific to the event's type, see below
// Reserved..............: For alignment and extension ~ must be 0
// ..
//
// TypeSpecific content:
// : o ControlMessage: represent the encoding used for that control message
// |0|1|2|3|4|5|6|7|
// +---------------+
// |CODEC| Reserved|
//
// NOTE: The HeaderWords allows to eventually put event level options
// (either by extending the EventHeader struct, or putting new struct
// after the EventHeader). For now, this is left up for future
// enhancement as there are no use-case for an event level option.
private int fragmentBitAndLength;
private byte protocolVersionAndType;
private byte headerWords;
private byte typeSpecific;
private byte reserved;
private static final int FRAGMENT_NUM_BITS = 1;
private static final int LENGTH_NUM_BITS = 31;
private static final int PROTOCOL_VERSION_NUM_BITS = 2;
private static final int TYPE_NUM_BITS = 6;
private static final int FRAGMENT_START_IDX = 31;
private static final int LENGTH_START_IDX = 0;
private static final int PROTOCOL_VERSION_START_IDX = 6;
private static final int TYPE_START_IDX = 0;
private static final int FRAGMENT_MASK = BitUtil.oneMask(FRAGMENT_START_IDX, FRAGMENT_NUM_BITS);
private static final int LENGTH_MASK = BitUtil.oneMask(LENGTH_START_IDX, LENGTH_NUM_BITS);
private static final int PROTOCOL_VERSION_MASK =
BitUtil.oneMask(PROTOCOL_VERSION_START_IDX, PROTOCOL_VERSION_NUM_BITS);
private static final int TYPE_MASK = BitUtil.oneMask(TYPE_START_IDX, TYPE_NUM_BITS);
public static final int MAX_SIZE_HARD = (1 << LENGTH_NUM_BITS) - 1;
// Maximum size (bytes) of a full event, per protocol limitations.
public static final int MAX_SIZE_SOFT = (64 + 2) * 1024 * 1024; // 66 MB
// Maximum size (bytes) of a full event, enforced. Note that this
// constant needs to be greater than PutHeader.MAX_PAYLOAD_SIZE_SOFT.
// This is because a PUT message sent on the wire contains EventHeader
// as well as PutHeader, and some padding bytes, and
// EventHeader.MAX_SIZE_SOFT needs to be large enough to allow those
// headers as well.
public static final int MAX_TYPE = (1 << TYPE_NUM_BITS) - 1;
// Highest possible value for the type of an event.
public static final int MAX_HEADER_SIZE = ((1 << 8) - 1) * Protocol.WORD_SIZE;
// Maximum size (bytes) of an 'EventHeader'.
public static final int MIN_HEADER_SIZE = 6;
// Minimum size (bytes) of an 'EventHeader' (that is sufficient to
// capture header words). This value should *never* change.
public static final int HEADER_SIZE = 8;
// Current size (bytes) of the header.
public static final int CONTROL_EVENT_ENCODING_NUM_BITS = 3;
public static final int CONTROL_EVENT_ENCODING_START_IDX = 5;
public static final byte CONTROL_EVENT_ENCODING_MASK = (byte) 0b11100000;
// FIXME: same as MessageHeader.java
// ctor, setters & getters for each field, streamIn, streamOut, equals(),
//
public EventHeader() {
setLength(HEADER_SIZE);
setProtocolVersion((byte) Protocol.VERSION);
setType(EventType.UNDEFINED);
setHeaderWords((byte) (HEADER_SIZE / Protocol.WORD_SIZE));
typeSpecific = 0;
reserved = 0;
}
public void setLength(int value) {
fragmentBitAndLength = (fragmentBitAndLength & FRAGMENT_MASK) | (value & LENGTH_MASK);
}
public void setTypeSpecific(byte typeSpecific) {
this.typeSpecific = typeSpecific;
}
public void setProtocolVersion(byte value) {
protocolVersionAndType =
(byte)
((protocolVersionAndType & TYPE_MASK)
| (value << PROTOCOL_VERSION_START_IDX));
}
public void setType(EventType value) {
protocolVersionAndType =
(byte)
((protocolVersionAndType & PROTOCOL_VERSION_MASK)
| (value.toInt() & TYPE_MASK));
}
public void setHeaderWords(byte value) {
headerWords = value;
}
public byte typeSpecific() {
return typeSpecific;
}
public int fragmentBit() {
return (fragmentBitAndLength & FRAGMENT_MASK) >>> FRAGMENT_START_IDX;
}
public int length() {
return fragmentBitAndLength & LENGTH_MASK;
}
public byte protocolVersion() {
return (byte)
((protocolVersionAndType & PROTOCOL_VERSION_MASK)
>> // Spotbugs requires to use signed right shift here
PROTOCOL_VERSION_START_IDX);
}
public EventType type() {
return EventType.fromInt(protocolVersionAndType & TYPE_MASK);
}
public byte headerWords() {
return headerWords;
}
public boolean equals(Object obj) {
if (!(obj instanceof EventHeader)) {
return false; // RETURN
}
if (this == obj) {
return true; // RETURN
}
EventHeader that = (EventHeader) obj;
return (fragmentBit() == that.fragmentBit()
&& length() == that.length()
&& protocolVersion() == that.protocolVersion()
&& type() == that.type()
&& headerWords() == that.headerWords());
}
@Override
public int hashCode() {
Long l = (long) fragmentBitAndLength;
l <<= Integer.SIZE;
l |= (long) (headerWords & 0xff);
l <<= Byte.SIZE;
l |= (long) (protocolVersionAndType & 0xff);
l <<= Byte.SIZE;
l |= (long) (reserved & 0xff);
l <<= Byte.SIZE;
l |= (long) (typeSpecific & 0xff);
return l.hashCode();
}
public void streamIn(ByteBufferInputStream bbis) throws IOException {
fragmentBitAndLength = bbis.readInt();
protocolVersionAndType = bbis.readByte();
headerWords = bbis.readByte();
typeSpecific = bbis.readByte();
reserved = bbis.readByte();
if (headerWords() * Protocol.WORD_SIZE > HEADER_SIZE) {
// Read and ignore bytes that we don't know in the header.
int numExtraBytes = headerWords() * Protocol.WORD_SIZE - HEADER_SIZE;
byte[] headerExtra = new byte[numExtraBytes];
int readBytes = bbis.read(headerExtra);
assert readBytes == numExtraBytes;
}
}
public void streamOut(ByteBufferOutputStream bbos) throws IOException {
bbos.writeInt(fragmentBitAndLength);
bbos.writeByte(protocolVersionAndType);
bbos.writeByte(headerWords);
bbos.writeByte(typeSpecific);
bbos.writeByte(reserved);
}
public void setControlEventEncodingType(EncodingType type) {
Argument.expectCondition(type != EncodingType.UNKNOWN, "Unexpected encoding type");
if (this.type() != EventType.CONTROL) {
throw new IllegalStateException("Unexpected call");
}
byte typeSpecificUpdate = typeSpecific();
// Reset the bits for encoding type
typeSpecificUpdate &= ~CONTROL_EVENT_ENCODING_MASK;
byte typeAsByte = (byte) (type.toInt());
// Set those bits to represent 'type'
typeSpecificUpdate |= (typeAsByte << CONTROL_EVENT_ENCODING_START_IDX);
setTypeSpecific(typeSpecificUpdate);
}
public EncodingType controlEventEncodingType() {
// PRECONDITIONS
if (this.type() != EventType.CONTROL) {
throw new IllegalStateException("Unexpected call");
}
return EncodingType.fromInt(
(typeSpecific() & CONTROL_EVENT_ENCODING_MASK)
>>> CONTROL_EVENT_ENCODING_START_IDX);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[ EventHeader [")
.append(" FragmentBit=")
.append(fragmentBit())
.append(" Length=")
.append(length())
.append(" Type=")
.append(type())
.append(" HeaderWords=")
.append(headerWords())
.append(" TypeSpecific=")
.append(typeSpecific())
.append(" ] ]");
return sb.toString();
}
}