Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Entry write support local node region aware placement policy #4063

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,20 @@ public PlacementResult<List<BookieId>> newEnsemble(int ensembleSize, int writeQu
remainingEnsembleBeforeIteration = remainingEnsemble;
int regionsToAllocate = numRemainingRegions;
int startRegionIndex = lastRegionIndex % numRegionsAvailable;
int localRegionIndex = -1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a more clear way to do this is to remove the localRegion from the availableRegions then everything happen smoothly with original logic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not. If so, the localRegion node can't pick again. If we want to pick 5 nodes, and there are only two regions. If we remove localRegion, then we can only pick nodes from the remaining one region.

if (myRegion != null && !UNKNOWN_REGION.equals(myRegion)) {
localRegionIndex = availableRegions.indexOf(myRegion);
}
String region = myRegion;
for (int i = 0; i < numRegionsAvailable; ++i) {
String region = availableRegions.get(startRegionIndex % numRegionsAvailable);
startRegionIndex++;
// select the local region first, and for the rest region select, use round-robin selection.
if (i > 0 || localRegionIndex == -1) {
if (startRegionIndex % numRegionsAvailable == localRegionIndex) {
startRegionIndex++;
}
region = availableRegions.get(startRegionIndex % numRegionsAvailable);
startRegionIndex++;
}
final Pair<Integer, Integer> currentAllocation = regionsWiseAllocation.get(region);
TopologyAwareEnsemblePlacementPolicy policyWithinRegion = perRegionPlacement.get(region);
if (!regionsReachedMaxAllocation.contains(region)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1880,4 +1880,119 @@ public void testNotifyRackChangeWithNewRegion() throws Exception {
assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
assertEquals("region3", repp.address2Region.get(addr4.toBookieId()));
}


@Test
public void testNewEnsemblePickLocalRegionBookies()
throws Exception {
repp.uninitalize();
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.10", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181);
BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181);
BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.8", 3181);
BookieSocketAddress addr9 = new BookieSocketAddress("127.0.0.9", 3181);

// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/r1");
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region2/r2");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region2/r2");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region2/r2");
StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/region3/r3");
StaticDNSResolver.addNodeToRack(addr6.getHostName(), "/region4/r4");
StaticDNSResolver.addNodeToRack(addr7.getHostName(), "/region5/r5");
StaticDNSResolver.addNodeToRack(addr8.getHostName(), "/region1/r2");
StaticDNSResolver.addNodeToRack(addr9.getHostName(), "/region1/r2");


updateMyRack("/region1/r2");
repp = new RegionAwareEnsemblePlacementPolicy();
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
// Update cluster
Set<BookieId> addrs = new HashSet<BookieId>();
addrs.add(addr1.toBookieId());
addrs.add(addr2.toBookieId());
addrs.add(addr3.toBookieId());
addrs.add(addr4.toBookieId());
addrs.add(addr5.toBookieId());
addrs.add(addr6.toBookieId());
addrs.add(addr7.toBookieId());
addrs.add(addr8.toBookieId());
addrs.add(addr9.toBookieId());
repp.onClusterChanged(addrs, new HashSet<BookieId>());

int ensembleSize = 3;
int writeQuorumSize = 3;
int ackQuorumSize = 2;

Set<BookieId> excludeBookies = new HashSet<>();

int bookie1Count = 0;
int bookie8Count = 0;
int bookie9Count = 0;
for (int i = 0; i < 100; ++i) {
EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, excludeBookies);
List<BookieId> ensemble = ensembleResponse.getResult();
if (ensemble.contains(addr1.toBookieId())) {
bookie1Count++;
}
if (ensemble.contains(addr8.toBookieId())) {
bookie8Count++;
}
if (ensemble.contains(addr9.toBookieId())) {
bookie9Count++;
}

if (!ensemble.contains(addr8.toBookieId()) && !ensemble.contains(addr9.toBookieId())) {
fail("Failed to select bookie located on the same region and rack with bookie client");
}
if (ensemble.contains(addr2.toBookieId()) && ensemble.contains(addr3.toBookieId())) {
fail("addr2 and addr3 is same rack.");
}
}
LOG.info("Bookie1 Count: {}, Bookie8 Count: {}, Bookie9 Count: {}", bookie1Count, bookie8Count, bookie9Count);

//shutdown all the bookies located in the same region and rack with local node
// to test new ensemble should contain addr1
addrs.remove(addr8.toBookieId());
addrs.remove(addr9.toBookieId());
repp.onClusterChanged(addrs, new HashSet<BookieId>());
bookie1Count = 0;
bookie8Count = 0;
bookie9Count = 0;
for (int i = 0; i < 100; ++i) {
try {
EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, excludeBookies);
List<BookieId> ensemble = ensembleResponse.getResult();
if (ensemble.contains(addr1.toBookieId())) {
bookie1Count++;
}
if (ensemble.contains(addr8.toBookieId())) {
bookie8Count++;
}
if (ensemble.contains(addr9.toBookieId())) {
bookie9Count++;
}
if (!ensemble.contains(addr1.toBookieId())) {
fail("Failed to select bookie located on the same region with bookie client");
}
if (ensemble.contains(addr8.toBookieId()) || ensemble.contains(addr9.toBookieId())) {
fail("Selected the shutdown bookies");
}
} catch (BKNotEnoughBookiesException e) {
fail("Failed to select the ensemble.");
}
}
LOG.info("Bookie1 Count: {}, Bookie8 Count: {}, Bookie9 Count: {}", bookie1Count, bookie8Count, bookie9Count);

}
}
Loading