Skip to content

Commit

Permalink
SONAR-9802 ability to execute distributed calls
Browse files Browse the repository at this point in the history
See HazelcastMember#call(DistributedCall, ...)
  • Loading branch information
Simon Brandhof committed Sep 26, 2017
1 parent e4c401f commit df53863
Show file tree
Hide file tree
Showing 15 changed files with 532 additions and 84 deletions.
Expand Up @@ -42,7 +42,7 @@ public AppStateFactory(AppSettings settings) {
public AppState create() {
if (ClusterSettings.isClusterEnabled(settings)) {
HazelcastMember hzMember = createHzMember(settings.getProps());
return new ClusterAppStateImpl(hzMember);
return new ClusterAppStateImpl(settings, hzMember);
}
return new AppStateImpl();
}
Expand Down
Expand Up @@ -28,8 +28,6 @@
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sonar.application.cluster.ClusterAppState;
import org.sonar.application.cluster.health.SearchNodeHealthProvider;
import org.sonar.application.command.CommandFactory;
import org.sonar.application.command.EsCommand;
import org.sonar.application.command.JavaCommand;
Expand All @@ -41,10 +39,7 @@
import org.sonar.application.process.ProcessLifecycleListener;
import org.sonar.application.process.ProcessMonitor;
import org.sonar.application.process.SQProcess;
import org.sonar.process.NetworkUtils;
import org.sonar.process.ProcessId;
import org.sonar.application.cluster.health.HealthStateSharing;
import org.sonar.application.cluster.health.HealthStateSharingImpl;

public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLifecycleListener, AppStateListener {

Expand All @@ -65,7 +60,6 @@ public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLi
private final AtomicInteger stopCountDown = new AtomicInteger(0);
private StopperThread stopperThread;
private RestarterThread restarterThread;
private HealthStateSharing healthStateSharing;
private long processWatcherDelayMs = SQProcess.DEFAULT_WATCHER_DELAY_MS;

public SchedulerImpl(AppSettings settings, AppReloader appReloader, CommandFactory commandFactory,
Expand Down Expand Up @@ -105,7 +99,6 @@ public void schedule() {
}

private void tryToStartAll() {
tryToStartHealthStateSharing();
tryToStartEs();
tryToStartWeb();
tryToStartCe();
Expand Down Expand Up @@ -144,18 +137,6 @@ private void tryToStartCe() {
}
}

private void tryToStartHealthStateSharing() {
if (healthStateSharing == null
&& appState instanceof ClusterAppState
&& ClusterSettings.isLocalElasticsearchEnabled(settings)) {
ClusterAppState clusterAppState = (ClusterAppState) appState;
this.healthStateSharing = new HealthStateSharingImpl(
clusterAppState.getHazelcastMember(),
new SearchNodeHealthProvider(settings.getProps(), clusterAppState, NetworkUtils.INSTANCE));
this.healthStateSharing.start();
}
}

private boolean isEsClientStartable() {
boolean requireLocalEs = ClusterSettings.isLocalElasticsearchEnabled(settings);
return appState.isOperational(ProcessId.ELASTICSEARCH, requireLocalEs);
Expand Down Expand Up @@ -190,7 +171,6 @@ private void stopAll() {
stopProcess(ProcessId.COMPUTE_ENGINE);
stopProcess(ProcessId.WEB_SERVER);
stopProcess(ProcessId.ELASTICSEARCH);
stopHealthStateSharing();
}

/**
Expand All @@ -204,12 +184,6 @@ private void stopProcess(ProcessId processId) {
}
}

private void stopHealthStateSharing() {
if (healthStateSharing != null) {
healthStateSharing.stop();
}
}

/**
* Blocks until all processes are stopped. Pending restart, if
* any, is disabled.
Expand Down
Expand Up @@ -39,7 +39,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sonar.application.AppStateListener;
import org.sonar.application.cluster.health.HealthStateSharing;
import org.sonar.application.cluster.health.HealthStateSharingImpl;
import org.sonar.application.cluster.health.SearchNodeHealthProvider;
import org.sonar.application.config.AppSettings;
import org.sonar.application.config.ClusterSettings;
import org.sonar.process.MessageException;
import org.sonar.process.NetworkUtils;
import org.sonar.process.ProcessId;
import org.sonar.process.cluster.NodeType;
import org.sonar.process.cluster.hz.HazelcastMember;
Expand All @@ -60,14 +66,20 @@ public class ClusterAppStateImpl implements ClusterAppState {
private final ReplicatedMap<ClusterProcess, Boolean> operationalProcesses;
private final String operationalProcessListenerUUID;
private final String nodeDisconnectedListenerUUID;
private HealthStateSharing healthStateSharing = null;

public ClusterAppStateImpl(HazelcastMember hzMember) {
public ClusterAppStateImpl(AppSettings settings, HazelcastMember hzMember) {
this.hzMember = hzMember;

// Get or create the replicated map
operationalProcesses = (ReplicatedMap) hzMember.getReplicatedMap(OPERATIONAL_PROCESSES);
operationalProcessListenerUUID = operationalProcesses.addEntryListener(new OperationalProcessListener());
nodeDisconnectedListenerUUID = hzMember.getCluster().addMembershipListener(new NodeDisconnectedListener());

if (ClusterSettings.isLocalElasticsearchEnabled(settings)) {
this.healthStateSharing = new HealthStateSharingImpl(hzMember, new SearchNodeHealthProvider(settings.getProps(), this, NetworkUtils.INSTANCE));
this.healthStateSharing.start();
}
}

@Override
Expand Down Expand Up @@ -184,6 +196,9 @@ public Optional<String> getLeaderHostName() {
@Override
public void close() {
if (hzMember != null) {
if (healthStateSharing != null) {
healthStateSharing.stop();
}
try {
// Removing listeners
operationalProcesses.removeEntryListener(operationalProcessListenerUUID);
Expand Down
Expand Up @@ -19,21 +19,26 @@
*/
package org.sonar.application.cluster;

import java.net.InetAddress;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.ExpectedException;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.sonar.application.AppStateListener;
import org.sonar.application.config.TestAppSettings;
import org.sonar.process.MessageException;
import org.sonar.process.NetworkUtils;
import org.sonar.process.ProcessId;
import org.sonar.process.cluster.NodeType;
import org.sonar.process.cluster.hz.HazelcastMember;
import org.sonar.process.cluster.hz.HazelcastMemberBuilder;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.sonar.application.cluster.HazelcastTesting.newHzMember;
import static org.sonar.process.cluster.hz.HazelcastObjects.CLUSTER_NAME;
import static org.sonar.process.cluster.hz.HazelcastObjects.SONARQUBE_VERSION;

Expand All @@ -47,7 +52,7 @@ public class ClusterAppStateImplTest {

@Test
public void tryToLockWebLeader_returns_true_only_for_the_first_call() throws Exception {
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
assertThat(underTest.tryToLockWebLeader()).isEqualTo(true);
assertThat(underTest.tryToLockWebLeader()).isEqualTo(false);
}
Expand All @@ -56,7 +61,7 @@ public void tryToLockWebLeader_returns_true_only_for_the_first_call() throws Exc
@Test
public void test_listeners() throws InterruptedException {
AppStateListener listener = mock(AppStateListener.class);
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
underTest.addListener(listener);

underTest.setOperational(ProcessId.ELASTICSEARCH);
Expand All @@ -72,7 +77,7 @@ public void test_listeners() throws InterruptedException {
@Test
public void registerSonarQubeVersion_publishes_version_on_first_call() {

try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
underTest.registerSonarQubeVersion("6.4.1.5");

assertThat(underTest.getHazelcastMember().getAtomicReference(SONARQUBE_VERSION).get())
Expand All @@ -82,7 +87,7 @@ public void registerSonarQubeVersion_publishes_version_on_first_call() {

@Test
public void registerClusterName_publishes_clusterName_on_first_call() {
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
underTest.registerClusterName("foo");

assertThat(underTest.getHazelcastMember().getAtomicReference(CLUSTER_NAME).get())
Expand All @@ -92,7 +97,7 @@ public void registerClusterName_publishes_clusterName_on_first_call() {

@Test
public void reset_always_throws_ISE() {
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("state reset is not supported in cluster mode");

Expand All @@ -103,7 +108,7 @@ public void reset_always_throws_ISE() {
@Test
public void registerSonarQubeVersion_throws_ISE_if_initial_version_is_different() throws Exception {
// Now launch an instance that try to be part of the hzInstance cluster
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
// Register first version
underTest.getHazelcastMember().getAtomicReference(SONARQUBE_VERSION).set("6.6.0.1111");

Expand All @@ -117,7 +122,7 @@ public void registerSonarQubeVersion_throws_ISE_if_initial_version_is_different(

@Test
public void registerClusterName_throws_MessageException_if_clusterName_is_different() throws Exception {
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
// Register first version
underTest.getHazelcastMember().getAtomicReference(CLUSTER_NAME).set("goodClusterName");

Expand All @@ -128,4 +133,18 @@ public void registerClusterName_throws_MessageException_if_clusterName_is_differ
underTest.registerClusterName("badClusterName");
}
}

private static HazelcastMember newHzMember() {
// use loopback for support of offline builds
InetAddress loopback = InetAddress.getLoopbackAddress();

return new HazelcastMemberBuilder()
.setNodeType(NodeType.APPLICATION)
.setProcessId(ProcessId.COMPUTE_ENGINE)
.setClusterName("foo")
.setNodeName("bar")
.setPort(NetworkUtils.INSTANCE.getNextAvailablePort(loopback))
.setNetworkInterface(loopback.getHostAddress())
.build();
}
}

This file was deleted.

@@ -0,0 +1,73 @@
/*
* SonarQube
* Copyright (C) 2009-2017 SonarSource SA
* mailto:info AT sonarsource DOT com
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package org.sonar.process.cluster.hz;

import com.hazelcast.core.Member;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
* Answer of {@link DistributedCall}, aggregating the answers from
* all the target members.
*/
public class DistributedAnswer<T> {

private final Map<Member, T> answers = new HashMap<>();
private final Set<Member> timedOutMembers = new HashSet<>();
private final Map<Member, Exception> failedMembers = new HashMap<>();

public Optional<T> getAnswer(Member member) {
return Optional.ofNullable(answers.get(member));
}

public boolean hasTimedOut(Member member) {
return timedOutMembers.contains(member);
}

public Optional<Exception> getFailed(Member member) {
return Optional.ofNullable(failedMembers.get(member));
}

public Collection<Member> getMembers() {
List<Member> members = new ArrayList<>();
members.addAll(answers.keySet());
members.addAll(timedOutMembers);
members.addAll(failedMembers.keySet());
return members;
}

void setAnswer(Member member, T answer) {
this.answers.put(member, answer);
}

void setTimedOut(Member member) {
this.timedOutMembers.add(member);
}

void setFailed(Member member, Exception e) {
failedMembers.put(member, e);
}
}
@@ -0,0 +1,26 @@
/*
* SonarQube
* Copyright (C) 2009-2017 SonarSource SA
* mailto:info AT sonarsource DOT com
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package org.sonar.process.cluster.hz;

import java.io.Serializable;
import java.util.concurrent.Callable;

public interface DistributedCall<T> extends Callable<T>, Serializable {
}

0 comments on commit df53863

Please sign in to comment.