Skip to content
Permalink
Browse files
Fix region/rack aware placement police replace bookie bug (#2642)
  • Loading branch information
hangc0276 committed Mar 31, 2022
1 parent d7376c4 commit 4443c60739ad8f3ea2156c9bd7243e44a22709c9
Showing 3 changed files with 57 additions and 0 deletions.
@@ -129,6 +129,7 @@ public void handleBookiesThatJoined(Set<BookieId> joinedBookies) {
BookieNode node = createBookieNode(addr);
topology.add(node);
knownBookies.put(addr, node);
historyBookies.put(addr, node);
String region = getLocalRegion(node);
if (null == perRegionPlacement.get(region)) {
perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy()
@@ -62,6 +62,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy implements
static final Logger LOG = LoggerFactory.getLogger(TopologyAwareEnsemblePlacementPolicy.class);
public static final String REPP_DNS_RESOLVER_CLASS = "reppDnsResolverClass";
protected final Map<BookieId, BookieNode> knownBookies = new HashMap<BookieId, BookieNode>();
protected final Map<BookieId, BookieNode> historyBookies = new HashMap<BookieId, BookieNode>();
protected final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
protected Map<BookieNode, WeightedObject> bookieInfoMap = new HashMap<BookieNode, WeightedObject>();
// Initialize to empty set
@@ -717,6 +718,7 @@ public void handleBookiesThatJoined(Set<BookieId> joinedBookies) {
BookieNode node = createBookieNode(addr);
topology.add(node);
knownBookies.put(addr, node);
historyBookies.put(addr, node);
if (this.isWeighted) {
this.bookieInfoMap.putIfAbsent(node, new BookieInfo());
}
@@ -750,6 +752,7 @@ public void onBookieRackChange(List<BookieId> bookieAddressList) {
topology.remove(node);
topology.add(newNode);
knownBookies.put(bookieAddress, newNode);
historyBookies.put(bookieAddress, newNode);
}
} catch (IllegalArgumentException | NetworkTopologyImpl.InvalidTopologyException e) {
LOG.error("Failed to update bookie rack info: {} ", bookieAddress, e);
@@ -798,6 +801,11 @@ protected String resolveNetworkLocation(BookieId addr) {
try {
return NetUtils.resolveNetworkLocation(dnsResolver, bookieAddressResolver.resolve(addr));
} catch (BookieAddressResolver.BookieIdNotResolvedException err) {
BookieNode historyBookie = historyBookies.get(addr);
if (null != historyBookie) {
return historyBookie.getNetworkLocation();
}

LOG.error("Cannot resolve bookieId {} to a network address, resolving as {}", addr,
NetworkTopology.DEFAULT_REGION_AND_RACK, err);
return NetworkTopology.DEFAULT_REGION_AND_RACK;
@@ -1421,6 +1421,54 @@ private int getNumCoveredRegionsInWriteQuorum(List<BookieId> ensemble, int write
return numCoveredWriteQuorums;
}

@Test
public void testRecoveryOnNodeFailure() throws Exception {
repp.uninitalize();
repp = new RegionAwareEnsemblePlacementPolicy();
repp.initialize(conf, Optional.empty(), timer, DISABLE_ALL,
NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181);
BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.7", 3181);

// Update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/r1");
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/r1");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region2/r2");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region2/r2");
StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/region3/r3");
StaticDNSResolver.addNodeToRack(addr6.getHostName(), "/region3/r3");

// Update cluster
Set<BookieId> addrs = new HashSet<>();
addrs.add(addr1.toBookieId());
addrs.add(addr2.toBookieId());
addrs.add(addr3.toBookieId());
addrs.add(addr4.toBookieId());
addrs.add(addr5.toBookieId());
addrs.add(addr6.toBookieId());

repp.onClusterChanged(addrs, new HashSet<>());

Set<BookieId> bookiesLeftSet = new HashSet<>();
bookiesLeftSet.add(addr1.toBookieId());
repp.handleBookiesThatLeft(bookiesLeftSet);

List<BookieId> currentEnsemble = new ArrayList<>();
currentEnsemble.add(addr1.toBookieId());
currentEnsemble.add(addr3.toBookieId());
currentEnsemble.add(addr6.toBookieId());

EnsemblePlacementPolicy.PlacementResult<BookieId> placementResult = repp.replaceBookie(3,
3, 2, null,
currentEnsemble, addr1.toBookieId(), new HashSet<>());

assertEquals(placementResult.getResult(), addr2.toBookieId());
}

@Test
public void testNodeWithFailures() throws Exception {
repp.uninitalize();

0 comments on commit 4443c60

Please sign in to comment.