Skip to content

Commit

Permalink
GEODE-10010: Improve accuracy of per-second Redis stats (#7358)
Browse files Browse the repository at this point in the history
Code changes:
 - Rather than a value that updates once per second, instantaneous
 operations per second and instantaneous kilobytes read per second now
 return a rolling average of those stats updated 16 times per second
 - Introduce RollingAverageStat nested class in RedisStats

Test changes:
 - Rename AbstractRedisInfoStatsIntegrationTest to match child classes
 - Rather than measuring instantaneous per second stats after
 operations have finished, sample them while operations are ongoing and
 calculate the expected value based on the number of operations
 performed in the one second prior to sampling.
 - Change test tolerances from fixed value to 12.5% of expected value

Authored-by: Donal Evans <doevans@vmware.com>
  • Loading branch information
DonalEvans committed Mar 1, 2022
1 parent 5ed8bd9 commit ea20f25
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 87 deletions.
Expand Up @@ -20,7 +20,7 @@

import org.apache.geode.NativeRedisTestRule;

public class InfoStatsNativeRedisAcceptanceTest extends AbstractRedisInfoStatsIntegrationTest {
public class InfoStatsNativeRedisAcceptanceTest extends AbstractInfoStatsIntegrationTest {
@ClassRule
public static NativeRedisTestRule redis = new NativeRedisTestRule();

Expand Down
Expand Up @@ -15,17 +15,25 @@
package org.apache.geode.redis.internal.commands.executor.server;

import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.offset;
import static org.assertj.core.api.Assertions.withinPercentage;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.assertj.core.data.Offset;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import redis.clients.jedis.Jedis;

Expand All @@ -34,17 +42,19 @@
import org.apache.geode.redis.RedisIntegrationTest;
import org.apache.geode.redis.RedisTestHelper;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;

public abstract class AbstractRedisInfoStatsIntegrationTest implements RedisIntegrationTest {
public abstract class AbstractInfoStatsIntegrationTest implements RedisIntegrationTest {
@Rule
public ExecutorServiceRule executor = new ExecutorServiceRule();

private static final int TIMEOUT = (int) GeodeAwaitility.getTimeout().toMillis();
private static final String EXISTING_HASH_KEY = "Existing_Hash";
private static final String EXISTING_STRING_KEY = "Existing_String";
private static final String EXISTING_SET_KEY_1 = "Existing_Set_1";
private static final String EXISTING_SET_KEY_2 = "Existing_Set_2";

private Jedis jedis;
private static long START_TIME;
private static long startTime;
private static StatisticsClock statisticsClock;

private long preTestConnectionsReceived = 0;
Expand Down Expand Up @@ -73,12 +83,12 @@ public abstract class AbstractRedisInfoStatsIntegrationTest implements RedisInte
@BeforeClass
public static void beforeClass() {
statisticsClock = new EnabledStatisticsClock();
START_TIME = statisticsClock.getTime();
startTime = statisticsClock.getTime();
}

@Before
public void before() {
jedis = new Jedis("localhost", getPort(), TIMEOUT);
jedis = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT);
numInfoCalled.set(0);

long preSetupCommandsProcessed = Long.parseLong(getInfo(jedis).get(COMMANDS_PROCESSED));
Expand Down Expand Up @@ -157,27 +167,57 @@ public void commandsProcessed_shouldIncrement_givenSuccessfulCommand() {
}

@Test
public void opsPerformedOverLastSecond_ShouldUpdate_givenOperationsOccurring() {
int NUMBER_SECONDS_TO_RUN = 10;
AtomicInteger numberOfCommandsExecuted = new AtomicInteger();

await().during(Duration.ofSeconds(NUMBER_SECONDS_TO_RUN)).until(() -> {
jedis.set("key", "value");
numberOfCommandsExecuted.getAndIncrement();
return true;
public void opsPerformedOverLastSecond_ShouldUpdate_givenOperationsOccurring()
throws InterruptedException, ExecutionException, TimeoutException {
final long numberSecondsToRun = 4;
AtomicInteger totalOpsPerformed = new AtomicInteger();

final long startTime = System.currentTimeMillis();
final long endTime = startTime + Duration.ofSeconds(numberSecondsToRun).toMillis();

// Take a sample in the middle of performing operations to help eliminate warmup as a factor
final long timeToSampleAt = startTime + Duration.ofSeconds(numberSecondsToRun / 2).toMillis();
final long timeToGetBaselineOpsPerformed = timeToSampleAt - Duration.ofSeconds(1).toMillis();

// Execute commands in the background
Future<Void> executeCommands = executor.submit(() -> {
Jedis jedis2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT);
while (System.currentTimeMillis() < endTime) {
jedis2.set("key", "value");
totalOpsPerformed.getAndIncrement();
}
jedis2.close();
});
double reportedCommandsPerLastSecond =

// Record the total number of operations performed a second before we plan to sample. A poll
// interval less than the default of 100ms is used to increase the accuracy of the expected
// value, as the stats update the value of instantaneous per second values every 62.5ms
await().pollInterval(Duration.ofMillis(10))
.until(() -> System.currentTimeMillis() >= timeToGetBaselineOpsPerformed);
final int opsPerformedUntilASecondBeforeSampling = totalOpsPerformed.get();

// Calculate how many operations were performed in the last second. A poll interval less than
// the default of 100ms is used to increase the accuracy of the expected value, as the stats
// update the value of instantaneous per second values every 62.5ms
await().pollInterval(Duration.ofMillis(10))
.until(() -> System.currentTimeMillis() >= timeToSampleAt);

final double reportedCommandsPerLastSecond =
Double.parseDouble(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND));

long expected = numberOfCommandsExecuted.get() / NUMBER_SECONDS_TO_RUN;
final int expected = totalOpsPerformed.get() - opsPerformedUntilASecondBeforeSampling;

assertThat(reportedCommandsPerLastSecond).isCloseTo(expected, Offset.offset(4.0));
assertThat(reportedCommandsPerLastSecond).isCloseTo(expected, withinPercentage(12.5));

// if time passes w/o operations
await().during(NUMBER_SECONDS_TO_RUN, TimeUnit.SECONDS).until(() -> true);
executeCommands.get(GeodeAwaitility.getTimeout().toMillis(), TimeUnit.MILLISECONDS);

assertThat(Double.valueOf(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND)))
.isCloseTo(0.0, Offset.offset(1.0));
// Wait two seconds with no operations
Thread.sleep(2000);

// Confirm that instantaneous operations per second returns to zero when no operations are being
// performed, with a small offset to account for the info command being executed
assertThat(Double.parseDouble(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND))).isCloseTo(0,
offset(1.0));
}

@Test
Expand All @@ -193,39 +233,72 @@ public void networkBytesRead_shouldIncrementBySizeOfCommandSent() {
}

@Test
public void networkKiloBytesReadOverLastSecond_shouldBeCloseToBytesReadOverLastSecond() {

double REASONABLE_SOUNDING_OFFSET = .8;
int NUMBER_SECONDS_TO_RUN = 5;
String RESP_COMMAND_STRING = "*3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue\r\n";
int BYTES_SENT_PER_COMMAND = RESP_COMMAND_STRING.length();
public void networkKiloBytesReadOverLastSecond_shouldBeCloseToBytesReadOverLastSecond()
throws InterruptedException, ExecutionException, TimeoutException {
final int numberSecondsToRun = 4;
final String command = "set";
final String key = "key";
final String value = "value";
final int bytesSentPerCommand =
("*3\r\n$" + command.length() + "\r\n" + command +
"\r\n$" + key.length() + "\r\n" + key +
"\r\n$" + value.length() + "\r\n" + value +
"\r\n").length();
AtomicInteger totalBytesSent = new AtomicInteger();

await().during(Duration.ofSeconds(NUMBER_SECONDS_TO_RUN)).until(() -> {
jedis.set("key", "value");
totalBytesSent.addAndGet(BYTES_SENT_PER_COMMAND);
return true;
final long startTime = System.currentTimeMillis();
final long endTime = startTime + Duration.ofSeconds(numberSecondsToRun).toMillis();

// Take a sample in the middle of performing operations to help eliminate warmup as a factor
final long timeToSampleAt = startTime + Duration.ofSeconds(numberSecondsToRun / 2).toMillis();
final long timeToGetBaselineBytesSent = timeToSampleAt - Duration.ofSeconds(1).toMillis();

// Execute commands in the background
Future<Void> executeCommands = executor.submit(() -> {
Jedis jedis2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT);
while (System.currentTimeMillis() < endTime) {
jedis2.set(key, value);
totalBytesSent.addAndGet(bytesSentPerCommand);
}
jedis2.close();
});
double actual_kbs = Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND));
double expected_kbs = ((double) totalBytesSent.get() / NUMBER_SECONDS_TO_RUN) / 1000;

assertThat(actual_kbs).isCloseTo(expected_kbs, Offset.offset(REASONABLE_SOUNDING_OFFSET));
// Record the total number of KB sent a second before we plan to sample. A poll interval less
// than the default of 100ms is used to increase the accuracy of the expected value, as the
// stats update the value of instantaneous per second values every 62.5ms
await().pollInterval(Duration.ofMillis(10))
.until(() -> System.currentTimeMillis() >= timeToGetBaselineBytesSent);
final int bytesSentUntilASecondBeforeSampling = totalBytesSent.get();

// Calculate how many KB were sent in the last second. A poll interval less than the default of
// 100ms is used to increase the accuracy of the expected value, as the stats update the value
// of instantaneous per second values every 62.5ms
await().pollInterval(Duration.ofMillis(10))
.until(() -> System.currentTimeMillis() >= timeToSampleAt);

final double reportedKBReadPerLastSecond =
Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND));

final double expected = (totalBytesSent.get() - bytesSentUntilASecondBeforeSampling) / 1024.0;

assertThat(reportedKBReadPerLastSecond).isCloseTo(expected, withinPercentage(12.5));

executeCommands.get(GeodeAwaitility.getTimeout().toMillis(), TimeUnit.MILLISECONDS);

// if time passes w/o operations
await().during(NUMBER_SECONDS_TO_RUN, TimeUnit.SECONDS)
.until(() -> true);
// Wait two seconds with no operations
Thread.sleep(2000);

// Kb/s should eventually drop to 0 or at least very close since just executing the info
// command may result in the value increasing.
assertThat(Double.valueOf(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND)))
.isCloseTo(0.0, Offset.offset(0.1));
// Confirm that instantaneous KB read per second returns to zero when no operations are being
// performed, with a small offset to account for the info command being executed
assertThat(Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND)))
.isCloseTo(0, offset(0.02));
}

// ------------------- Clients Section -------------------------- //

@Test
public void connectedClients_incrAndDecrWhenClientConnectsAndDisconnects() {
Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT);
Jedis jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
jedis2.ping();

validateConnectedClients(jedis, preTestConnectedClients, 1);
Expand All @@ -237,7 +310,7 @@ public void connectedClients_incrAndDecrWhenClientConnectsAndDisconnects() {

@Test
public void totalConnectionsReceivedStat_shouldIncrement_whenNewConnectionOccurs() {
Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT);
Jedis jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
jedis2.ping();

validateConnectionsReceived(jedis, preTestConnectionsReceived, 1);
Expand Down Expand Up @@ -276,7 +349,7 @@ public void uptimeInSeconds_shouldReturnTimeSinceStartInSeconds() {

// ------------------- Helper Methods ----------------------------- //
public long getStartTime() {
return START_TIME;
return startTime;
}

public long getCurrentTime() {
Expand Down
Expand Up @@ -20,7 +20,7 @@

import org.apache.geode.redis.GeodeRedisServerRule;

public class InfoStatsIntegrationTest extends AbstractRedisInfoStatsIntegrationTest {
public class InfoStatsIntegrationTest extends AbstractInfoStatsIntegrationTest {
@ClassRule
public static GeodeRedisServerRule server = new GeodeRedisServerRule();

Expand All @@ -35,7 +35,5 @@ public int getExposedPort() {
}

@Override
public void configureMaxMemory(Jedis jedis) {
return;
}
public void configureMaxMemory(Jedis jedis) {}
}

0 comments on commit ea20f25

Please sign in to comment.