Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,8 @@

import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.*;

import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName;
Expand Down Expand Up @@ -338,6 +319,9 @@ public long getTotalECBlockGroups() {
/** Block report thread for handling async reports. */
private final BlockReportProcessingThread blockReportThread;

/** Used to prevent the occurrence of FBR datanodeid collection. */
private Set<String> fullBRDatanodeIds = new HashSet<String>();

/**
* Store blocks {@literal ->} datanodedescriptor(s) map of corrupt replicas.
*/
Expand Down Expand Up @@ -773,6 +757,7 @@ public void close() {
datanodeManager.close();
pendingReconstruction.stop();
blocksMap.close();
fullBRDatanodeIds.clear();
}

/** @return the datanodeManager */
Expand Down Expand Up @@ -2603,6 +2588,12 @@ public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) {
LOG.warn("Failed to find datanode {}", nodeReg);
return 0;
}

if (containFBRDatanode(nodeReg)) {
LOG.warn("The FBR data of {} already exists on the NameNode.", nodeReg);
return 0;
}

// Request a new block report lease. The BlockReportLeaseManager has
// its own internal locking.
long leaseId = blockReportLeaseManager.requestLease(node);
Expand Down Expand Up @@ -5283,6 +5274,31 @@ public <T> T runBlockOp(final Callable<T> action)
}
}

@VisibleForTesting
public void addFBRDatanode(String datanodeId) throws IOException {
synchronized (this) {
if (datanodeId == null || "".equals(datanodeId)) {
String msg = "Full BR cannot make DataNode empty.";
throw new IOException(msg);
}
fullBRDatanodeIds.add(datanodeId);
}
}

@VisibleForTesting
public void removeFBRDatanode(String datanodeId) {
synchronized (this) {
if (datanodeId != null && !"".equals(datanodeId)) {
fullBRDatanodeIds.remove(datanodeId);
}
}
}

@VisibleForTesting
public boolean containFBRDatanode(DatanodeRegistration nodeReg) {
return fullBRDatanodeIds.contains(nodeReg.getDatanodeUuid());
}

/**
* Notification of a successful block recovery.
* @param block for which the recovery succeeded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1618,17 +1618,22 @@ public DatanodeCommand blockReport(final DatanodeRegistration nodeReg,
boolean noStaleStorages = false;
try {
if (bm.checkBlockReportLease(context, nodeReg)) {
for (int r = 0; r < reports.length; r++) {
final BlockListAsLongs blocks = reports[r].getBlocks();
//
// BlockManager.processReport accumulates information of prior calls
// for the same node and storage, so the value returned by the last
// call of this loop is the final updated value for noStaleStorage.
//
final int index = r;
noStaleStorages = bm.runBlockOp(() ->
bm.processReport(nodeReg, reports[index].getStorage(),
blocks, context));
try {
bm.addFBRDatanode(nodeReg.getDatanodeUuid());
for (int r = 0; r < reports.length; r++) {
final BlockListAsLongs blocks = reports[r].getBlocks();
//
// BlockManager.processReport accumulates information of prior calls
// for the same node and storage, so the value returned by the last
// call of this loop is the final updated value for noStaleStorage.
//
final int index = r;
noStaleStorages = bm.runBlockOp(() ->
bm.processReport(nodeReg, reports[index].getStorage(),
blocks, context));
}
} finally {
bm.removeFBRDatanode(nodeReg.getDatanodeUuid());
}
}
} catch (UnregisteredNodeException une) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,44 @@ public void testCheckBlockReportLease() throws Exception {
}
}

@Test
public void testCheckBlockReportLease2() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build()) {
cluster.waitActive();

FSNamesystem fsn = cluster.getNamesystem();
BlockManager blockManager = fsn.getBlockManager();
BlockManager spyBlockManager = spy(blockManager);
fsn.setBlockManagerForTesting(spyBlockManager);
String poolId = cluster.getNamesystem().getBlockPoolId();
NamenodeProtocols rpcServer = cluster.getNameNodeRpc();

// Test based on one DataNode report to Namenode
DataNode dn = cluster.getDataNodes().get(0);
DatanodeRegistration dnRegistration = dn.getDNRegistrationForBP(poolId);
StorageReport[] storages = dn.getFSDataset().getStorageReports(poolId);

HeartbeatResponse hbResponse = rpcServer.sendHeartbeat(
dnRegistration, storages, 0, 0, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
assertTrue(hbResponse.getFullBlockReportLeaseId() != 0);

spyBlockManager.addFBRDatanode(dnRegistration.getDatanodeUuid());
hbResponse = rpcServer.sendHeartbeat(
dnRegistration, storages, 0, 0, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
assertTrue(hbResponse.getFullBlockReportLeaseId() == 0);

spyBlockManager.removeFBRDatanode(dnRegistration.getDatanodeUuid());
hbResponse = rpcServer.sendHeartbeat(
dnRegistration, storages, 0, 0, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
assertTrue(hbResponse.getFullBlockReportLeaseId() != 0);
}
}

private StorageBlockReport[] createReports(DatanodeStorage[] dnStorages,
int numBlocks) {
int longsPerBlock = 3;
Expand Down