Skip to content

Commit

Permalink
Reimplement JsonParser without the queue since the underlying parser …
Browse files Browse the repository at this point in the history
…is already a buffer of its own
  • Loading branch information
vietj committed Oct 23, 2018
1 parent 84dad36 commit fe28f4c
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 83 deletions.
98 changes: 66 additions & 32 deletions src/main/java/io/vertx/core/parsetools/impl/JsonParserImpl.java
Expand Up @@ -18,14 +18,14 @@
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.VertxException; import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.Arguments;
import io.vertx.core.json.DecodeException; import io.vertx.core.json.DecodeException;
import io.vertx.core.json.Json; import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
import io.vertx.core.parsetools.JsonEvent; import io.vertx.core.parsetools.JsonEvent;
import io.vertx.core.parsetools.JsonEventType; import io.vertx.core.parsetools.JsonEventType;
import io.vertx.core.parsetools.JsonParser; import io.vertx.core.parsetools.JsonParser;
import io.vertx.core.queue.Queue;
import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.ReadStream;


import java.io.IOException; import java.io.IOException;
Expand All @@ -38,22 +38,20 @@
public class JsonParserImpl implements JsonParser { public class JsonParserImpl implements JsonParser {


private NonBlockingJsonParser parser; private NonBlockingJsonParser parser;
private JsonToken currentToken;
private Handler<JsonToken> tokenHandler = this::handleToken; private Handler<JsonToken> tokenHandler = this::handleToken;
private final Queue<JsonEvent> pending; private Handler<JsonEvent> eventHandler;
private BufferingHandler arrayHandler; private BufferingHandler arrayHandler;
private BufferingHandler objectHandler; private BufferingHandler objectHandler;
private Handler<Throwable> exceptionHandler; private Handler<Throwable> exceptionHandler;
private String currentField; private String currentField;
private Handler<Void> endHandler; private Handler<Void> endHandler;
private long demand = Long.MAX_VALUE;
private boolean ended;
private final ReadStream<Buffer> stream; private final ReadStream<Buffer> stream;


public JsonParserImpl(ReadStream<Buffer> stream) { public JsonParserImpl(ReadStream<Buffer> stream) {
this.stream = stream; this.stream = stream;
this.pending = Queue
.<JsonEvent>queue()
.writableHandler(v -> {
stream.resume();
});
JsonFactory factory = new JsonFactory(); JsonFactory factory = new JsonFactory();
try { try {
parser = (NonBlockingJsonParser) factory.createNonBlockingByteArrayParser(); parser = (NonBlockingJsonParser) factory.createNonBlockingByteArrayParser();
Expand All @@ -64,19 +62,23 @@ public JsonParserImpl(ReadStream<Buffer> stream) {


@Override @Override
public JsonParser pause() { public JsonParser pause() {
pending.pause(); demand = 0L;
return this; return this;
} }


@Override @Override
public JsonParser resume() { public JsonParser resume() {
pending.resume(); return fetch(Long.MAX_VALUE);
return this;
} }


@Override @Override
public JsonParser fetch(long amount) { public JsonParser fetch(long amount) {
pending.take(amount); Arguments.require(amount > 0L, "Fetch amount must be > 0L");
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
checkPending();
return this; return this;
} }


Expand All @@ -88,7 +90,7 @@ public JsonParser endHandler(Handler<Void> handler) {


@Override @Override
public JsonParser handler(Handler<JsonEvent> handler) { public JsonParser handler(Handler<JsonEvent> handler) {
pending.handler(handler); eventHandler = handler;
if (stream != null) { if (stream != null) {
if (handler != null) { if (handler != null) {
stream.endHandler(v -> end()); stream.endHandler(v -> end());
Expand All @@ -108,8 +110,9 @@ public JsonParser handler(Handler<JsonEvent> handler) {
} }


private void handleEvent(JsonEvent event) { private void handleEvent(JsonEvent event) {
if (!pending.add(event) && stream != null) { Handler<JsonEvent> handler = this.eventHandler;
stream.pause(); if (handler != null) {
handler.handle(event);
} }
} }


Expand Down Expand Up @@ -183,35 +186,66 @@ private void handleToken(JsonToken token) {


@Override @Override
public void handle(Buffer event) { public void handle(Buffer event) {
handle(event.getBytes()); byte[] bytes = event.getBytes();
try {
parser.feedInput(bytes, 0, bytes.length);
} catch (IOException e) {
if (exceptionHandler != null) {
exceptionHandler.handle(e);
return;
} else {
throw new DecodeException(e.getMessage());
}
}
checkPending();
} }


@Override @Override
public void end() { public void end() {
handle((byte[]) null); if (ended) {
}

private void handle(byte[] bytes) {
if (parser == null) {
throw new IllegalStateException("Parsing already done"); throw new IllegalStateException("Parsing already done");
} }
ended = true;
parser.endOfInput();
checkPending();
}

private void checkPending() {
try { try {
if (bytes != null) {
parser.feedInput(bytes, 0, bytes.length);
} else {
parser.endOfInput();
}
while (true) { while (true) {
JsonToken token = parser.nextToken(); if (currentToken == null) {
if (token == null || token == JsonToken.NOT_AVAILABLE) { JsonToken next = parser.nextToken();
if (next != null && next != JsonToken.NOT_AVAILABLE) {
currentToken = next;
}
}
if (currentToken == null) {
if (ended) {
if (endHandler != null) {
endHandler.handle(null);
}
}
break; break;
} else {
if (demand > 0L) {
if (demand != Long.MAX_VALUE) {
demand--;
}
JsonToken token = currentToken;
currentToken = null;
tokenHandler.handle(token);
} else {
break;
}
} }
tokenHandler.handle(token);
} }
if (bytes == null) { if (demand == 0L) {
parser = null; if (stream != null) {
if (endHandler != null) { stream.pause();
endHandler.handle(null); }
} else {
if (stream != null) {
stream.resume();
} }
} }
} catch (IOException e) { } catch (IOException e) {
Expand Down
92 changes: 92 additions & 0 deletions src/test/java/io/vertx/core/parsetools/FakeStream.java
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2011-2018 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.parsetools;

import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.Arguments;
import io.vertx.core.streams.ReadStream;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

class FakeStream implements ReadStream<Buffer> {

private long demand = Long.MAX_VALUE;
private Handler<Buffer> eventHandler;
private Handler<Void> endHandler;
private Handler<Throwable> exceptionHandler;

@Override
public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
return this;
}

@Override
public ReadStream<Buffer> handler(Handler<Buffer> handler) {
eventHandler = handler;
return this;
}

@Override
public ReadStream<Buffer> fetch(long amount) {
Arguments.require(amount > 0, "Fetch amount must be > 0L");
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
return this;
}

@Override
public ReadStream<Buffer> pause() {
demand = 0L;
return this;
}

@Override
public ReadStream<Buffer> resume() {
return fetch(Long.MAX_VALUE);
}

@Override
public ReadStream<Buffer> endHandler(Handler<Void> handler) {
endHandler = handler;
return this;
}

boolean isPaused() {
return demand == 0L;
}

void handle(String s) {
handle(Buffer.buffer(s));
}

void handle(Buffer buff) {
if (demand == 0L) {
throw new IllegalStateException();
}
if (demand != Long.MAX_VALUE) {
demand--;
}
eventHandler.handle(buff);
}

void fail(Throwable err) {
exceptionHandler.handle(err);
}

void end() {
endHandler.handle(null);
}
}
117 changes: 114 additions & 3 deletions src/test/java/io/vertx/core/parsetools/JsonParserTest.java
Expand Up @@ -17,9 +17,6 @@
import io.vertx.core.json.DecodeException; import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
import io.vertx.core.parsetools.JsonEvent;
import io.vertx.core.parsetools.JsonEventType;
import io.vertx.core.parsetools.JsonParser;
import io.vertx.test.core.TestUtils; import io.vertx.test.core.TestUtils;
import org.junit.Test; import org.junit.Test;


Expand Down Expand Up @@ -700,4 +697,118 @@ public void testParseLineDelimitedJSONStream() {
assertEquals(1, nullCount.get()); assertEquals(1, nullCount.get());
assertEquals(1, stringCount.get()); assertEquals(1, stringCount.get());
} }

@Test
public void testStreamHandle() {
FakeStream stream = new FakeStream();
JsonParser parser = JsonParser.newParser(stream);
List<JsonEvent> events = new ArrayList<>();
parser.handler(events::add);
stream.handle("{}");
assertFalse(stream.isPaused());
assertEquals(2, events.size());
}

@Test
public void testStreamPause() {
FakeStream stream = new FakeStream();
JsonParser parser = JsonParser.newParser(stream);
List<JsonEvent> events = new ArrayList<>();
parser.handler(events::add);
parser.pause();
stream.handle("1234");
assertTrue(stream.isPaused());
assertEquals(0, events.size());
}

@Test
public void testStreamResume() {
FakeStream stream = new FakeStream();
JsonParser parser = JsonParser.newParser(stream);
List<JsonEvent> events = new ArrayList<>();
parser.handler(events::add);
parser.pause();
stream.handle("{}");
parser.resume();
assertEquals(2, events.size());
assertFalse(stream.isPaused());
}

@Test
public void testStreamFetch() {
FakeStream stream = new FakeStream();
JsonParser parser = JsonParser.newParser(stream);
List<JsonEvent> events = new ArrayList<>();
parser.handler(events::add);
parser.pause();
stream.handle("{}");
parser.fetch(1);
assertEquals(1, events.size());
assertTrue(stream.isPaused());
}

@Test
public void testStreamPauseInHandler() {
FakeStream stream = new FakeStream();
JsonParser parser = JsonParser.newParser(stream);
List<JsonEvent> events = new ArrayList<>();
parser.handler(event -> {
assertTrue(events.isEmpty());
events.add(event);
parser.pause();
});
stream.handle("{}");
assertEquals(1, events.size());
assertTrue(stream.isPaused());
}

@Test
public void testStreamFetchInHandler() {
FakeStream stream = new FakeStream();
JsonParser parser = JsonParser.newParser(stream);
List<JsonEvent> events = new ArrayList<>();
parser.handler(event -> {
events.add(event);
stream.fetch(1);
});
stream.pause();
stream.fetch(1);
stream.handle("{}");
assertEquals(2, events.size());
assertFalse(stream.isPaused());
}

@Test
public void testStreamEnd() {
FakeStream stream = new FakeStream();
JsonParser parser = JsonParser.newParser(stream);
List<JsonEvent> events = new ArrayList<>();
parser.handler(events::add);
AtomicInteger ended = new AtomicInteger();
parser.endHandler(v -> ended.incrementAndGet());
stream.end();
assertEquals(0, events.size());
assertEquals(1, ended.get());
}

@Test
public void testStreamPausedEnd() {
FakeStream stream = new FakeStream();
JsonParser parser = JsonParser.newParser(stream);
List<JsonEvent> events = new ArrayList<>();
parser.handler(events::add);
AtomicInteger ended = new AtomicInteger();
parser.endHandler(v -> ended.incrementAndGet());
parser.pause();
stream.handle("{}");
stream.end();
assertEquals(0, ended.get());
assertEquals(0, events.size());
parser.fetch(1);
assertEquals(1, events.size());
assertEquals(0, ended.get());
parser.fetch(1);
assertEquals(2, events.size());
assertEquals(1, ended.get());
}
} }

0 comments on commit fe28f4c

Please sign in to comment.