Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.apache.geode.test.dunit.VM.getController;
import static org.apache.geode.test.dunit.VM.getVM;
import static org.apache.geode.test.dunit.VM.toArray;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

import java.io.File;
import java.io.Serializable;
import java.util.Properties;
import java.util.function.Consumer;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
Expand All @@ -51,7 +52,6 @@
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.DistributedRule;
import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
import org.apache.geode.test.junit.rules.serializable.SerializableTestName;

public class MeterSubregistryReconnectDistributedTest implements Serializable {

Expand All @@ -60,44 +60,31 @@ public class MeterSubregistryReconnectDistributedTest implements Serializable {
private static LocatorLauncher locatorLauncher;

private static InternalDistributedSystem system;
private static MeterRegistry addedSubregistry;
private static MeterRegistry discoveredSubregistry;

private VM locatorVM;
private VM server1VM;
private VM server2VM;
private VM otherServer;

private String locatorName;
private String server1Name;
private String server2Name;
private static final String LOCATOR_NAME = "locator";
private static final String OTHER_SERVER_NAME = "other-server";
private static final String NAME_OF_SERVER_TO_RECONNECT = "server-to-reconnect";

private File locatorDir;
private int locatorPort;

@Rule
public DistributedRule distributedRule = new DistributedRule();

@Rule
public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();

@Rule
public SerializableTestName testName = new SerializableTestName();

@Before
public void setUp() throws Exception {
locatorName = "locator-" + testName.getMethodName();
server1Name = "server-" + testName.getMethodName() + "-1";
server2Name = "server-" + testName.getMethodName() + "-2";

locatorVM = getVM(0);
server1VM = getVM(1);
server2VM = getController();

locatorDir = temporaryFolder.newFolder(locatorName);
otherServer = getVM(1);

int locatorPort = locatorVM.invoke(() -> createLocator());

server1VM.invoke(() -> createServer(server1Name, locatorPort));
server2VM.invoke(() -> createServer(server2Name, locatorPort));
locatorDir = temporaryFolder.newFolder(LOCATOR_NAME);
locatorPort = locatorVM.invoke(this::createLocator);
otherServer.invoke(() -> createServer(OTHER_SERVER_NAME));

addIgnoredException(ForcedDisconnectException.class);
addIgnoredException("Possible loss of quorum");
Expand All @@ -111,46 +98,54 @@ public void tearDown() {
system = null;
});

for (VM vm : toArray(server1VM, server2VM)) {
vm.invoke(() -> {
system.disconnect();
system = null;
});
}
otherServer.invoke(() -> {
system.disconnect();
system = null;
});

system.disconnect();
}

@Test
public void meterSubregistryIsUsedAfterReconnect() {
locatorVM.invoke(() -> {
assertThat(system.getDistributionManager().getDistributionManagerIds()).hasSize(3);
});
public void reconnect_restoresOnlySubregistriesFromCacheFactory() throws InterruptedException {
MeterRegistry subregistryFromCacheFactory = meterRegistryFrom("CacheFactory");
MeterRegistry subregistryFromPublishingService = meterRegistryFrom("MetricsPublishingService");

server2VM.invoke(() -> {
GMSMembershipManager membershipManager = (GMSMembershipManager) getMembershipManager(system);
membershipManager.getGMSManager()
.forceDisconnect("Forcing disconnect in " + testName.getMethodName());
Consumer<CompositeMeterRegistry> addSubregistryViaPublishingService =
compositeMeterRegistry -> compositeMeterRegistry.add(subregistryFromPublishingService);

await().until(() -> system.isReconnecting());
system.waitUntilReconnected(TIMEOUT, MILLISECONDS);
assertThat(system.getReconnectedSystem()).isNotSameAs(system);
});
Consumer<CacheFactory> addSubregistryViaCacheFactory =
cacheFactory -> cacheFactory.addMeterSubregistry(subregistryFromCacheFactory);

locatorVM.invoke(() -> {
assertThat(system.getDistributionManager().getDistributionManagerIds()).hasSize(3);
});
createServer(NAME_OF_SERVER_TO_RECONNECT,
addSubregistryViaCacheFactory,
addSubregistryViaPublishingService);

server2VM.invoke(() -> {
system = (InternalDistributedSystem) system.getReconnectedSystem();
InternalCache cache = system.getCache();
CompositeMeterRegistry compositeMeterRegistry =
(CompositeMeterRegistry) cache.getMeterRegistry();
assertThat(compositeMeterRegistry.getRegistries()).containsOnly(addedSubregistry);
});
reconnect();

assertThat(cacheMeterRegistry().getRegistries())
.contains(subregistryFromCacheFactory)
.doesNotContain(subregistryFromPublishingService);
}

private CompositeMeterRegistry cacheMeterRegistry() {
InternalCache cache = system.getCache();
return (CompositeMeterRegistry) cache.getMeterRegistry();
}

private void reconnect() throws InterruptedException {
GMSMembershipManager membershipManager = (GMSMembershipManager) getMembershipManager(system);
membershipManager.getGMSManager().forceDisconnect("Forcing disconnect in test");

await().until(() -> system.isReconnecting());
system.waitUntilReconnected(TIMEOUT, MILLISECONDS);

system = (InternalDistributedSystem) system.getReconnectedSystem();
}

private int createLocator() {
LocatorLauncher.Builder builder = new LocatorLauncher.Builder();
builder.setMemberName(locatorName);
builder.setMemberName(LOCATOR_NAME);
builder.setWorkingDirectory(locatorDir.getAbsolutePath());
builder.setPort(0);
builder.set(DISABLE_AUTO_RECONNECT, "false");
Expand All @@ -166,7 +161,14 @@ private int createLocator() {
return locatorLauncher.getPort();
}

private void createServer(String serverName, int locatorPort) {
private void createServer(String serverName) {
createServer(serverName, cacheFactory -> {
}, compositeMeterRegistry -> {
});
}

private void createServer(String serverName, Consumer<CacheFactory> cacheInitializer,
Consumer<CompositeMeterRegistry> registryInitializer) {
Properties configProperties = new Properties();
configProperties.setProperty(LOCATORS, "localHost[" + locatorPort + "]");
configProperties.setProperty(DISABLE_AUTO_RECONNECT, "false");
Expand All @@ -175,21 +177,22 @@ private void createServer(String serverName, int locatorPort) {
configProperties.setProperty(MEMBER_TIMEOUT, "2000");
configProperties.setProperty(NAME, serverName);

addedSubregistry = new SimpleMeterRegistry();

CacheFactory cacheFactory = new CacheFactory(configProperties);
cacheFactory.addMeterSubregistry(addedSubregistry);
cacheInitializer.accept(cacheFactory);

InternalCache cache = (InternalCache) cacheFactory.create();

CompositeMeterRegistry compositeMeterRegistry =
(CompositeMeterRegistry) cache.getMeterRegistry();
assertThat(compositeMeterRegistry.getRegistries()).contains(addedSubregistry);

// same as a discovered ServerLoader that created a subregistry
discoveredSubregistry = new SimpleMeterRegistry();
compositeMeterRegistry.add(discoveredSubregistry);
registryInitializer.accept(compositeMeterRegistry);

system = cache.getInternalDistributedSystem();
}

private static MeterRegistry meterRegistryFrom(String registrySource) {
SimpleMeterRegistry registry = spy(SimpleMeterRegistry.class);
doReturn("Subregistry added by " + registrySource).when(registry).toString();
return registry;
}
}