-
Notifications
You must be signed in to change notification settings - Fork 52
/
EBMLParser.java
366 lines (318 loc) · 14.6 KB
/
EBMLParser.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
/*
Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
A copy of the License is located at
http://aws.amazon.com/apache2.0/
or in the "license" file accompanying this file.
This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
*/
package com.amazonaws.kinesisvideo.parser.ebml;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Stack;
import java.util.stream.Collectors;
/**
* This class is used to parse a stream of EBML.
* It is based on the ebml specification published by the Matroska Org
* (https://github.com/Matroska-Org/ebml-specification/blob/master/specification.markdown).
*
* A new instance of this object is created for a new stream of EBML (the response stream for a GetMedia call).
* A new instance is configured with a {@link EBMLTypeInfoProvider} that provides the semantics for the EBML document
* being parsed and a {@link EBMLParserCallbacks} that receives callbacks as the parser detects different EBML elements.
* Once an instance of the EBML parser is created, the parse method is invoked repeatedly.
* A stream of EBML is encapsulated by a {@link ParserByteSource} and is an argument to the parse method.
* The parse method is non-blocking and consumes all the data passed to it in each invocation.
* As the parser detects EBML elements it invokes methods on the {@link EBMLParserCallbacks}.
* Once all the data in an EBML stream has being sent to the parser, the method closeParser is called to shutdown
* the parser.
*
* TODO: add implementation details.
*
*/
@Slf4j
public class EBMLParser {
private static final int BYTE_MASK = 0xFF;
//TODO: have it be an argument, either constructor or method
private static final int DEFAULT_MAX_CONTENT_BYTES_IN_ONE_PASS = 8192;
private final EBMLTypeInfoProvider typeInfoProvider;
private final Stack<EBMLParserInternalElement> masterElements;
private final EBMLParserCallbacks callbacks;
private final int maxContentBytesInOnePass;
private final ByteBuffer skipBuffer;
private long elementCount = 0;
private long totalBytesRead = 0;
@Getter(AccessLevel.PACKAGE)
private boolean endOfStream;
@Getter(AccessLevel.PUBLIC)
private boolean closed;
private EBMLParserInternalElement currentElement;
private ReplayIdAndSizeBuffer replayIdAndSizeBuffer;
public EBMLParser(EBMLTypeInfoProvider typeInfoProvider, EBMLParserCallbacks callbacks) {
this(typeInfoProvider, callbacks, DEFAULT_MAX_CONTENT_BYTES_IN_ONE_PASS);
}
public EBMLParser(EBMLTypeInfoProvider typeInfoProvider,
EBMLParserCallbacks callbacks,
int maxContentBytesInOnePass) {
this.typeInfoProvider = typeInfoProvider;
this.callbacks = callbacks;
this.replayIdAndSizeBuffer =
new ReplayIdAndSizeBuffer(EBMLUtils.EBML_ID_MAX_BYTES + EBMLUtils.EBML_SIZE_MAX_BYTES);
createNewCurrentElementInfo();
this.masterElements = new Stack<>();
this.maxContentBytesInOnePass = maxContentBytesInOnePass;
this.skipBuffer = ByteBuffer.allocate(maxContentBytesInOnePass);
log.info("Creating EBMLParser with maxContentBytesInOnePass {}", this.maxContentBytesInOnePass);
}
public void parse(ParserByteSource byteSource) {
try (CallState callState = new CallState(byteSource)) {
while (callState.shouldContinueParsing()) {
if (log.isDebugEnabled()) {
log.debug("Current element read state {}", currentElement.currentElementReadState);
}
switch (currentElement.currentElementReadState) {
case NEW:
//check if any master elements are done because their end offset has been reached.
removeMasterElementsBasedOnSizeEnd();
currentElement.readId(callState);
break;
case ID_DONE:
currentElement.readSize(callState);
break;
case SIZE_DONE:
currentElement.updateTypeInfo(typeInfoProvider);
//check if any master elements are done because an equal or higher level
//element is reached.
removeMasterElementsBasedOnLevel();
//Call onstartForElement();
if (currentElement.isKnownType()) {
log.debug("Invoking onStartElement for current element {}", currentElement);
callbacks.onStartElement(currentElement.getMetadata(),
currentElement.getDataSize(),
replayIdAndSizeBuffer.getByteBuffer(),
this::currentElementPath);
}
startReadingContentBasedOnType();
break;
case CONTENT_READING:
Validate.isTrue(currentElement.isKnownType(),
"We should read only from elements with known types");
currentElement.readContent(callState, callState, callbacks, maxContentBytesInOnePass);
break;
case CONTENT_SKIPPING:
Validate.isTrue(!currentElement.isKnownType(), "We should skip data for unknown elements only");
skipBuffer.rewind();
currentElement.skipContent(callState, callState, skipBuffer);
break;
case FINISHED:
invokeOnEndElementCallback(currentElement);
//check if any master elements are done because their end offset has been reached.
removeMasterElementsBasedOnSizeEnd();
createNewCurrentElementInfo();
break;
default:
throw new IllegalArgumentException("Unexpected ElementReadState");
}
}
log.debug("Stopping parsing");
if (endOfStream) {
closeParser();
}
}
}
public void closeParser() {
if (!closed) {
log.info("Closing EBMLParser");
//close current element
if (currentElement != null && currentElement.isKnownType()) {
log.info("Closing with currentElement {} still set, invoking end element callback on it",
currentElement);
invokeOnEndElementCallback(currentElement);
currentElement = null;
}
log.info("Closing with {} master elements on stack, invoking end element callback on them",
masterElements.size());
while (!masterElements.isEmpty()) {
EBMLParserInternalElement top = masterElements.pop();
//TODO: see if we need to add a flag to indicate unclean close
invokeOnEndElementCallback(top);
}
}
closed = true;
}
private void startReadingContentBasedOnType() {
if (!currentElement.isKnownType()) {
Validate.isTrue(!currentElement.isUnknownLength(), "Cannot skip element of unknown length");
currentElement.startSkippingContent();
log.warn("Will skip content for element number {} with unknown id {} datasize {}",
currentElement.getElementCount(),
currentElement.getId(),
currentElement.getDataSize());
} else {
if (currentElement.getTypeInfo().getType() == EBMLTypeInfo.TYPE.MASTER) {
//Mark the master element as started although it will consist of
//child elements. So, push it into the stack of master elements whose
//contents are currently being read.
currentElement.startReadingContent();
masterElements.push(currentElement);
createNewCurrentElementInfo();
} else {
//A non-master element should not have unknown or infinite length
//as that prevents the parser finding the end of the element.
Validate.isTrue(!currentElement.isUnknownLength(),
"A non-master element should not have unknown length");
//start reading contents.
currentElement.startReadingContent();
}
}
}
private void removeMasterElementsBasedOnLevel() {
if (!currentElement.isKnownType()) {
return;
}
if (!currentElement.getTypeInfo().isGlobal()) {
while (!masterElements.isEmpty()) {
EBMLParserInternalElement top = masterElements.peek();
//For handling master elements with the wrong size (such as segments)
//We should finish master elements of known size is another element of the same or
//lower level is found.
Validate.isTrue(currentElement.getElementCount() != top.getElementCount());
if (currentElement.getTypeInfo().getLevel() <= top.getTypeInfo().getLevel()) {
log.debug("Removing master element {} based on level of current element {}", top, currentElement);
masterElements.pop();
invokeOnEndElementCallback(top);
} else {
break;
}
}
}
}
private void removeMasterElementsBasedOnSizeEnd() {
if (!currentElement.isKnownType()) {
return;
}
while (!masterElements.isEmpty()) {
EBMLParserInternalElement top = masterElements.peek();
if (!top.isUnknownLength()) {
if (top.endOffSet() <= totalBytesRead) {
log.debug("Removing master element {} based on size end {}", top, totalBytesRead);
masterElements.pop();
invokeOnEndElementCallback(top);
} else {
break;
}
} else {
break;
}
}
}
private List<EBMLElementMetaData> currentElementPath() {
return masterElements.stream().map(EBMLParserInternalElement::getMetadata).collect(Collectors.toList());
}
private void invokeOnEndElementCallback(EBMLParserInternalElement finishedElement) {
if (finishedElement.isKnownType()) {
log.debug("Invoking onStartElement for current element {}", finishedElement);
callbacks.onEndElement(finishedElement.getMetadata(), this::currentElementPath);
}
}
private void createNewCurrentElementInfo() {
currentElement = new EBMLParserInternalElement(totalBytesRead, elementCount);
elementCount++;
replayIdAndSizeBuffer.init(totalBytesRead);
}
/**
* This internal class maintains state for each parse call.
*/
@RequiredArgsConstructor
private class CallState implements Closeable, TrackingReplayableIdAndSizeByteSource, ParserBulkByteSource {
private boolean parseMore = true;
private final ParserByteSource byteSource;
@Setter
private long readOffsetForReplayBuffer;
@Override
public long getTotalBytesRead() {
return totalBytesRead;
}
@Override
public void close() {
}
boolean shouldContinueParsing() {
return !endOfStream && parseMore && callbacks.continueParsing();
}
@Override
public boolean checkAndReadIntoReplayBuffer(int len) {
if (parseMore) {
int availableInReplayBuffer = replayIdAndSizeBuffer.availableAfter(readOffsetForReplayBuffer);
Validate.isTrue(availableInReplayBuffer >= 0);
if (availableInReplayBuffer >= len) {
return true;
} else {
int numBytesToRead = len - availableInReplayBuffer;
parseMore = byteSource.available() >= numBytesToRead;
numBytesToRead = Math.min(numBytesToRead, byteSource.available());
for (int i = 0; i < numBytesToRead; i++) {
readFromByteSourceIntoReplayBuffer();
}
}
}
return parseMore;
}
@Override
public int readByte() {
if (replayIdAndSizeBuffer.inReplayBuffer(readOffsetForReplayBuffer)) {
byte result = replayIdAndSizeBuffer.getByteFromOffset(readOffsetForReplayBuffer);
readOffsetForReplayBuffer++;
return result & BYTE_MASK;
} else {
int result = readFromByteSourceIntoReplayBuffer();
readOffsetForReplayBuffer++;
return result;
}
}
private int readFromByteSourceIntoReplayBuffer() {
int result = byteSource.readByte();
if (result == -1) {
markAsEndofStream();
return -1;
}
Validate.inclusiveBetween(0, BYTE_MASK, result);
replayIdAndSizeBuffer.addByte((byte) result);
totalBytesRead++;
return result;
}
@Override
public int availableForContent() {
if (parseMore) {
int availableBytes = byteSource.available();
if (availableBytes == 0) {
parseMore = false;
}
return availableBytes;
}
return 0;
}
@Override
public int readBytes(ByteBuffer dest, int numBytes) {
int readBytes = byteSource.readBytes(dest, numBytes);
if (readBytes == -1) {
markAsEndofStream();
return readBytes;
}
Validate.isTrue(readBytes >= 0);
totalBytesRead += readBytes;
return readBytes;
}
private void markAsEndofStream() {
endOfStream = true;
parseMore = false;
}
}
}