Skip to content

Commit

Permalink
fix: rework primary channel election
Browse files Browse the repository at this point in the history
  - list of candidate was containing duplicating id which could lead to wrong primary election
  - if the primary channel is also present locally, it was first primary then secondary
  - handler on connector was too late registered
  - election topic was badly configured
  • Loading branch information
guillaumelamirand committed Mar 29, 2024
1 parent bfbc8ce commit c3c0a27
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 38 deletions.
Expand Up @@ -43,19 +43,20 @@ public Maybe<ExchangeConnector> get(final String targetId) {

@Override
public Completable register(final ExchangeConnector exchangeConnector) {
return exchangeConnector
.initialize()
.doOnComplete(() -> {
log.debug("New connector successfully register for target [{}]", exchangeConnector.targetId());
return Completable
.fromRunnable(() ->
// Add custom handlers to deal with healthcheck and primary commands
exchangeConnector.addCommandHandlers(
List.of(
new GoodByeCommandHandler(exchangeConnector),
new HealthCheckCommandHandler(exchangeConnector),
new PrimaryCommandHandler(exchangeConnector)
)
);

)
)
.andThen(exchangeConnector.initialize())
.doOnComplete(() -> {
log.debug("New connector successfully register for target [{}]", exchangeConnector.targetId());
exchangeConnectors.put(exchangeConnector.targetId(), exchangeConnector);
})
.onErrorResumeNext(throwable -> {
Expand Down
Expand Up @@ -23,17 +23,18 @@
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Single;
import java.util.List;
import java.util.Map;
import lombok.Builder;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;

/**
* @author Guillaume LAMIRAND (guillaume.lamirand at graviteesource.com)
* @author GraviteeSource Team
*/
@SuperBuilder
@NoArgsConstructor
@Slf4j
public class EmbeddedExchangeConnector implements ExchangeConnector {

protected ConnectorChannel connectorChannel;
Expand Down Expand Up @@ -68,6 +69,7 @@ public boolean isPrimary() {

@Override
public void setPrimary(final boolean isPrimary) {
log.debug("Setting primary status '{}' on connector", isPrimary);
this.primary = isPrimary;
}

Expand All @@ -78,6 +80,8 @@ public Single<Reply<?>> sendCommand(final Command<?> command) {

@Override
public void addCommandHandlers(final List<CommandHandler<? extends Command<?>, ? extends Reply<?>>> commandHandlers) {
this.connectorChannel.addCommandHandlers(commandHandlers);
if (this.connectorChannel != null) {
this.connectorChannel.addCommandHandlers(commandHandlers);
}
}
}
Expand Up @@ -37,18 +37,17 @@
import io.vertx.rxjava3.core.Vertx;
import io.vertx.rxjava3.core.http.HttpClient;
import io.vertx.rxjava3.core.http.WebSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;

/**
* @author Guillaume LAMIRAND (guillaume.lamirand at graviteesource.com)
* @author GraviteeSource Team
*/
@RequiredArgsConstructor
@SuperBuilder
@Slf4j
public class WebSocketExchangeConnector extends EmbeddedExchangeConnector {
Expand All @@ -61,6 +60,24 @@ public class WebSocketExchangeConnector extends EmbeddedExchangeConnector {
private final WebSocketConnectorClientFactory webSocketConnectorClientFactory;
private final ExchangeSerDe exchangeSerDe;

public WebSocketExchangeConnector(
final ProtocolVersion protocolVersion,
final List<CommandHandler<? extends Command<?>, ? extends Reply<?>>> commandHandlers,
final List<CommandAdapter<? extends Command<?>, ? extends Command<?>, ? extends Reply<?>>> commandAdapters,
final List<ReplyAdapter<? extends Reply<?>, ? extends Reply<?>>> replyAdapters,
final Vertx vertx,
final WebSocketConnectorClientFactory webSocketConnectorClientFactory,
final ExchangeSerDe exchangeSerDe
) {
this.protocolVersion = protocolVersion;
this.commandHandlers = commandHandlers != null ? new ArrayList<>(commandHandlers) : new ArrayList<>();
this.commandAdapters = commandHandlers != null ? new ArrayList<>(commandAdapters) : new ArrayList<>();
this.replyAdapters = commandHandlers != null ? new ArrayList<>(replyAdapters) : new ArrayList<>();
this.vertx = vertx;
this.webSocketConnectorClientFactory = webSocketConnectorClientFactory;
this.exchangeSerDe = exchangeSerDe;
}

@Override
public Completable initialize() {
return Completable
Expand Down Expand Up @@ -147,4 +164,19 @@ private Single<WebSocket> connect() {
});
});
}

@Override
public void addCommandHandlers(final List<CommandHandler<? extends Command<?>, ? extends Reply<?>>> commandHandlers) {
if (commandHandlers != null) {
commandHandlers.forEach(commandHandler -> {
if (
this.commandHandlers.stream()
.noneMatch(newCommandHandler -> newCommandHandler.supportType().equals(commandHandler.supportType()))
) {
this.commandHandlers.add(commandHandler);
}
});
}
super.addCommandHandlers(commandHandlers);
}
}
Expand Up @@ -41,7 +41,7 @@
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.disposables.Disposable;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -77,7 +77,8 @@ protected void doStart() throws Exception {
log.debug("[{}] Starting channel manager", this.identifyConfiguration.id());
super.doStart();
primaryChannelManager.start();
primaryChannelElectedEventTopic = clusterManager.topic(PrimaryChannelManager.PRIMARY_CHANNEL_EVENTS_ELECTED_TOPIC);
primaryChannelElectedEventTopic =
clusterManager.topic(identifyConfiguration.identifyName(PrimaryChannelManager.PRIMARY_CHANNEL_EVENTS_ELECTED_TOPIC));
primaryChannelElectedSubscriptionId =
primaryChannelElectedEventTopic.addMessageListener(message -> handlePrimaryChannelElectedEvent(message.content()));
healthCheckDisposable =
Expand Down Expand Up @@ -129,6 +130,7 @@ private void handlePrimaryChannelElectedEvent(final PrimaryChannelElectedEvent e
.andThen(
Flowable
.fromIterable(localChannelRegistry.getAllByTargetId(targetId))
.filter(controllerChannel -> !controllerChannel.id().equals(channelId))
.flatMapCompletable(controllerChannel -> sendPrimaryCommand(controllerChannel, false))
);
})
Expand Down Expand Up @@ -228,7 +230,7 @@ public Flowable<TargetMetric> targetsMetric() {
return this.primaryChannelManager.candidatesChannel()
.flatMapSingle(candidatesChannelEntries -> {
String targetId = candidatesChannelEntries.getKey();
List<String> channelIds = candidatesChannelEntries.getValue();
Set<String> channelIds = candidatesChannelEntries.getValue();
return this.primaryChannelManager.primaryChannelBy(targetId)
.defaultIfEmpty("unknown")
.flattenStreamAsFlowable(primaryChannel ->
Expand Down
Expand Up @@ -18,29 +18,28 @@
import io.gravitee.node.api.cache.Cache;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import java.util.ArrayList;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class PrimaryChannelCandidateStore {

private final Cache<String, List<String>> store;
private final Cache<String, Set<String>> store;

public Flowable<Map.Entry<String, List<String>>> rxEntries() {
public Flowable<Map.Entry<String, Set<String>>> rxEntries() {
return store.rxEntrySet();
}

public Maybe<List<String>> rxGet(final String targetId) {
public Maybe<Set<String>> rxGet(final String targetId) {
if (targetId == null) {
return Maybe.error(new IllegalArgumentException("Target id cannot be null"));
}
return store.rxGet(targetId);
}

public List<String> get(final String targetId) {
public Set<String> get(final String targetId) {
if (targetId == null) {
throw new IllegalArgumentException("Target id cannot be null");
}
Expand All @@ -58,7 +57,7 @@ public void put(final String targetId, final String channelId) {
targetId,
(key, channels) -> {
if (channels == null) {
return new ArrayList<>(List.of(channelId));
channels = new HashSet<>();
}
channels.add(channelId);
return channels;
Expand Down
Expand Up @@ -25,12 +25,12 @@
import io.gravitee.node.api.cluster.messaging.Topic;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import java.util.List;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;

/**
* @author Guillaume LAMIRAND (guillaume.lamirand at graviteesource.com)
Expand Down Expand Up @@ -98,11 +98,11 @@ protected void doStop() throws Exception {
super.doStop();
}

public Flowable<Map.Entry<String, List<String>>> candidatesChannel() {
public Flowable<Map.Entry<String, Set<String>>> candidatesChannel() {
return primaryChannelCandidateStore.rxEntries();
}

public Maybe<List<String>> candidatesChannel(final String targetId) {
public Maybe<Set<String>> candidatesChannel(final String targetId) {
return primaryChannelCandidateStore.rxGet(targetId);
}

Expand All @@ -129,7 +129,7 @@ private void handleChannelEvent(final ChannelEvent channelEvent) {

private void electPrimaryChannel(final String targetId) {
String previousPrimaryChannelId = primaryChannelCache.get(targetId);
List<String> channelIds = primaryChannelCandidateStore.get(targetId);
Set<String> channelIds = primaryChannelCandidateStore.get(targetId);

if (null == channelIds || channelIds.isEmpty()) {
log.warn(
Expand All @@ -141,13 +141,31 @@ private void electPrimaryChannel(final String targetId) {
return;
}
if (!channelIds.contains(previousPrimaryChannelId)) {
//noinspection deprecation
@SuppressWarnings("java:S1874")
String newPrimaryChannelId = channelIds.get(RandomUtils.nextInt(0, channelIds.size()));
primaryChannelCache.put(targetId, newPrimaryChannelId);
primaryChannelElectedEventTopic.publish(
PrimaryChannelElectedEvent.builder().targetId(targetId).channelId(newPrimaryChannelId).build()
);
String newPrimaryChannelId = getRandomChannel(channelIds);
if (newPrimaryChannelId != null) {
primaryChannelCache.put(targetId, newPrimaryChannelId);
primaryChannelElectedEventTopic.publish(
PrimaryChannelElectedEvent.builder().targetId(targetId).channelId(newPrimaryChannelId).build()
);
}
}
}

private String getRandomChannel(Set<String> channelIds) {
int randomIndex = ThreadLocalRandom.current().nextInt(channelIds.size());
int i = 0;
for (String channelId : channelIds) {
if (i == randomIndex) {
return channelId;
}
i++;
}
// Send first one in case random loop didn't find any match
Iterator<String> iterator = channelIds.iterator();
if (iterator.hasNext()) {
return channelIds.iterator().next();
}
// Shouldn't happen in any case
return null;
}
}
Expand Up @@ -22,7 +22,9 @@
import io.gravitee.node.api.cache.CacheConfiguration;
import io.gravitee.node.plugin.cache.common.InMemoryCache;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -33,7 +35,7 @@
*/
class PrimaryChannelCandidateStoreTest {

private Cache<String, List<String>> store;
private Cache<String, Set<String>> store;
private PrimaryChannelCandidateStore cut;

@BeforeEach
Expand All @@ -44,7 +46,7 @@ public void beforeEach() {

@Test
void should_get_from_store() {
store.put("targetId", List.of("channelId", "channelId2"));
store.put("targetId", Set.of("channelId", "channelId2"));
assertThat(cut.get("targetId")).containsOnly("channelId", "channelId2");
}

Expand Down Expand Up @@ -76,14 +78,14 @@ void should_throw_exception_when_putting_without_channel_id() {

@Test
void should_remove_from_store() {
store.put("targetId", new ArrayList<>(List.of("channelId")));
store.put("targetId", new HashSet<>(List.of("channelId")));
cut.remove("targetId", "channelId");
assertThat(store.containsKey("targetId")).isFalse();
}

@Test
void should_remove_only_one_channel_from_store() {
store.put("targetId", new ArrayList<>(List.of("channelId", "channelId2")));
store.put("targetId", new HashSet<>(List.of("channelId", "channelId2")));
cut.remove("targetId", "channelId");
assertThat(store.containsKey("targetId")).isTrue();
assertThat(store.get("targetId")).containsOnly("channelId2");
Expand Down

0 comments on commit c3c0a27

Please sign in to comment.