Skip to content

Commit

Permalink
Revisit Write Timeout Handling (nats-io#1128)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored and ajax-surovskyi-y committed May 13, 2024
1 parent 6545878 commit 22be049
Show file tree
Hide file tree
Showing 13 changed files with 255 additions and 124 deletions.
4 changes: 3 additions & 1 deletion src/examples/java/io/nats/examples/benchmark/NatsBench2.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static io.nats.client.support.NatsConstants.OUTPUT_QUEUE_IS_FULL;

/**
* A utility class for measuring NATS performance, similar to the version in go
* and node. The various tradeoffs to make this code act/work like the other
Expand Down Expand Up @@ -279,7 +281,7 @@ public void run() {
nc.publish(subject, payload);
success = true;
} catch (IllegalStateException ex) {
if (ex.getMessage().contains("Output queue is full")) {
if (ex.getMessage().contains(OUTPUT_QUEUE_IS_FULL)) {
success = false;
Thread.sleep(1000);
} else {
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/nats/client/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -541,15 +541,14 @@ enum Status {
/**
* Immediately flushes the underlying connection buffer if the connection is valid.
* @throws IOException the connection flush fails
* @throws IllegalStateException the connection is not connected
*/
void flushBuffer() throws IOException;

/**
* Forces reconnect behavior. Stops the current connection including the reading and writing,
* copies already queued outgoing messages, and then begins the reconnect logic.
* @throws IOException the force reconnected fails
* @throws InterruptedException if one is thrown, in order to propagate it up
* @throws IOException the forceReconnect fails
* @throws InterruptedException the connection is not connected
*/
void forceReconnect() throws IOException, InterruptedException;

Expand Down
33 changes: 29 additions & 4 deletions src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ public class Options {
*/
public static final Duration DEFAULT_SOCKET_WRITE_TIMEOUT = Duration.ofMinutes(1);

/**
* Constant used for calculating if a socket write timeout is large enough.
*/
public static final long MINIMUM_SOCKET_WRITE_TIMEOUT_GT_CONNECTION_TIMEOUT = 100;

/**
* Default server ping interval. The client will send a ping to the server on this interval to insure liveness.
* The server may send pings to the client as well, these are handled automatically by the library,
Expand Down Expand Up @@ -1240,11 +1245,23 @@ public Builder maxControlLine(int bytes) {
* Set the timeout for connection attempts. Each server in the options is allowed this timeout
* so if 3 servers are tried with a timeout of 5s the total time could be 15s.
*
* @param time the time to wait
* @param connectionTimeout the time to wait
* @return the Builder for chaining
*/
public Builder connectionTimeout(Duration connectionTimeout) {
this.connectionTimeout = connectionTimeout;
return this;
}

/**
* Set the timeout for connection attempts. Each server in the options is allowed this timeout
* so if 3 servers are tried with a timeout of 5s the total time could be 15s.
*
* @param connectionTimeoutMillis the time to wait in milliseconds
* @return the Builder for chaining
*/
public Builder connectionTimeout(Duration time) {
this.connectionTimeout = time;
public Builder connectionTimeout(long connectionTimeoutMillis) {
this.connectionTimeout = Duration.ofMillis(connectionTimeoutMillis);
return this;
}

Expand Down Expand Up @@ -1728,9 +1745,17 @@ else if (useDefaultTls) {
new DefaultThreadFactory(threadPrefix));
}

if (socketWriteTimeout != null && socketWriteTimeout.toMillis() < 1) {
if (socketWriteTimeout == null || socketWriteTimeout.toMillis() < 1) {
socketWriteTimeout = null;
}
else {
long swtMin = connectionTimeout.toMillis() + MINIMUM_SOCKET_WRITE_TIMEOUT_GT_CONNECTION_TIMEOUT;
if (socketWriteTimeout.toMillis() < swtMin) {
throw new IllegalStateException("Socket Write Timeout must be at least "
+ MINIMUM_SOCKET_WRITE_TIMEOUT_GT_CONNECTION_TIMEOUT
+ " milliseconds greater than the Connection Timeout");
}
}

if (errorListener == null) {
errorListener = new ErrorListenerLoggerImpl();
Expand Down
80 changes: 51 additions & 29 deletions src/main/java/io/nats/client/impl/MessageQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.function.Predicate;

import static io.nats.client.support.NatsConstants.EMPTY_BODY;
import static io.nats.client.support.NatsConstants.OUTPUT_QUEUE_IS_FULL;

class MessageQueue {
protected static final int STOPPED = 0;
Expand All @@ -35,9 +36,10 @@ class MessageQueue {
protected final AtomicInteger running;
protected final boolean singleReaderMode;
protected final LinkedBlockingQueue<NatsMessage> queue;
protected final Lock filterLock;
protected final Lock editLock;
protected final int publishHighwaterMark;
protected final boolean discardWhenFull;
protected final long offerLockMillis;
protected final long offerTimeoutMillis;
protected final Duration requestCleanupInterval;

Expand All @@ -47,7 +49,11 @@ class MessageQueue {
protected final NatsMessage poisonPill;

MessageQueue(boolean singleReaderMode, Duration requestCleanupInterval) {
this(singleReaderMode, -1, false, requestCleanupInterval);
this(singleReaderMode, -1, false, requestCleanupInterval, null);
}

MessageQueue(boolean singleReaderMode, Duration requestCleanupInterval, MessageQueue source) {
this(singleReaderMode, -1, false, requestCleanupInterval, source);
}

/**
Expand All @@ -61,31 +67,40 @@ class MessageQueue {
* @param requestCleanupInterval is used to figure the offerTimeoutMillis
*/
MessageQueue(boolean singleReaderMode, int publishHighwaterMark, boolean discardWhenFull, Duration requestCleanupInterval) {
this(singleReaderMode, publishHighwaterMark, discardWhenFull, requestCleanupInterval, null);
}

MessageQueue(boolean singleReaderMode, int publishHighwaterMark, boolean discardWhenFull, Duration requestCleanupInterval, MessageQueue source) {
this.publishHighwaterMark = publishHighwaterMark;
this.queue = publishHighwaterMark > 0 ? new LinkedBlockingQueue<>(publishHighwaterMark) : new LinkedBlockingQueue<>();
this.discardWhenFull = discardWhenFull;
this.running = new AtomicInteger(RUNNING);
this.sizeInBytes = new AtomicLong(0);
this.length = new AtomicLong(0);
this.offerTimeoutMillis = calculateOfferTimeoutMillis(requestCleanupInterval);
this.offerLockMillis = requestCleanupInterval.toMillis();
this.offerTimeoutMillis = Math.max(1, requestCleanupInterval.toMillis() * 95 / 100);

// The poisonPill is used to stop poll and accumulate when the queue is stopped
this.poisonPill = new NatsMessage("_poison", null, EMPTY_BODY);

this.filterLock = new ReentrantLock();
editLock = new ReentrantLock();

this.singleReaderMode = singleReaderMode;
this.requestCleanupInterval = requestCleanupInterval;

if (source != null) {
source.drainTo(this);
}
}

MessageQueue(MessageQueue source) {
this(source.singleReaderMode, source.publishHighwaterMark, source.discardWhenFull, source.requestCleanupInterval);
source.queue.drainTo(queue);
length.set(queue.size());
}

private static long calculateOfferTimeoutMillis(Duration requestCleanupInterval) {
return Math.max(1, requestCleanupInterval.toMillis() * 95 / 100);
void drainTo(MessageQueue target) {
editLock.lock();
try {
queue.drainTo(target.queue);
target.length.set(queue.size());
} finally {
editLock.unlock();
}
}

boolean isSingleReaderMode() {
Expand Down Expand Up @@ -124,21 +139,36 @@ boolean push(NatsMessage msg) {
}

boolean push(NatsMessage msg, boolean internal) {
this.filterLock.lock();
long start = System.currentTimeMillis();
try {
// try to get the lock, but don't wait forever
// assuming that if we are waiting for the lock
// another push likely has the lock and
if (!editLock.tryLock(offerLockMillis, TimeUnit.MILLISECONDS)) {
throw new IllegalStateException(OUTPUT_QUEUE_IS_FULL + queue.size());
}
}
catch (InterruptedException e) {
return false;
}

try {
// If we aren't running, then we need to obey the filter lock
// to avoid ordering problems
if (!internal && this.discardWhenFull) {
return this.queue.offer(msg);
}
if (!this.offer(msg)) {
throw new IllegalStateException("Output queue is full " + queue.size());

long timeoutLeft = Math.max(100, offerTimeoutMillis - (System.currentTimeMillis() - start));

if (!this.queue.offer(msg, timeoutLeft, TimeUnit.MILLISECONDS)) {
throw new IllegalStateException(OUTPUT_QUEUE_IS_FULL + queue.size());
}
this.sizeInBytes.getAndAdd(msg.getSizeInBytes());
this.length.incrementAndGet();
return true;
} catch (InterruptedException ie) {
return false;
} finally {
this.filterLock.unlock();
editLock.unlock();
}
}

Expand All @@ -154,14 +184,6 @@ void poisonTheQueue() {
}
}

boolean offer(NatsMessage msg) {
try {
return this.queue.offer(msg, offerTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
return false;
}
}

NatsMessage poll(Duration timeout) throws InterruptedException {
NatsMessage msg = null;

Expand Down Expand Up @@ -289,7 +311,7 @@ long sizeInBytes() {
}

void filter(Predicate<NatsMessage> p) {
this.filterLock.lock();
editLock.lock();
try {
if (this.isRunning()) {
throw new IllegalStateException("Filter is only supported when the queue is paused");
Expand All @@ -307,7 +329,7 @@ void filter(Predicate<NatsMessage> p) {
}
this.queue.addAll(newQueue);
} finally {
this.filterLock.unlock();
editLock.unlock();
}
}
}
}
Loading

0 comments on commit 22be049

Please sign in to comment.