Skip to content

Commit

Permalink
Merge pull request #156 from AxonFramework/featlure/112-load-balancin…
Browse files Browse the repository at this point in the history
…g-properties

[#122] Enable load balancing strategy property configuration
  • Loading branch information
smcvb committed Oct 30, 2023
2 parents e0fe960 + 39bf9f0 commit af73dea
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.axonframework.extensions.multitenancy.autoconfig;

import io.axoniq.axonserver.connector.AxonServerConnection;
import io.axoniq.axonserver.connector.admin.AdminChannel;
import io.axoniq.axonserver.connector.control.ControlChannel;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
Expand All @@ -28,8 +29,17 @@
import org.axonframework.extensions.multitenancy.components.eventhandeling.MultiTenantEventProcessor;
import org.axonframework.lifecycle.Phase;
import org.axonframework.lifecycle.StartHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toMap;

/**
* Multi-tenant implementation of {@link EventProcessorControlService}.
Expand All @@ -43,6 +53,8 @@ public class MultiTenantEventProcessorControlService
extends EventProcessorControlService
implements MultiTenantAwareComponent {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

/**
* Initialize a {@link MultiTenantEventProcessorControlService}.
* <p>
Expand Down Expand Up @@ -101,19 +113,107 @@ public void start() {
return;
}

Map<String, AxonServerConnection> contextToConnection = new HashMap<>();
Map<String, EventProcessor> eventProcessors = eventProcessingConfiguration.eventProcessors();
eventProcessors.forEach((name, processor) -> {
Map<String, String> strategiesPerProcessor = strategiesPerProcessor(eventProcessors);

eventProcessors.forEach((processorAndContext, processor) -> {
if (processor instanceof MultiTenantEventProcessor) {
return;
}
String context = name.substring(name.indexOf("@") + 1);
ControlChannel controlChannel = axonServerConnectionManager.getConnection(context)
.controlChannel();
AxonProcessorInstructionHandler instructionHandler = new AxonProcessorInstructionHandler(processor, name);
controlChannel.registerEventProcessor(name, infoSupplier(processor), instructionHandler);

String processorName = processorNameFromCombination(processorAndContext);
String context = contextFromCombination(processorAndContext);
AxonServerConnection connection =
contextToConnection.computeIfAbsent(context, axonServerConnectionManager::getConnection);

registerInstructionHandler(connection.controlChannel(), processorAndContext, processor);
String strategyForProcessor = strategiesPerProcessor.get(processorName);
if (strategyForProcessor != null) {
setLoadBalancingStrategy(connection.adminChannel(), processorName, strategyForProcessor);
}
});
}

private Map<String, String> strategiesPerProcessor(Map<String, EventProcessor> eventProcessors) {
List<String> processorNames =
eventProcessors.entrySet()
.stream()
// Filter out MultiTenantEventProcessors as those aren't registered with Axon Server anyhow.
.filter(entry -> !(entry.getValue() instanceof MultiTenantEventProcessor))
.map(Map.Entry::getKey)
.map(MultiTenantEventProcessorControlService::processorNameFromCombination)
.collect(Collectors.toList());
return processorConfig.entrySet()
.stream()
.filter(entry -> {
if (!processorNames.contains(entry.getKey())) {
logger.info("Event Processor [{}] is not a registered. "
+ "Please check the name or register the Event Processor",
entry.getKey());
return false;
}
return true;
})
.collect(toMap(Map.Entry::getKey, entry -> entry.getValue().getLoadBalancingStrategy()));
}

private static String contextFromCombination(String processorAndContext) {
return processorAndContext.substring(processorAndContext.indexOf("@") + 1);
}

private void registerInstructionHandler(ControlChannel controlChannel,
String processorAndContext,
EventProcessor processor) {
controlChannel.registerEventProcessor(processorAndContext,
infoSupplier(processor),
new AxonProcessorInstructionHandler(processor, processorAndContext));
}

private void setLoadBalancingStrategy(AdminChannel adminChannel, String processorName, String strategy) {
Optional<String> optionalIdentifier = tokenStoreIdentifierFor(processorName);
if (!optionalIdentifier.isPresent()) {
logger.warn("Cannot find token store identifier for processor [{}]. "
+ "Load balancing cannot be configured without this identifier.", processorName);
return;
}
String tokenStoreIdentifier = optionalIdentifier.get();

adminChannel.loadBalanceEventProcessor(processorName, tokenStoreIdentifier, strategy)
.whenComplete((r, e) -> {
if (e == null) {
logger.debug("Successfully requested to load balance processor [{}]"
+ " with strategy [{}].", processorName, strategy);
return;
}
logger.warn("Requesting to load balance processor [{}] with strategy [{}] failed.",
processorName, strategy, e);
});
if (processorConfig.get(processorName).isAutomaticBalancing()) {
adminChannel.setAutoLoadBalanceStrategy(processorName, tokenStoreIdentifier, strategy)
.whenComplete((r, e) -> {
if (e == null) {
logger.debug("Successfully requested to automatically balance processor [{}]"
+ " with strategy [{}].", processorName, strategy);
return;
}
logger.warn(
"Requesting to automatically balance processor [{}] with strategy [{}] failed.",
processorName, strategy, e
);
});
}
}

private Optional<String> tokenStoreIdentifierFor(String processorName) {
return eventProcessingConfiguration.tokenStore(processorName)
.retrieveStorageIdentifier();
}

private static String processorNameFromCombination(String processorAndContext) {
return processorAndContext.substring(0, processorAndContext.indexOf("@"));
}

@Override
public Registration registerTenant(TenantDescriptor tenantDescriptor) {
//Already registered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,23 @@

import com.google.common.collect.ImmutableMap;
import io.axoniq.axonserver.connector.AxonServerConnection;
import io.axoniq.axonserver.connector.admin.AdminChannel;
import io.axoniq.axonserver.connector.control.ControlChannel;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.config.EventProcessingConfiguration;
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.extensions.multitenancy.components.TenantDescriptor;
import org.axonframework.extensions.multitenancy.components.eventhandeling.MultiTenantEventProcessor;
import org.junit.jupiter.api.*;
import org.mockito.*;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static org.mockito.Mockito.*;

/**
Expand All @@ -37,96 +44,143 @@
*/
class MultiTenantEventProcessorControlServiceTest {

private static final String PROCESSOR_NAME = "some-processor";
private static final String TOKEN_STORE_IDENTIFIER = "token-store-identifier";
private static final String LOAD_BALANCING_STRATEGY = "some-strategy";

private AxonServerConnectionManager axonServerConnectionManager;
private EventProcessingConfiguration eventProcessingConfiguration;

private ControlChannel controlTenant1;
private AdminChannel adminTenant1;
private ControlChannel controlTenant2;
private AdminChannel adminTenant2;

private MultiTenantEventProcessorControlService testSubject;

@BeforeEach
void setUp() {
axonServerConnectionManager = mock(AxonServerConnectionManager.class);
mockConnectionManager();

eventProcessingConfiguration = mock(EventProcessingConfiguration.class);
AxonServerConfiguration axonServerConfiguration = mock(AxonServerConfiguration.class);
when(axonServerConfiguration.getEventhandling())
.thenReturn(new AxonServerConfiguration.Eventhandling());
TokenStore tokenStore = mock(TokenStore.class);
when(tokenStore.retrieveStorageIdentifier()).thenReturn(Optional.of(TOKEN_STORE_IDENTIFIER));
when(eventProcessingConfiguration.tokenStore(PROCESSOR_NAME)).thenReturn(tokenStore);

AxonServerConfiguration axonServerConfig = mock(AxonServerConfiguration.class);
mockAxonServerConfig(axonServerConfig);

testSubject = new MultiTenantEventProcessorControlService(axonServerConnectionManager,
eventProcessingConfiguration,
axonServerConfiguration);
axonServerConfig);
}

@Test
void start() {
private void mockConnectionManager() {
AxonServerConnection connectionTenant1 = mock(AxonServerConnection.class);
ControlChannel controlTenant1 = mock(ControlChannel.class);
controlTenant1 = mock(ControlChannel.class);
when(connectionTenant1.controlChannel()).thenReturn(controlTenant1);
adminTenant1 = mock(AdminChannel.class);
when(adminTenant1.loadBalanceEventProcessor(any(), any(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
when(adminTenant1.setAutoLoadBalanceStrategy(any(), any(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
when(connectionTenant1.adminChannel()).thenReturn(adminTenant1);
AxonServerConnection connectionTenant2 = mock(AxonServerConnection.class);
ControlChannel controlTenant2 = mock(ControlChannel.class);
controlTenant2 = mock(ControlChannel.class);
when(connectionTenant2.controlChannel()).thenReturn(controlTenant2);

adminTenant2 = mock(AdminChannel.class);
when(adminTenant2.loadBalanceEventProcessor(any(), any(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
when(adminTenant2.setAutoLoadBalanceStrategy(any(), any(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
when(connectionTenant2.adminChannel()).thenReturn(adminTenant2);
ArgumentCaptor<String> contextCapture = ArgumentCaptor.forClass(String.class);
when(axonServerConnectionManager.getConnection(contextCapture.capture())).thenAnswer(a -> {
if (contextCapture.getValue().equals("tenant-1")) {
return connectionTenant1;
} else {
return connectionTenant2;
}
});
when(axonServerConnectionManager.getConnection(contextCapture.capture()))
.thenAnswer(a -> contextCapture.getValue().equals("tenant-1") ? connectionTenant1 : connectionTenant2);
}

private static void mockAxonServerConfig(AxonServerConfiguration axonServerConfig) {
Map<String, AxonServerConfiguration.Eventhandling.ProcessorSettings> processorSettings = new HashMap<>();
AxonServerConfiguration.Eventhandling eventHandling = mock(AxonServerConfiguration.Eventhandling.class);
AxonServerConfiguration.Eventhandling.ProcessorSettings tepSettings =
new AxonServerConfiguration.Eventhandling.ProcessorSettings();
tepSettings.setLoadBalancingStrategy(LOAD_BALANCING_STRATEGY);
tepSettings.setAutomaticBalancing(true);
processorSettings.put(PROCESSOR_NAME, tepSettings);
when(eventHandling.getProcessors()).thenReturn(processorSettings);
when(axonServerConfig.getEventhandling()).thenReturn(eventHandling);
}

@Test
void registersInstructionHandlersWithEachContextControlChannelOnStart() {
when(eventProcessingConfiguration.eventProcessors()).thenReturn(ImmutableMap.of(
"tep@tenant-1",
mock(EventProcessor.class),
"tep@tenant-2",
mock(EventProcessor.class),
"proxy-ep",
mock(MultiTenantEventProcessor.class)
PROCESSOR_NAME + "@tenant-1", mock(EventProcessor.class),
PROCESSOR_NAME + "@tenant-2", mock(EventProcessor.class),
"proxy-ep", mock(MultiTenantEventProcessor.class)
));

testSubject.start();

verify(controlTenant1).registerEventProcessor(eq("tep@tenant-1"), any(), any());
verify(controlTenant2).registerEventProcessor(eq("tep@tenant-2"), any(), any());
verify(controlTenant1).registerEventProcessor(eq(PROCESSOR_NAME + "@tenant-1"), any(), any());
verify(controlTenant2).registerEventProcessor(eq(PROCESSOR_NAME + "@tenant-2"), any(), any());
}

@Test
void addingNewTenantAfterStart() {
AxonServerConnection connectionTenant1 = mock(AxonServerConnection.class);
ControlChannel controlTenant1 = mock(ControlChannel.class);
when(connectionTenant1.controlChannel()).thenReturn(controlTenant1);
AxonServerConnection connectionTenant2 = mock(AxonServerConnection.class);
ControlChannel controlTenant2 = mock(ControlChannel.class);
when(connectionTenant2.controlChannel()).thenReturn(controlTenant2);

ArgumentCaptor<String> contextCapture = ArgumentCaptor.forClass(String.class);
when(axonServerConnectionManager.getConnection(contextCapture.capture())).thenAnswer(a -> {
if (contextCapture.getValue().equals("tenant-1")) {
return connectionTenant1;
} else {
return connectionTenant2;
}
});

when(eventProcessingConfiguration.eventProcessors()).thenReturn(ImmutableMap.of(
"tep@tenant-1",
mock(EventProcessor.class),
"proxy-ep",
mock(MultiTenantEventProcessor.class)
PROCESSOR_NAME + "@tenant-1", mock(EventProcessor.class),
"proxy-ep", mock(MultiTenantEventProcessor.class)
));

testSubject.start();

verify(controlTenant1).registerEventProcessor(eq("tep@tenant-1"), any(), any());
verify(controlTenant2, times(0)).registerEventProcessor(eq("tep@tenant-2"), any(), any());
verify(controlTenant1).registerEventProcessor(eq(PROCESSOR_NAME + "@tenant-1"), any(), any());
verify(controlTenant2, times(0)).registerEventProcessor(eq(PROCESSOR_NAME + "@tenant-2"), any(), any());

when(eventProcessingConfiguration.eventProcessors()).thenReturn(ImmutableMap.of(
"tep@tenant-1",
mock(EventProcessor.class),
"tep@tenant-2",
mock(EventProcessor.class),
"proxy-ep",
mock(MultiTenantEventProcessor.class)
PROCESSOR_NAME + "@tenant-1", mock(EventProcessor.class),
PROCESSOR_NAME + "@tenant-2", mock(EventProcessor.class),
"proxy-ep", mock(MultiTenantEventProcessor.class)
));

testSubject.registerAndStartTenant(TenantDescriptor.tenantWithId("tenant-2"));
verify(controlTenant2).registerEventProcessor(eq("tep@tenant-2"), any(), any());
verify(controlTenant2).registerEventProcessor(eq(PROCESSOR_NAME + "@tenant-2"), any(), any());
}

@Test
void willSetLoadBalancingStrategyForProcessorsWithPropertiesOnStart() {
String processorNameWithoutSettings = "processor-without-load-balancing";
String expectedStrategy = "some-strategy";

// Given
// Mock Event Processor Configuration
when(eventProcessingConfiguration.eventProcessors()).thenReturn(ImmutableMap.of(
PROCESSOR_NAME + "@tenant-1", mock(EventProcessor.class),
PROCESSOR_NAME + "@tenant-2", mock(EventProcessor.class),
processorNameWithoutSettings + "@tenant-1", mock(EventProcessor.class),
processorNameWithoutSettings + "@tenant-2", mock(EventProcessor.class),
"proxy-ep", mock(MultiTenantEventProcessor.class)
));

// When
testSubject.start();

// Then
// Registers instruction handlers
verify(controlTenant1).registerEventProcessor(eq(PROCESSOR_NAME + "@tenant-1"), any(), any());
verify(controlTenant2).registerEventProcessor(eq(PROCESSOR_NAME + "@tenant-2"), any(), any());
verify(controlTenant1).registerEventProcessor(eq(processorNameWithoutSettings + "@tenant-1"), any(), any());
verify(controlTenant2).registerEventProcessor(eq(processorNameWithoutSettings + "@tenant-2"), any(), any());
// Load balances Processors
verify(adminTenant1).loadBalanceEventProcessor(PROCESSOR_NAME, TOKEN_STORE_IDENTIFIER, expectedStrategy);
verify(adminTenant2).loadBalanceEventProcessor(PROCESSOR_NAME, TOKEN_STORE_IDENTIFIER, expectedStrategy);
verify(adminTenant1, never()).loadBalanceEventProcessor(eq(processorNameWithoutSettings), any(), any());
verify(adminTenant2, never()).loadBalanceEventProcessor(eq(processorNameWithoutSettings), any(), any());
// Enables automatic load balancing
verify(adminTenant1).setAutoLoadBalanceStrategy(PROCESSOR_NAME, TOKEN_STORE_IDENTIFIER, expectedStrategy);
verify(adminTenant2).setAutoLoadBalanceStrategy(PROCESSOR_NAME, TOKEN_STORE_IDENTIFIER, expectedStrategy);
verify(adminTenant1, never()).setAutoLoadBalanceStrategy(eq(processorNameWithoutSettings), any(), any());
verify(adminTenant2, never()).setAutoLoadBalanceStrategy(eq(processorNameWithoutSettings), any(), any());
}
}

0 comments on commit af73dea

Please sign in to comment.