Skip to content

Commit

Permalink
Rewrite the RecordParser stream handling
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Oct 23, 2018
1 parent 8035628 commit 863eaa3
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 37 deletions.
87 changes: 50 additions & 37 deletions src/main/java/io/vertx/core/parsetools/impl/RecordParserImpl.java
Expand Up @@ -15,7 +15,6 @@
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.Arguments; import io.vertx.core.impl.Arguments;
import io.vertx.core.parsetools.RecordParser; import io.vertx.core.parsetools.RecordParser;
import io.vertx.core.queue.Queue;
import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.ReadStream;


import java.util.Objects; import java.util.Objects;
Expand All @@ -30,29 +29,26 @@ public class RecordParserImpl implements RecordParser {
private int pos; // Current position in buffer private int pos; // Current position in buffer
private int start; // Position of beginning of current record private int start; // Position of beginning of current record
private int delimPos; // Position of current match in delimiter array private int delimPos; // Position of current match in delimiter array
private int next = -1; // Position of the next matching record


private boolean delimited; private boolean delimited;
private byte[] delim; private byte[] delim;
private int recordSize; private int recordSize;
private int maxRecordSize; private int maxRecordSize;
private long demand = Long.MAX_VALUE;
private Handler<Buffer> eventHandler;
private Handler<Void> endHandler; private Handler<Void> endHandler;
private Handler<Throwable> exceptionHandler; private Handler<Throwable> exceptionHandler;


private final ReadStream<Buffer> stream; private final ReadStream<Buffer> stream;
private final Queue<Buffer> pending;


private RecordParserImpl(ReadStream<Buffer> stream) { private RecordParserImpl(ReadStream<Buffer> stream) {
this.stream = stream; this.stream = stream;
this.pending = Queue
.<Buffer>queue()
.writableHandler(v -> {
stream.resume();
});
} }


public void setOutput(Handler<Buffer> output) { public void setOutput(Handler<Buffer> output) {
Objects.requireNonNull(output, "output"); Objects.requireNonNull(output, "output");
this.pending.handler(output); eventHandler = output;
} }


/** /**
Expand Down Expand Up @@ -175,15 +171,38 @@ public RecordParser maxRecordSize(int size) {
private void handleParsing() { private void handleParsing() {
int len = buff.length(); int len = buff.length();
do { do {
Buffer event; if (next == -1) {
if (delimited) { if (delimited) {
event = parseDelimited(); next = parseDelimited();
} else { } else {
event = parseFixed(); next = parseFixed();
}
} }
if (event != null) { if (demand > 0L) {
handleEvent(event); if (next == -1) {
ReadStream<Buffer> s = stream;
if (s != null) {
s.resume();
}
break;
}
if (demand != Long.MAX_VALUE) {
demand--;
}
Buffer event = buff.getBuffer(start, next);
start = pos;
next = -1;
Handler<Buffer> handler = eventHandler;
if (handler != null) {
handler.handle(event);
}
} else { } else {
if (next != -1) {
ReadStream<Buffer> s = stream;
if (s != null) {
s.pause();
}
}
break; break;
} }
} while (true); } while (true);
Expand All @@ -199,23 +218,15 @@ private void handleParsing() {
start = 0; start = 0;
} }


private void handleEvent(Buffer event) { private int parseDelimited() {
if (!pending.add(event) && stream != null) {
stream.pause();
}
}

private Buffer parseDelimited() {
int len = buff.length(); int len = buff.length();
for (; pos < len; pos++) { for (; pos < len; pos++) {
if (buff.getByte(pos) == delim[delimPos]) { if (buff.getByte(pos) == delim[delimPos]) {
delimPos++; delimPos++;
if (delimPos == delim.length) { if (delimPos == delim.length) {
pos++; pos++;
Buffer ret = buff.getBuffer(start, pos - delim.length);
start = pos;
delimPos = 0; delimPos = 0;
return ret; return pos - delim.length;
} }
} else { } else {
if (delimPos > 0) { if (delimPos > 0) {
Expand All @@ -224,19 +235,17 @@ private Buffer parseDelimited() {
} }
} }
} }
return null; return -1;
} }


private Buffer parseFixed() { private int parseFixed() {
int len = buff.length(); int len = buff.length();
if (len - start >= recordSize) { if (len - start >= recordSize) {
int end = start + recordSize; int end = start + recordSize;
Buffer ret = buff.getBuffer(start, end); pos = end;
start = end; return end;
pos = start;
return ret;
} }
return null; return -1;
} }


/** /**
Expand Down Expand Up @@ -276,7 +285,7 @@ public RecordParser exceptionHandler(Handler<Throwable> handler) {


@Override @Override
public RecordParser handler(Handler<Buffer> handler) { public RecordParser handler(Handler<Buffer> handler) {
pending.handler(handler); eventHandler = handler;
if (stream != null) { if (stream != null) {
if (handler != null) { if (handler != null) {
stream.endHandler(v -> end()); stream.endHandler(v -> end());
Expand All @@ -297,20 +306,24 @@ public RecordParser handler(Handler<Buffer> handler) {


@Override @Override
public RecordParser pause() { public RecordParser pause() {
pending.pause(); demand = 0L;
return this; return this;
} }


@Override @Override
public RecordParser fetch(long amount) { public RecordParser fetch(long amount) {
pending.take(amount); Arguments.require(amount > 0, "Fetch amount must be > 0");
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
handleParsing();
return this; return this;
} }


@Override @Override
public RecordParser resume() { public RecordParser resume() {
pending.resume(); return fetch(Long.MAX_VALUE);
return this;
} }


@Override @Override
Expand Down
11 changes: 11 additions & 0 deletions src/test/java/io/vertx/core/parsetools/RecordParserTest.java
Expand Up @@ -333,11 +333,22 @@ public void testWrapReadStream() {
assertEquals("item-" + i, records.poll()); assertEquals("item-" + i, records.poll());
} }
assertNull(records.poll()); assertNull(records.poll());
assertFalse(stream.isPaused());


stream.end(); stream.end();
assertEquals(1, ends.get()); assertEquals(1, ends.get());
} }


@Test
public void testPausedStreamShouldNotPauseOnIncompleteMatch() {
FakeStream stream = new FakeStream();
RecordParser parser = RecordParser.newDelimited("\r\n", stream);
parser.handler(event -> {});
parser.pause();
stream.handle("abc");
assertFalse(stream.isPaused());
}

private void doTestDelimitedMaxRecordSize(final Buffer input, Buffer delim, Integer[] chunkSizes, int maxRecordSize, private void doTestDelimitedMaxRecordSize(final Buffer input, Buffer delim, Integer[] chunkSizes, int maxRecordSize,
Handler<Throwable> exHandler, final Buffer... expected) { Handler<Throwable> exHandler, final Buffer... expected) {
final Buffer[] results = new Buffer[expected.length]; final Buffer[] results = new Buffer[expected.length];
Expand Down

0 comments on commit 863eaa3

Please sign in to comment.