Skip to content

Commit

Permalink
Rewrite the Queue, rename to InboundBuffer and make it more internal
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Oct 23, 2018
1 parent c3a6f75 commit 38cc7f5
Show file tree
Hide file tree
Showing 14 changed files with 1,093 additions and 1,361 deletions.
12 changes: 6 additions & 6 deletions src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java
Expand Up @@ -24,7 +24,7 @@
import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger; import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory; import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.queue.Queue; import io.vertx.core.streams.impl.InboundBuffer;


import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
Expand Down Expand Up @@ -68,7 +68,7 @@ public class AsyncFileImpl implements AsyncFile {
private int maxWrites = 128 * 1024; // TODO - we should tune this for best performance private int maxWrites = 128 * 1024; // TODO - we should tune this for best performance
private int lwm = maxWrites / 2; private int lwm = maxWrites / 2;
private int readBufferSize = DEFAULT_READ_BUFFER_SIZE; private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
private Queue<Buffer> queue; private InboundBuffer<Buffer> queue;
private Handler<Void> endHandler; private Handler<Void> endHandler;
private long readPos; private long readPos;


Expand Down Expand Up @@ -100,9 +100,9 @@ public class AsyncFileImpl implements AsyncFile {
throw new FileSystemException(e); throw new FileSystemException(e);
} }
this.context = context; this.context = context;
this.queue = Queue.queue(context, 0); this.queue = new InboundBuffer<>(context, 0);


queue.writableHandler(v -> { queue.drainHandler(v -> {
doRead(); doRead();
}); });
} }
Expand Down Expand Up @@ -137,7 +137,7 @@ public synchronized AsyncFile read(Buffer buffer, int offset, long position, int


@Override @Override
public AsyncFile fetch(long amount) { public AsyncFile fetch(long amount) {
queue.take(amount); queue.fetch(amount);
return this; return this;
} }


Expand Down Expand Up @@ -338,7 +338,7 @@ private synchronized void doRead() {
handleEnd(); handleEnd();
} else { } else {
readPos += buffer.length(); readPos += buffer.length();
if (queue.add(buffer)) { if (queue.write(buffer)) {
doRead(); doRead();
} }
} }
Expand Down
Expand Up @@ -39,7 +39,7 @@
import io.vertx.core.net.impl.NetSocketImpl; import io.vertx.core.net.impl.NetSocketImpl;
import io.vertx.core.net.impl.VertxNetHandler; import io.vertx.core.net.impl.VertxNetHandler;
import io.vertx.core.spi.metrics.HttpClientMetrics; import io.vertx.core.spi.metrics.HttpClientMetrics;
import io.vertx.core.queue.Queue; import io.vertx.core.streams.impl.InboundBuffer;


import java.net.URI; import java.net.URI;
import java.util.*; import java.util.*;
Expand Down Expand Up @@ -195,15 +195,15 @@ private static class StreamImpl implements HttpClientStream {
private boolean requestEnded; private boolean requestEnded;
private boolean responseEnded; private boolean responseEnded;
private boolean reset; private boolean reset;
private Queue<Buffer> queue; private InboundBuffer<Buffer> queue;
private MultiMap trailers; private MultiMap trailers;
private StreamImpl next; private StreamImpl next;


StreamImpl(Http1xClientConnection conn, int id, Handler<AsyncResult<HttpClientStream>> handler) { StreamImpl(Http1xClientConnection conn, int id, Handler<AsyncResult<HttpClientStream>> handler) {
this.conn = conn; this.conn = conn;
this.fut = Future.<HttpClientStream>future().setHandler(handler); this.fut = Future.<HttpClientStream>future().setHandler(handler);
this.id = id; this.id = id;
this.queue = Queue.queue(conn.context, 0); this.queue = new InboundBuffer<>(conn.context, 5);
} }


private void append(StreamImpl s) { private void append(StreamImpl s) {
Expand Down Expand Up @@ -304,7 +304,7 @@ private void sendRequest(
} }


private boolean handleChunk(Buffer buff) { private boolean handleChunk(Buffer buff) {
return queue.add(buff); return queue.write(buff);
} }


@Override @Override
Expand Down Expand Up @@ -342,7 +342,7 @@ public void doPause() {


@Override @Override
public void doFetch(long amount) { public void doFetch(long amount) {
queue.take(amount); queue.fetch(amount);
} }


@Override @Override
Expand Down Expand Up @@ -454,7 +454,7 @@ private HttpClientResponseImpl beginResponse(HttpResponse resp) {
response.handleEnd(trailers); response.handleEnd(trailers);
} }
}); });
queue.writableHandler(v -> { queue.drainHandler(v -> {
if (!responseEnded) { if (!responseEnded) {
conn.doResume(); conn.doResume();
} }
Expand Down
Expand Up @@ -13,15 +13,13 @@


import io.vertx.core.Context; import io.vertx.core.Context;
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile; import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.OpenOptions; import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.HttpServerFileUpload; import io.vertx.core.http.HttpServerFileUpload;
import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.queue.Queue;
import io.vertx.core.streams.Pump; import io.vertx.core.streams.Pump;
import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.impl.InboundBuffer;


import java.nio.charset.Charset; import java.nio.charset.Charset;


Expand Down Expand Up @@ -49,7 +47,7 @@ class HttpServerFileUploadImpl implements HttpServerFileUpload {
private Handler<Throwable> exceptionHandler; private Handler<Throwable> exceptionHandler;


private long size; private long size;
private Queue<Buffer> pending; private InboundBuffer<Buffer> pending;
private boolean complete; private boolean complete;
private boolean lazyCalculateSize; private boolean lazyCalculateSize;


Expand All @@ -64,7 +62,7 @@ class HttpServerFileUploadImpl implements HttpServerFileUpload {
this.contentTransferEncoding = contentTransferEncoding; this.contentTransferEncoding = contentTransferEncoding;
this.charset = charset; this.charset = charset;
this.size = size; this.size = size;
this.pending = Queue.<Buffer>queue(context) this.pending = new InboundBuffer<Buffer>(context)
.emptyHandler(v -> { .emptyHandler(v -> {
if (complete) { if (complete) {
handleComplete(); handleComplete();
Expand Down Expand Up @@ -173,7 +171,7 @@ synchronized void receiveData(Buffer data) {
} }


synchronized void doReceiveData(Buffer data) { synchronized void doReceiveData(Buffer data) {
if (!pending.add(data)) { if (!pending.write(data)) {
req.pause(); req.pause();
} }
} }
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/vertx/core/http/impl/HttpServerRequestImpl.java
Expand Up @@ -29,7 +29,7 @@
import io.vertx.core.net.NetSocket; import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress; import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.metrics.Metrics; import io.vertx.core.spi.metrics.Metrics;
import io.vertx.core.queue.Queue; import io.vertx.core.streams.impl.InboundBuffer;


import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSession;
Expand Down Expand Up @@ -85,7 +85,7 @@ public class HttpServerRequestImpl implements HttpServerRequest {
private long bytesRead; private long bytesRead;


private boolean paused; private boolean paused;
private Queue<Buffer> pending; private InboundBuffer<Buffer> pending;


HttpServerRequestImpl(Http1xServerConnection conn, DefaultHttpRequest request) { HttpServerRequestImpl(Http1xServerConnection conn, DefaultHttpRequest request) {
this.conn = conn; this.conn = conn;
Expand All @@ -104,10 +104,10 @@ void setRequest(DefaultHttpRequest request) {
} }
} }


private Queue<Buffer> pendingQueue() { private InboundBuffer<Buffer> pendingQueue() {
if (pending == null) { if (pending == null) {
pending = Queue.queue(conn.getContext(), 8); pending = new InboundBuffer<>(conn.getContext(), 8);
pending.writableHandler(v -> conn.doResume()); pending.drainHandler(v -> conn.doResume());
pending.emptyHandler(v -> { pending.emptyHandler(v -> {
if (ended) { if (ended) {
doEnd(); doEnd();
Expand All @@ -120,7 +120,7 @@ private Queue<Buffer> pendingQueue() {


private void enqueueData(Buffer chunk) { private void enqueueData(Buffer chunk) {
// We queue requests if paused or a request is in progress to prevent responses being written in the wrong order // We queue requests if paused or a request is in progress to prevent responses being written in the wrong order
if (!pendingQueue().add(chunk)) { if (!pendingQueue().write(chunk)) {
// We only pause when we are actively called by the connection // We only pause when we are actively called by the connection
conn.doPause(); conn.doPause();
} }
Expand Down Expand Up @@ -327,7 +327,7 @@ public HttpServerRequest pause() {
@Override @Override
public HttpServerRequest fetch(long amount) { public HttpServerRequest fetch(long amount) {
synchronized (conn) { synchronized (conn) {
pendingQueue().take(amount); pendingQueue().fetch(amount);
return this; return this;
} }
} }
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java
Expand Up @@ -20,7 +20,7 @@
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.VertxInternal;
import io.vertx.core.queue.Queue; import io.vertx.core.streams.impl.InboundBuffer;


/** /**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a> * @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
Expand All @@ -35,7 +35,7 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
protected final ChannelHandlerContext handlerContext; protected final ChannelHandlerContext handlerContext;
protected final Http2Stream stream; protected final Http2Stream stream;


private Queue<Buffer> pending; private InboundBuffer<Buffer> pending;
private int pendingBytes; private int pendingBytes;
private MultiMap trailers; private MultiMap trailers;
private boolean writable; private boolean writable;
Expand All @@ -47,15 +47,16 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
this.stream = stream; this.stream = stream;
this.context = conn.getContext(); this.context = conn.getContext();
this.writable = writable; this.writable = writable;
this.pending = Queue.queue(context, 5); this.pending = new InboundBuffer<>(context, 5);


pending.writableHandler(v -> { pending.drainHandler(v -> {
int numBytes = pendingBytes; int numBytes = pendingBytes;
pendingBytes = 0; pendingBytes = 0;
conn.handler.consume(stream, numBytes); conn.handler.consume(stream, numBytes);
}); });


pending.handler(this::handleData); pending.handler(this::handleData);
pending.exceptionHandler(context.exceptionHandler());
pending.emptyHandler(v -> { pending.emptyHandler(v -> {
if (trailers != null) { if (trailers != null) {
handleEnd(trailers); handleEnd(trailers);
Expand All @@ -70,7 +71,7 @@ void onResetRead(long code) {
} }


boolean onDataRead(Buffer data) { boolean onDataRead(Buffer data) {
boolean read = pending.add(data); boolean read = pending.write(data);
if (!read) { if (!read) {
pendingBytes += data.length(); pendingBytes += data.length();
} }
Expand Down Expand Up @@ -110,7 +111,7 @@ public void doResume() {
} }


public void doFetch(long amount) { public void doFetch(long amount) {
pending.take(amount); pending.fetch(amount);
} }


boolean isNotWritable() { boolean isNotWritable() {
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java
Expand Up @@ -23,7 +23,7 @@
import io.vertx.core.http.impl.ws.WebSocketFrameInternal; import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.SocketAddress; import io.vertx.core.net.SocketAddress;
import io.vertx.core.queue.Queue; import io.vertx.core.streams.impl.InboundBuffer;


import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSession;
Expand All @@ -46,7 +46,7 @@ public abstract class WebSocketImplBase<S extends WebSocketBase> implements WebS
private final String binaryHandlerID; private final String binaryHandlerID;
private final int maxWebSocketFrameSize; private final int maxWebSocketFrameSize;
private final int maxWebSocketMessageSize; private final int maxWebSocketMessageSize;
private final Queue<Buffer> pending; private final InboundBuffer<Buffer> pending;
private MessageConsumer binaryHandlerRegistration; private MessageConsumer binaryHandlerRegistration;
private MessageConsumer textHandlerRegistration; private MessageConsumer textHandlerRegistration;
private String subProtocol; private String subProtocol;
Expand All @@ -69,9 +69,9 @@ public abstract class WebSocketImplBase<S extends WebSocketBase> implements WebS
this.conn = conn; this.conn = conn;
this.maxWebSocketFrameSize = maxWebSocketFrameSize; this.maxWebSocketFrameSize = maxWebSocketFrameSize;
this.maxWebSocketMessageSize = maxWebSocketMessageSize; this.maxWebSocketMessageSize = maxWebSocketMessageSize;
this.pending = Queue.<Buffer>queue(conn.getContext()); this.pending = new InboundBuffer<>(conn.getContext());


pending.writableHandler(v -> { pending.drainHandler(v -> {
conn.doResume(); conn.doResume();
}); });
} }
Expand Down Expand Up @@ -284,7 +284,7 @@ void handleFrame(WebSocketFrameInternal frame) {
synchronized (conn) { synchronized (conn) {
if (frame.type() != FrameType.CLOSE) { if (frame.type() != FrameType.CLOSE) {
conn.reportBytesRead(frame.length()); conn.reportBytesRead(frame.length());
if (!pending.add(frame.binaryData())) { if (!pending.write(frame.binaryData())) {
conn.doPause(); conn.doPause();
} }
} }
Expand Down Expand Up @@ -557,7 +557,7 @@ public S resume() {
@Override @Override
public S fetch(long amount) { public S fetch(long amount) {
if (!isClosed()) { if (!isClosed()) {
pending.take(amount); pending.fetch(amount);
} }
return (S) this; return (S) this;
} }
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/io/vertx/core/net/impl/NetSocketImpl.java
Expand Up @@ -36,7 +36,7 @@
import io.vertx.core.net.NetSocket; import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress; import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.metrics.TCPMetrics; import io.vertx.core.spi.metrics.TCPMetrics;
import io.vertx.core.queue.Queue; import io.vertx.core.streams.impl.InboundBuffer;


import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
Expand Down Expand Up @@ -71,7 +71,7 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
private final TCPMetrics metrics; private final TCPMetrics metrics;
private Handler<Void> endHandler; private Handler<Void> endHandler;
private Handler<Void> drainHandler; private Handler<Void> drainHandler;
private Queue<Object> pending; private InboundBuffer<Object> pending;
private MessageConsumer registration; private MessageConsumer registration;
private boolean closed; private boolean closed;


Expand All @@ -87,8 +87,8 @@ public NetSocketImpl(VertxInternal vertx, ChannelHandlerContext channel, SocketA
this.writeHandlerID = "__vertx.net." + UUID.randomUUID().toString(); this.writeHandlerID = "__vertx.net." + UUID.randomUUID().toString();
this.remoteAddress = remoteAddress; this.remoteAddress = remoteAddress;
this.metrics = metrics; this.metrics = metrics;
pending = Queue.queue(context, 0); pending = new InboundBuffer<>(context);
pending.writableHandler(v -> doResume()); pending.drainHandler(v -> doResume());
pending.handler(NULL_MSG_HANDLER); pending.handler(NULL_MSG_HANDLER);
} }


Expand Down Expand Up @@ -188,7 +188,7 @@ public synchronized NetSocket pause() {


@Override @Override
public NetSocket fetch(long amount) { public NetSocket fetch(long amount) {
pending.take(amount); pending.fetch(amount);
return this; return this;
} }


Expand Down Expand Up @@ -361,7 +361,7 @@ protected void handleClosed() {


public synchronized void handleMessage(Object msg) { public synchronized void handleMessage(Object msg) {
checkContext(); checkContext();
if (!pending.add(msg)) { if (!pending.write(msg)) {
doPause(); doPause();
} }
} }
Expand Down

0 comments on commit 38cc7f5

Please sign in to comment.