Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-16831. [RBF SBN] GetNamenodesForNameserviceId should shuffle Observer NameNodes every time #5098

Merged
merged 7 commits into from
Dec 23, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,53 @@ private void updateNameNodeState(final String nsId,
}
}

/**
* Try to shuffle the multiple observer namenodes if listObserversFirst is true.
* @param inputNameNodes the input FederationNamenodeContext list. If listObserversFirst is true,
* all observers will be placed at the front of the collection.
* @param listObserversFirst true if we need to shuffle the multiple front observer namenodes.
* @return a list of FederationNamenodeContext.
* @param <T> a subclass of FederationNamenodeContext.
*/
private <T extends FederationNamenodeContext> List<T> shuffleObserverNN(
goiri marked this conversation as resolved.
Show resolved Hide resolved
List<T> inputNameNodes, boolean listObserversFirst) {
if (!listObserversFirst) {
return inputNameNodes;
}
// Get Observers first.
List<T> observerList = new ArrayList<>();
for (T t : inputNameNodes) {
if (t.getState() == OBSERVER) {
observerList.add(t);
} else {
// The inputNameNodes are already sorted, so it can break
goiri marked this conversation as resolved.
Show resolved Hide resolved
// when the first non-observer is encountered.
break;
}
}
// Returns the inputNameNodes if no shuffle is required
if (observerList.size() <= 1) {
return inputNameNodes;
}

// Shuffle multiple Observers
Collections.shuffle(observerList);

List<T> ret = new ArrayList<>(inputNameNodes.size());
ret.addAll(observerList);
for (int i = observerList.size(); i < inputNameNodes.size(); i++) {
ret.add(inputNameNodes.get(i));
}
return Collections.unmodifiableList(ret);
}

@Override
public List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
final String nsId, boolean listObserversFirst) throws IOException {

List<? extends FederationNamenodeContext> ret = cacheNS.get(Pair.of(nsId, listObserversFirst));
if (ret != null) {
return ret;
return shuffleObserverNN(ret, listObserversFirst);
}

// Not cached, generate the value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,98 @@ public void setup() throws IOException, InterruptedException {
assertTrue(cleared);
}

@Test
public void testShuffleObserverNNs() throws Exception {
// Add an active entry to the store
NamenodeStatusReport activeReport = createNamenodeReport(
NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE);
assertTrue(namenodeResolver.registerNamenode(activeReport));

// Add a standby entry to the store
NamenodeStatusReport standbyReport = createNamenodeReport(
NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY);
assertTrue(namenodeResolver.registerNamenode(standbyReport));

// Load cache
stateStore.refreshCaches(true);

// Get namenodes from state store.
List<? extends FederationNamenodeContext> withoutObserver =
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(2, withoutObserver.size());
assertEquals(FederationNamenodeServiceState.ACTIVE, withoutObserver.get(0).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, withoutObserver.get(1).getState());

// Get namenodes from cache.
withoutObserver = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(2, withoutObserver.size());
assertEquals(FederationNamenodeServiceState.ACTIVE, withoutObserver.get(0).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, withoutObserver.get(1).getState());

// Add an observer entry to the store
NamenodeStatusReport observerReport1 = createNamenodeReport(
NAMESERVICES[0], NAMENODES[2], HAServiceState.OBSERVER);
assertTrue(namenodeResolver.registerNamenode(observerReport1));

// Load cache
stateStore.refreshCaches(true);

// Get namenodes from state store.
List<? extends FederationNamenodeContext> observerList =
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(3, observerList.size());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList.get(0).getState());
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList.get(1).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, observerList.get(2).getState());

// Get namenodes from cache.
observerList = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(3, observerList.size());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList.get(0).getState());
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList.get(1).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, observerList.get(2).getState());

// Add one new observer entry to the store
NamenodeStatusReport observerReport2 = createNamenodeReport(
NAMESERVICES[0], NAMENODES[3], HAServiceState.OBSERVER);
assertTrue(namenodeResolver.registerNamenode(observerReport2));

// Load cache
stateStore.refreshCaches(true);

// Get namenodes from state store.
List<? extends FederationNamenodeContext> observerList2 =
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(4, observerList2.size());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(0).getState());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(1).getState());
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList2.get(2).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, observerList2.get(3).getState());

// Get namenodes from cache.
observerList2 = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(4, observerList2.size());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(0).getState());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(1).getState());
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList2.get(2).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, observerList2.get(3).getState());

// Test shuffler
List<? extends FederationNamenodeContext> observerList3;
boolean hit = false;
for (int i = 0; i < 1000; i++) {
observerList3 = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList3.get(0).getState());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList3.get(1).getState());
if (observerList3.get(0).getNamenodeId().equals(observerList2.get(1).getNamenodeId()) &&
observerList3.get(1).getNamenodeId().equals(observerList2.get(0).getNamenodeId())) {
hit = true;
break;
}
}
assertTrue(hit);
}

@Test
public void testStateStoreDisconnected() throws Exception {

Expand Down