Skip to content
Permalink
Browse files
IGNITE-14085 Implement network recovery protocol
  • Loading branch information
SammyVimes committed May 13, 2022
1 parent 7fc6738 commit 8c075cdd8defd5ff811c1fa68864650d22c7d06e
Showing 32 changed files with 1,419 additions and 705 deletions.
@@ -81,6 +81,7 @@ private static List<ExecutableElement> extractGetters(ProcessingEnvironment proc
.filter(e -> !typeUtils.isSameType(e.asType(), NetworkMessage.class))
.flatMap(e -> e.getEnclosedElements().stream())
.filter(e -> e.getKind() == ElementKind.METHOD)
.filter(e -> !((ExecutableElement) e).isDefault())
// use a tree map to sort getters by name and remove duplicates
.collect(Collectors.toMap(
e -> e.getSimpleName().toString(),
@@ -24,7 +24,7 @@
*/
public interface NetworkMessage {
/** Size of the message type (in bytes), used during (de-)serialization. */
static final int MSG_TYPE_SIZE_BYTES = 4;
int MSG_TYPE_SIZE_BYTES = 4;

/**
* Message type. Must be <b>distinct</b> among all messages in a <i>message group</i>. Only positive values are allowed.
@@ -51,4 +51,13 @@ default void prepareMarshal(IntSet ids, Object marshaller) throws Exception {
default void unmarshal(Object marshaller, Object descriptors) throws Exception {
// No-op.
}

/**
* Returns {@code true} if this message needs an acknowledgement from the remote node, {@code false} otherwise.
*
* @return {@code true} if this message needs an acknowledgement from the remote node, {@code false} otherwise.
*/
default boolean needAck() {
return true;
}
}
@@ -46,8 +46,6 @@
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -147,7 +145,7 @@ public void testReuseIncomingConnection() throws Exception {
NettySender senderFrom1to2 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);

// Ensure a handshake has finished on both sides by sending a message.
// TODO: IGNITE-14085 When the recovery protocol is implemented replace this with simple
// TODO: IGNITE-16947 When the recovery protocol is implemented replace this with simple
// CompletableFuture#get called on the send future.
var messageReceivedOn2 = new CompletableFuture<Void>();

@@ -353,9 +351,8 @@ private IgniteBiTuple<ConnectionManager, NettyBootstrapFactory> startManager(int
var manager = new ConnectionManager(
cfg,
new SerializationService(registry, mock(UserObjectSerializationContext.class)),
launchId,
consistentId,
() -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
() -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory),
bootstrapFactory
);

0 comments on commit 8c075cd

Please sign in to comment.