diff --git a/server/sonar-main/src/main/java/org/sonar/application/AppStateFactory.java b/server/sonar-main/src/main/java/org/sonar/application/AppStateFactory.java index a2b9cbd7d700..0498f2e41005 100644 --- a/server/sonar-main/src/main/java/org/sonar/application/AppStateFactory.java +++ b/server/sonar-main/src/main/java/org/sonar/application/AppStateFactory.java @@ -42,7 +42,7 @@ public AppStateFactory(AppSettings settings) { public AppState create() { if (ClusterSettings.isClusterEnabled(settings)) { HazelcastMember hzMember = createHzMember(settings.getProps()); - return new ClusterAppStateImpl(hzMember); + return new ClusterAppStateImpl(settings, hzMember); } return new AppStateImpl(); } diff --git a/server/sonar-main/src/main/java/org/sonar/application/SchedulerImpl.java b/server/sonar-main/src/main/java/org/sonar/application/SchedulerImpl.java index 4862cfa89dea..8589e29414a9 100644 --- a/server/sonar-main/src/main/java/org/sonar/application/SchedulerImpl.java +++ b/server/sonar-main/src/main/java/org/sonar/application/SchedulerImpl.java @@ -28,8 +28,6 @@ import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.sonar.application.cluster.ClusterAppState; -import org.sonar.application.cluster.health.SearchNodeHealthProvider; import org.sonar.application.command.CommandFactory; import org.sonar.application.command.EsCommand; import org.sonar.application.command.JavaCommand; @@ -41,10 +39,7 @@ import org.sonar.application.process.ProcessLifecycleListener; import org.sonar.application.process.ProcessMonitor; import org.sonar.application.process.SQProcess; -import org.sonar.process.NetworkUtils; import org.sonar.process.ProcessId; -import org.sonar.application.cluster.health.HealthStateSharing; -import org.sonar.application.cluster.health.HealthStateSharingImpl; public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLifecycleListener, AppStateListener { @@ -65,7 +60,6 @@ public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLi private final AtomicInteger stopCountDown = new AtomicInteger(0); private StopperThread stopperThread; private RestarterThread restarterThread; - private HealthStateSharing healthStateSharing; private long processWatcherDelayMs = SQProcess.DEFAULT_WATCHER_DELAY_MS; public SchedulerImpl(AppSettings settings, AppReloader appReloader, CommandFactory commandFactory, @@ -105,7 +99,6 @@ public void schedule() { } private void tryToStartAll() { - tryToStartHealthStateSharing(); tryToStartEs(); tryToStartWeb(); tryToStartCe(); @@ -144,18 +137,6 @@ private void tryToStartCe() { } } - private void tryToStartHealthStateSharing() { - if (healthStateSharing == null - && appState instanceof ClusterAppState - && ClusterSettings.isLocalElasticsearchEnabled(settings)) { - ClusterAppState clusterAppState = (ClusterAppState) appState; - this.healthStateSharing = new HealthStateSharingImpl( - clusterAppState.getHazelcastMember(), - new SearchNodeHealthProvider(settings.getProps(), clusterAppState, NetworkUtils.INSTANCE)); - this.healthStateSharing.start(); - } - } - private boolean isEsClientStartable() { boolean requireLocalEs = ClusterSettings.isLocalElasticsearchEnabled(settings); return appState.isOperational(ProcessId.ELASTICSEARCH, requireLocalEs); @@ -190,7 +171,6 @@ private void stopAll() { stopProcess(ProcessId.COMPUTE_ENGINE); stopProcess(ProcessId.WEB_SERVER); stopProcess(ProcessId.ELASTICSEARCH); - stopHealthStateSharing(); } /** @@ -204,12 +184,6 @@ private void stopProcess(ProcessId processId) { } } - private void stopHealthStateSharing() { - if (healthStateSharing != null) { - healthStateSharing.stop(); - } - } - /** * Blocks until all processes are stopped. Pending restart, if * any, is disabled. diff --git a/server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java b/server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java index 80940e96b389..3c23a5501eb9 100644 --- a/server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java +++ b/server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java @@ -39,7 +39,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.sonar.application.AppStateListener; +import org.sonar.application.cluster.health.HealthStateSharing; +import org.sonar.application.cluster.health.HealthStateSharingImpl; +import org.sonar.application.cluster.health.SearchNodeHealthProvider; +import org.sonar.application.config.AppSettings; +import org.sonar.application.config.ClusterSettings; import org.sonar.process.MessageException; +import org.sonar.process.NetworkUtils; import org.sonar.process.ProcessId; import org.sonar.process.cluster.NodeType; import org.sonar.process.cluster.hz.HazelcastMember; @@ -60,14 +66,20 @@ public class ClusterAppStateImpl implements ClusterAppState { private final ReplicatedMap operationalProcesses; private final String operationalProcessListenerUUID; private final String nodeDisconnectedListenerUUID; + private HealthStateSharing healthStateSharing = null; - public ClusterAppStateImpl(HazelcastMember hzMember) { + public ClusterAppStateImpl(AppSettings settings, HazelcastMember hzMember) { this.hzMember = hzMember; // Get or create the replicated map operationalProcesses = (ReplicatedMap) hzMember.getReplicatedMap(OPERATIONAL_PROCESSES); operationalProcessListenerUUID = operationalProcesses.addEntryListener(new OperationalProcessListener()); nodeDisconnectedListenerUUID = hzMember.getCluster().addMembershipListener(new NodeDisconnectedListener()); + + if (ClusterSettings.isLocalElasticsearchEnabled(settings)) { + this.healthStateSharing = new HealthStateSharingImpl(hzMember, new SearchNodeHealthProvider(settings.getProps(), this, NetworkUtils.INSTANCE)); + this.healthStateSharing.start(); + } } @Override @@ -184,6 +196,9 @@ public Optional getLeaderHostName() { @Override public void close() { if (hzMember != null) { + if (healthStateSharing != null) { + healthStateSharing.stop(); + } try { // Removing listeners operationalProcesses.removeEntryListener(operationalProcessListenerUUID); diff --git a/server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java b/server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java index 72e117924a2b..bf93a7b88c5b 100644 --- a/server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java +++ b/server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java @@ -19,6 +19,7 @@ */ package org.sonar.application.cluster; +import java.net.InetAddress; import org.junit.Rule; import org.junit.Test; import org.junit.rules.DisableOnDebug; @@ -26,14 +27,18 @@ import org.junit.rules.TestRule; import org.junit.rules.Timeout; import org.sonar.application.AppStateListener; +import org.sonar.application.config.TestAppSettings; import org.sonar.process.MessageException; +import org.sonar.process.NetworkUtils; import org.sonar.process.ProcessId; +import org.sonar.process.cluster.NodeType; +import org.sonar.process.cluster.hz.HazelcastMember; +import org.sonar.process.cluster.hz.HazelcastMemberBuilder; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; -import static org.sonar.application.cluster.HazelcastTesting.newHzMember; import static org.sonar.process.cluster.hz.HazelcastObjects.CLUSTER_NAME; import static org.sonar.process.cluster.hz.HazelcastObjects.SONARQUBE_VERSION; @@ -47,7 +52,7 @@ public class ClusterAppStateImplTest { @Test public void tryToLockWebLeader_returns_true_only_for_the_first_call() throws Exception { - try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) { + try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) { assertThat(underTest.tryToLockWebLeader()).isEqualTo(true); assertThat(underTest.tryToLockWebLeader()).isEqualTo(false); } @@ -56,7 +61,7 @@ public void tryToLockWebLeader_returns_true_only_for_the_first_call() throws Exc @Test public void test_listeners() throws InterruptedException { AppStateListener listener = mock(AppStateListener.class); - try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) { + try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) { underTest.addListener(listener); underTest.setOperational(ProcessId.ELASTICSEARCH); @@ -72,7 +77,7 @@ public void test_listeners() throws InterruptedException { @Test public void registerSonarQubeVersion_publishes_version_on_first_call() { - try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) { + try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) { underTest.registerSonarQubeVersion("6.4.1.5"); assertThat(underTest.getHazelcastMember().getAtomicReference(SONARQUBE_VERSION).get()) @@ -82,7 +87,7 @@ public void registerSonarQubeVersion_publishes_version_on_first_call() { @Test public void registerClusterName_publishes_clusterName_on_first_call() { - try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) { + try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) { underTest.registerClusterName("foo"); assertThat(underTest.getHazelcastMember().getAtomicReference(CLUSTER_NAME).get()) @@ -92,7 +97,7 @@ public void registerClusterName_publishes_clusterName_on_first_call() { @Test public void reset_always_throws_ISE() { - try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) { + try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) { expectedException.expect(IllegalStateException.class); expectedException.expectMessage("state reset is not supported in cluster mode"); @@ -103,7 +108,7 @@ public void reset_always_throws_ISE() { @Test public void registerSonarQubeVersion_throws_ISE_if_initial_version_is_different() throws Exception { // Now launch an instance that try to be part of the hzInstance cluster - try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) { + try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) { // Register first version underTest.getHazelcastMember().getAtomicReference(SONARQUBE_VERSION).set("6.6.0.1111"); @@ -117,7 +122,7 @@ public void registerSonarQubeVersion_throws_ISE_if_initial_version_is_different( @Test public void registerClusterName_throws_MessageException_if_clusterName_is_different() throws Exception { - try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) { + try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) { // Register first version underTest.getHazelcastMember().getAtomicReference(CLUSTER_NAME).set("goodClusterName"); @@ -128,4 +133,18 @@ public void registerClusterName_throws_MessageException_if_clusterName_is_differ underTest.registerClusterName("badClusterName"); } } + + private static HazelcastMember newHzMember() { + // use loopback for support of offline builds + InetAddress loopback = InetAddress.getLoopbackAddress(); + + return new HazelcastMemberBuilder() + .setNodeType(NodeType.APPLICATION) + .setProcessId(ProcessId.COMPUTE_ENGINE) + .setClusterName("foo") + .setNodeName("bar") + .setPort(NetworkUtils.INSTANCE.getNextAvailablePort(loopback)) + .setNetworkInterface(loopback.getHostAddress()) + .build(); + } } diff --git a/server/sonar-main/src/test/java/org/sonar/application/cluster/HazelcastTesting.java b/server/sonar-main/src/test/java/org/sonar/application/cluster/HazelcastTesting.java deleted file mode 100644 index cf8fea2bb26f..000000000000 --- a/server/sonar-main/src/test/java/org/sonar/application/cluster/HazelcastTesting.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * SonarQube - * Copyright (C) 2009-2017 SonarSource SA - * mailto:info AT sonarsource DOT com - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 3 of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ -package org.sonar.application.cluster; - -import java.net.InetAddress; -import org.sonar.process.NetworkUtils; -import org.sonar.process.ProcessId; -import org.sonar.process.cluster.NodeType; -import org.sonar.process.cluster.hz.HazelcastMember; -import org.sonar.process.cluster.hz.HazelcastMemberBuilder; - -public class HazelcastTesting { - - private HazelcastTesting() { - // do not instantiate - } - - public static HazelcastMember newHzMember() { - // use loopback for support of offline builds - InetAddress loopback = InetAddress.getLoopbackAddress(); - - return new HazelcastMemberBuilder() - .setNodeType(NodeType.APPLICATION) - .setProcessId(ProcessId.COMPUTE_ENGINE) - .setClusterName("foo") - .setNodeName("bar") - .setPort(NetworkUtils.INSTANCE.getNextAvailablePort(loopback)) - .setNetworkInterface(loopback.getHostAddress()) - .build(); - } -} diff --git a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedAnswer.java b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedAnswer.java new file mode 100644 index 000000000000..25c60a3c712a --- /dev/null +++ b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedAnswer.java @@ -0,0 +1,73 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.process.cluster.hz; + +import com.hazelcast.core.Member; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Answer of {@link DistributedCall}, aggregating the answers from + * all the target members. + */ +public class DistributedAnswer { + + private final Map answers = new HashMap<>(); + private final Set timedOutMembers = new HashSet<>(); + private final Map failedMembers = new HashMap<>(); + + public Optional getAnswer(Member member) { + return Optional.ofNullable(answers.get(member)); + } + + public boolean hasTimedOut(Member member) { + return timedOutMembers.contains(member); + } + + public Optional getFailed(Member member) { + return Optional.ofNullable(failedMembers.get(member)); + } + + public Collection getMembers() { + List members = new ArrayList<>(); + members.addAll(answers.keySet()); + members.addAll(timedOutMembers); + members.addAll(failedMembers.keySet()); + return members; + } + + void setAnswer(Member member, T answer) { + this.answers.put(member, answer); + } + + void setTimedOut(Member member) { + this.timedOutMembers.add(member); + } + + void setFailed(Member member, Exception e) { + failedMembers.put(member, e); + } +} diff --git a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedCall.java b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedCall.java new file mode 100644 index 000000000000..c55668e7c93f --- /dev/null +++ b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedCall.java @@ -0,0 +1,26 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.process.cluster.hz; + +import java.io.Serializable; +import java.util.concurrent.Callable; + +public interface DistributedCall extends Callable, Serializable { +} diff --git a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMember.java b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMember.java index 7521fc9c4ea5..0a70741d0206 100644 --- a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMember.java +++ b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMember.java @@ -21,6 +21,7 @@ import com.hazelcast.core.Cluster; import com.hazelcast.core.IAtomicReference; +import com.hazelcast.core.MemberSelector; import java.util.List; import java.util.Map; import java.util.Set; @@ -98,6 +99,19 @@ interface Attribute { Cluster getCluster(); + /** + * Runs a distributed query on a set of Hazelcast members. + * + * @param callable the query that is executed on all target members. Be careful of classloader, don't use classes + * that are not available in classpath of target members. + * @param memberSelector the subset of members to target. See {@link com.hazelcast.cluster.memberselector.MemberSelectors} + * for utilities. + * @param timeoutMs the total timeout to get responses from all target members, in milliseconds. If timeout is reached, then + * the members that didn't answer on time are marked as timed-out in {@link DistributedAnswer} + */ + DistributedAnswer call(DistributedCall callable, MemberSelector memberSelector, long timeoutMs) + throws InterruptedException; + @Override void close(); } diff --git a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMemberImpl.java b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMemberImpl.java index 606c58928003..118b4e6ee1d4 100644 --- a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMemberImpl.java +++ b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMemberImpl.java @@ -23,10 +23,16 @@ import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstanceNotActiveException; import com.hazelcast.core.IAtomicReference; +import com.hazelcast.core.IExecutorService; import com.hazelcast.core.Member; +import com.hazelcast.core.MemberSelector; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; import org.slf4j.LoggerFactory; @@ -89,6 +95,32 @@ public Cluster getCluster() { return hzInstance.getCluster(); } + @Override + public DistributedAnswer call(DistributedCall callable, MemberSelector memberSelector, long timeoutMs) + throws InterruptedException { + + IExecutorService executor = hzInstance.getExecutorService("default"); + Map> futures = executor.submitToMembers(callable, memberSelector); + try { + DistributedAnswer distributedAnswer = new DistributedAnswer<>(); + long maxTime = System.currentTimeMillis() + timeoutMs; + for (Map.Entry> entry : futures.entrySet()) { + long remainingMs = Math.max(maxTime - System.currentTimeMillis(), 5L); + try { + T answer = entry.getValue().get(remainingMs, TimeUnit.MILLISECONDS); + distributedAnswer.setAnswer(entry.getKey(), answer); + } catch (ExecutionException e) { + distributedAnswer.setFailed(entry.getKey(), e); + } catch (TimeoutException e) { + distributedAnswer.setTimedOut(entry.getKey()); + } + } + return distributedAnswer; + } finally { + futures.values().forEach(f -> f.cancel(true)); + } + } + @Override public void close() { try { diff --git a/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/DistributedAnswerTest.java b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/DistributedAnswerTest.java new file mode 100644 index 000000000000..6c9d8270a19c --- /dev/null +++ b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/DistributedAnswerTest.java @@ -0,0 +1,84 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.process.cluster.hz; + +import com.hazelcast.core.Member; +import java.io.IOException; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class DistributedAnswerTest { + + private Member member = newMember("member1"); + private DistributedAnswer underTest = new DistributedAnswer(); + + @Test + public void test_call_with_unknown_member() { + assertThat(underTest.getAnswer(member)).isEmpty(); + assertThat(underTest.hasTimedOut(member)).isFalse(); + assertThat(underTest.getFailed(member)).isEmpty(); + } + + @Test + public void test_setAnswer() { + underTest.setAnswer(member, "foo"); + + assertThat(underTest.getAnswer(member)).hasValue("foo"); + assertThat(underTest.hasTimedOut(member)).isFalse(); + } + + @Test + public void test_setTimedOut() { + underTest.setTimedOut(member); + + assertThat(underTest.getAnswer(member)).isEmpty(); + assertThat(underTest.hasTimedOut(member)).isTrue(); + } + + @Test + public void test_setFailed() { + IOException e = new IOException(); + underTest.setFailed(member, e); + + assertThat(underTest.getFailed(member)).hasValue(e); + } + + @Test + public void member_can_be_referenced_multiple_times() { + underTest.setTimedOut(member); + underTest.setAnswer(member, "foo"); + IOException exception = new IOException(); + underTest.setFailed(member, exception); + + assertThat(underTest.hasTimedOut(member)).isTrue(); + assertThat(underTest.getAnswer(member)).hasValue("foo"); + assertThat(underTest.getFailed(member)).hasValue(exception); + } + + private static Member newMember(String uuid) { + Member member = mock(Member.class); + when(member.getUuid()).thenReturn(uuid); + return member; + } +} diff --git a/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/FailedDistributedCall.java b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/FailedDistributedCall.java new file mode 100644 index 000000000000..03a13d6fda5a --- /dev/null +++ b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/FailedDistributedCall.java @@ -0,0 +1,37 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.process.cluster.hz; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +public class FailedDistributedCall implements DistributedCall { + static final AtomicLong COUNTER = new AtomicLong(); + + @Override + public Long call() throws Exception { + long value = COUNTER.getAndIncrement(); + if (value == 1L) { + // only the second call fails + throw new IOException("BOOM"); + } + return value; + } +} diff --git a/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/HazelcastMemberImplTest.java b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/HazelcastMemberImplTest.java new file mode 100644 index 000000000000..72087914a8ec --- /dev/null +++ b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/HazelcastMemberImplTest.java @@ -0,0 +1,146 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.process.cluster.hz; + +import com.hazelcast.cluster.memberselector.MemberSelectors; +import com.hazelcast.core.Member; +import java.net.InetAddress; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.DisableOnDebug; +import org.junit.rules.ExpectedException; +import org.junit.rules.TestRule; +import org.junit.rules.Timeout; +import org.sonar.process.NetworkUtils; +import org.sonar.process.ProcessId; +import org.sonar.process.cluster.NodeType; + +import static org.assertj.core.api.Assertions.assertThat; + +public class HazelcastMemberImplTest { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Rule + public TestRule safeguardTimeout = new DisableOnDebug(Timeout.seconds(60)); + + // use loopback for support of offline builds + private static InetAddress loopback = InetAddress.getLoopbackAddress(); + private static HazelcastMember member1; + private static HazelcastMember member2; + private static HazelcastMember member3; + + @BeforeClass + public static void setUp() throws Exception { + int port1 = NetworkUtils.INSTANCE.getNextAvailablePort(loopback); + int port2 = NetworkUtils.INSTANCE.getNextAvailablePort(loopback); + int port3 = NetworkUtils.INSTANCE.getNextAvailablePort(loopback); + member1 = newHzMember(port1, port2, port3); + member2 = newHzMember(port2, port1, port3); + member3 = newHzMember(port3, port1, port2); + } + + @AfterClass + public static void tearDown() throws Exception { + member1.close(); + member2.close(); + member3.close(); + } + + @Test + public void call_executes_query_on_members() throws Exception { + SuccessfulDistributedCall.COUNTER.set(0L); + DistributedCall call = new SuccessfulDistributedCall(); + + DistributedAnswer answer = member1.call(call, MemberSelectors.DATA_MEMBER_SELECTOR, 30_000L); + + assertThat(answer.getMembers()).extracting(Member::getUuid).containsOnlyOnce(member1.getUuid(), member2.getUuid(), member3.getUuid()); + assertThat(extractAnswers(answer)).containsOnlyOnce(0L, 1L, 2L); + } + + @Test + public void timed_out_calls_do_not_break_other_answers() throws InterruptedException { + // member 1 and 3 success, member 2 times-out + TimedOutDistributedCall.COUNTER.set(0L); + DistributedCall call = new TimedOutDistributedCall(); + DistributedAnswer answer = member1.call(call, MemberSelectors.DATA_MEMBER_SELECTOR, 2_000L); + + assertThat(extractAnswers(answer)).containsOnlyOnce(0L, 2L); + + assertThat(extractTimeOuts(answer)).containsExactlyInAnyOrder(false, false, true); + } + + @Test + public void failed_calls_do_not_break_other_answers() throws InterruptedException { + // member 1 and 3 success, member 2 fails + FailedDistributedCall.COUNTER.set(0L); + DistributedCall call = new FailedDistributedCall(); + DistributedAnswer answer = member1.call(call, MemberSelectors.DATA_MEMBER_SELECTOR, 2_000L); + + // 2 successful answers + assertThat(extractAnswers(answer)).containsOnlyOnce(0L, 2L); + + // 1 failure + List failures = extractFailures(answer); + assertThat(failures).hasSize(1); + assertThat(failures.get(0)).hasMessageContaining("BOOM"); + } + + private static HazelcastMember newHzMember(int port, int... otherPorts) { + return new HazelcastMemberBuilder() + .setNodeType(NodeType.APPLICATION) + .setProcessId(ProcessId.COMPUTE_ENGINE) + .setClusterName("foo") + .setNodeName("name" + port) + .setPort(port) + .setNetworkInterface(loopback.getHostAddress()) + .setMembers(Arrays.stream(otherPorts).mapToObj(p -> loopback.getHostAddress() + ":" + p).collect(Collectors.toList())) + .build(); + } + + private static Set extractAnswers(DistributedAnswer answer) { + return answer.getMembers().stream() + .map(answer::getAnswer) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toSet()); + } + + private static List extractFailures(DistributedAnswer answer) { + return answer.getMembers().stream() + .map(answer::getFailed) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + } + + private static List extractTimeOuts(DistributedAnswer answer) { + return answer.getMembers().stream() + .map(answer::hasTimedOut) + .collect(Collectors.toList()); + } +} diff --git a/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/SuccessfulDistributedCall.java b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/SuccessfulDistributedCall.java new file mode 100644 index 000000000000..515965617b98 --- /dev/null +++ b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/SuccessfulDistributedCall.java @@ -0,0 +1,31 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.process.cluster.hz; + +import java.util.concurrent.atomic.AtomicLong; + +public class SuccessfulDistributedCall implements DistributedCall { + static final AtomicLong COUNTER = new AtomicLong(); + + @Override + public Long call() throws Exception { + return COUNTER.getAndIncrement(); + } +} diff --git a/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/TimedOutDistributedCall.java b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/TimedOutDistributedCall.java new file mode 100644 index 000000000000..05136aea2c63 --- /dev/null +++ b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/TimedOutDistributedCall.java @@ -0,0 +1,36 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.process.cluster.hz; + +import java.util.concurrent.atomic.AtomicLong; + +public class TimedOutDistributedCall implements DistributedCall { + static final AtomicLong COUNTER = new AtomicLong(); + + @Override + public Long call() throws Exception { + long value = COUNTER.getAndIncrement(); + if (value == 1L) { + // only the second call times out + Thread.sleep(30_000L); + } + return value; + } +} diff --git a/server/sonar-server/src/main/java/org/sonar/server/cluster/StartableHazelcastMember.java b/server/sonar-server/src/main/java/org/sonar/server/cluster/StartableHazelcastMember.java index f9347fd55bff..e746c21435dc 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/cluster/StartableHazelcastMember.java +++ b/server/sonar-server/src/main/java/org/sonar/server/cluster/StartableHazelcastMember.java @@ -21,6 +21,7 @@ import com.hazelcast.core.Cluster; import com.hazelcast.core.IAtomicReference; +import com.hazelcast.core.MemberSelector; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; @@ -33,6 +34,8 @@ import org.sonar.process.ProcessId; import org.sonar.process.ProcessProperties; import org.sonar.process.cluster.NodeType; +import org.sonar.process.cluster.hz.DistributedAnswer; +import org.sonar.process.cluster.hz.DistributedCall; import org.sonar.process.cluster.hz.HazelcastMember; import org.sonar.process.cluster.hz.HazelcastMemberBuilder; @@ -110,6 +113,12 @@ public Cluster getCluster() { return nonNullMember().getCluster(); } + @Override + public DistributedAnswer call(DistributedCall callable, MemberSelector memberSelector, long timeoutMs) + throws InterruptedException { + return nonNullMember().call(callable, memberSelector, timeoutMs); + } + private HazelcastMember nonNullMember() { return requireNonNull(member, "Hazelcast member not started"); }