Skip to content

Commit

Permalink
InboundBuffer should call the drain handler only during a drain call …
Browse files Browse the repository at this point in the history
…- see #2851
  • Loading branch information
vietj committed Feb 25, 2019
1 parent ef5c3a2 commit 90455f8
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 129 deletions.
42 changes: 42 additions & 0 deletions src/main/asciidoc/dataobjects.adoc
Expand Up @@ -223,6 +223,9 @@ Set the trust options in jks format, aka Java truststore
|[[useAlpn]]`@useAlpn`|`Boolean`|+++
Set the ALPN usage.
+++
|[[usePooledBuffers]]`@usePooledBuffers`|`Boolean`|+++
Set whether Netty pooled buffers are enabled
+++
|===

[[CopyOptions]]
Expand Down Expand Up @@ -376,6 +379,18 @@ Sets the value of max worker execute time, in link.
|[[maxWorkerExecuteTimeUnit]]`@maxWorkerExecuteTimeUnit`|`link:enums.html#TimeUnit[TimeUnit]`|+++
Set the time unit of <code>maxWorkerExecuteTime</code>
+++
|[[multiThreaded]]`@multiThreaded`|`Boolean`|+++
Set whether the verticle(s) should be deployed as a multi-threaded worker verticle.
<p>
<strong>WARNING</strong>: Multi-threaded worker verticles are a deprecated feature.
<p>
Most applications will have no need for them. Because of the concurrency in these verticles you have to be
very careful to keep the verticle in a consistent state using standard Java techniques for multi-threaded
programming.
<p>
You can read the documentation that explains how you can replace this feature by the usage of custom worker
pools or <code>executeBlocking</code> calls.
+++
|[[worker]]`@worker`|`Boolean`|+++
Set whether the verticle(s) should be deployed as a worker verticle
+++
Expand Down Expand Up @@ -559,6 +574,9 @@ Set the trust options in jks format, aka Java truststore
|[[useAlpn]]`@useAlpn`|`Boolean`|+++
Set the ALPN usage.
+++
|[[usePooledBuffers]]`@usePooledBuffers`|`Boolean`|+++
Set whether Netty pooled buffers are enabled
+++
|===

[[FileSystemOptions]]
Expand Down Expand Up @@ -874,6 +892,9 @@ Set whether the client will offer the WebSocket per-message deflate compression
|[[useAlpn]]`@useAlpn`|`Boolean`|+++
Set the ALPN usage.
+++
|[[usePooledBuffers]]`@usePooledBuffers`|`Boolean`|+++
Set whether Netty pooled buffers are enabled
+++
|[[verifyHost]]`@verifyHost`|`Boolean`|+++
Set whether hostname verification is enabled
+++
Expand Down Expand Up @@ -916,6 +937,9 @@ Set the list of protocol versions to provide to the server during the Applicatio
|[[clientAuth]]`@clientAuth`|`link:enums.html#ClientAuth[ClientAuth]`|+++
Set whether client auth is required
+++
|[[clientAuthRequired]]`@clientAuthRequired`|`Boolean`|+++
Set whether client auth is required
+++
|[[compressionLevel]]`@compressionLevel`|`Number (int)`|+++
This method allows to set the compression level to be used in http1.x/2 response bodies
when compression support is turned on (@see setCompressionSupported) and the client advertises
Expand Down Expand Up @@ -1077,6 +1101,9 @@ Set the trust options in jks format, aka Java truststore
|[[useAlpn]]`@useAlpn`|`Boolean`|+++
Set the ALPN usage.
+++
|[[usePooledBuffers]]`@usePooledBuffers`|`Boolean`|+++
Set whether Netty pooled buffers are enabled
+++
|[[websocketAllowServerNoContext]]`@websocketAllowServerNoContext`|`Boolean`|+++
Set whether the WebSocket server will accept the <code>server_no_context_takeover</code> parameter of the per-message
deflate compression extension offered by the client.
Expand Down Expand Up @@ -1290,6 +1317,9 @@ Set the trust options in jks format, aka Java truststore
|[[useAlpn]]`@useAlpn`|`Boolean`|+++
Set the ALPN usage.
+++
|[[usePooledBuffers]]`@usePooledBuffers`|`Boolean`|+++
Set whether Netty pooled buffers are enabled
+++
|===

[[NetServerOptions]]
Expand All @@ -1310,6 +1340,9 @@ Set the accept back log
|[[clientAuth]]`@clientAuth`|`link:enums.html#ClientAuth[ClientAuth]`|+++
Set whether client auth is required
+++
|[[clientAuthRequired]]`@clientAuthRequired`|`Boolean`|+++
Set whether client auth is required
+++
|[[crlPaths]]`@crlPaths`|`Array of String`|+++
Add a CRL path
+++
Expand Down Expand Up @@ -1404,6 +1437,9 @@ Set the trust options in jks format, aka Java truststore
|[[useAlpn]]`@useAlpn`|`Boolean`|+++
Set the ALPN usage.
+++
|[[usePooledBuffers]]`@usePooledBuffers`|`Boolean`|+++
Set whether Netty pooled buffers are enabled
+++
|===

[[NetworkOptions]]
Expand Down Expand Up @@ -1929,6 +1965,9 @@ Set the trust options in jks format, aka Java truststore
|[[useAlpn]]`@useAlpn`|`Boolean`|+++
Set the ALPN usage.
+++
|[[usePooledBuffers]]`@usePooledBuffers`|`Boolean`|+++
Set whether Netty pooled buffers are enabled
+++
|===

[[VertxOptions]]
Expand Down Expand Up @@ -1985,6 +2024,9 @@ Sets the event bus configuration to configure the host, port, ssl...
|[[eventLoopPoolSize]]`@eventLoopPoolSize`|`Number (int)`|+++
Set the number of event loop threads to be used by the Vert.x instance.
+++
|[[fileResolverCachingEnabled]]`@fileResolverCachingEnabled`|`Boolean`|+++
Set whether the Vert.x file resolver uses caching for classpath resources.
+++
|[[fileSystemOptions]]`@fileSystemOptions`|`link:dataobjects.html#FileSystemOptions[FileSystemOptions]`|+++
Set the file system options
+++
Expand Down
78 changes: 39 additions & 39 deletions src/main/java/io/vertx/core/streams/impl/InboundBuffer.java
Expand Up @@ -106,13 +106,10 @@ public boolean write(E element) {
checkContext();
Handler<E> handler;
synchronized (this) {
if (emitting || demand == 0L) {
if (demand == 0L || emitting) {
pending.add(element);
boolean writable = pending.size() < highWaterMark;
overflow |= !writable;
return writable;
return checkWritable();
} else {
assert pending.size() == 0; // Try to break this...
if (demand != Long.MAX_VALUE) {
--demand;
}
Expand All @@ -124,6 +121,17 @@ public boolean write(E element) {
return emitPending();
}

private boolean checkWritable() {
if (demand == Long.MAX_VALUE) {
return true;
} else {
long actual = pending.size() - demand;
boolean writable = actual < highWaterMark;
overflow |= !writable;
return writable;
}
}

/**
* Write an {@code iterable} of {@code elements}.
*
Expand All @@ -137,10 +145,8 @@ public boolean write(Iterable<E> elements) {
for (E element : elements) {
pending.add(element);
}
if (emitting || demand == 0L) {
boolean writable = pending.size() < highWaterMark;
overflow |= !writable;
return writable;
if (demand == 0L || emitting) {
return checkWritable();
} else {
emitting = true;
}
Expand All @@ -150,41 +156,26 @@ public boolean write(Iterable<E> elements) {

private boolean emitPending() {
E element;
Handler<E> handler;
Handler<E> h;
while (true) {
synchronized (this) {
int size = pending.size();
if (demand == 0L) {
emitting = false;
boolean writable = pending.size() < highWaterMark;
boolean writable = size < highWaterMark;
overflow |= !writable;
return writable;
}
int size = pending.size();
if (size == 0) {
checkCallDrainHandler();
} else if (size == 0) {
emitting = false;
return true;
}
if (demand != Long.MAX_VALUE) {
demand--;
}
element = pending.poll();
handler = this.handler;
h = this.handler;
}
handleEvent(handler, element);
}
}

private void checkCallDrainHandler() {
if (overflow) {
overflow = false;
context.runOnContext(v -> {
Handler<Void> drainHandler;
synchronized (InboundBuffer.this) {
drainHandler = this.drainHandler;
}
handleEvent(drainHandler, null);
});
handleEvent(h, element);
}
}

Expand All @@ -194,21 +185,29 @@ private void checkCallDrainHandler() {
* Calling this assumes {@code (demand > 0L && !pending.isEmpty()) == true}
*/
private void drain() {
Handler<Void> emptyHandler = null;
int emitted = 0;
Handler<Void> drainHandler;
Handler<Void> emptyHandler;
while (true) {
E element;
Handler<E> handler;
synchronized (this) {
int size = pending.size();
if (size == 0) {
emitting = false;
checkCallDrainHandler();
emptyHandler = this.emptyHandler;
if (overflow) {
overflow = false;
drainHandler = this.drainHandler;
} else {
drainHandler = null;
}
emptyHandler = emitted > 0 ? this.emptyHandler : null;
break;
} else if (demand == 0L) {
emitting = false;
return;
}
emitted++;
if (demand != Long.MAX_VALUE) {
demand--;
}
Expand All @@ -217,7 +216,12 @@ private void drain() {
}
handleEvent(handler, element);
}
handleEvent(emptyHandler, null);
if (drainHandler != null) {
handleEvent(drainHandler, null);
}
if (emptyHandler != null) {
handleEvent(emptyHandler, null);
}
}

private <T> void handleEvent(Handler<T> handler, T element) {
Expand Down Expand Up @@ -258,11 +262,7 @@ public InboundBuffer<E> fetch(long amount) {
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
if (emitting) {
return this;
}
if (pending.isEmpty()) {
checkCallDrainHandler();
if (emitting || (pending.isEmpty() && !overflow)) {
return this;
}
emitting = true;
Expand Down

0 comments on commit 90455f8

Please sign in to comment.