Skip to content
Permalink
Browse files
GEODE-10340: Add new DiskStoreMXBean JMX metrics (#7726)
* GEODE-10340: Add new DiskStoreMXBean JMX metrics

The following DiskStore stats are now published via JMX:
recoveredEntryCreates, recoveredEntryUpdates and recoveredEntryDestroys.

* GEODE-10340: Added default implementation for new methods
  • Loading branch information
albertogpz committed Jul 6, 2022
1 parent d6261cc commit bb93789c16b1d2e1904a04d743add8ab29601379
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 6 deletions.
@@ -9,5 +9,9 @@
"Method org.apache.geode.cache.query.IndexStatistics.getReadLockCountLong()": "Added new methods.",
"Method org.apache.geode.management.MemberMXBean.getOffHeapFragments()": "Added new stat",
"Method org.apache.geode.management.MemberMXBean.getOffHeapFreedChunks()": "Added new stat",
"Method org.apache.geode.management.MemberMXBean.getOffHeapLargestFragment()": "Added new stat"
"Method org.apache.geode.management.MemberMXBean.getOffHeapLargestFragment()": "Added new stat",
"Class org.apache.geode.management.DiskStoreMXBean": "Added new methods.",
"Method org.apache.geode.management.DiskStoreMXBean.getTotalRecoveredEntryCreates()": "Added new stat",
"Method org.apache.geode.management.DiskStoreMXBean.getTotalRecoveredEntryDestroys()": "Added new stat",
"Method org.apache.geode.management.DiskStoreMXBean.getTotalRecoveredEntryUpdates()": "Added new stat"
}
@@ -26,9 +26,12 @@

import javax.management.ObjectName;

import junitparams.Parameters;
import junitparams.naming.TestCaseName;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.apache.geode.cache.Cache;
import org.apache.geode.cache.DataPolicy;
@@ -39,8 +42,10 @@
import org.apache.geode.cache.Scope;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.DiskRegionStats;
import org.apache.geode.internal.cache.DiskStoreImpl;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.TombstoneService;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.cache.persistence.PersistentMemberManager;
import org.apache.geode.internal.process.ProcessUtils;
@@ -49,12 +54,14 @@
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
import org.apache.geode.test.junit.runners.GeodeParamsRunner;

/**
* Test cases to cover all test cases which pertains to disk from Management layer
*/

@SuppressWarnings({"serial", "unused"})
@RunWith(GeodeParamsRunner.class)
public class DiskManagementDUnitTest implements Serializable {

private static final String REGION_NAME =
@@ -193,6 +200,78 @@ public void testMissingMembers() throws Exception {
});
}

/**
* Checks that after a restart of a server the JMX stats
* for oplog recovery are updated accordingly.
*/
@Test
@Parameters({"true, false", "false, false", "true, true"})
@TestCaseName("{method}(useKrf={0}, expireTombstones={1})")
public void testRecoveryStats(boolean useKrf, boolean expireTombstones) throws Exception {
VM memberVM1 = memberVMs[0];

createPersistentRegionAsync(memberVM1, useKrf, expireTombstones).await();

String key1 = "key1";
String key2 = "key2";
String value11 = "value12";
String value12 = "value12";
String value2 = "value2";

putEntry(memberVM1, key1, value11);
putEntry(memberVM1, key2, value2);
updateEntry(memberVM1, key1, value11);
deleteEntry(memberVM1, key1);

if (expireTombstones) {
forceGC(memberVM1, 1);
}

memberVM1.invoke("stop server", () -> {
Cache cache = managementTestRule.getCache();
cache.close();
});

createPersistentRegionAsync(memberVM1, useKrf, expireTombstones).await();

verifyRecoveryStats(memberVM1, true);

verifyRecoveryEntriesStats(memberVM1, useKrf, expireTombstones);

// Check to make sure we recovered the old values of the entries.
memberVM1.invoke("check for the entries", () -> {
Cache cache = managementTestRule.getCache();
Region region = cache.getRegion(REGION_NAME);
assertThat(region.get(key1)).isEqualTo(null);
assertThat(region.get(key2)).isEqualTo(value2);
});
}

private void verifyRecoveryEntriesStats(VM memberVM1, boolean useKrf, boolean expireTombstones) {
memberVM1.invoke("verifyRecoveryEntriesStats", () -> {
Cache cache = managementTestRule.getCache();
Region region = cache.getRegion(REGION_NAME);
DistributedRegion distributedRegion = (DistributedRegion) region;

ManagementService service = managementTestRule.getManagementService();
DiskStoreMXBean diskStoreMXBean = service.getLocalDiskStoreMBean(REGION_NAME);

int recoveredEntryCreates = expireTombstones ? 1 : 2;
int recoveredEntryUpdates = useKrf ? 0 : 2;
int recoveredEntryDestroys = expireTombstones ? 1 : 0;

assertThat(diskStoreMXBean.getTotalRecoveredEntryCreates()).isEqualTo(recoveredEntryCreates);
assertThat(diskStoreMXBean.getTotalRecoveredEntryUpdates()).isEqualTo(recoveredEntryUpdates);
assertThat(diskStoreMXBean.getTotalRecoveredEntryDestroys())
.isEqualTo(recoveredEntryDestroys);
});
}

private void forceGC(VM vm, final int count) {
vm.invoke("force GC", () -> managementTestRule.getCache().getTombstoneService()
.forceBatchExpirationForTests(count));
}

/**
* Invokes flush on the given disk store by MBean interface
*/
@@ -316,19 +395,40 @@ private void compactDiskStoresRemote(final VM managerVM, final int memberCount)
});
}

private void updateTheEntry(final VM memberVM, final String value) {
memberVM.invoke("updateTheEntry", () -> {
private void updateTheEntry(final VM memberVM, final Object value) {
updateEntry(memberVM, "A", value);
}

private void updateEntry(final VM memberVM, final Object key, final Object value) {
memberVM.invoke("updateEntry", () -> {
Cache cache = managementTestRule.getCache();
Region region = cache.getRegion(REGION_NAME);
region.put("A", value);
region.put(key, value);
});
}

private void putAnEntry(final VM memberVM) {
memberVM.invoke("putAnEntry", () -> {
putEntry(memberVM, "A", "B");
}

private void putEntry(final VM memberVM, Object key, Object value) {
memberVM.invoke("putEntry", () -> {
Cache cache = managementTestRule.getCache();
Region region = cache.getRegion(REGION_NAME);
region.put("A", "B");
region.put(key, value);
});
}

private void deleteTheEntry(final VM memberVM) {
deleteEntry(memberVM, "A");

}

private void deleteEntry(final VM memberVM, Object key) {
memberVM.invoke("deleteEntry", () -> {
Cache cache = managementTestRule.getCache();
Region region = cache.getRegion(REGION_NAME);
region.remove(key);
});
}

@@ -346,7 +446,20 @@ private void createPersistentRegion(final VM memberVM)
}

private AsyncInvocation<Void> createPersistentRegionAsync(final VM memberVM) {
return createPersistentRegionAsync(memberVM, true, false);
}

private AsyncInvocation<Void> createPersistentRegionAsync(final VM memberVM, boolean useKrf,
boolean expireTombstones) {
return memberVM.invokeAsync("createPersistentRegionAsync", () -> {
if (!useKrf) {
System.setProperty(DiskStoreImpl.RECOVER_VALUES_SYNC_PROPERTY_NAME, "true");
}
if (expireTombstones) {
DiskStoreImpl.SET_IGNORE_PREALLOCATE = true;
TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = 1;
TombstoneService.EXPIRED_TOMBSTONE_LIMIT = 1;
}
File dir = new File(diskDir, String.valueOf(ProcessUtils.identifyPid()));

Cache cache = managementTestRule.getCache();
@@ -177,6 +177,18 @@ public interface DiskStoreMXBean {
*/
int getTotalRecoveriesInProgress();

default int getTotalRecoveredEntryCreates() {
return 0;
};

default int getTotalRecoveredEntryUpdates() {
return 0;
};

default int getTotalRecoveredEntryDestroys() {
return 0;
};

/**
* Requests the DiskStore to start writing to a new op-log. The old oplog will be asynchronously
* compressed if compaction is set to true. The new op-log will be created in the next available
@@ -120,6 +120,19 @@ public int getTotalRecoveriesInProgress() {
return bridge.getTotalRecoveriesInProgress();
}

@Override
public int getTotalRecoveredEntryCreates() {
return bridge.getTotalRecoveredEntryCreates();
}

public int getTotalRecoveredEntryUpdates() {
return bridge.getTotalRecoveredEntryUpdates();
}

public int getTotalRecoveredEntryDestroys() {
return bridge.getTotalRecoveredEntryDestroys();
}

@Override
public int getWriteBufferSize() {
return bridge.getWriteBufferSize();
@@ -247,6 +247,18 @@ public int getTotalRecoveriesInProgress() {
return getDiskStoreStatistic(StatsKey.RECOVERIES_IN_PROGRESS).intValue();
}

public int getTotalRecoveredEntryCreates() {
return getDiskStoreStatistic(StatsKey.DISK_RECOVERY_ENTRIES_CREATED).intValue();
}

public int getTotalRecoveredEntryUpdates() {
return getDiskStoreStatistic(StatsKey.DISK_RECOVERY_ENTRIES_UPDATED).intValue();
}

public int getTotalRecoveredEntryDestroys() {
return getDiskStoreStatistic(StatsKey.DISK_RECOVERY_ENTRIES_DESTROYED).intValue();
}

public Number getDiskStoreStatistic(String statName) {
if (diskStoreStats != null) {
return diskStoreStats.getStats().get(statName);
@@ -169,6 +169,10 @@ public class StatsKey {
public static final String DISK_WRITEN_BYTES = "writtenBytes";

public static final String DISK_RECOVERY_ENTRIES_CREATED = "recoveredEntryCreates";

public static final String DISK_RECOVERY_ENTRIES_UPDATED = "recoveredEntryUpdates";

public static final String DISK_RECOVERY_ENTRIES_DESTROYED = "recoveredEntryDestroys";
public static final String DISK_RECOVERED_BYTES = "recoveredBytes";

public static final String BACKUPS_IN_PROGRESS = "backupsInProgress";

0 comments on commit bb93789

Please sign in to comment.