From 046132f67a1da75a839d3d73c5bfbea9edf38108 Mon Sep 17 00:00:00 2001 From: Mikhail Petrov Date: Wed, 1 Apr 2026 17:15:51 +0300 Subject: [PATCH] IGNITE-28459 Fixed unhandled NIO Session Requests during node stop. --- .../internal/util/nio/GridNioServer.java | 465 ++++++++++-------- .../internal/util/nio/GridNioWorker.java | 5 +- .../util/nio/GridSelectorNioSessionImpl.java | 6 +- 3 files changed, 263 insertions(+), 213 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index e1268673c14b4..797290677db52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -585,7 +585,12 @@ public IgniteInternalFuture close(GridNioSession ses) { NioOperationFuture fut = new NioOperationFuture<>(impl, NioOperation.CLOSE); - impl.offerStateChange(fut); + try { + impl.offerStateChange(fut); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } return fut; } @@ -772,7 +777,12 @@ public void resend(GridNioSession ses) { ses0.procWrite.set(true); // Wake up worker. - ses0.offerStateChange((GridNioServer.SessionChangeRequest)fut0); + try { + ses0.offerStateChange((GridNioServer.SessionChangeRequest)fut0); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to notify NIO Server while resending messages [rmtNode=" + recoveryDesc.node().id() + ']', e); + } } } @@ -804,8 +814,13 @@ private void moveSession(GridNioSession ses, int from, int to) { SessionMoveFuture fut = new SessionMoveFuture(ses0, to); - if (!ses0.offerMove(clientWorkers.get(from), fut)) - fut.onDone(false); + try { + if (!ses0.offerMove(clientWorkers.get(from), fut)) + fut.onDone(false); + } + catch (IgniteCheckedException e) { + fut.onDone(e); + } } /** @@ -813,7 +828,7 @@ private void moveSession(GridNioSession ses, int from, int to) { * @param op Operation. * @return Future for operation. */ - private IgniteInternalFuture pauseResumeReads(GridNioSession ses, NioOperation op) { + private IgniteInternalFuture pauseResumeReads(GridNioSession ses, NioOperation op) throws IgniteCheckedException { assert ses instanceof GridSelectorNioSessionImpl; assert op == NioOperation.PAUSE_READ || op == NioOperation.RESUME_READ; @@ -874,9 +889,14 @@ public IgniteInternalFuture dumpStats(final String msg, IgnitePredicate< opFut.msg = p; - clientWorkers.get(i).offer(opFut); + try { + clientWorkers.get(i).offer(opFut); - fut.add(opFut); + fut.add(opFut); + } + catch (IgniteCheckedException e) { + fut.add(new GridFinishedFuture<>(e)); + } } fut.markInitialized(); @@ -919,9 +939,14 @@ public IgniteInternalFuture dumpNodeStats(final String msg, IgnitePredic opFut.msg = p; - clientWorkers.get(i).offer(opFut); + try { + clientWorkers.get(i).offer(opFut); - fut.add(opFut); + fut.add(opFut); + } + catch (IgniteCheckedException e) { + fut.add(new GridFinishedFuture<>(e)); + } } fut.markInitialized(); @@ -967,7 +992,7 @@ public IgniteInternalFuture createSession( return new GridFinishedFuture<>( new IgniteCheckedException("Failed to create session, server is stopped.")); } - catch (IOException e) { + catch (IgniteCheckedException | IOException e) { return new GridFinishedFuture<>(e); } } @@ -977,7 +1002,10 @@ public IgniteInternalFuture createSession( * @param meta Session meta. */ public IgniteInternalFuture cancelConnect(final SocketChannel ch, Map meta) { - if (!closed) { + if (closed) + return new GridFinishedFuture<>(new IgniteCheckedException("Failed to cancel connection, server is stopped")); + + try { NioOperationFuture req = new NioOperationFuture<>(ch, false, meta); req.op = NioOperation.CANCEL_CONNECT; @@ -990,9 +1018,9 @@ public IgniteInternalFuture cancelConnect(final SocketChannel ch return req; } - else - return new GridFinishedFuture<>( - new IgniteCheckedException("Failed to cancel connection, server is stopped.")); + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(new IgniteCheckedException("Failed to cancel connection", e)); + } } /** @@ -1083,7 +1111,7 @@ private Selector createSelector(@Nullable SocketAddress addr) throws IgniteCheck * @param req Request to balance. * @param meta Session metadata. */ - private synchronized void offerBalanced(NioOperationFuture req, @Nullable Map meta) { + private synchronized void offerBalanced(NioOperationFuture req, @Nullable Map meta) throws IgniteCheckedException { assert req.operation() == NioOperation.REGISTER || req.operation() == NioOperation.CONNECT : req; assert req.socketChannel() != null : req; @@ -2038,7 +2066,10 @@ private void createSelector() throws IgniteCheckedException { * * @param req Change request. */ - @Override public void offer(SessionChangeRequest req) { + @Override public void offer(SessionChangeRequest req) throws IgniteCheckedException { + if (isCancelled()) + throw new IgniteCheckedException("NIO client worker has been stopped"); + changeReqs.offer(req); if (select) @@ -2046,7 +2077,10 @@ private void createSelector() throws IgniteCheckedException { } /** {@inheritDoc} */ - @Override public void offer(Collection reqs) { + @Override public void offer(Collection reqs) throws IgniteCheckedException { + if (isCancelled()) + throw new IgniteCheckedException("NIO client worker has been stopped"); + for (SessionChangeRequest req : reqs) changeReqs.offer(req); @@ -2078,185 +2112,19 @@ private void createSelector() throws IgniteCheckedException { * * @throws IgniteCheckedException If IOException occurred or thread was unable to add worker to workers pool. */ - @SuppressWarnings("unchecked") private void bodyInternal() throws IgniteCheckedException, InterruptedException { try { long lastIdleCheck = U.currentTimeMillis(); - mainLoop: - while (!closed && selector.isOpen()) { - SessionChangeRequest req0; + while (selector.isOpen() && !(isCancelled() && changeReqs.isEmpty())) { + SessionChangeRequest req; updateHeartbeat(); - while ((req0 = changeReqs.poll()) != null) { + while ((req = changeReqs.poll()) != null) { updateHeartbeat(); - switch (req0.operation()) { - case CONNECT: { - NioOperationFuture fut = (NioOperationFuture)req0; - - SocketChannel ch = fut.socketChannel(); - - try { - ch.register(selector, SelectionKey.OP_CONNECT, fut); - } - catch (IOException e) { - fut.onDone(new IgniteCheckedException("Failed to register channel on selector", e)); - } - - break; - } - - case CANCEL_CONNECT: { - NioOperationFuture req = (NioOperationFuture)req0; - - SocketChannel ch = req.socketChannel(); - - SelectionKey key = ch.keyFor(selector); - - if (key != null) - key.cancel(); - - U.closeQuiet(ch); - - req.onDone(); - - break; - } - - case REGISTER: { - register((NioOperationFuture)req0); - - break; - } - - case MOVE: { - SessionMoveFuture f = (SessionMoveFuture)req0; - - GridSelectorNioSessionImpl ses = f.session(); - - if (idx == f.toIdx) { - assert f.movedSocketChannel() != null : f; - - boolean add = workerSessions.add(ses); - - assert add; - - ses.finishMoveSession(this); - - if (idx % 2 == 0) - readerMoveCnt.incrementAndGet(); - else - writerMoveCnt.incrementAndGet(); - - SelectionKey key = f.movedSocketChannel().register(selector, - SelectionKey.OP_READ | SelectionKey.OP_WRITE, - ses); - - ses.key(key); - - ses.procWrite.set(true); - - f.onDone(true); - } - else { - assert f.movedSocketChannel() == null : f; - - if (workerSessions.remove(ses)) { - ses.startMoveSession(this); - - SelectionKey key = ses.key(); - - assert key.channel() != null : key; - - f.movedSocketChannel((SocketChannel)key.channel()); - - key.cancel(); - commitKeyCancellation(); - - clientWorkers.get(f.toIndex()).offer(f); - } - else - f.onDone(false); - } - - break; - } - - case REQUIRE_WRITE: { - registerWrite((GridSelectorNioSessionImpl)req0.session()); - - break; - } - - case CLOSE: { - NioOperationFuture req = (NioOperationFuture)req0; - - if (close(req.session(), null)) - req.onDone(true); - else - req.onDone(false); - - break; - } - - case PAUSE_READ: { - NioOperationFuture req = (NioOperationFuture)req0; - - SelectionKey key = req.session().key(); - - if (key.isValid()) { - key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); - - GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); - - ses.readsPaused(true); - - req.onDone(true); - } - else - req.onDone(false); - - break; - } - - case RESUME_READ: { - NioOperationFuture req = (NioOperationFuture)req0; - - SelectionKey key = req.session().key(); - - if (key.isValid()) { - key.interestOps(key.interestOps() | SelectionKey.OP_READ); - - GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); - - ses.readsPaused(false); - - req.onDone(true); - } - else - req.onDone(false); - - break; - } - - case DUMP_STATS: { - NioOperationFuture req = (NioOperationFuture)req0; - - IgnitePredicate p = - req.msg instanceof IgnitePredicate ? (IgnitePredicate)req.msg : null; - - StringBuilder sb = new StringBuilder(); - - try { - dumpStats(sb, p, p != null); - } - finally { - req.onDone(sb.toString()); - } - } - } + processSessionChangedRequest(req); } for (long i = 0; i < selectorSpins && selector.selectedKeys().isEmpty(); i++) { @@ -2277,7 +2145,7 @@ private void bodyInternal() throws IgniteCheckedException, InterruptedException } if (!changeReqs.isEmpty()) - continue mainLoop; + break; // Just in case we do busy selects. long now = U.currentTimeMillis(); @@ -2289,14 +2157,14 @@ private void bodyInternal() throws IgniteCheckedException, InterruptedException } if (isCancelled()) - return; + break; } // Falling to blocking select. select = true; try { - if (!changeReqs.isEmpty()) + if (!changeReqs.isEmpty() || isCancelled()) continue; blockingSectionBegin(); @@ -2370,6 +2238,180 @@ private void bodyInternal() throws IgniteCheckedException, InterruptedException } } + /** */ + private void processSessionChangedRequest(SessionChangeRequest req0) throws IgniteCheckedException, IOException { + switch (req0.operation()) { + case CONNECT: { + NioOperationFuture fut = (NioOperationFuture)req0; + + SocketChannel ch = fut.socketChannel(); + + try { + ch.register(selector, SelectionKey.OP_CONNECT, fut); + } + catch (IOException e) { + fut.onDone(new IgniteCheckedException("Failed to register channel on selector", e)); + } + + break; + } + + case CANCEL_CONNECT: { + NioOperationFuture req = (NioOperationFuture)req0; + + SocketChannel ch = req.socketChannel(); + + SelectionKey key = ch.keyFor(selector); + + if (key != null) + key.cancel(); + + U.closeQuiet(ch); + + req.onDone(); + + break; + } + + case REGISTER: { + register((NioOperationFuture)req0); + + break; + } + + case MOVE: { + SessionMoveFuture f = (SessionMoveFuture)req0; + + GridSelectorNioSessionImpl ses = f.session(); + + if (idx == f.toIdx) { + assert f.movedSocketChannel() != null : f; + + boolean add = workerSessions.add(ses); + + assert add; + + ses.finishMoveSession(this); + + if (idx % 2 == 0) + readerMoveCnt.incrementAndGet(); + else + writerMoveCnt.incrementAndGet(); + + SelectionKey key = f.movedSocketChannel().register(selector, + SelectionKey.OP_READ | SelectionKey.OP_WRITE, + ses); + + ses.key(key); + + ses.procWrite.set(true); + + f.onDone(true); + } + else { + assert f.movedSocketChannel() == null : f; + + if (workerSessions.remove(ses)) { + ses.startMoveSession(this); + + SelectionKey key = ses.key(); + + assert key.channel() != null : key; + + f.movedSocketChannel((SocketChannel)key.channel()); + + key.cancel(); + commitKeyCancellation(); + + clientWorkers.get(f.toIndex()).offer(f); + } + else + f.onDone(false); + } + + break; + } + + case REQUIRE_WRITE: { + registerWrite((GridSelectorNioSessionImpl)req0.session()); + + break; + } + + case CLOSE: { + NioOperationFuture req = (NioOperationFuture)req0; + + if (close(req.session(), null)) + req.onDone(true); + else + req.onDone(false); + + break; + } + + case PAUSE_READ: { + NioOperationFuture req = (NioOperationFuture)req0; + + SelectionKey key = req.session().key(); + + if (key.isValid()) { + key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); + + GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); + + ses.readsPaused(true); + + req.onDone(true); + } + else + req.onDone(false); + + break; + } + + case RESUME_READ: { + NioOperationFuture req = (NioOperationFuture)req0; + + SelectionKey key = req.session().key(); + + if (key.isValid()) { + key.interestOps(key.interestOps() | SelectionKey.OP_READ); + + GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); + + ses.readsPaused(false); + + req.onDone(true); + } + else + req.onDone(false); + + break; + } + + case DUMP_STATS: { + NioOperationFuture req = (NioOperationFuture)req0; + + IgnitePredicate p = + req.msg instanceof IgnitePredicate ? (IgnitePredicate)req.msg : null; + + StringBuilder sb = new StringBuilder(); + + try { + dumpStats(sb, p, p != null); + } + finally { + req.onDone(sb.toString()); + } + } + } + } + + /** {@inheritDoc} */ + @Override public boolean isCancelled() { + return closed || super.isCancelled(); + } + /** * Makes sure that pending key cancellations are executed and the corresponding channels can be * re-registered with our selector without causing {@link java.nio.channels.CancelledKeyException}s. @@ -2765,32 +2807,32 @@ private void register(NioOperationFuture fut) { SelectionKey key; - if (!sockCh.isRegistered()) { - assert fut.op == NioOperation.REGISTER : fut.op; + try { + if (!sockCh.isRegistered()) { + assert fut.op == NioOperation.REGISTER : fut.op; - key = sockCh.register(selector, SelectionKey.OP_READ, ses); + key = sockCh.register(selector, SelectionKey.OP_READ, ses); - ses.key(key); + ses.key(key); - resend(ses); - } - else { - assert fut.op == NioOperation.CONNECT : fut.op; + resend(ses); + } + else { + assert fut.op == NioOperation.CONNECT : fut.op; - key = sockCh.keyFor(selector); + key = sockCh.keyFor(selector); - key.attach(ses); + key.attach(ses); - key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT)); - key.interestOps(key.interestOps() | SelectionKey.OP_READ); + key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT)); + key.interestOps(key.interestOps() | SelectionKey.OP_READ); - ses.key(key); - } + ses.key(key); + } - sessions.add(ses); - workerSessions.add(ses); + sessions.add(ses); + workerSessions.add(ses); - try { filterChain.onSessionOpened(ses); fut.onDone(ses); @@ -3231,7 +3273,14 @@ private void processSelectedKeys(Set keys) throws IOException { * @param sockCh Socket channel to be registered on one of the selectors. */ private void addRegistrationRequest(SocketChannel sockCh) { - offerBalanced(new NioOperationFuture<>(sockCh, true, null), null); + try { + offerBalanced(new NioOperationFuture<>(sockCh, true, null), null); + } + catch (IgniteCheckedException e) { + U.warn(log, "Incoming connection was rejected [addr=" + sockCh.socket().getRemoteSocketAddress() + ']', e); + + U.close(sockCh, log); + } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java index 3419b4cd3079c..a2f2d4334cd94 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java @@ -19,6 +19,7 @@ import java.util.Collection; import java.util.List; +import org.apache.ignite.IgniteCheckedException; import org.jetbrains.annotations.Nullable; /** @@ -28,12 +29,12 @@ interface GridNioWorker { /** * @param req Change request. */ - public void offer(GridNioServer.SessionChangeRequest req); + public void offer(GridNioServer.SessionChangeRequest req) throws IgniteCheckedException; /** * @param reqs Change requests. */ - public void offer(Collection reqs); + public void offer(Collection reqs) throws IgniteCheckedException; /** * @param ses Session. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index d8adbfd84d183..561941903b6b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -232,7 +232,7 @@ public void closeSocketOnSessionClose(boolean closeSocket) { * @param fut Move future. * @return {@code True} if session move was scheduled. */ - boolean offerMove(GridNioWorker from, GridNioServer.SessionChangeRequest fut) { + boolean offerMove(GridNioWorker from, GridNioServer.SessionChangeRequest fut) throws IgniteCheckedException { synchronized (this) { if (log.isDebugEnabled()) log.debug("Offered move [ses=" + this + ", fut=" + fut + ']'); @@ -251,7 +251,7 @@ boolean offerMove(GridNioWorker from, GridNioServer.SessionChangeRequest fut) { /** * @param fut Future. */ - void offerStateChange(GridNioServer.SessionChangeRequest fut) { + void offerStateChange(GridNioServer.SessionChangeRequest fut) throws IgniteCheckedException { synchronized (this) { if (log.isDebugEnabled()) log.debug("Offered move [ses=" + this + ", fut=" + fut + ']'); @@ -295,7 +295,7 @@ void startMoveSession(GridNioWorker moveFrom) { /** * @param moveTo New session worker. */ - void finishMoveSession(GridNioWorker moveTo) { + void finishMoveSession(GridNioWorker moveTo) throws IgniteCheckedException { synchronized (this) { assert worker == null;