Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,10 @@ private void offerService() throws Exception {

// There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
final long waitTime = scheduler.getHeartbeatWaitTime();
if (waitTime > 0) {
sleepAndLogInterrupts(waitTime, "heartbeat interrupted");
}
} catch(RemoteException re) {
String reClass = re.getClassName();
if (UnregisteredNodeException.class.getName().equals(reClass) ||
Expand Down Expand Up @@ -856,7 +859,7 @@ void register(NamespaceInfo nsInfo) throws IOException {
}


private void sleepAndLogInterrupts(int millis,
private void sleepAndLogInterrupts(long millis,
String stateString) {
try {
Thread.sleep(millis);
Expand Down Expand Up @@ -1147,17 +1150,16 @@ public void run() {
while (shouldRun()) {
try {
final long startTime = scheduler.monotonicNow();
final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
if (!dn.areIBRDisabledForTests() &&
(ibrManager.sendImmediately() || sendHeartbeat)) {
if (!dn.areIBRDisabledForTests() && ibrManager.sendImmediately()) {
synchronized (sendIBRLock) {
ibrManager.sendIBRs(bpNamenode, bpRegistration,
bpos.getBlockPoolId(), getRpcMetricSuffix());
}
}
// There is no work to do; sleep until heartbeat timer elapses,
// or work arrives, and then iterate again.
ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
final long endTime = scheduler.monotonicNow();
// wait until next ibr is ready to send.
// using heart beat interval as max wait time if ibr interval is not configured.
ibrManager.waitTillNextIBR(dnConf.heartBeatInterval - (endTime - startTime));
} catch (Throwable t) {
LOG.error("Exception in IBRTaskHandler.", t);
sleepAndLogInterrupts(5000, "offering IBR service");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ public void testDatanodeReportMissingBlock() throws Exception {
} catch (IOException e) {
// all bad datanodes
}
cluster.triggerHeartbeats(); // IBR delete ack
// ibr thread is seperated from heart beat thread, use BlockReports to trigger deletion ibr
cluster.triggerBlockReports();
int retries = 0;
while (true) {
lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ public void testReportBlockDeleted() throws InterruptedException, IOException {
anyString(),
any(StorageReceivedDeletedBlocks[].class));

// Trigger a heartbeat, this also triggers an IBR.
DataNodeTestUtils.triggerHeartbeat(singletonDn);
// Trigger a block report, this also triggers an IBR.
DataNodeTestUtils.triggerBlockReport(singletonDn);

// Ensure that the deleted block is reported.
int retries = 0;
Expand Down