Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve IO pipelines [5.0.5] [HZ-1884] #24893

Merged
merged 3 commits into from Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -43,7 +43,7 @@ public interface InboundPipeline {
* No verification is done if the handler is already added and a handler
* should only be added once.
*
* This method should only be made on the thread 'owning' the handler.
* This method should only be made on the thread 'owning' the pipeline.
*
* @param handlers the handlers to add
* @return this
Expand All @@ -58,7 +58,7 @@ public interface InboundPipeline {
* No verification is done if any of the handlers is already added and a
* handler should only be added once.
*
* This method should only be made on the thread 'owning' the handler.
* This method should only be made on the thread 'owning' the pipeline.
*
* @param oldHandler the handler to replace
* @param newHandlers the new handlers to insert
Expand All @@ -71,7 +71,7 @@ public interface InboundPipeline {
/**
* Removes the given handler from the pipeline.
*
* This method should only be made on the thread 'owning' the handler.
* This method should only be made on the thread 'owning' the pipeline.
*
* @param handler the handler to remove
* @return this
Expand Down
Expand Up @@ -28,7 +28,7 @@ public interface OutboundPipeline {
* No verification is done if the handler is already added and a handler
* should only be added once.
*
* This method should only be made on the thread 'owning' the handler.
* This method should only be made on the thread 'owning' the pipeline.
*
* @param handlers the handlers to add.
* @return this
Expand All @@ -43,7 +43,7 @@ public interface OutboundPipeline {
* No verification is done if any of the handlers is already added and a
* handler should only be added once.
*
* This method should only be made on the thread 'owning' the handler.
* This method should only be made on the thread 'owning' the pipeline.
*
* @param oldHandler the handlers to replace
* @param newHandlers the new handlers to insert.
Expand All @@ -56,7 +56,7 @@ public interface OutboundPipeline {
/**
* Removes the given handler from the pipeline.
*
* This method should only be made on the thread 'owning' the handler.
* This method should only be made on the thread 'owning' the pipeline.
*
* @param handler the handler to remove.
* @return this
Expand Down
Expand Up @@ -38,11 +38,19 @@ public void initChannel(Channel channel) {
OutboundHandler[] outboundHandlers = serverContext.createOutboundHandlers(EndpointQualifier.MEMBER, connection);
InboundHandler[] inboundHandlers = serverContext.createInboundHandlers(EndpointQualifier.MEMBER, connection);

SingleProtocolEncoder protocolEncoder = new SingleProtocolEncoder(new MemberProtocolEncoder(outboundHandlers));
OutboundHandler outboundHandler;
SingleProtocolEncoder protocolEncoder;
if (channel.isClientMode()) {
protocolEncoder = new SingleProtocolEncoder(outboundHandlers);
outboundHandler = new MemberProtocolEncoder(protocolEncoder);
} else {
protocolEncoder = new SingleProtocolEncoder(new MemberProtocolEncoder(outboundHandlers));
outboundHandler = protocolEncoder;
}
SingleProtocolDecoder protocolDecoder = new SingleProtocolDecoder(ProtocolType.MEMBER,
inboundHandlers, protocolEncoder, true);
inboundHandlers, protocolEncoder);

channel.outboundPipeline().addLast(protocolEncoder);
channel.outboundPipeline().addLast(outboundHandler);
channel.inboundPipeline().addLast(protocolDecoder);
}
}
Expand Up @@ -20,6 +20,7 @@
import com.hazelcast.internal.networking.OutboundHandler;
import com.hazelcast.internal.nio.ConnectionType;
import com.hazelcast.internal.server.ServerConnection;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import java.nio.ByteBuffer;
Expand All @@ -31,65 +32,47 @@
import static com.hazelcast.internal.nio.Protocols.PROTOCOL_LENGTH;
import static com.hazelcast.internal.util.StringUtil.stringToBytes;

/**
* Writes the member protocol header bytes (HZC) to dst buffer and replaces itself by the next {@link OutboundHandler
* OutboundHandlers}.
*/
public class MemberProtocolEncoder extends OutboundHandler<Void, ByteBuffer> {

private final OutboundHandler[] outboundHandlers;
private volatile boolean encoderCanReplace;

private boolean clusterProtocolBuffered;

/**
* Decodes first 3 incoming bytes, validates against {@code supportedProtocol} and, when
* matching, replaces itself in the inbound pipeline with the {@code next InboundHandler}.
*
* @param next the {@link OutboundHandler} to replace this one in the outbound pipeline
* upon match of protocol bytes
*/
@SuppressFBWarnings("EI_EXPOSE_REP2")
public MemberProtocolEncoder(OutboundHandler[] next) {
public MemberProtocolEncoder(OutboundHandler... next) {
this.outboundHandlers = next;
}

@Override
public void handlerAdded() {
initDstBuffer(PROTOCOL_LENGTH);
initDstBuffer(PROTOCOL_LENGTH, stringToBytes(CLUSTER));
}

@Override
public HandlerStatus onWrite() {
compactOrClear(dst);

try {
if (!clusterProtocolBuffered) {
clusterProtocolBuffered = true;
dst.put(stringToBytes(CLUSTER));
// Return false because ProtocolEncoder is not ready yet; but first we need to flush protocol
return DIRTY;
}

if (!isProtocolBufferDrained()) {
// Return false because ProtocolEncoder is not ready yet; but first we need to flush protocol
return DIRTY;
}

if (encoderCanReplace) {
if (isProtocolBufferDrained()) {
// replace!
ServerConnection connection = (TcpServerConnection) channel.attributeMap().get(ServerConnection.class);
connection.setConnectionType(ConnectionType.MEMBER);
channel.outboundPipeline().replace(this, outboundHandlers);
return CLEAN;
}

return CLEAN;
return DIRTY;
} finally {
dst.flip();
}
}

public void signalEncoderCanReplace() {
encoderCanReplace = true;
channel.outboundPipeline().wakeup();
}

/**
* Checks if the protocol bytes have been drained.
*
Expand Down
Expand Up @@ -50,10 +50,9 @@ public class SingleProtocolDecoder
*/
protected volatile boolean verifyProtocolCalled;
final SingleProtocolEncoder encoder;
private final boolean shouldSignalMemberProtocolEncoder;

public SingleProtocolDecoder(ProtocolType supportedProtocol, InboundHandler next, SingleProtocolEncoder encoder) {
this(supportedProtocol, new InboundHandler[]{next}, encoder, false);
this(supportedProtocol, new InboundHandler[]{next}, encoder);
}

/**
Expand All @@ -72,19 +71,13 @@ public SingleProtocolDecoder(ProtocolType supportedProtocol, InboundHandler next
* that will be notified when
* non-matching protocol bytes have
* been received
* @param shouldSignalMemberProtocolEncoder a boolean used to notify the
* next encoder in the pipeline
* after the {@link SingleProtocolEncoder}
* when matching protocol bytes
* have been received
*/
@SuppressFBWarnings("EI_EXPOSE_REP2")
public SingleProtocolDecoder(ProtocolType supportedProtocol, InboundHandler[] next,
SingleProtocolEncoder encoder, boolean shouldSignalMemberProtocolEncoder) {
SingleProtocolEncoder encoder) {
this.supportedProtocol = supportedProtocol;
this.inboundHandlers = next;
this.encoder = encoder;
this.shouldSignalMemberProtocolEncoder = shouldSignalMemberProtocolEncoder;
this.verifyProtocolCalled = false;
}

Expand Down Expand Up @@ -122,16 +115,6 @@ public HandlerStatus onRead() {
// Initialize the connection
initConnection();
setupNextDecoder();
if (!channel.isClientMode()) {
// Set up the next encoder in the pipeline if in server mode
// This replaces SignalProtocolEncoder with next one in the pipeline
encoder.setupNextEncoder();
}

// Signal the member protocol encoder only if it's needed
if (shouldSignalMemberProtocolEncoder) {
((MemberProtocolEncoder) encoder.getFirstOutboundHandler()).signalEncoderCanReplace();
}

return CLEAN;
} finally {
Expand Down
Expand Up @@ -22,6 +22,7 @@

import java.nio.ByteBuffer;

import static com.hazelcast.internal.networking.HandlerStatus.BLOCKED;
import static com.hazelcast.internal.networking.HandlerStatus.CLEAN;
import static com.hazelcast.internal.networking.HandlerStatus.DIRTY;
import static com.hazelcast.internal.nio.IOUtil.compactOrClear;
Expand All @@ -30,15 +31,11 @@
import static com.hazelcast.internal.util.StringUtil.stringToBytes;

/**
* Together with {@link SingleProtocolDecoder}, this encoder decoder pair is
* used for checking correct protocol is used or not. {@link
* SingleProtocolDecoder} checks if the correct protocol is received. If the
* protocol is correct, both encoder and decoder swaps itself with the next
* handler in the pipeline. If it isn't {@link SingleProtocolEncoder} throws
* {@link ProtocolException} and {@link SingleProtocolDecoder} sends {@value
* Protocols#UNEXPECTED_PROTOCOL}. Note that in client mode {@link
* SingleProtocolEncoder} has no effect, and it swaps itself with the next
* handler.
* Together with {@link SingleProtocolDecoder}, this encoder-decoder pair is used to check if correct protocol is used.
* {@link SingleProtocolDecoder} checks if the proper protocol is received. If the protocol is correct, both encoder and decoder
* are replaced by the next handlers in the pipeline. If it isn't the {@link SingleProtocolEncoder} sends
* {@link Protocols#UNEXPECTED_PROTOCOL} response and throws a {@link ProtocolException}. Note that in client mode the
* {@link SingleProtocolEncoder} allows blocking packet writes until the (member-)protocol is confirmed.
*/
public class SingleProtocolEncoder extends OutboundHandler<Void, ByteBuffer> {
private final OutboundHandler[] outboundHandlers;
Expand All @@ -65,27 +62,25 @@ public HandlerStatus onWrite() throws Exception {
// sends anything and only swaps itself with the next encoder
try {
// First, decoder must receive the protocol
if (!isDecoderReceivedProtocol && !channel.isClientMode()) {
if (!isDecoderReceivedProtocol) {
return BLOCKED;
}
if (isDecoderVerifiedProtocol) {
// Set up the next encoder in the pipeline once the protocol is verified
setupNextEncoder();
return CLEAN;
}

// Decoder didn't verify the protocol, protocol error should be sent
if (!isDecoderVerifiedProtocol && !channel.isClientMode()) {
// Decoder received protocol bytes, but verification failed. If we are server/acceptor, then respond with the
// UNEXPECTED_PROTOCOL response bytes.
if (!channel.isClientMode()) {
if (!sendProtocol()) {
return DIRTY;
}
// UNEXPECTED_PROTOCOL is sent (or at least in the socket
// buffer). We can now throw exception in the pipeline to close
// the channel.
throw new ProtocolException(exceptionMessage);
}

if (channel.isClientMode()) {
// Set up the next encoder in the pipeline if in client mode
setupNextEncoder();
}

return CLEAN;
// Either we are in the client mode or the UNEXPECTED_PROTOCOL is sent already (or at least placed into the
// destination buffer). We can now throw exception in the pipeline to close the channel.
throw new ProtocolException(exceptionMessage);
} finally {
dst.flip();
}
Expand All @@ -103,7 +98,7 @@ private boolean sendProtocol() {
}

// Swap this encoder with the next one
protected void setupNextEncoder() {
private void setupNextEncoder() {
channel.outboundPipeline().replace(this, outboundHandlers);
}

Expand All @@ -125,17 +120,31 @@ private boolean isProtocolBufferDrained() {
// Used by SingleProtocolDecoder in order to swap
// SingleProtocolEncoder with the next encoder in the pipeline
public void signalProtocolVerified() {
// this update order below must stay in reverse order with access order in SingleProtocolEncode#onWrite
isDecoderVerifiedProtocol = true;
isDecoderReceivedProtocol = true;
channel.outboundPipeline().wakeup();
// This channel can become null when SingleProtocolEncoder is not active handler of the outbound
// pipeline, when the previous MemberProtocolEncoder doesn't replace itself with SingleProtocolEncoder
// yet. In this case, this outboundPipeline().wakeup() call can be ignored since it is not possible
// to enter the blocked state from the path that isDecoderReceivedProtocol check is performed.
if (channel != null) {
channel.outboundPipeline().wakeup();
}
}

// Used by SingleProtocolDecoder in order to send HZX eventually
public void signalWrongProtocol(String exceptionMessage) {
// this update order below must stay in reverse order with access order in SingleProtocolEncode#onWrite
this.exceptionMessage = exceptionMessage;
isDecoderVerifiedProtocol = false;
isDecoderReceivedProtocol = true;
channel.outboundPipeline().wakeup();
// This channel can become null when SingleProtocolEncoder is not active handler of the outbound
// pipeline, when the previous MemberProtocolEncoder doesn't replace itself with SingleProtocolEncoder
// yet. In this case, this outboundPipeline().wakeup() call can be ignored since it is not possible
// to enter the blocked state from the path that isDecoderReceivedProtocol check is performed.
if (channel != null) {
channel.outboundPipeline().wakeup();
}
}

public OutboundHandler getFirstOutboundHandler() {
Expand Down