Skip to content

Commit

Permalink
Improve IO pipelines [5.0.5] [HZ-1884] (#24893)
Browse files Browse the repository at this point in the history
Backports (cherry-picks) #21066 #21391 #21497 to `5.0.5`.

EE part of the fix for hazelcast/hazelcast-enterprise#6181

---------

Co-authored-by: ufukyilmaz <ufuk.yilmaz@hazelcast.com>
Co-authored-by: Viliam Durina <viliam@hazelcast.com>
  • Loading branch information
3 people committed Jun 23, 2023
1 parent 054e3b3 commit 04417ad
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 87 deletions.
Expand Up @@ -123,14 +123,10 @@ protected void checkUserCancel(boolean useClient) {
instance1 = newHazelcastInstance(true);
client = newClient();

createMapping(instance1, MAP_NAME, long.class, long.class);
IMap<Long, Long> map = instance1.getMap(MAP_NAME);
map.put(1L, 1L);
map.put(2L, 2L);

HazelcastInstance target = useClient ? client : instance1;

try (SqlResult res = target.getSql().execute(query().setCursorBufferSize(1))) {
try (SqlResult res = target.getSql().execute("select * from table(generate_stream(1))")) {
sleepSeconds(1);
res.close();

try {
Expand Down
Expand Up @@ -43,7 +43,7 @@ public interface InboundPipeline {
* No verification is done if the handler is already added and a handler
* should only be added once.
*
* This method should only be made on the thread 'owning' the handler.
* This method should only be made on the thread 'owning' the pipeline.
*
* @param handlers the handlers to add
* @return this
Expand All @@ -58,7 +58,7 @@ public interface InboundPipeline {
* No verification is done if any of the handlers is already added and a
* handler should only be added once.
*
* This method should only be made on the thread 'owning' the handler.
* This method should only be made on the thread 'owning' the pipeline.
*
* @param oldHandler the handler to replace
* @param newHandlers the new handlers to insert
Expand All @@ -71,7 +71,7 @@ public interface InboundPipeline {
/**
* Removes the given handler from the pipeline.
*
* This method should only be made on the thread 'owning' the handler.
* This method should only be made on the thread 'owning' the pipeline.
*
* @param handler the handler to remove
* @return this
Expand Down
Expand Up @@ -28,7 +28,7 @@ public interface OutboundPipeline {
* No verification is done if the handler is already added and a handler
* should only be added once.
*
* This method should only be made on the thread 'owning' the handler.
* This method should only be made on the thread 'owning' the pipeline.
*
* @param handlers the handlers to add.
* @return this
Expand All @@ -43,7 +43,7 @@ public interface OutboundPipeline {
* No verification is done if any of the handlers is already added and a
* handler should only be added once.
*
* This method should only be made on the thread 'owning' the handler.
* This method should only be made on the thread 'owning' the pipeline.
*
* @param oldHandler the handlers to replace
* @param newHandlers the new handlers to insert.
Expand All @@ -56,7 +56,7 @@ public interface OutboundPipeline {
/**
* Removes the given handler from the pipeline.
*
* This method should only be made on the thread 'owning' the handler.
* This method should only be made on the thread 'owning' the pipeline.
*
* @param handler the handler to remove.
* @return this
Expand Down
Expand Up @@ -38,11 +38,19 @@ public void initChannel(Channel channel) {
OutboundHandler[] outboundHandlers = serverContext.createOutboundHandlers(EndpointQualifier.MEMBER, connection);
InboundHandler[] inboundHandlers = serverContext.createInboundHandlers(EndpointQualifier.MEMBER, connection);

SingleProtocolEncoder protocolEncoder = new SingleProtocolEncoder(new MemberProtocolEncoder(outboundHandlers));
OutboundHandler outboundHandler;
SingleProtocolEncoder protocolEncoder;
if (channel.isClientMode()) {
protocolEncoder = new SingleProtocolEncoder(outboundHandlers);
outboundHandler = new MemberProtocolEncoder(protocolEncoder);
} else {
protocolEncoder = new SingleProtocolEncoder(new MemberProtocolEncoder(outboundHandlers));
outboundHandler = protocolEncoder;
}
SingleProtocolDecoder protocolDecoder = new SingleProtocolDecoder(ProtocolType.MEMBER,
inboundHandlers, protocolEncoder, true);
inboundHandlers, protocolEncoder);

channel.outboundPipeline().addLast(protocolEncoder);
channel.outboundPipeline().addLast(outboundHandler);
channel.inboundPipeline().addLast(protocolDecoder);
}
}
Expand Up @@ -20,6 +20,7 @@
import com.hazelcast.internal.networking.OutboundHandler;
import com.hazelcast.internal.nio.ConnectionType;
import com.hazelcast.internal.server.ServerConnection;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import java.nio.ByteBuffer;
Expand All @@ -31,65 +32,47 @@
import static com.hazelcast.internal.nio.Protocols.PROTOCOL_LENGTH;
import static com.hazelcast.internal.util.StringUtil.stringToBytes;

/**
* Writes the member protocol header bytes (HZC) to dst buffer and replaces itself by the next {@link OutboundHandler
* OutboundHandlers}.
*/
public class MemberProtocolEncoder extends OutboundHandler<Void, ByteBuffer> {

private final OutboundHandler[] outboundHandlers;
private volatile boolean encoderCanReplace;

private boolean clusterProtocolBuffered;

/**
* Decodes first 3 incoming bytes, validates against {@code supportedProtocol} and, when
* matching, replaces itself in the inbound pipeline with the {@code next InboundHandler}.
*
* @param next the {@link OutboundHandler} to replace this one in the outbound pipeline
* upon match of protocol bytes
*/
@SuppressFBWarnings("EI_EXPOSE_REP2")
public MemberProtocolEncoder(OutboundHandler[] next) {
public MemberProtocolEncoder(OutboundHandler... next) {
this.outboundHandlers = next;
}

@Override
public void handlerAdded() {
initDstBuffer(PROTOCOL_LENGTH);
initDstBuffer(PROTOCOL_LENGTH, stringToBytes(CLUSTER));
}

@Override
public HandlerStatus onWrite() {
compactOrClear(dst);

try {
if (!clusterProtocolBuffered) {
clusterProtocolBuffered = true;
dst.put(stringToBytes(CLUSTER));
// Return false because ProtocolEncoder is not ready yet; but first we need to flush protocol
return DIRTY;
}

if (!isProtocolBufferDrained()) {
// Return false because ProtocolEncoder is not ready yet; but first we need to flush protocol
return DIRTY;
}

if (encoderCanReplace) {
if (isProtocolBufferDrained()) {
// replace!
ServerConnection connection = (TcpServerConnection) channel.attributeMap().get(ServerConnection.class);
connection.setConnectionType(ConnectionType.MEMBER);
channel.outboundPipeline().replace(this, outboundHandlers);
return CLEAN;
}

return CLEAN;
return DIRTY;
} finally {
dst.flip();
}
}

public void signalEncoderCanReplace() {
encoderCanReplace = true;
channel.outboundPipeline().wakeup();
}

/**
* Checks if the protocol bytes have been drained.
*
Expand Down
Expand Up @@ -50,10 +50,9 @@ public class SingleProtocolDecoder
*/
protected volatile boolean verifyProtocolCalled;
final SingleProtocolEncoder encoder;
private final boolean shouldSignalMemberProtocolEncoder;

public SingleProtocolDecoder(ProtocolType supportedProtocol, InboundHandler next, SingleProtocolEncoder encoder) {
this(supportedProtocol, new InboundHandler[]{next}, encoder, false);
this(supportedProtocol, new InboundHandler[]{next}, encoder);
}

/**
Expand All @@ -72,19 +71,13 @@ public SingleProtocolDecoder(ProtocolType supportedProtocol, InboundHandler next
* that will be notified when
* non-matching protocol bytes have
* been received
* @param shouldSignalMemberProtocolEncoder a boolean used to notify the
* next encoder in the pipeline
* after the {@link SingleProtocolEncoder}
* when matching protocol bytes
* have been received
*/
@SuppressFBWarnings("EI_EXPOSE_REP2")
public SingleProtocolDecoder(ProtocolType supportedProtocol, InboundHandler[] next,
SingleProtocolEncoder encoder, boolean shouldSignalMemberProtocolEncoder) {
SingleProtocolEncoder encoder) {
this.supportedProtocol = supportedProtocol;
this.inboundHandlers = next;
this.encoder = encoder;
this.shouldSignalMemberProtocolEncoder = shouldSignalMemberProtocolEncoder;
this.verifyProtocolCalled = false;
}

Expand Down Expand Up @@ -122,16 +115,6 @@ public HandlerStatus onRead() {
// Initialize the connection
initConnection();
setupNextDecoder();
if (!channel.isClientMode()) {
// Set up the next encoder in the pipeline if in server mode
// This replaces SignalProtocolEncoder with next one in the pipeline
encoder.setupNextEncoder();
}

// Signal the member protocol encoder only if it's needed
if (shouldSignalMemberProtocolEncoder) {
((MemberProtocolEncoder) encoder.getFirstOutboundHandler()).signalEncoderCanReplace();
}

return CLEAN;
} finally {
Expand Down
Expand Up @@ -22,6 +22,7 @@

import java.nio.ByteBuffer;

import static com.hazelcast.internal.networking.HandlerStatus.BLOCKED;
import static com.hazelcast.internal.networking.HandlerStatus.CLEAN;
import static com.hazelcast.internal.networking.HandlerStatus.DIRTY;
import static com.hazelcast.internal.nio.IOUtil.compactOrClear;
Expand All @@ -30,15 +31,11 @@
import static com.hazelcast.internal.util.StringUtil.stringToBytes;

/**
* Together with {@link SingleProtocolDecoder}, this encoder decoder pair is
* used for checking correct protocol is used or not. {@link
* SingleProtocolDecoder} checks if the correct protocol is received. If the
* protocol is correct, both encoder and decoder swaps itself with the next
* handler in the pipeline. If it isn't {@link SingleProtocolEncoder} throws
* {@link ProtocolException} and {@link SingleProtocolDecoder} sends {@value
* Protocols#UNEXPECTED_PROTOCOL}. Note that in client mode {@link
* SingleProtocolEncoder} has no effect, and it swaps itself with the next
* handler.
* Together with {@link SingleProtocolDecoder}, this encoder-decoder pair is used to check if correct protocol is used.
* {@link SingleProtocolDecoder} checks if the proper protocol is received. If the protocol is correct, both encoder and decoder
* are replaced by the next handlers in the pipeline. If it isn't the {@link SingleProtocolEncoder} sends
* {@link Protocols#UNEXPECTED_PROTOCOL} response and throws a {@link ProtocolException}. Note that in client mode the
* {@link SingleProtocolEncoder} allows blocking packet writes until the (member-)protocol is confirmed.
*/
public class SingleProtocolEncoder extends OutboundHandler<Void, ByteBuffer> {
private final OutboundHandler[] outboundHandlers;
Expand All @@ -65,27 +62,25 @@ public HandlerStatus onWrite() throws Exception {
// sends anything and only swaps itself with the next encoder
try {
// First, decoder must receive the protocol
if (!isDecoderReceivedProtocol && !channel.isClientMode()) {
if (!isDecoderReceivedProtocol) {
return BLOCKED;
}
if (isDecoderVerifiedProtocol) {
// Set up the next encoder in the pipeline once the protocol is verified
setupNextEncoder();
return CLEAN;
}

// Decoder didn't verify the protocol, protocol error should be sent
if (!isDecoderVerifiedProtocol && !channel.isClientMode()) {
// Decoder received protocol bytes, but verification failed. If we are server/acceptor, then respond with the
// UNEXPECTED_PROTOCOL response bytes.
if (!channel.isClientMode()) {
if (!sendProtocol()) {
return DIRTY;
}
// UNEXPECTED_PROTOCOL is sent (or at least in the socket
// buffer). We can now throw exception in the pipeline to close
// the channel.
throw new ProtocolException(exceptionMessage);
}

if (channel.isClientMode()) {
// Set up the next encoder in the pipeline if in client mode
setupNextEncoder();
}

return CLEAN;
// Either we are in the client mode or the UNEXPECTED_PROTOCOL is sent already (or at least placed into the
// destination buffer). We can now throw exception in the pipeline to close the channel.
throw new ProtocolException(exceptionMessage);
} finally {
dst.flip();
}
Expand All @@ -103,7 +98,7 @@ private boolean sendProtocol() {
}

// Swap this encoder with the next one
protected void setupNextEncoder() {
private void setupNextEncoder() {
channel.outboundPipeline().replace(this, outboundHandlers);
}

Expand All @@ -125,17 +120,31 @@ private boolean isProtocolBufferDrained() {
// Used by SingleProtocolDecoder in order to swap
// SingleProtocolEncoder with the next encoder in the pipeline
public void signalProtocolVerified() {
// this update order below must stay in reverse order with access order in SingleProtocolEncode#onWrite
isDecoderVerifiedProtocol = true;
isDecoderReceivedProtocol = true;
channel.outboundPipeline().wakeup();
// This channel can become null when SingleProtocolEncoder is not active handler of the outbound
// pipeline, when the previous MemberProtocolEncoder doesn't replace itself with SingleProtocolEncoder
// yet. In this case, this outboundPipeline().wakeup() call can be ignored since it is not possible
// to enter the blocked state from the path that isDecoderReceivedProtocol check is performed.
if (channel != null) {
channel.outboundPipeline().wakeup();
}
}

// Used by SingleProtocolDecoder in order to send HZX eventually
public void signalWrongProtocol(String exceptionMessage) {
// this update order below must stay in reverse order with access order in SingleProtocolEncode#onWrite
this.exceptionMessage = exceptionMessage;
isDecoderVerifiedProtocol = false;
isDecoderReceivedProtocol = true;
channel.outboundPipeline().wakeup();
// This channel can become null when SingleProtocolEncoder is not active handler of the outbound
// pipeline, when the previous MemberProtocolEncoder doesn't replace itself with SingleProtocolEncoder
// yet. In this case, this outboundPipeline().wakeup() call can be ignored since it is not possible
// to enter the blocked state from the path that isDecoderReceivedProtocol check is performed.
if (channel != null) {
channel.outboundPipeline().wakeup();
}
}

public OutboundHandler getFirstOutboundHandler() {
Expand Down

0 comments on commit 04417ad

Please sign in to comment.