Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.LongUnaryOperator;

import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -952,6 +953,52 @@ public void testDoTCPCheckMemberWithUnkownStatus() throws Exception {
executeTestDoTCPCheck(GMSHealthMonitor.ERROR + 100, false);
}

@Test
public void heartbeatOversleepCausesWarning() {
testHeartbeatSleepScenario(sleepLimit -> sleepLimit + 1,
"Failure detection heartbeat-generation thread overslept by more than a full period. Asleep time: 1,000,000,001 nanoseconds. Period: 500,000,000 nanoseconds.");
}

@Test
public void heartbeatOnTimeWakeupCausesNoWarning() {
testHeartbeatSleepScenario(sleepLimit -> sleepLimit,
null);
}

private void testHeartbeatSleepScenario(final LongUnaryOperator actualSleepPeriod,
final String expectedLogWarning) {

/*
* Creating a class here because it's a convenient to provide (mutable) variables needed
* by the lambdas. Without the class, each of them would have to be arrays or atomics
* or some other kind of "holder object". By creating a class they can simply be fields.
*/
new Runnable() {
// the thing we're testing
final GMSHealthMonitor.Heart heart = gmsHealthMonitor.new Heart();
int periodNumber = 0; // index into times
String capturedMessage; // warning message (if any) generated by heart

@Override
public void run() {
heart.sendPeriodicHeartbeats(sleepMillis -> {
},
() -> {
switch (periodNumber++) {
case 0:
return 0L;
case 1:
default:
gmsHealthMonitor.stop();
return actualSleepPeriod.applyAsLong(heart.sleepLimitNanos);
}
},
msg -> capturedMessage = msg);
assertThat(capturedMessage).isEqualTo(expectedLogWarning);
}
}.run();
}

private void executeTestDoTCPCheck(int receivedStatus, boolean expectedResult) throws Exception {
MemberIdentifier otherMember =
createGMSMember(Version.CURRENT_ORDINAL, 0, 1, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -750,82 +750,7 @@ private void startTcpServer(ServerSocket ssocket) {
* process
*/
private void startHeartbeatThread() {
checkExecutor.execute(new Runnable() {
@Override
public void run() {
Thread.currentThread().setName("Geode Heartbeat Sender");
sendPeriodicHeartbeats();
}

private void sendPeriodicHeartbeats() {
while (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
try {
Thread.sleep(memberTimeout / LOGICAL_INTERVAL);
} catch (InterruptedException e) {
return;
}
GMSMembershipView<ID> v = currentView;
if (v != null) {
List<ID> mbrs = v.getMembers();
int index = mbrs.indexOf(localAddress);
if (index < 0 || mbrs.size() < 2) {
continue;
}
if (!playingDead) {
sendHeartbeats(mbrs, index);
}
}
}
}

private void sendHeartbeats(List<ID> mbrs, int startIndex) {
ID coordinator = currentView.getCoordinator();
if (coordinator != null && !coordinator.equals(localAddress)) {
HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1);
message.setRecipient(coordinator);
try {
if (isStopping) {
return;
}
services.getMessenger().sendUnreliably(message);
GMSHealthMonitor.this.stats.incHeartbeatsSent();
} catch (MembershipClosedException e) {
return;
}
}

int index = startIndex;
int numSent = 0;
for (;;) {
index--;
if (index < 0) {
index = mbrs.size() - 1;
}
ID mbr = mbrs.get(index);
if (mbr.equals(localAddress)) {
break;
}
if (mbr.equals(coordinator)) {
continue;
}
if (isStopping) {
return;
}
HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1);
message.setRecipient(mbr);
try {
services.getMessenger().sendUnreliably(message);
GMSHealthMonitor.this.stats.incHeartbeatsSent();
numSent++;
if (numSent >= NUM_HEARTBEATS) {
break;
}
} catch (MembershipClosedException e) {
return;
}
}
} // for (;;)
});
checkExecutor.execute(new Heart());
}

@Override
Expand Down Expand Up @@ -1530,4 +1455,117 @@ public void run() {
public MembershipStatistics getStats() {
return this.stats;
}

@FunctionalInterface
interface Sleeper {
void sleep(long millis) throws InterruptedException;
}

@FunctionalInterface
interface NanoTimer {
long nanoTime();
}

@FunctionalInterface
interface Warner {
void warn(String message);
}

class Heart implements Runnable {

// If we sleep longer than this number of periods then log a warning
public static final int OVERSLEEP_WARNING_THRESHOLD_PERIODS = 2;
public final long sleepPeriodMillis = memberTimeout / LOGICAL_INTERVAL;
public final long sleepPeriodNanos =
TimeUnit.NANOSECONDS.convert(sleepPeriodMillis, TimeUnit.MILLISECONDS);
public final long sleepLimitNanos = OVERSLEEP_WARNING_THRESHOLD_PERIODS * sleepPeriodNanos;

@Override
public void run() {
Thread.currentThread().setName("Geode Heartbeat Sender");
sendPeriodicHeartbeats(Thread::sleep, System::nanoTime, logger::warn);
}

@VisibleForTesting
void sendPeriodicHeartbeats(final Sleeper sleeper,
final NanoTimer nanoTimer,
final Warner warner) {
while (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
try {
final long timeBeforeSleep = nanoTimer.nanoTime();
sleeper.sleep(sleepPeriodMillis);
final long timeAfterSleep = nanoTimer.nanoTime();
final long asleepNanos = timeAfterSleep - timeBeforeSleep;
if (asleepNanos > sleepLimitNanos) {
warner.warn(
String.format(
"Failure detection heartbeat-generation thread overslept by more than a full period. Asleep time: %,d nanoseconds. Period: %,d nanoseconds.",
asleepNanos, sleepPeriodNanos));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
GMSMembershipView<ID> v = currentView;
if (v != null) {
List<ID> mbrs = v.getMembers();
int index = mbrs.indexOf(localAddress);
if (index < 0 || mbrs.size() < 2) {
continue;
}
if (!playingDead) {
sendHeartbeats(mbrs, index);
}
}
}
}

private void sendHeartbeats(List<ID> mbrs, int startIndex) {
ID coordinator = currentView.getCoordinator();
if (coordinator != null && !coordinator.equals(localAddress)) {
HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1);
message.setRecipient(coordinator);
try {
if (isStopping) {
return;
}
services.getMessenger().sendUnreliably(message);
GMSHealthMonitor.this.stats.incHeartbeatsSent();
} catch (MembershipClosedException e) {
return;
}
}

int index = startIndex;
int numSent = 0;
for (;;) {
index--;
if (index < 0) {
index = mbrs.size() - 1;
}
ID mbr = mbrs.get(index);
if (mbr.equals(localAddress)) {
break;
}
if (mbr.equals(coordinator)) {
continue;
}
if (isStopping) {
return;
}
HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1);
message.setRecipient(mbr);
try {
services.getMessenger().sendUnreliably(message);
GMSHealthMonitor.this.stats.incHeartbeatsSent();
numSent++;
if (numSent >= NUM_HEARTBEATS) {
break;
}
} catch (MembershipClosedException e) {
return;
}
}
} // for (;;)
}
}