From 26c838f03f43f85b9edeeee5d46089c011eb9e94 Mon Sep 17 00:00:00 2001 From: Josef Cacek Date: Wed, 17 Aug 2022 14:18:28 +0200 Subject: [PATCH 1/3] Improve IO pipelines (5.1.z) (#21972) * [HZ-940] Improve IO pipelines (#21066) * Wait for the decoder's protocol confirmation before sending the member data [HZ-1163] (#21391) * Add null checks to SingleProtocolEncoder protocol verification signals (#21497) Co-authored-by: ufukyilmaz --- .../internal/networking/InboundPipeline.java | 6 +- .../internal/networking/OutboundPipeline.java | 6 +- .../server/tcp/MemberChannelInitializer.java | 14 +- .../server/tcp/MemberProtocolEncoder.java | 37 +--- .../server/tcp/SingleProtocolDecoder.java | 21 +- .../server/tcp/SingleProtocolEncoder.java | 61 +++--- .../server/tcp/ProtocolNegotiationTest.java | 194 ++++++++++++++++++ 7 files changed, 258 insertions(+), 81 deletions(-) create mode 100644 hazelcast/src/test/java/com/hazelcast/internal/server/tcp/ProtocolNegotiationTest.java diff --git a/hazelcast/src/main/java/com/hazelcast/internal/networking/InboundPipeline.java b/hazelcast/src/main/java/com/hazelcast/internal/networking/InboundPipeline.java index dfa6b6ee65c56..4310656c6932b 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/networking/InboundPipeline.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/networking/InboundPipeline.java @@ -43,7 +43,7 @@ public interface InboundPipeline { * No verification is done if the handler is already added and a handler * should only be added once. * - * This method should only be made on the thread 'owning' the handler. + * This method should only be made on the thread 'owning' the pipeline. * * @param handlers the handlers to add * @return this @@ -58,7 +58,7 @@ public interface InboundPipeline { * No verification is done if any of the handlers is already added and a * handler should only be added once. * - * This method should only be made on the thread 'owning' the handler. + * This method should only be made on the thread 'owning' the pipeline. * * @param oldHandler the handler to replace * @param newHandlers the new handlers to insert @@ -71,7 +71,7 @@ public interface InboundPipeline { /** * Removes the given handler from the pipeline. * - * This method should only be made on the thread 'owning' the handler. + * This method should only be made on the thread 'owning' the pipeline. * * @param handler the handler to remove * @return this diff --git a/hazelcast/src/main/java/com/hazelcast/internal/networking/OutboundPipeline.java b/hazelcast/src/main/java/com/hazelcast/internal/networking/OutboundPipeline.java index 94b6f4ddaa15e..d8082cb20265e 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/networking/OutboundPipeline.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/networking/OutboundPipeline.java @@ -28,7 +28,7 @@ public interface OutboundPipeline { * No verification is done if the handler is already added and a handler * should only be added once. * - * This method should only be made on the thread 'owning' the handler. + * This method should only be made on the thread 'owning' the pipeline. * * @param handlers the handlers to add. * @return this @@ -43,7 +43,7 @@ public interface OutboundPipeline { * No verification is done if any of the handlers is already added and a * handler should only be added once. * - * This method should only be made on the thread 'owning' the handler. + * This method should only be made on the thread 'owning' the pipeline. * * @param oldHandler the handlers to replace * @param newHandlers the new handlers to insert. @@ -56,7 +56,7 @@ public interface OutboundPipeline { /** * Removes the given handler from the pipeline. * - * This method should only be made on the thread 'owning' the handler. + * This method should only be made on the thread 'owning' the pipeline. * * @param handler the handler to remove. * @return this diff --git a/hazelcast/src/main/java/com/hazelcast/internal/server/tcp/MemberChannelInitializer.java b/hazelcast/src/main/java/com/hazelcast/internal/server/tcp/MemberChannelInitializer.java index cb1170a836173..a8b6431a2d787 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/server/tcp/MemberChannelInitializer.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/server/tcp/MemberChannelInitializer.java @@ -38,11 +38,19 @@ public void initChannel(Channel channel) { OutboundHandler[] outboundHandlers = serverContext.createOutboundHandlers(EndpointQualifier.MEMBER, connection); InboundHandler[] inboundHandlers = serverContext.createInboundHandlers(EndpointQualifier.MEMBER, connection); - SingleProtocolEncoder protocolEncoder = new SingleProtocolEncoder(new MemberProtocolEncoder(outboundHandlers)); + OutboundHandler outboundHandler; + SingleProtocolEncoder protocolEncoder; + if (channel.isClientMode()) { + protocolEncoder = new SingleProtocolEncoder(outboundHandlers); + outboundHandler = new MemberProtocolEncoder(protocolEncoder); + } else { + protocolEncoder = new SingleProtocolEncoder(new MemberProtocolEncoder(outboundHandlers)); + outboundHandler = protocolEncoder; + } SingleProtocolDecoder protocolDecoder = new SingleProtocolDecoder(ProtocolType.MEMBER, - inboundHandlers, protocolEncoder, true); + inboundHandlers, protocolEncoder); - channel.outboundPipeline().addLast(protocolEncoder); + channel.outboundPipeline().addLast(outboundHandler); channel.inboundPipeline().addLast(protocolDecoder); } } diff --git a/hazelcast/src/main/java/com/hazelcast/internal/server/tcp/MemberProtocolEncoder.java b/hazelcast/src/main/java/com/hazelcast/internal/server/tcp/MemberProtocolEncoder.java index 9e3568f9549ff..2ce7658c16b21 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/server/tcp/MemberProtocolEncoder.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/server/tcp/MemberProtocolEncoder.java @@ -20,6 +20,7 @@ import com.hazelcast.internal.networking.OutboundHandler; import com.hazelcast.internal.nio.ConnectionType; import com.hazelcast.internal.server.ServerConnection; + import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.nio.ByteBuffer; @@ -31,28 +32,26 @@ import static com.hazelcast.internal.nio.Protocols.PROTOCOL_LENGTH; import static com.hazelcast.internal.util.StringUtil.stringToBytes; +/** + * Writes the member protocol header bytes (HZC) to dst buffer and replaces itself by the next {@link OutboundHandler + * OutboundHandlers}. + */ public class MemberProtocolEncoder extends OutboundHandler { private final OutboundHandler[] outboundHandlers; - private volatile boolean encoderCanReplace; - - private boolean clusterProtocolBuffered; /** - * Decodes first 3 incoming bytes, validates against {@code supportedProtocol} and, when - * matching, replaces itself in the inbound pipeline with the {@code next InboundHandler}. - * * @param next the {@link OutboundHandler} to replace this one in the outbound pipeline * upon match of protocol bytes */ @SuppressFBWarnings("EI_EXPOSE_REP2") - public MemberProtocolEncoder(OutboundHandler[] next) { + public MemberProtocolEncoder(OutboundHandler... next) { this.outboundHandlers = next; } @Override public void handlerAdded() { - initDstBuffer(PROTOCOL_LENGTH); + initDstBuffer(PROTOCOL_LENGTH, stringToBytes(CLUSTER)); } @Override @@ -60,36 +59,20 @@ public HandlerStatus onWrite() { compactOrClear(dst); try { - if (!clusterProtocolBuffered) { - clusterProtocolBuffered = true; - dst.put(stringToBytes(CLUSTER)); - // Return false because ProtocolEncoder is not ready yet; but first we need to flush protocol - return DIRTY; - } - - if (!isProtocolBufferDrained()) { - // Return false because ProtocolEncoder is not ready yet; but first we need to flush protocol - return DIRTY; - } - - if (encoderCanReplace) { + if (isProtocolBufferDrained()) { // replace! ServerConnection connection = (TcpServerConnection) channel.attributeMap().get(ServerConnection.class); connection.setConnectionType(ConnectionType.MEMBER); channel.outboundPipeline().replace(this, outboundHandlers); + return CLEAN; } - return CLEAN; + return DIRTY; } finally { dst.flip(); } } - public void signalEncoderCanReplace() { - encoderCanReplace = true; - channel.outboundPipeline().wakeup(); - } - /** * Checks if the protocol bytes have been drained. * diff --git a/hazelcast/src/main/java/com/hazelcast/internal/server/tcp/SingleProtocolDecoder.java b/hazelcast/src/main/java/com/hazelcast/internal/server/tcp/SingleProtocolDecoder.java index 8e65dc3092764..4821f15fbc418 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/server/tcp/SingleProtocolDecoder.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/server/tcp/SingleProtocolDecoder.java @@ -50,10 +50,9 @@ public class SingleProtocolDecoder */ protected volatile boolean verifyProtocolCalled; final SingleProtocolEncoder encoder; - private final boolean shouldSignalMemberProtocolEncoder; public SingleProtocolDecoder(ProtocolType supportedProtocol, InboundHandler next, SingleProtocolEncoder encoder) { - this(supportedProtocol, new InboundHandler[]{next}, encoder, false); + this(supportedProtocol, new InboundHandler[]{next}, encoder); } /** @@ -72,19 +71,13 @@ public SingleProtocolDecoder(ProtocolType supportedProtocol, InboundHandler next * that will be notified when * non-matching protocol bytes have * been received - * @param shouldSignalMemberProtocolEncoder a boolean used to notify the - * next encoder in the pipeline - * after the {@link SingleProtocolEncoder} - * when matching protocol bytes - * have been received */ @SuppressFBWarnings("EI_EXPOSE_REP2") public SingleProtocolDecoder(ProtocolType supportedProtocol, InboundHandler[] next, - SingleProtocolEncoder encoder, boolean shouldSignalMemberProtocolEncoder) { + SingleProtocolEncoder encoder) { this.supportedProtocol = supportedProtocol; this.inboundHandlers = next; this.encoder = encoder; - this.shouldSignalMemberProtocolEncoder = shouldSignalMemberProtocolEncoder; this.verifyProtocolCalled = false; } @@ -122,16 +115,6 @@ public HandlerStatus onRead() { // Initialize the connection initConnection(); setupNextDecoder(); - if (!channel.isClientMode()) { - // Set up the next encoder in the pipeline if in server mode - // This replaces SignalProtocolEncoder with next one in the pipeline - encoder.setupNextEncoder(); - } - - // Signal the member protocol encoder only if it's needed - if (shouldSignalMemberProtocolEncoder) { - ((MemberProtocolEncoder) encoder.getFirstOutboundHandler()).signalEncoderCanReplace(); - } return CLEAN; } finally { diff --git a/hazelcast/src/main/java/com/hazelcast/internal/server/tcp/SingleProtocolEncoder.java b/hazelcast/src/main/java/com/hazelcast/internal/server/tcp/SingleProtocolEncoder.java index b8ff4eba0cab5..fd382800df6e6 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/server/tcp/SingleProtocolEncoder.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/server/tcp/SingleProtocolEncoder.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; +import static com.hazelcast.internal.networking.HandlerStatus.BLOCKED; import static com.hazelcast.internal.networking.HandlerStatus.CLEAN; import static com.hazelcast.internal.networking.HandlerStatus.DIRTY; import static com.hazelcast.internal.nio.IOUtil.compactOrClear; @@ -30,15 +31,11 @@ import static com.hazelcast.internal.util.StringUtil.stringToBytes; /** - * Together with {@link SingleProtocolDecoder}, this encoder decoder pair is - * used for checking correct protocol is used or not. {@link - * SingleProtocolDecoder} checks if the correct protocol is received. If the - * protocol is correct, both encoder and decoder swaps itself with the next - * handler in the pipeline. If it isn't {@link SingleProtocolEncoder} throws - * {@link ProtocolException} and {@link SingleProtocolDecoder} sends {@value - * Protocols#UNEXPECTED_PROTOCOL}. Note that in client mode {@link - * SingleProtocolEncoder} has no effect, and it swaps itself with the next - * handler. + * Together with {@link SingleProtocolDecoder}, this encoder-decoder pair is used to check if correct protocol is used. + * {@link SingleProtocolDecoder} checks if the proper protocol is received. If the protocol is correct, both encoder and decoder + * are replaced by the next handlers in the pipeline. If it isn't the {@link SingleProtocolEncoder} sends + * {@link Protocols#UNEXPECTED_PROTOCOL} response and throws a {@link ProtocolException}. Note that in client mode the + * {@link SingleProtocolEncoder} allows blocking packet writes until the (member-)protocol is confirmed. */ public class SingleProtocolEncoder extends OutboundHandler { private final OutboundHandler[] outboundHandlers; @@ -65,27 +62,25 @@ public HandlerStatus onWrite() throws Exception { // sends anything and only swaps itself with the next encoder try { // First, decoder must receive the protocol - if (!isDecoderReceivedProtocol && !channel.isClientMode()) { + if (!isDecoderReceivedProtocol) { + return BLOCKED; + } + if (isDecoderVerifiedProtocol) { + // Set up the next encoder in the pipeline once the protocol is verified + setupNextEncoder(); return CLEAN; } - // Decoder didn't verify the protocol, protocol error should be sent - if (!isDecoderVerifiedProtocol && !channel.isClientMode()) { + // Decoder received protocol bytes, but verification failed. If we are server/acceptor, then respond with the + // UNEXPECTED_PROTOCOL response bytes. + if (!channel.isClientMode()) { if (!sendProtocol()) { return DIRTY; } - // UNEXPECTED_PROTOCOL is sent (or at least in the socket - // buffer). We can now throw exception in the pipeline to close - // the channel. - throw new ProtocolException(exceptionMessage); - } - - if (channel.isClientMode()) { - // Set up the next encoder in the pipeline if in client mode - setupNextEncoder(); } - - return CLEAN; + // Either we are in the client mode or the UNEXPECTED_PROTOCOL is sent already (or at least placed into the + // destination buffer). We can now throw exception in the pipeline to close the channel. + throw new ProtocolException(exceptionMessage); } finally { dst.flip(); } @@ -103,7 +98,7 @@ private boolean sendProtocol() { } // Swap this encoder with the next one - protected void setupNextEncoder() { + private void setupNextEncoder() { channel.outboundPipeline().replace(this, outboundHandlers); } @@ -125,17 +120,31 @@ private boolean isProtocolBufferDrained() { // Used by SingleProtocolDecoder in order to swap // SingleProtocolEncoder with the next encoder in the pipeline public void signalProtocolVerified() { + // this update order below must stay in reverse order with access order in SingleProtocolEncode#onWrite isDecoderVerifiedProtocol = true; isDecoderReceivedProtocol = true; - channel.outboundPipeline().wakeup(); + // This channel can become null when SingleProtocolEncoder is not active handler of the outbound + // pipeline, when the previous MemberProtocolEncoder doesn't replace itself with SingleProtocolEncoder + // yet. In this case, this outboundPipeline().wakeup() call can be ignored since it is not possible + // to enter the blocked state from the path that isDecoderReceivedProtocol check is performed. + if (channel != null) { + channel.outboundPipeline().wakeup(); + } } // Used by SingleProtocolDecoder in order to send HZX eventually public void signalWrongProtocol(String exceptionMessage) { + // this update order below must stay in reverse order with access order in SingleProtocolEncode#onWrite this.exceptionMessage = exceptionMessage; isDecoderVerifiedProtocol = false; isDecoderReceivedProtocol = true; - channel.outboundPipeline().wakeup(); + // This channel can become null when SingleProtocolEncoder is not active handler of the outbound + // pipeline, when the previous MemberProtocolEncoder doesn't replace itself with SingleProtocolEncoder + // yet. In this case, this outboundPipeline().wakeup() call can be ignored since it is not possible + // to enter the blocked state from the path that isDecoderReceivedProtocol check is performed. + if (channel != null) { + channel.outboundPipeline().wakeup(); + } } public OutboundHandler getFirstOutboundHandler() { diff --git a/hazelcast/src/test/java/com/hazelcast/internal/server/tcp/ProtocolNegotiationTest.java b/hazelcast/src/test/java/com/hazelcast/internal/server/tcp/ProtocolNegotiationTest.java new file mode 100644 index 0000000000000..8bbe05d602892 --- /dev/null +++ b/hazelcast/src/test/java/com/hazelcast/internal/server/tcp/ProtocolNegotiationTest.java @@ -0,0 +1,194 @@ +/* + * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.internal.server.tcp; + +import static com.hazelcast.internal.nio.IOUtil.close; +import static com.hazelcast.test.HazelcastTestSupport.smallInstanceConfig; +import static java.lang.Math.max; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import java.io.IOException; +import java.io.InputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.net.ssl.SSLSocket; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; +import org.junit.runners.Parameterized.UseParametersRunnerFactory; + +import com.hazelcast.config.Config; +import com.hazelcast.config.JoinConfig; +import com.hazelcast.logging.ILogger; +import com.hazelcast.logging.Logger; +import com.hazelcast.spi.properties.ClusterProperty; +import com.hazelcast.test.HazelcastParallelParametersRunnerFactory; +import com.hazelcast.test.HazelcastParametrizedRunner; +import com.hazelcast.test.TestAwareInstanceFactory; +import com.hazelcast.test.annotation.QuickTest; + +/** + * Verify that after sending member protocol header bytes (HZC) no more bytes are sent from the connection initiating member. + * (Next bytes should only follow if the protocol is confirmed by the HZC reply - not tested by this test). + */ +@RunWith(HazelcastParametrizedRunner.class) +@UseParametersRunnerFactory(HazelcastParallelParametersRunnerFactory.class) +@Category({ QuickTest.class }) +public class ProtocolNegotiationTest { + + private final BytesCountingServer bcServer = new BytesCountingServer(createServerSocket()); + private final TestAwareInstanceFactory factory = new TestAwareInstanceFactory(); + + @Before + public void before() { + new Thread(bcServer).start(); + } + + @After + public void after() { + factory.terminateAll(); + bcServer.stop(); + close(bcServer.serverSocket); + } + + @Parameter + public boolean advancedNetworking; + + @Parameters(name = "advancedNetworking:{0}") + public static Object[] parameters() { + return new Object[] { true, false }; + } + + /** + * Verify that only 3 header bytes are sent to a server. + */ + @Test + public void verifyOnlyTheProtocolHeaderIsSent() { + Config config = createConfig(); + assertThrows(IllegalStateException.class, () -> factory.newHazelcastInstance(config)); + bcServer.stop(); + assertEquals(3, bcServer.maxBytesReceived.get()); + } + + protected Config createConfig() { + JoinConfig joinConfig = new JoinConfig(); + joinConfig.getMulticastConfig().setEnabled(false); + joinConfig.getAutoDetectionConfig().setEnabled(false); + joinConfig.getTcpIpConfig().setEnabled(true).setConnectionTimeoutSeconds(3) + .addMember("127.0.0.1:" + bcServer.serverSocket.getLocalPort()); + Config config = smallInstanceConfig() + .setProperty(ClusterProperty.INVOCATION_MAX_RETRY_COUNT.getName(), "1") + .setProperty(ClusterProperty.INVOCATION_RETRY_PAUSE.getName(), "0") + .setProperty(ClusterProperty.WAIT_SECONDS_BEFORE_JOIN.getName(), "0") + .setProperty(ClusterProperty.OPERATION_CALL_TIMEOUT_MILLIS.getName(), "2000") + .setProperty(ClusterProperty.MAX_JOIN_SECONDS.getName(), "2") + ; + if (advancedNetworking) { + config.getAdvancedNetworkConfig().setEnabled(true).setJoin(joinConfig); + } else { + config.getNetworkConfig().setJoin(joinConfig); + } + return config; + } + + protected ServerSocket createServerSocket() { + try { + return new ServerSocket(0); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + static final class BytesCountingServer implements Runnable { + private static final ILogger LOGGER = Logger.getLogger(BytesCountingServer.class); + + final ServerSocket serverSocket; + volatile boolean shutdownRequested; + final AtomicInteger maxBytesReceived = new AtomicInteger(-1); + + BytesCountingServer(ServerSocket serverSocket) { + this.serverSocket = serverSocket; + try { + this.serverSocket.setSoTimeout(500); + } catch (SocketException e) { + throw new RuntimeException(e); + } + LOGGER.info("The server will be listening on port " + serverSocket.getLocalPort()); + } + + public void run() { + try { + while (!shutdownRequested) { + try { + Socket socket = serverSocket.accept(); + new Thread(() -> { + LOGGER.info("Socket accepted " + socket); + try { + if (socket instanceof SSLSocket) { + ((SSLSocket) socket).startHandshake(); + } + socket.setSoTimeout(100); + int count = readWithTimeout(socket.getInputStream(), 2000); + LOGGER.info("Bytes read: " + count); + maxBytesReceived.updateAndGet(c -> max(c, count)); + } catch (IOException e) { + LOGGER.warning("Reading from the socket failed", e); + } finally { + close(socket); + } + }).start(); + } catch (SocketTimeoutException e) { + // it's fine + } + } + } catch (IOException e) { + LOGGER.warning("The test server thrown an exception", e); + } finally { + close(serverSocket); + } + } + + void stop() { + shutdownRequested = true; + } + + static int readWithTimeout(InputStream is, long timeoutMillis) throws IOException { + int count = 0; + long maxTimeMillis = System.currentTimeMillis() + timeoutMillis; + while (System.currentTimeMillis() < maxTimeMillis) { + try { + is.read(); + count++; + } catch (SocketTimeoutException e) { + // OK - we have the SO_TIMEOUT configured on the socket + } + } + return count; + } + } + +} From 92f3497d843041d973bb559d9e766c6b74cfdbc4 Mon Sep 17 00:00:00 2001 From: Josef Cacek Date: Fri, 23 Jun 2023 15:10:41 +0200 Subject: [PATCH 2/3] Checkstyle fix --- .../hazelcast/internal/server/tcp/ProtocolNegotiationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hazelcast/src/test/java/com/hazelcast/internal/server/tcp/ProtocolNegotiationTest.java b/hazelcast/src/test/java/com/hazelcast/internal/server/tcp/ProtocolNegotiationTest.java index 8bbe05d602892..df945234ced5d 100644 --- a/hazelcast/src/test/java/com/hazelcast/internal/server/tcp/ProtocolNegotiationTest.java +++ b/hazelcast/src/test/java/com/hazelcast/internal/server/tcp/ProtocolNegotiationTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved. + * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 27653b4d3315a3f46c489657447e3d8d4b885cfa Mon Sep 17 00:00:00 2001 From: Viliam Durina Date: Mon, 31 Jan 2022 10:52:09 +0100 Subject: [PATCH 3/3] Fix SqlErrorTest.testCancel It was a test issue. The SqlResult is automatically closed when the backing job completes (for member cursors) or when all rows are fetched (for client cursor). A sleep inserted before `res.close()` reproduced the issue (only for member, because for client the fetch size was set to 1 and the client would never fetch all rows at that point. Fixed by replacing the query with a streaming one. Fixes #20336 --- .../java/com/hazelcast/jet/sql/SqlErrorAbstractTest.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/SqlErrorAbstractTest.java b/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/SqlErrorAbstractTest.java index 9f19d6d0945e8..ffd48d89a2def 100644 --- a/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/SqlErrorAbstractTest.java +++ b/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/SqlErrorAbstractTest.java @@ -123,14 +123,10 @@ protected void checkUserCancel(boolean useClient) { instance1 = newHazelcastInstance(true); client = newClient(); - createMapping(instance1, MAP_NAME, long.class, long.class); - IMap map = instance1.getMap(MAP_NAME); - map.put(1L, 1L); - map.put(2L, 2L); - HazelcastInstance target = useClient ? client : instance1; - try (SqlResult res = target.getSql().execute(query().setCursorBufferSize(1))) { + try (SqlResult res = target.getSql().execute("select * from table(generate_stream(1))")) { + sleepSeconds(1); res.close(); try {