From 749f41ac7bf0ecae0b4a3e28e572da438c3fc833 Mon Sep 17 00:00:00 2001 From: Aled Sage Date: Thu, 5 Oct 2017 10:07:26 +0100 Subject: [PATCH] Fix ManagementNodeStateListener MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wait for rebind to complete before notifying of ‘MASTER’ --- .../api/mgmt/ha/HighAvailabilityManager.java | 2 +- .../mgmt/ha/HighAvailabilityManagerImpl.java | 14 +- .../ManagementNodeStateListenerManager.java | 70 ++++++ .../NonDeploymentManagementContext.java | 3 +- .../core/mgmt/rebind/RebindTestFixture.java | 9 +- .../core/mgmt/rebind/RebindTestUtils.java | 51 ++++- .../ManagementNodeStateListenerTest.java | 62 ++++-- .../launcher/common/BasicLauncher.java | 3 +- .../BrooklynEntityMirrorIntegrationTest.java | 2 +- ...RebindManagementNodeStateListenerTest.java | 199 ++++++++++++++++++ .../rest/testing/BrooklynRestApiTest.java | 2 +- .../rest/BrooklynRestApiLauncher.java | 5 +- .../entity/AbstractMultiDistroLiveTest.java | 2 +- ...wareProcessRebindNotRunningEntityTest.java | 6 +- 14 files changed, 389 insertions(+), 41 deletions(-) create mode 100644 launcher/src/test/java/org/apache/brooklyn/launcher/BrooklynLauncherRebindManagementNodeStateListenerTest.java diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ha/HighAvailabilityManager.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ha/HighAvailabilityManager.java index 6724a03b71..76002cd9d6 100644 --- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ha/HighAvailabilityManager.java +++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ha/HighAvailabilityManager.java @@ -67,7 +67,7 @@ public interface HighAvailabilityManager { * this is called after this HA Manager is started. */ @Beta - void disabled(); + void disabled(boolean persistenceEnabled); /** Whether HA mode is operational */ boolean isRunning(); diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java index 0148275973..ec00896536 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java @@ -62,6 +62,7 @@ import org.apache.brooklyn.core.mgmt.internal.ManagementTransitionMode; import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils; import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils.CreateBackupMode; +import org.apache.brooklyn.core.mgmt.persist.PersistMode; import org.apache.brooklyn.core.mgmt.persist.PersistenceActivityMetrics; import org.apache.brooklyn.core.mgmt.rebind.RebindManagerImpl; import org.apache.brooklyn.core.mgmt.usage.ManagementNodeStateListener; @@ -145,6 +146,7 @@ public long read() { private volatile Ticker optionalRemoteTickerUtc = null; private volatile Task pollingTask; + private volatile boolean persistenceEnabled; private volatile boolean disabled; private volatile boolean running; private volatile ManagementNodeState nodeState = ManagementNodeState.INITIALIZING; @@ -250,7 +252,8 @@ public boolean isRunning() { } @Override - public void disabled() { + public void disabled(boolean persistenceEnabled) { + this.persistenceEnabled = persistenceEnabled; disabled = true; // this is notionally the master, just not running; see javadoc for more info setNodeStateTransitionComplete(true); @@ -261,6 +264,7 @@ public void disabled() { @Override public void start(HighAvailabilityMode startMode) { setNodeStateTransitionComplete(true); + persistenceEnabled = true; disabled = false; running = true; changeMode(startMode, true, true); @@ -527,8 +531,12 @@ protected void setInternalNodeState(ManagementNodeState newState) { nodeStateHistory.remove(nodeStateHistory.size()-1); } } - ((RebindManagerImpl)managementContext.getRebindManager()).setAwaitingInitialRebind(running && - (ManagementNodeState.isHotProxy(newState) || newState==ManagementNodeState.MASTER)); + // If no HA (i.e. a standalone server) then we will be 'disabled'. We will still be + // queried for `getNodeState()` and therefore still want to record awaiting-initial-rebind + // if this standalone server has persistence enabled. + boolean awaitingInitialRebind = (running || (disabled && persistenceEnabled)) && + (ManagementNodeState.isHotProxy(newState) || newState==ManagementNodeState.MASTER); + ((RebindManagerImpl)managementContext.getRebindManager()).setAwaitingInitialRebind(awaitingInitialRebind); this.nodeState = newState; } diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/ManagementNodeStateListenerManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/ManagementNodeStateListenerManager.java index e148c7b706..729105f9df 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/ManagementNodeStateListenerManager.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/ManagementNodeStateListenerManager.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState; @@ -61,6 +62,9 @@ public class ManagementNodeStateListenerManager implements ManagementNodeStateLi private final ManagementContextInternal mgmt; private final Object mutex = new Object(); + private final Object waitForMasterRebindMutex = new Object(); + + private volatile boolean terminated; private final List listeners = Lists.newCopyOnWriteArrayList(); private ManagementNodeState lastPublishedVal; @@ -105,6 +109,10 @@ public ManagementNodeStateListenerManager(ManagementContextInternal managementCo @Override public void onStateChange(ManagementNodeState state) { // Filtering out duplicates/nulls, schedule the notification of the listeners with this latest value. + // + // If transitioning to master, also ensure we have finished rebinding to the persisted state + // (see comments in waitForMasterNotRebinding). + synchronized (mutex) { if (state != null && lastPublishedVal != state) { LOG.debug("Notifying {} listener(s) of management-node state changed to {}", new Object[] {listeners.size(), state}); @@ -112,6 +120,14 @@ public void onStateChange(ManagementNodeState state) { execOnListeners(new Function() { @Override public Void apply(ManagementNodeStateListener listener) { + if (state == ManagementNodeState.MASTER) { + boolean success = waitForMasterNotRebinding(); + if (!success) { + LOG.info("Not notifying listener {} of management-node state {} because did not finish rebinding", + new Object[] {listener, state}); + return null; + } + } listener.onStateChange(state); return null; } @@ -123,7 +139,61 @@ public String toString() { } } + /** + * If we have transitioned to master, ensure that we have also finished rebinding to persisted state. + * This is important so that a listener can list the apps etc. + * + * While waiting, it will abort if we are terminated or if the state transitions away from being master + * (e.g. rebind fails). + * + * @return true if we are still rebind completes are we are still master; false otherwise. + */ + private boolean waitForMasterNotRebinding() { + // This approach feels very hacky, but it's hard to do better: the internal state of Brooklyn is + // buried inside HighAvailabilityManager and RebindManager; their behaviours depend on the sequence + // of calls made to each (e.g. controlled by `BasicLauncher`). + // + // For example, if the haManager promotes us to master, it will mark its state as 'MASTER' before rebind + // (i.e. the RebindManager will not yet have been given the persisted state, so we won't know about our + // apps/catalog yet). To work around this, the haManager calls `RebindManagerImpl.setAwaitingInitialRebind`. + // Where required, one can therefore call `RebindManager.isAwaitingInitialRebind()` to find out if we + // are properly initialized. The REST api does this in its 'HaHotCheck' filter. + // + // if HA is disabled, then `highAvailabilityManager.disable()` will mark its state as 'MASTER' (because + // it will obviously be the only node in that HA-cluster). If persistence is disabled, that's fine. + // But if persistence is enabled, then the RebindManager will not yet have been given the persisted state. + // The haManager therefore also now calls setAwaitingInitialRebind in this situation. + // For how the REST api behaves, it will report 'MASTER' before rebind is done. However, /server/healthy + // will continue to say false until rebind is complete. This is because the `BasicLauncher` will be rebinding + // to the persisted state synchronously: it will only mark the management context as 'startup-complete' after + // rebind (via the call to LocalManagementContext.noteStartupComplete). + + while (true) { + boolean awaitingRebind = mgmt.getRebindManager().isAwaitingInitialRebind(); + ManagementNodeState state = mgmt.getHighAvailabilityManager().getNodeState(); + boolean abort = (state != ManagementNodeState.MASTER) || terminated || !mgmt.isRunning(); + + if (abort) { + return false; + } else if (!awaitingRebind) { + return true; + } + synchronized (waitForMasterRebindMutex) { + try { + waitForMasterRebindMutex.wait(100); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + } + } + } + public void terminate() { + terminated = true; + synchronized (waitForMasterRebindMutex) { + waitForMasterRebindMutex.notifyAll(); + } + // Wait for the listeners to finish + close the listeners Duration timeout = mgmt.getBrooklynProperties().getConfig(BrooklynServerConfig.MANAGEMENT_NODE_STATE_LISTENER_TERMINATION_TIMEOUT); if (listenerQueueSize.get() > 0) { diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java index b5fd0b584d..a1ab3df3a2 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java @@ -67,6 +67,7 @@ import org.apache.brooklyn.core.internal.storage.BrooklynStorage; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.mgmt.ha.OsgiManager; +import org.apache.brooklyn.core.mgmt.persist.PersistMode; import org.apache.brooklyn.core.mgmt.usage.UsageManager; import org.apache.brooklyn.core.objs.proxy.InternalEntityFactory; import org.apache.brooklyn.core.objs.proxy.InternalLocationFactory; @@ -629,7 +630,7 @@ public HighAvailabilityManager setPersister(ManagementPlaneSyncRecordPersister p throw new IllegalStateException("Non-deployment context "+NonDeploymentManagementContext.this+" is not valid for this operation."); } @Override - public void disabled() { + public void disabled(boolean persistenceEnabled) { throw new IllegalStateException("Non-deployment context "+NonDeploymentManagementContext.this+" is not valid for this operation."); } @Override diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindTestFixture.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindTestFixture.java index bad753d3ef..42fdc70922 100644 --- a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindTestFixture.java +++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindTestFixture.java @@ -296,12 +296,11 @@ protected T rebind(RebindOptions options) throws Exception { return newApp; } - protected T hotStandby() throws Exception { + protected ManagementContext hotStandby() throws Exception { return hotStandby(RebindOptions.create()); } - @SuppressWarnings("unchecked") - protected T hotStandby(RebindOptions options) throws Exception { + protected ManagementContext hotStandby(RebindOptions options) throws Exception { if (newApp != null || newManagementContext != null) { throw new IllegalStateException("already rebound - use switchOriginalToNewManagementContext() if you are trying to rebind multiple times"); } @@ -323,8 +322,8 @@ protected T hotStandby(RebindOptions options) throws Exception { RebindTestUtils.stopPersistence(origApp); newManagementContext = options.newManagementContext; - newApp = (T) RebindTestUtils.rebind(options); - return newApp; + RebindTestUtils.rebind(options); + return newManagementContext; } /** diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindTestUtils.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindTestUtils.java index 911db07c8f..428858b82f 100644 --- a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindTestUtils.java +++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindTestUtils.java @@ -31,8 +31,10 @@ import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.location.Location; import org.apache.brooklyn.api.mgmt.ManagementContext; +import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityManager; import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode; import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState; +import org.apache.brooklyn.api.mgmt.ha.ManagementPlaneSyncRecordPersister; import org.apache.brooklyn.api.mgmt.rebind.RebindExceptionHandler; import org.apache.brooklyn.api.mgmt.rebind.RebindManager; import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMemento; @@ -290,10 +292,11 @@ public LocalManagementContext buildStarted() { if (persistMode == PersistMode.DISABLED) { unstarted.generateManagementPlaneId(); unstarted.getCatalogInitialization().populateInitialCatalogOnly(); + unstarted.getHighAvailabilityManager().disabled(persistMode != PersistMode.DISABLED); } else if (haMode == HighAvailabilityMode.DISABLED) { unstarted.getRebindManager().rebind(classLoader, null, ManagementNodeState.MASTER); unstarted.getRebindManager().startPersistence(); - unstarted.getHighAvailabilityManager().disabled(); + unstarted.getHighAvailabilityManager().disabled(persistMode != PersistMode.DISABLED); } else { unstarted.getHighAvailabilityManager().start(haMode); } @@ -388,7 +391,11 @@ public static Application rebind(RebindOptions options) throws Exception { } Collection newApps = rebindAll(options); if (newApps.isEmpty()) { - if (hadApps) { + if (options.haMode != null && options.haMode != HighAvailabilityMode.DISABLED) { + // will rebind async, when promoted to master. + // Dont't assert here! Rely on caller to wait. + return null; + } else if (hadApps) { throw new IllegalStateException("Application could not be found after rebind; serialization probably failed"); } else { // no apps before; probably testing catalog @@ -469,6 +476,7 @@ public static Collection rebindAll(RebindOptions options) throws Ex HighAvailabilityMode haMode = (options.haMode == null ? HighAvailabilityMode.DISABLED : options.haMode); RebindExceptionHandler exceptionHandler = options.exceptionHandler; boolean hasPersister = newManagementContext != null && newManagementContext.getRebindManager().getPersister() != null; + boolean hasHaPersister = newManagementContext != null && newManagementContext.getHighAvailabilityManager().getPersister() != null; boolean checkSerializable = options.checkSerializable; boolean terminateOrigManagementContext = options.terminateOrigManagementContext; Function stateTransformer = options.stateTransformer; @@ -517,12 +525,39 @@ public static Collection rebindAll(RebindOptions options) throws Ex stateTransformer.apply(persister); } - List newApps = newManagementContext.getRebindManager().rebind( - classLoader, - exceptionHandler, - (haMode == HighAvailabilityMode.DISABLED) ? ManagementNodeState.MASTER : ManagementNodeState.of(haMode).get()); - newManagementContext.getRebindManager().startPersistence(); - return newApps; + if (haMode == HighAvailabilityMode.DISABLED) { + HighAvailabilityManager haManager = newManagementContext.getHighAvailabilityManager(); + haManager.disabled(true); + + List newApps = newManagementContext.getRebindManager().rebind( + classLoader, + exceptionHandler, + (haMode == HighAvailabilityMode.DISABLED) ? ManagementNodeState.MASTER : ManagementNodeState.of(haMode).get()); + newManagementContext.getRebindManager().startPersistence(); + + return newApps; + + } else { + HighAvailabilityManager haManager = newManagementContext.getHighAvailabilityManager(); + + if (!hasHaPersister) { + if (hasPersister) throw new IllegalStateException("Must not supply persister for RebindManager but not for HighAvailabilityManager"); + assert objectStore != null; + + ManagementPlaneSyncRecordPersister persister = + new ManagementPlaneSyncRecordPersisterToObjectStore(newManagementContext, + objectStore, + newManagementContext.getCatalogClassLoader()); + + haManager.setPersister(persister); + } + + haManager.start(haMode); + + // TODO We'll be promoted to master asynchronously; will not yet have done our rebind. + // Could block here for rebind to complete but do any callers really need us to do that? + return ImmutableList.of(); + } } public static void waitForPersisted(Application origApp) throws InterruptedException, TimeoutException { diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/usage/ManagementNodeStateListenerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/usage/ManagementNodeStateListenerTest.java index 0c1e07bb20..e5dc961693 100644 --- a/core/src/test/java/org/apache/brooklyn/core/mgmt/usage/ManagementNodeStateListenerTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/usage/ManagementNodeStateListenerTest.java @@ -27,8 +27,10 @@ import java.util.List; +import org.apache.brooklyn.api.mgmt.ManagementContext; import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState; import org.apache.brooklyn.core.internal.BrooklynProperties; +import org.apache.brooklyn.core.mgmt.ManagementContextInjectable; import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext; import org.apache.brooklyn.core.server.BrooklynServerConfig; import org.apache.brooklyn.core.test.BrooklynMgmtUnitTestSupport; @@ -62,7 +64,7 @@ private LocalManagementContext newManagementContext(BrooklynProperties brooklynP // Need to call HighAvailabilityManager explicitly; otherwise it will never publish // the ManagementNodeState. LocalManagementContext result = LocalManagementContextForTests.newInstance(brooklynProperties); - result.getHighAvailabilityManager().disabled(); + result.getHighAvailabilityManager().disabled(false); result.noteStartupComplete(); return result; } @@ -73,10 +75,11 @@ public void testAddUsageListenerInstance() throws Exception { brooklynProperties.put(BrooklynServerConfig.MANAGEMENT_NODE_STATE_LISTENERS, ImmutableList.of(new RecordingStaticManagementNodeStateListener())); replaceManagementContext(newManagementContext(brooklynProperties)); - assertEventsEventually(RecordingStaticManagementNodeStateListener.getInstance(), ImmutableList.of(INITIALIZING, MASTER)); + assertEquals(RecordingStaticManagementNodeStateListener.getInstance().getManagementContext(), mgmt); + RecordingStaticManagementNodeStateListener.getInstance().assertEventsEventually(ImmutableList.of(INITIALIZING, MASTER)); mgmt.terminate(); - assertEventsEventually(RecordingStaticManagementNodeStateListener.getInstance(), ImmutableList.of(INITIALIZING, MASTER, TERMINATED)); + RecordingStaticManagementNodeStateListener.getInstance().assertEventsEventually(ImmutableList.of(INITIALIZING, MASTER, TERMINATED)); } @Test @@ -85,7 +88,7 @@ public void testAddUsageListenerViaProperties() throws Exception { brooklynProperties.put(BrooklynServerConfig.MANAGEMENT_NODE_STATE_LISTENERS, RecordingStaticManagementNodeStateListener.class.getName()); replaceManagementContext(newManagementContext(brooklynProperties)); - assertEventsEventually(RecordingStaticManagementNodeStateListener.getInstance(), ImmutableList.of(INITIALIZING, MASTER)); + RecordingStaticManagementNodeStateListener.getInstance().assertEventsEventually(ImmutableList.of(INITIALIZING, MASTER)); } @Test @@ -99,8 +102,8 @@ public void testAddMultipleUsageListenersViaProperties() throws Exception { assertTrue(listeners.get(0) instanceof RecordingStaticManagementNodeStateListener, "listeners="+listeners); assertTrue(listeners.get(1) instanceof RecordingStaticManagementNodeStateListener, "listeners="+listeners); - assertEventsEventually(listeners.get(0), ImmutableList.of(INITIALIZING, MASTER)); - assertEventsEventually(listeners.get(1), ImmutableList.of(INITIALIZING, MASTER)); + listeners.get(0).assertEventsEventually(ImmutableList.of(INITIALIZING, MASTER)); + listeners.get(1).assertEventsEventually(ImmutableList.of(INITIALIZING, MASTER)); } @Test(expectedExceptions = ClassCastException.class) @@ -110,15 +113,6 @@ public void testErrorWhenConfiguredClassIsNotAListener() { replaceManagementContext(LocalManagementContextForTests.newInstance(brooklynProperties)); } - private void assertEventsEventually(RecordingManagementNodeStateListener listener, List expected) { - Asserts.succeedsEventually(new Runnable() { - public void run() { - List actual = listener.getEvents(); - String errMsg = "actual="+actual+"; expected="+expected; - assertEquals(actual, expected, errMsg); - }}); - } - public static class RecordingStaticManagementNodeStateListener extends RecordingManagementNodeStateListener implements ManagementNodeStateListener { private static final List STATIC_INSTANCES = Lists.newCopyOnWriteArrayList(); @@ -126,6 +120,14 @@ public static RecordingStaticManagementNodeStateListener getInstance() { return Iterables.getOnlyElement(STATIC_INSTANCES); } + public static RecordingStaticManagementNodeStateListener getInstanceEventually() { + Asserts.succeedsEventually(new Runnable() { + public void run() { + assertTrue(STATIC_INSTANCES.size() > 0); + }}); + return getInstance(); + } + public static RecordingStaticManagementNodeStateListener getLastInstance() { return Iterables.getLast(STATIC_INSTANCES); } @@ -145,9 +147,15 @@ public RecordingStaticManagementNodeStateListener() { } } - public static class RecordingManagementNodeStateListener implements ManagementNodeStateListener { + public static class RecordingManagementNodeStateListener implements ManagementNodeStateListener, ManagementContextInjectable { private final List events = Lists.newCopyOnWriteArrayList(); + private ManagementContext managementContext; + @Override + public void setManagementContext(ManagementContext managementContext) { + this.managementContext = managementContext; + } + @Override public void onStateChange(ManagementNodeState state) { events.add(state); @@ -156,5 +164,27 @@ public void onStateChange(ManagementNodeState state) { public List getEvents() { return ImmutableList.copyOf(events); } + + public ManagementContext getManagementContext() { + return managementContext; + } + + public void assertEventsEventually(List expected) { + Asserts.succeedsEventually(new Runnable() { + public void run() { + List actual = getEvents(); + String errMsg = "actual="+actual+"; expected="+expected; + assertEquals(actual, expected, errMsg); + }}); + } + + public void assertEventsContinually(List expected) { + Asserts.succeedsContinually(new Runnable() { + public void run() { + List actual = getEvents(); + String errMsg = "actual="+actual+"; expected="+expected; + assertEquals(actual, expected, errMsg); + }}); + } } } diff --git a/launcher-common/src/main/java/org/apache/brooklyn/launcher/common/BasicLauncher.java b/launcher-common/src/main/java/org/apache/brooklyn/launcher/common/BasicLauncher.java index 19b6e2025c..ddb3f8ec25 100644 --- a/launcher-common/src/main/java/org/apache/brooklyn/launcher/common/BasicLauncher.java +++ b/launcher-common/src/main/java/org/apache/brooklyn/launcher/common/BasicLauncher.java @@ -439,6 +439,7 @@ protected T startPartTwo() { } else { ((LocalManagementContext)managementContext).generateManagementPlaneId(); populateInitialCatalogNoPersistence(catalogInitialization); + managementContext.getHighAvailabilityManager().disabled(false); } markCatalogStarted(catalogInitialization); addLocations(); @@ -614,7 +615,7 @@ protected void startPersistence() { // Now start the HA Manager and the Rebind manager, as required if (highAvailabilityMode == HighAvailabilityMode.DISABLED) { HighAvailabilityManager haManager = managementContext.getHighAvailabilityManager(); - haManager.disabled(); + haManager.disabled(true); startPersistenceWithoutHA(); diff --git a/launcher/src/test/java/org/apache/brooklyn/entity/brooklynnode/BrooklynEntityMirrorIntegrationTest.java b/launcher/src/test/java/org/apache/brooklyn/entity/brooklynnode/BrooklynEntityMirrorIntegrationTest.java index 3c874f84fd..a665149c3a 100644 --- a/launcher/src/test/java/org/apache/brooklyn/entity/brooklynnode/BrooklynEntityMirrorIntegrationTest.java +++ b/launcher/src/test/java/org/apache/brooklyn/entity/brooklynnode/BrooklynEntityMirrorIntegrationTest.java @@ -88,7 +88,7 @@ protected void setUpServer(ManagementContext mgmt, boolean skipSecurity) { server.skipSecurity(skipSecurity); server.start(); - serverMgmt.getHighAvailabilityManager().disabled(); + serverMgmt.getHighAvailabilityManager().disabled(false); serverApp = TestApplication.Factory.newManagedInstanceForTests(serverMgmt); ((LocalManagementContextForTests)serverMgmt).noteStartupComplete(); diff --git a/launcher/src/test/java/org/apache/brooklyn/launcher/BrooklynLauncherRebindManagementNodeStateListenerTest.java b/launcher/src/test/java/org/apache/brooklyn/launcher/BrooklynLauncherRebindManagementNodeStateListenerTest.java new file mode 100644 index 0000000000..e417bbc268 --- /dev/null +++ b/launcher/src/test/java/org/apache/brooklyn/launcher/BrooklynLauncherRebindManagementNodeStateListenerTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.launcher; + +import static org.apache.brooklyn.api.mgmt.ha.ManagementNodeState.INITIALIZING; +import static org.apache.brooklyn.api.mgmt.ha.ManagementNodeState.MASTER; +import static org.apache.brooklyn.api.mgmt.ha.ManagementNodeState.STANDBY; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.brooklyn.api.entity.Application; +import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode; +import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState; +import org.apache.brooklyn.api.mgmt.rebind.RebindExceptionHandler; +import org.apache.brooklyn.core.catalog.internal.CatalogInitialization; +import org.apache.brooklyn.core.internal.BrooklynProperties; +import org.apache.brooklyn.core.mgmt.persist.PersistMode; +import org.apache.brooklyn.core.mgmt.usage.ManagementNodeStateListenerTest.RecordingStaticManagementNodeStateListener; +import org.apache.brooklyn.core.server.BrooklynServerConfig; +import org.apache.brooklyn.core.test.entity.LocalManagementContextForTests; +import org.apache.brooklyn.core.test.entity.TestApplication; +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +public class BrooklynLauncherRebindManagementNodeStateListenerTest extends AbstractBrooklynLauncherRebindTest { + + private AtomicReference populateInitialCatalogOnlyLatch = new AtomicReference<>(); + private AtomicReference populateInitialAndPersistedCatalogLatch = new AtomicReference<>(); + + @BeforeMethod(alwaysRun=true) + @Override + public void setUp() throws Exception { + super.setUp(); + RecordingStaticManagementNodeStateListener.clearInstances(); + populateInitialCatalogOnlyLatch.set(null); + populateInitialAndPersistedCatalogLatch.set(null); + } + + @AfterMethod(alwaysRun=true) + @Override + public void tearDown() throws Exception { + super.tearDown(); + RecordingStaticManagementNodeStateListener.clearInstances(); + } + + @Override + protected BrooklynLauncher newLauncherForTests(PersistMode persistMode, HighAvailabilityMode haMode) { + BrooklynProperties brooklynProperties = LocalManagementContextForTests.builder(true).buildProperties(); + brooklynProperties.put(BrooklynServerConfig.MANAGEMENT_NODE_STATE_LISTENERS, RecordingStaticManagementNodeStateListener.class.getName()); + + // If latches are set, then blocks startup + CatalogInitialization catInit = new CatalogInitialization() { + @Override + public void populateInitialAndPersistedCatalog(ManagementNodeState mode, PersistedCatalogState persistedState, RebindExceptionHandler exceptionHandler, RebindLogger rebindLogger) { + awaitIfNotNull(populateInitialAndPersistedCatalogLatch); + super.populateInitialAndPersistedCatalog(mode, persistedState, exceptionHandler, rebindLogger); + } + @Override + public void populateInitialCatalogOnly() { + awaitIfNotNull(populateInitialCatalogOnlyLatch); + super.populateInitialCatalogOnly(); + } + private void awaitIfNotNull(AtomicReference latchRef) { + CountDownLatch latch = latchRef.get(); + if (latch != null) { + try { + latch.await(); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + } + } + }; + + return super.newLauncherForTests(persistMode, haMode) + .brooklynProperties(brooklynProperties) + .catalogInitialization(catInit); + } + + @Test + public void testNotifiesWhenPersistenceOff() throws Exception { + BrooklynLauncher launcher = newLauncherForTests(PersistMode.DISABLED, HighAvailabilityMode.DISABLED); + launcher.start(); + RecordingStaticManagementNodeStateListener.getInstance().assertEventsEventually(ImmutableList.of(INITIALIZING, MASTER)); + } + + @Test + public void testNotifiesOnRebind() throws Exception { + // Starting with no persisted state + BrooklynLauncher launcher = newLauncherForTests(); + launcher.start(); + RecordingStaticManagementNodeStateListener.getInstance().assertEventsEventually(ImmutableList.of(INITIALIZING, MASTER)); + RecordingStaticManagementNodeStateListener.clearInstances(); + + launcher.terminate(); + + // Starting with a populated persisted state dir + BrooklynLauncher newLauncher = newLauncherForTests(); + newLauncher.start(); + RecordingStaticManagementNodeStateListener.getInstance().assertEventsEventually(ImmutableList.of(INITIALIZING, MASTER)); + } + + @Test + public void testNotifiesOnHighAvailabilityPromotion() throws Exception { + // Start master + BrooklynLauncher launcher = newLauncherForTests(PersistMode.AUTO, HighAvailabilityMode.AUTO); + launcher.start(); + RecordingStaticManagementNodeStateListener.getInstance().assertEventsEventually(ImmutableList.of(INITIALIZING, STANDBY, MASTER)); + RecordingStaticManagementNodeStateListener.clearInstances(); + + // Start standby + BrooklynLauncher launcher2 = newLauncherForTests(PersistMode.AUTO, HighAvailabilityMode.AUTO); + launcher2.start(); + RecordingStaticManagementNodeStateListener.getInstance().assertEventsEventually(ImmutableList.of(INITIALIZING, STANDBY)); + + // Promote standby to master + launcher.terminate(); + RecordingStaticManagementNodeStateListener.getInstance().assertEventsEventually(ImmutableList.of(INITIALIZING, STANDBY, MASTER)); + } + + @Test + public void testNotifiesOfMasterOnlyAfterRebindingAppsWhenHaDisabled() throws Exception { + runNotifiesOfMasterOnlyAfterRebindingApps(HighAvailabilityMode.DISABLED); + } + + @Test + public void testNotifiesOfMasterOnlyAfterRebindingAppsWhenHaEnabled() throws Exception { + runNotifiesOfMasterOnlyAfterRebindingApps(HighAvailabilityMode.AUTO); + } + + protected void runNotifiesOfMasterOnlyAfterRebindingApps(HighAvailabilityMode haMode) throws Exception { + // Populate the persisted state with an app + BrooklynLauncher origLauncher = newLauncherForTests(PersistMode.AUTO, haMode); + origLauncher.start(); + TestApplication origApp = origLauncher.getManagementContext().getEntityManager().createEntity(EntitySpec.create(TestApplication.class)); + origLauncher.terminate(); + RecordingStaticManagementNodeStateListener.clearInstances(); + + // Start an app async (causing its rebind to block until we release the latch) + populateInitialAndPersistedCatalogLatch.set(new CountDownLatch(1)); + + BrooklynLauncher launcher2 = newLauncherForTests(PersistMode.AUTO, haMode); + Thread t = new Thread() { + public void run() { + launcher2.start(); + } + }; + try { + List expectedInitialStates = (haMode == HighAvailabilityMode.DISABLED) ? + ImmutableList.of(INITIALIZING) : + ImmutableList.of(INITIALIZING, STANDBY); + List expectedFinalStates = (haMode == HighAvailabilityMode.DISABLED) ? + ImmutableList.of(INITIALIZING, MASTER) : + ImmutableList.of(INITIALIZING, STANDBY, MASTER); + + t.start(); + RecordingStaticManagementNodeStateListener.getInstanceEventually().assertEventsEventually(expectedInitialStates); + RecordingStaticManagementNodeStateListener.getInstance().assertEventsContinually(expectedInitialStates); + assertTrue(t.isAlive()); + + // Let it complete; now expect to get the callback that we are master + populateInitialAndPersistedCatalogLatch.get().countDown(); + RecordingStaticManagementNodeStateListener.getInstance().assertEventsEventually(expectedFinalStates); + Application newApp = Iterables.getOnlyElement(launcher2.getManagementContext().getApplications()); + assertEquals(newApp.getId(), origApp.getId()); + + t.join(Asserts.DEFAULT_LONG_TIMEOUT.toMilliseconds()); + } finally { + t.interrupt(); + } + } +} diff --git a/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/testing/BrooklynRestApiTest.java b/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/testing/BrooklynRestApiTest.java index 2ae018334a..54c9384419 100644 --- a/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/testing/BrooklynRestApiTest.java +++ b/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/testing/BrooklynRestApiTest.java @@ -147,7 +147,7 @@ protected synchronized ManagementContext getManagementContext() { } else { manager = new LocalManagementContextForTests(); } - manager.getHighAvailabilityManager().disabled(); + manager.getHighAvailabilityManager().disabled(false); ((LocalManagementContext)manager).generateManagementPlaneId(); new BrooklynCampPlatformLauncherNoServer() diff --git a/rest/rest-server/src/test/java/org/apache/brooklyn/rest/BrooklynRestApiLauncher.java b/rest/rest-server/src/test/java/org/apache/brooklyn/rest/BrooklynRestApiLauncher.java index b7c6b2e492..617fb77b78 100644 --- a/rest/rest-server/src/test/java/org/apache/brooklyn/rest/BrooklynRestApiLauncher.java +++ b/rest/rest-server/src/test/java/org/apache/brooklyn/rest/BrooklynRestApiLauncher.java @@ -304,8 +304,9 @@ private static Server startServer(ManagementContext mgmt, ContextHandler context ((BrooklynProperties)mgmt.getConfig()).put(BrooklynWebConfig.SECURITY_PROVIDER_CLASSNAME, AnyoneSecurityProvider.class.getName()); } } - if (mgmt != null && disableHighAvailability) - mgmt.getHighAvailabilityManager().disabled(); + if (mgmt != null && disableHighAvailability) { + mgmt.getHighAvailabilityManager().disabled(false); + } InetSocketAddress bindLocation = new InetSocketAddress( secure ? Networking.ANY_NIC : Networking.LOOPBACK, Networking.nextAvailablePort(FAVOURITE_PORT)); diff --git a/software/base/src/test/java/org/apache/brooklyn/entity/AbstractMultiDistroLiveTest.java b/software/base/src/test/java/org/apache/brooklyn/entity/AbstractMultiDistroLiveTest.java index fa44f184ad..ff29408e36 100644 --- a/software/base/src/test/java/org/apache/brooklyn/entity/AbstractMultiDistroLiveTest.java +++ b/software/base/src/test/java/org/apache/brooklyn/entity/AbstractMultiDistroLiveTest.java @@ -97,7 +97,7 @@ public void setUp() throws Exception { localManagementContextForTests.generateManagementPlaneId(); mgmt = localManagementContextForTests; - mgmt.getHighAvailabilityManager().disabled(); + mgmt.getHighAvailabilityManager().disabled(false); super.setUp(); } diff --git a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessRebindNotRunningEntityTest.java b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessRebindNotRunningEntityTest.java index be591ef60c..6a58efd0ee 100644 --- a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessRebindNotRunningEntityTest.java +++ b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessRebindNotRunningEntityTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.brooklyn.api.entity.Application; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntitySpec; import org.apache.brooklyn.api.entity.ImplementedBy; @@ -40,6 +41,7 @@ import org.apache.brooklyn.api.location.MachineLocation; import org.apache.brooklyn.api.location.MachineProvisioningLocation; import org.apache.brooklyn.api.location.NoMachinesAvailableException; +import org.apache.brooklyn.api.mgmt.ManagementContext; import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; @@ -319,7 +321,9 @@ public CustomResponse generate(ExecParams execParams) throws Exception { EntityAsserts.assertAttributeEquals(entity, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.STARTING); // Check that the read-only hot standby does not overwrite the entity's state; it should still say "STARTING" - TestApplication newApp = hotStandby(); + ManagementContext newManagementContext = hotStandby(); + Asserts.succeedsEventually(() -> assertTrue(newManagementContext.getApplications().size() > 0)); + Application newApp = Iterables.getOnlyElement(newManagementContext.getApplications()); final VanillaSoftwareProcess newEntity = (VanillaSoftwareProcess) Iterables.find(newApp.getChildren(), Predicates.instanceOf(VanillaSoftwareProcess.class)); assertNotMarkedOnfire(newEntity, Lifecycle.STARTING);