Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-16831. [RBF SBN] GetNamenodesForNameserviceId should shuffle Observer NameNodes every time #5098

Merged
merged 7 commits into from
Dec 23, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,45 @@ private void updateNameNodeState(final String nsId,
}
}

private <T extends FederationNamenodeContext> List<T> shuffleObserverNN(
goiri marked this conversation as resolved.
Show resolved Hide resolved
List<T> inputNameNodes, boolean listObserversFirst) {
if (!listObserversFirst) {
return inputNameNodes;
}
// Get Observers first.
List<T> observerList = new ArrayList<>();
for (T t : inputNameNodes) {
if (t.getState() == OBSERVER) {
observerList.add(t);
} else {
// The inputNameNodes are already sorted, so it can break
goiri marked this conversation as resolved.
Show resolved Hide resolved
// when the first non-observer is encountered.
break;
}
}
// Returns the inputNameNodes if no shuffle is required
if (observerList.size() <= 1) {
return inputNameNodes;
}

// Shuffle multiple Observers
Collections.shuffle(observerList);

List<T> ret = new ArrayList<>(inputNameNodes.size());
ret.addAll(observerList);
for (int i = observerList.size(); i < inputNameNodes.size(); i++) {
ret.add(inputNameNodes.get(i));
}
return Collections.unmodifiableList(ret);
}

@Override
public List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
final String nsId, boolean listObserversFirst) throws IOException {

List<? extends FederationNamenodeContext> ret = cacheNS.get(Pair.of(nsId, listObserversFirst));
if (ret != null) {
return ret;
return shuffleObserverNN(ret, listObserversFirst);
}

// Not cached, generate the value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,83 @@ public void setup() throws IOException, InterruptedException {
assertTrue(cleared);
}

@Test
public void testShuffleObserverNNs() throws Exception {
// Add an active entry to the store
NamenodeStatusReport activeReport = createNamenodeReport(
NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE);
assertTrue(namenodeResolver.registerNamenode(activeReport));

// Add a standby entry to the store
NamenodeStatusReport standbyReport = createNamenodeReport(
NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY);
assertTrue(namenodeResolver.registerNamenode(standbyReport));

// Load cache
stateStore.refreshCaches(true);

// Get namenodes from state store.
List<? extends FederationNamenodeContext> withoutObserver =
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(2, withoutObserver.size());
assertEquals(FederationNamenodeServiceState.ACTIVE, withoutObserver.get(0).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, withoutObserver.get(1).getState());

// Get namenodes from cache.
withoutObserver = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(2, withoutObserver.size());
assertEquals(FederationNamenodeServiceState.ACTIVE, withoutObserver.get(0).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, withoutObserver.get(1).getState());

// Add an observer entry to the store
NamenodeStatusReport observerReport1 = createNamenodeReport(
NAMESERVICES[0], NAMENODES[2], HAServiceState.OBSERVER);
assertTrue(namenodeResolver.registerNamenode(observerReport1));

// Load cache
stateStore.refreshCaches(true);

// Get namenodes from state store.
List<? extends FederationNamenodeContext> observerList =
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(3, observerList.size());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList.get(0).getState());
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList.get(1).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, observerList.get(2).getState());

// Get namenodes from cache.
observerList = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(3, observerList.size());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList.get(0).getState());
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList.get(1).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, observerList.get(2).getState());

// Add one new observer entry to the store
NamenodeStatusReport observerReport2 = createNamenodeReport(
NAMESERVICES[0], NAMENODES[3], HAServiceState.OBSERVER);
assertTrue(namenodeResolver.registerNamenode(observerReport2));

// Load cache
stateStore.refreshCaches(true);

// Get namenodes from state store.
List<? extends FederationNamenodeContext> observerList2 =
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(4, observerList2.size());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(0).getState());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(1).getState());
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList2.get(2).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, observerList2.get(3).getState());

// Get namenodes from cache.
observerList2 = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(4, observerList2.size());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(0).getState());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(1).getState());
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList2.get(2).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, observerList2.get(3).getState());
}

@Test
public void testStateStoreDisconnected() throws Exception {

Expand Down