Skip to content

Commit

Permalink
:fix: event topic now contains module name as well as service name (s…
Browse files Browse the repository at this point in the history
…imple) and uuid for uniqueness

Signed-off-by: dseurotech <davide.salvador@eurotech.com>
  • Loading branch information
dseurotech committed Mar 7, 2024
1 parent a09063a commit 264661a
Show file tree
Hide file tree
Showing 28 changed files with 159 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ String metricModuleName() {
return "broker-telemetry";
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "telemetry";

Check warning on line 57 in broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/AppModule.java

View check run for this annotation

Codecov / codecov/patch

broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/AppModule.java#L57

Added line #L57 was not covered by tests
}

@Provides
@Singleton
@Named("brokerHost")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private void fillEvent(MethodInvocation invocation, ServiceEvent serviceEvent) {
Class<?> wrappedClass = ((KapuaService) invocation.getThis()).getClass().getSuperclass(); // this object should be not null
Class<?>[] implementedClass = wrappedClass.getInterfaces();
// assuming that the KapuaService implemented is specified by the first implementing interface
String serviceInterfaceName = implementedClass[0].getName();
String serviceInterfaceName = implementedClass[0].getSimpleName();

Check warning on line 139 in commons/src/main/java/org/eclipse/kapua/commons/event/RaiseServiceEventInterceptor.java

View check run for this annotation

Codecov / codecov/patch

commons/src/main/java/org/eclipse/kapua/commons/event/RaiseServiceEventInterceptor.java#L139

Added line #L139 was not covered by tests
// String splittedServiceInterfaceName[] = serviceInterfaceName.split("\\.");
// String serviceName = splittedServiceInterfaceName.length > 0 ? splittedServiceInterfaceName[splittedServiceInterfaceName.length-1] : "";
// String cleanedServiceName = serviceName.substring(0, serviceName.length()-"Service".length()).toLowerCase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ protected ServiceEventClientConfiguration[] appendClientId(String clientId, Serv
return config;
} else {
// config for @ListenServiceEvent
String subscriberName = config.getClientName() + (clientId == null ? "" : "_" + clientId);
String subscriberName = config.getClientName() + (clientId == null ? "" : "-" + clientId);
LOGGER.debug("Adding config for @ListenServiceEvent - address: {}, name: {}, listener: {}", config.getAddress(), subscriberName, config.getEventListener());
return new ServiceEventClientConfiguration(config.getAddress(), subscriberName, config.getEventListener());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,28 @@
*******************************************************************************/
package org.eclipse.kapua.commons.event;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.commons.lang3.ArrayUtils;
import org.eclipse.kapua.KapuaException;
import org.eclipse.kapua.KapuaRuntimeException;
import org.eclipse.kapua.event.ListenServiceEvent;
import org.eclipse.kapua.event.RaiseServiceEvent;
import org.eclipse.kapua.event.ServiceEvent;
import org.eclipse.kapua.service.KapuaService;

import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class ServiceInspector {

private static final Logger LOGGER = LoggerFactory.getLogger(ServiceInspector.class);

private ServiceInspector() {}
private ServiceInspector() {
}

public static <T extends KapuaService> List<ServiceEventClientConfiguration> getEventBusClients(KapuaService aService, Class<T> clazz) {

Expand Down Expand Up @@ -83,26 +83,26 @@ public static <T extends KapuaService> List<ServiceEventClientConfiguration> get
KapuaRuntimeException.internalError(e1, String.format("Unable to get the annotated method: annotation %s", ListenServiceEvent.class));
}

for (ListenServiceEvent listenAnnotation:listenAnnotations) {
for (ListenServiceEvent listenAnnotation : listenAnnotations) {
final Method listenerMethod = enhancedMethod;
configurations.add(
new ServiceEventClientConfiguration(
listenAnnotation.fromAddress(),
clazz.getName(),
serviceEvent -> {
try {
listenerMethod.invoke(aService, serviceEvent);
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
throw KapuaException.internalError(e, String.format("Error invoking method %s", method.getName()));
}
}));
new ServiceEventClientConfiguration(
listenAnnotation.fromAddress(),
clazz.getSimpleName(),
serviceEvent -> {
try {
listenerMethod.invoke(aService, serviceEvent);
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
throw KapuaException.internalError(e, String.format("Error invoking method %s", method.getName()));
}
}));

Check warning on line 98 in commons/src/main/java/org/eclipse/kapua/commons/event/ServiceInspector.java

View check run for this annotation

Codecov / codecov/patch

commons/src/main/java/org/eclipse/kapua/commons/event/ServiceInspector.java#L94-L98

Added lines #L94 - L98 were not covered by tests
}
}
if (!ArrayUtils.isEmpty(raiseAnnotations)) {
configurations.add(
new ServiceEventClientConfiguration(
null,
clazz.getName(),
clazz.getSimpleName(),
null));
}
}
Expand All @@ -122,7 +122,7 @@ private static Method getMatchingMethod(Class<?> clazz, Method method) throws No
if (!candidate.getReturnType().equals(method.getReturnType())) {
continue;
}
if(!Arrays.equals(method.getParameterTypes(), candidate.getParameterTypes())) {
if (!Arrays.equals(method.getParameterTypes(), candidate.getParameterTypes())) {
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,9 @@ synchronized void subscribe(Subscription subscription)
String subscriptionStr = String.format("$SYS/EVT/%s", subscription.getAddress());
// create a bunch of sessions to allow parallel event processing
LOGGER.info("Subscribing to address {} - name {} ...", subscriptionStr, subscription.getName());
final Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic jmsTopic = jmsSession.createTopic(subscriptionStr);

Check warning on line 307 in commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java

View check run for this annotation

Codecov / codecov/patch

commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java#L306-L307

Added lines #L306 - L307 were not covered by tests
for (int i = 0; i < consumerPoolSize; i++) {
final Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic jmsTopic = jmsSession.createTopic(subscriptionStr);
MessageConsumer jmsConsumer = jmsSession.createSharedDurableConsumer(jmsTopic, subscription.getName());
jmsConsumer.setMessageListener(message -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,10 @@ protected void configureModule() {
String metricModuleName() {
return "web-console";
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "console";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,11 @@ protected void configureModule() {
String metricModuleName() {
return MetricsLifecycle.CONSUMER_LIFECYCLE;
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "lifecycle";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,11 @@ protected void configureModule() {
String metricModuleName() {
return MetricsTelemetry.CONSUMER_TELEMETRY;
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "telemetry";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,11 @@ protected void configureModule() {
String metricModuleName() {
return "job-engine";
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "job_engine";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ String metricModuleName() {
return "test";
}

@Provides
@Named(value = "eventsModuleName")
String eventsModuleName() {
return "test";
}

@Provides
@Singleton
ServiceEventBusDriver serviceEventBusDriver() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ protected void configureModule() {

}


@Provides
@Named("metricModuleName")
String metricModuleName() {
return "qa-tests";
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "qa_tests";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,11 @@ Boolean showStackTrace(KapuaApiCoreSetting kapuaApiCoreSetting) {
String metricModuleName() {
return "unit-tests";
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "unit_tests";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,11 @@ Boolean showStackTrace(KapuaApiCoreSetting kapuaApiCoreSetting) {
String metricModuleName() {
return "rest-api";
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "rest_api";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ ServiceModule accountServiceModule(AccountService accountService,
EventStoreFactory eventStoreFactory,
EventStoreRecordRepository eventStoreRecordRepository,
ServiceEventBus serviceEventBus,
KapuaAccountSetting kapuaAccountSetting
KapuaAccountSetting kapuaAccountSetting,
@Named("eventsModuleName") String eventModuleName
) throws ServiceEventBusException {
return new AccountServiceModule(
accountService,
Expand All @@ -103,7 +104,8 @@ ServiceModule accountServiceModule(AccountService accountService,
txManagerFactory.create("kapua-account"),
serviceEventBus
),
serviceEventBus);
serviceEventBus,
eventModuleName);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.eclipse.kapua.service.account.internal.setting.KapuaAccountSetting;
import org.eclipse.kapua.service.account.internal.setting.KapuaAccountSettingKeys;

import java.util.UUID;

/**
* {@link AccountService} {@link ServiceModule} implementation.
*
Expand All @@ -33,10 +35,12 @@ public AccountServiceModule(
AccountService accountService,
KapuaAccountSetting kapuaAccountSetting,
ServiceEventHouseKeeperFactory serviceEventHouseKeeperFactory,
ServiceEventBus serviceEventBus) {
ServiceEventBus serviceEventBus,
String eventModuleName) {
super(
ServiceInspector.getEventBusClients(accountService, AccountService.class).toArray(new ServiceEventClientConfiguration[0]),
kapuaAccountSetting.getString(KapuaAccountSettingKeys.ACCOUNT_EVENT_ADDRESS),
eventModuleName + "-" + UUID.randomUUID().toString(),
serviceEventHouseKeeperFactory,
serviceEventBus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ protected void configureModule() {

}


@Provides
@Named("metricModuleName")
String metricModuleName() {
return "qa-tests";
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "qa_tests";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,11 @@ protected void configureModule() {
String metricModuleName() {
return MetricsAuthentication.SERVICE_AUTHENTICATION;
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "authentication";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ protected ServiceModule deviceConnectionEventListenerServiceModule(DeviceConnect
@Named("DeviceRegistryTransactionManager") TxManager txManager,
EventStoreFactory eventStoreFactory,
EventStoreRecordRepository eventStoreRecordRepository,
ServiceEventBus serviceEventBus
ServiceEventBus serviceEventBus,
@Named("eventsModuleName") String eventModuleName
) throws ServiceEventBusException {

String address = kapuaDeviceRegistrySettings.getString(KapuaDeviceRegistrySettingKeys.DEVICE_EVENT_ADDRESS);
Expand All @@ -69,6 +70,7 @@ protected ServiceModule deviceConnectionEventListenerServiceModule(DeviceConnect
txManager,
serviceEventBus
),
serviceEventBus);
serviceEventBus,
eventModuleName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
import org.eclipse.kapua.event.ServiceEventBus;
import org.eclipse.kapua.service.device.connection.listener.DeviceConnectionEventListenerService;

import java.util.UUID;

public class DeviceConnectionEventListenerServiceModule extends ServiceEventTransactionalModule implements ServiceModule {

public DeviceConnectionEventListenerServiceModule(DeviceConnectionEventListenerService deviceConnectionEventListenerService, String eventAddress, ServiceEventHouseKeeperFactory serviceEventTransactionalHousekeeperFactory,
ServiceEventBus serviceEventBus) {
ServiceEventBus serviceEventBus,
String eventModuleName) {
super(ServiceInspector.getEventBusClients(deviceConnectionEventListenerService, DeviceConnectionEventListenerService.class).toArray(new ServiceEventClientConfiguration[0]),
eventAddress,
eventModuleName + "-" + UUID.randomUUID().toString(),
serviceEventTransactionalHousekeeperFactory, serviceEventBus);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ ServiceModule deviceRegistryModule(DeviceConnectionService deviceConnectionServi
EventStoreRecordRepository eventStoreRecordRepository,
ServiceEventBus serviceEventBus,
KapuaDeviceRegistrySettings kapuaDeviceRegistrySettings,
KapuaJpaTxManagerFactory jpaTxManagerFactory
KapuaJpaTxManagerFactory jpaTxManagerFactory,
@Named("eventsModuleName") String eventModuleName
) throws ServiceEventBusException {
return new DeviceServiceModule(
deviceConnectionService,
Expand All @@ -151,7 +152,8 @@ ServiceModule deviceRegistryModule(DeviceConnectionService deviceConnectionServi
jpaTxManagerFactory.create("kapua-device"),
serviceEventBus
),
serviceEventBus);
serviceEventBus,
eventModuleName);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,25 @@
*******************************************************************************/
package org.eclipse.kapua.service.device.registry;

import java.util.Arrays;
import java.util.stream.Collectors;

import org.eclipse.kapua.commons.event.ServiceEventClientConfiguration;
import org.eclipse.kapua.commons.event.ServiceEventHouseKeeperFactory;
import org.eclipse.kapua.commons.event.ServiceEventTransactionalModule;
import org.eclipse.kapua.commons.event.ServiceInspector;
import org.eclipse.kapua.event.ServiceEventBus;
import org.eclipse.kapua.service.device.registry.connection.DeviceConnectionService;

import java.util.Arrays;
import java.util.UUID;
import java.util.stream.Collectors;

public class DeviceServiceModule extends ServiceEventTransactionalModule {

public DeviceServiceModule(DeviceConnectionService deviceConnectionService,
DeviceRegistryService deviceRegistryService,
KapuaDeviceRegistrySettings deviceRegistrySettings,
ServiceEventHouseKeeperFactory serviceEventTransactionalHousekeeperFactory,
ServiceEventBus serviceEventBus) {
ServiceEventBus serviceEventBus,
String eventModuleName) {
super(Arrays.asList(ServiceInspector.getEventBusClients(deviceRegistryService, DeviceRegistryService.class),
ServiceInspector.getEventBusClients(deviceConnectionService, DeviceConnectionService.class)
)
Expand All @@ -37,6 +39,7 @@ public DeviceServiceModule(DeviceConnectionService deviceConnectionService,
.collect(Collectors.toList())
.toArray(new ServiceEventClientConfiguration[0]),
deviceRegistrySettings.getString(KapuaDeviceRegistrySettingKeys.DEVICE_EVENT_ADDRESS),
eventModuleName + "-" + UUID.randomUUID().toString(),
serviceEventTransactionalHousekeeperFactory,
serviceEventBus);
}
Expand Down

0 comments on commit 264661a

Please sign in to comment.