Skip to content

Commit

Permalink
Fix OnJoinOp handling during batched cluster joining (#25114) [5.3.z] (
Browse files Browse the repository at this point in the history
…#25153)

Backport of: #25114

After the introduction of these changes in December 2022
(#22942), it became possible
for the `OnJoinRegistrationOperation` used in the `EventServiceImpl` to
not be executed correctly across the cluster.

This previous commit changed `EventServiceImpl#getPostJoinOperation` to
return null permanently, replacing this functionality with an `OnJoinOp`
passed for execution on the cluster's master by each joining member.
This solution works under ideal circumstances of sequential cluster
startup, however when several members try to join the master within
`ClusterProperty#WAIT_SECONDS_BEFORE_JOIN` of startup, these joins are
batched together - when a batched join occurs like this, currently only
the "triggering" (final) member's `OnJoinOp` is executed, while all
others are discarded. This results in registrations not being
distributed across the cluster as expected, and can lead to inconsistent
query results on Hazelcast.

This commit aims to resolve this issue by storing all passed `OnJoinOp`s
alongside their `MemberInfo`, mapped in `ClusterJoinManager` - this
allows us to then execute _all_ passed `OnJoinOp`s  within
`ClusterJoinManager#startJoin`. This ensures registrations are not missed.
I opted to use a `BiTuple` to store this operation rather than creating a
`MemberInfo` extension to ensure we're not retaining the `OnJoinOp`
object longer than necessary (which could happen if our extension is
retained elsewhere), and since this operation only has value during the
join process, and is only needed once. I also only broadcast these
operations to members not included in the batched join itself,
to avoid additional complexity.

This commit also includes a regression test that focuses on the most
prevalent method of observing this inconsistency - by examining
`ProxyServiceImpl` registrations across a 6 member cluster where some
non-master members start simultaneously, and others sequentially.

Fixes https://hazelcast.atlassian.net/browse/HZ-2797
  • Loading branch information
JamesHazelcast committed Aug 7, 2023
1 parent b361a10 commit a593ca0
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.hazelcast.internal.partition.InternalPartitionService;
import com.hazelcast.internal.partition.PartitionRuntimeState;
import com.hazelcast.internal.server.ServerConnection;
import com.hazelcast.internal.util.BiTuple;
import com.hazelcast.internal.util.Clock;
import com.hazelcast.internal.util.UuidUtil;
import com.hazelcast.logging.ILogger;
Expand All @@ -65,6 +66,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;

import static com.hazelcast.cluster.memberselector.MemberSelectors.DATA_MEMBER_SELECTOR;
import static com.hazelcast.instance.EndpointQualifier.MEMBER;
Expand Down Expand Up @@ -100,7 +102,8 @@ public class ClusterJoinManager {
private final ClusterClockImpl clusterClock;
private final ClusterStateManager clusterStateManager;

private final Map<Address, MemberInfo> joiningMembers = new LinkedHashMap<>();
// Map of members batched to join, along with their passed OnJoinOp pre-join operation
private final Map<Address, BiTuple<MemberInfo, OnJoinOp>> joiningMembers = new LinkedHashMap<>();
private final Map<UUID, Long> recentlyJoinedMemberUuids = new HashMap<>();

/**
Expand Down Expand Up @@ -457,22 +460,31 @@ private void startJoinRequest(MemberInfo memberInfo, OnJoinOp preJoinOperation)
firstJoinRequest = now;
}

final MemberInfo existing = joiningMembers.put(memberInfo.getAddress(), memberInfo);
// Store the OnJoinOp passed in joiningMembers map to execute later; otherwise when we batch
// join request, only the final joiner's OnJoinOp is executed - we want to execute them all!
final BiTuple<MemberInfo, OnJoinOp> existing = joiningMembers.put(memberInfo.getAddress(),
BiTuple.of(memberInfo, preJoinOperation));
if (existing == null) {
sendMasterAnswer(memberInfo.getAddress());
if (now - firstJoinRequest < maxWaitMillisBeforeJoin) {
timeToStartJoin = now + waitMillisBeforeJoin;
}
} else if (!existing.getUuid().equals(memberInfo.getUuid())) {
} else if (!existing.element1().getUuid().equals(memberInfo.getUuid())) {
logger.warning("Received a new join request from " + memberInfo.getAddress()
+ " with a new UUID " + memberInfo.getUuid()
+ ". Previous UUID was " + existing.getUuid());
+ ". Previous UUID was " + existing.element1().getUuid());
}
if (now >= timeToStartJoin) {
startJoin(preJoinOperation);

if (!isBatchingJoins(now)) {
startJoin();
}
}

// Accessible for testing
public boolean isBatchingJoins(long now) {
return now < timeToStartJoin;
}

/**
* Send join request to {@code toAddress}.
*
Expand Down Expand Up @@ -767,10 +779,8 @@ void setMastershipClaimInProgress() {

/**
* Starts join process on master member.
*
* @param preJoinOperation joining member's preJoinOperation, not master's
*/
private void startJoin(OnJoinOp preJoinOperation) {
private void startJoin() {
logger.fine("Starting join...");
clusterServiceLock.lock();
try {
Expand All @@ -783,7 +793,8 @@ private void startJoin(OnJoinOp preJoinOperation) {
partitionService.pauseMigration();
MemberMap memberMap = clusterService.getMembershipManager().getMemberMap();

MembersView newMembersView = MembersView.cloneAdding(memberMap.toMembersView(), joiningMembers.values());
MembersView newMembersView = MembersView.cloneAdding(memberMap.toMembersView(),
joiningMembers.values().stream().map(BiTuple::element1).collect(Collectors.toList()));

long time = clusterClock.getClusterTime();

Expand All @@ -794,11 +805,15 @@ private void startJoin(OnJoinOp preJoinOperation) {
return;
}

OnJoinOp preJoinOp = preparePreJoinOps();
// Run all joining members' provided pre join operations now, but only
// execute them locally and on existing members of this cluster (do
// not broadcast to other members joining within this batch)
runProvidedPostJoinOpsWithoutBroadcast();

if (preJoinOperation != null) {
nodeEngine.getOperationService().run(preJoinOperation);
}
// Prepare our normal pre-join operations, which will be broadcast remotely;
// this must be done AFTER pre-join ops from all joining members are applied
// to master, via #runProvidedPostJoinOpsWithoutBroadcast() above
OnJoinOp preJoinOp = preparePreJoinOps();

// post join operations must be lock free, that means no locks at all:
// no partition locks, no key-based locks, no service level locks!
Expand All @@ -807,7 +822,8 @@ private void startJoin(OnJoinOp preJoinOperation) {
// this is the current partition assignment state, not taking into account the
// currently joining members
PartitionRuntimeState partitionRuntimeState = partitionService.createPartitionState();
for (MemberInfo member : joiningMembers.values()) {
for (BiTuple<MemberInfo, OnJoinOp> tuple : joiningMembers.values()) {
MemberInfo member = tuple.element1();
if (isMemberRestartingWithPersistence(member.getAttributes())
&& isMemberRejoining(memberMap, member.getAddress(), member.getUuid())) {
logger.info(member + " is rejoining the cluster");
Expand Down Expand Up @@ -842,6 +858,20 @@ && isMemberRejoining(memberMap, member.getAddress(), member.getUuid())) {
}
}

private void runProvidedPostJoinOpsWithoutBroadcast() {
for (BiTuple<MemberInfo, OnJoinOp> tuple : joiningMembers.values()) {
if (tuple.element2() != null) {
OnJoinOp onJoinOp = tuple.element2();
try {
onJoinOp.beforeRun();
} catch (Exception e) {
throw new RuntimeException(e);
}
onJoinOp.runWithoutBroadcastTo(joiningMembers.keySet());
}
}
}

private OnJoinOp preparePostJoinOp() {
Collection<Operation> postJoinOps = nodeEngine.getPostJoinOperations();
return (postJoinOps != null && !postJoinOps.isEmpty()) ? new OnJoinOp(postJoinOps) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.hazelcast.config.SecurityConfig;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.internal.cluster.impl.ClusterDataSerializerHook;
import com.hazelcast.internal.cluster.impl.ClusterJoinManager;
import com.hazelcast.internal.cluster.impl.ClusterServiceImpl;
import com.hazelcast.internal.management.operation.UpdatePermissionConfigOperation;
import com.hazelcast.cluster.Address;
Expand All @@ -36,6 +37,7 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Set;

import static com.hazelcast.internal.serialization.impl.SerializationUtil.readCollection;
import static com.hazelcast.internal.serialization.impl.SerializationUtil.writeCollection;
Expand Down Expand Up @@ -81,6 +83,21 @@ public void beforeRun() throws Exception {

@Override
public void run() throws Exception {
run(null);
}

/**
* Used in {@link ClusterJoinManager#startJoin()} to prevent
* quadratic complexity during batched cluster joining
*
* @param skippedForBroadcast Set of {@link Address} to skip broadcasting
* to if we are master of a cluster
*/
public void runWithoutBroadcastTo(Set<Address> skippedForBroadcast) {
run(skippedForBroadcast);
}

private void run(Set<Address> skippedForBroadcast) {
if (!operations.isEmpty()) {
SecurityConfig securityConfig = getNodeEngine().getConfig().getSecurityConfig();
boolean runPermissionUpdates = securityConfig.getOnJoinPermissionOperation() == OnJoinPermissionOperationName.RECEIVE;
Expand All @@ -101,6 +118,9 @@ public void run() throws Exception {
if (clusterService.isMaster()) {
final OperationService operationService = getNodeEngine().getOperationService();
for (Member member : clusterService.getMembers()) {
if (skippedForBroadcast != null && skippedForBroadcast.contains(member.getAddress())) {
continue;
}
if (!member.localMember() && !member.getUuid().equals(getCallerUuid())) {
OnJoinOp operation = new OnJoinOp(operations);
operationService.invokeOnTarget(getServiceName(), operation, member.getAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ private int getWhenNoSSLDetected() {
* This parameter defines time that the master node will wait since the last
* received join request (a pre-join window) before it starts processing the
* join requests and forming a cluster.
* Alternatively, if the pre-join phase has laster for over
* Alternatively, if the pre-join phase has lasted for over
* {@link #MAX_WAIT_SECONDS_BEFORE_JOIN} seconds, the master node will proceed
* with processing the join requests and forming the cluster, regardless of the
* time elapsed since the last join request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,78 +18,100 @@

import com.hazelcast.cluster.Address;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.internal.util.Clock;
import com.hazelcast.internal.util.FutureUtil;
import com.hazelcast.spi.impl.eventservice.EventRegistration;
import com.hazelcast.spi.impl.proxyservice.impl.ProxyServiceImpl;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.OverridePropertyRule;
import com.hazelcast.test.TestHazelcastInstanceFactory;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static com.hazelcast.test.Accessors.getNodeEngineImpl;
import static java.util.Collections.synchronizedList;
import static com.hazelcast.test.OverridePropertyRule.set;
import static com.hazelcast.test.TestEnvironment.HAZELCAST_TEST_USE_NETWORK;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
public class EventRegistrationTest extends HazelcastTestSupport {

private final ILogger logger = Logger.getLogger(getClass());
@Rule
public final OverridePropertyRule overridePropertyRule = set(HAZELCAST_TEST_USE_NETWORK, "true");
private final HazelcastInstance[] batchedMembers = new HazelcastInstance[6];

@After
public void tearDown() throws InterruptedException {
Hazelcast.shutdownAll();
}
@Before
public void prepare() throws InterruptedException {
Config config = smallInstanceConfigWithoutJetAndMetrics();
final TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory();

@Test
public void test_eventRegistrations_OnStartup() {
assertEventRegistrations(3, startInstances(3));
// Create a cluster that goes through a batched join, starting with a master instance (always index 0)
batchedMembers[0] = factory.newHazelcastInstance(config);
// Start 3 members simultaneously (so their join requests are batched)
List<Future<?>> futures = new ArrayList<>(3);
for (int k = 1; k < 4; k++) {
int finalK = k;
futures.add(spawn(() -> batchedMembers[finalK] = factory.newHazelcastInstance(config)));
}

// Wait for all batched member instances to be created
FutureUtil.waitWithDeadline(futures, 30, TimeUnit.SECONDS);

// Ensure the batched join period has elapsed
ClusterJoinManager joinManager = ((ClusterServiceImpl) getNodeEngineImpl(batchedMembers[0])
.getClusterService()).getClusterJoinManager();
assertTrueEventually(() -> assertFalse(joinManager.isBatchingJoins(Clock.currentTimeMillis())));

// Then start 2 members sequentially (so their join requests are not batched)
for (int k = 4; k < 6; k++) {
batchedMembers[k] = factory.newHazelcastInstance(config);
}

// Ensure cluster size is 6
assertClusterSizeEventually(6, batchedMembers);
}

private HazelcastInstance[] startInstances(int nodeCount) {
List<HazelcastInstance> instancesList = synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(nodeCount);
TestHazelcastInstanceFactory instanceFactory = createHazelcastInstanceFactory(3);

for (int i = 0; i < nodeCount; ++i) {
new Thread(() -> {
try {
Address address = instanceFactory.nextAddress();
HazelcastInstance instance = instanceFactory.newHazelcastInstance(address, new Config());
instancesList.add(instance);
} catch (Throwable e) {
logger.severe(e);
} finally {
latch.countDown();
}
}, "Start thread for node " + i).start();
/**
* <a href="https://hazelcast.atlassian.net/browse/HZ-2797">Related to HZ-2797</a>
*/
@Test
public void testEventRegistrationsForAllMembers_AfterJoin() {
// Gather a set of expected addresses to use in comparisons
Set<Address> expectedAddresses = new HashSet<>(batchedMembers.length);
for (HazelcastInstance member : batchedMembers) {
expectedAddresses.add(getNodeEngineImpl(member).getThisAddress());
}

assertOpenEventually(latch);
// Iterate over all members of the cluster and assert that it has listeners registered for
// every member of the cluster (including the local member itself)
Set<Address> localAddresses = new HashSet<>(batchedMembers.length);
for (HazelcastInstance member : batchedMembers) {
Collection<EventRegistration> registrations = getNodeEngineImpl(member).getEventService().getRegistrations(
ProxyServiceImpl.SERVICE_NAME, ProxyServiceImpl.SERVICE_NAME);

return instancesList.toArray(new HazelcastInstance[0]);
}
localAddresses.clear();
registrations.forEach(registration -> localAddresses.add(registration.getSubscriber()));

private static void assertEventRegistrations(int expected, HazelcastInstance... instances) {
assertTrueEventually(() -> {
for (HazelcastInstance instance : instances) {
Collection<EventRegistration> regs = getNodeEngineImpl(instance).getEventService().getRegistrations(
ProxyServiceImpl.SERVICE_NAME, ProxyServiceImpl.SERVICE_NAME);
assertEquals(instance + ": " + regs, expected, regs.size());
}
});
assertEquals(String.format("Expected: %s, Actual: %s", expectedAddresses, localAddresses),
expectedAddresses.size(), localAddresses.size());
assertContainsAll(localAddresses, expectedAddresses);
}
}
}

0 comments on commit a593ca0

Please sign in to comment.