Skip to content

Commit

Permalink
Run all pending job on removing connections.
Browse files Browse the repository at this point in the history
Prevent gaps in pendingOutboundMessagesCountdown.

Signed-off-by: Achim Kraus <achim.kraus@cloudcoap.net>
  • Loading branch information
boaks committed Sep 19, 2022
1 parent 9112803 commit 5648a0c
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,7 @@ public void stop() {
for (Runnable job : pending) {
try {
job.run();
} catch (Exception e) {
} catch (Throwable e) {
LOGGER.warn("Shutdown DTLS connector:", e);
}
}
Expand Down Expand Up @@ -1401,7 +1401,7 @@ public void run() {

@Override
public void run() {
if (running.get()) {
if (running.get() && connection.isExecuting()) {
processRecord(record, connection);
}
}
Expand Down Expand Up @@ -1971,7 +1971,7 @@ public void run() {
connection.getExecutor().execute(new Runnable() {
@Override
public void run() {
if (running.get()) {
if (running.get() && connections.getConnectionByAddress().isExecuting()) {
processClientHello(clientHello, record, connections);
}
}
Expand Down Expand Up @@ -2399,7 +2399,7 @@ public void send(final RawData message) {
@Override
public void run() {
try {
if (running.get()) {
if (running.get() && connection.isExecuting()) {
sendMessage(now, message, connection);
} else {
DROP_LOGGER.trace("DTLSConnector drops {} outgoing bytes to {}:{}, connector not running!", message.getSize(), message.getAddress(), message.getPort());
Expand Down Expand Up @@ -2816,7 +2816,7 @@ private void processAsynchronousHandshakeResult(final HandshakeResult handshakeR

@Override
public void run() {
if (running.get()) {
if (running.get() && connection.isExecuting()) {
Handshaker handshaker = connection.getOngoingHandshake();
if (handshaker != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,15 +534,27 @@ public boolean remove(final Connection connection) {
public synchronized boolean remove(final Connection connection, final boolean removeFromSessionCache) {
boolean removed = connections.remove(connection.getConnectionId(), connection) == connection;
if (removed) {
List<Runnable> pendings = connection.getExecutor().shutdownNow();
int pending = 0;
SerialExecutor executor = connection.getExecutor();
if (executor != null) {
List<Runnable> pendings = connection.getExecutor().shutdownNow();
for (Runnable job : pendings) {
try {
job.run();
} catch (Throwable e) {
LOG.warn("Removing connection:", e);
}
}
pending = pendings.size();
}
if (LOG.isTraceEnabled()) {
LOG.trace("{}connection: remove {} (size {}, left jobs: {})", tag, connection, connections.size(),
pendings.size(), new Throwable("connection removed!"));
} else if (pendings.isEmpty()) {
pending, new Throwable("connection removed!"));
} else if (pending == 0) {
LOG.debug("{}connection: remove {} (size {})", tag, connection, connections.size());
} else {
LOG.debug("{}connection: remove {} (size {}, left jobs: {})", tag, connection, connections.size(),
pendings.size());
pending);
}
removeFromEstablishedSessions(connection);
removeFromAddressConnections(connection);
Expand Down

0 comments on commit 5648a0c

Please sign in to comment.