diff --git a/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceEventBusDriver.java b/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceEventBusDriver.java index c52986f56b1..62825cf2e40 100644 --- a/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceEventBusDriver.java +++ b/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceEventBusDriver.java @@ -28,4 +28,6 @@ public interface ServiceEventBusDriver { public void stop() throws ServiceEventBusException; public ServiceEventBus getEventBus(); + + public Boolean isConnected(); } diff --git a/commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java b/commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java index 201d7661f68..c4a56c6acc8 100644 --- a/commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java +++ b/commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java @@ -136,6 +136,10 @@ private void setSession(ServiceEvent kapuaEvent) { KapuaSession.createFrom(kapuaEvent.getScopeId(), kapuaEvent.getUserId()); } + public Boolean isConnected() { + return eventBusJMSConnectionBridge.isConnected(); + } + /** * Stop the event bus * @@ -209,9 +213,15 @@ private class EventBusJMSConnectionBridge { private Connection jmsConnection; private Map senders = new HashMap<>(); private ExceptionListenerImpl exceptionListener; + private Boolean connected; public EventBusJMSConnectionBridge() { this.exceptionListener = new ExceptionListenerImpl(); + connected = Boolean.FALSE; + } + + Boolean isConnected() { + return connected; } void start() throws JMSException, NamingException, ServiceEventBusException { @@ -230,6 +240,7 @@ void start() throws JMSException, NamingException, ServiceEventBusException { jmsConnection = jmsConnectionFactory.createConnection(eventbusUsername, eventbusPassword); jmsConnection.setExceptionListener(exceptionListener); + connected = Boolean.TRUE; jmsConnection.start(); } @@ -239,6 +250,7 @@ void stop() throws ServiceEventBusException { } private void closeConnection() { + connected = Boolean.FALSE; try { if (jmsConnection != null) { exceptionListener.stop(); @@ -436,6 +448,7 @@ private class ExceptionListenerImpl implements ExceptionListener { @Override public void onException(JMSException e) { + connected = Boolean.FALSE; LOGGER.error("EventBus Listener {} - Connection thrown exception: {}", this, e.getMessage(), e); commonsMetric.getEventBusConnectionError().inc(); int i = 1; diff --git a/locator/guice/src/test/java/org/eclipse/kapua/locator/internal/TestModule.java b/locator/guice/src/test/java/org/eclipse/kapua/locator/internal/TestModule.java index 32b5b014baf..c27339da1f7 100644 --- a/locator/guice/src/test/java/org/eclipse/kapua/locator/internal/TestModule.java +++ b/locator/guice/src/test/java/org/eclipse/kapua/locator/internal/TestModule.java @@ -32,6 +32,9 @@ protected void configureModule() { @Singleton ServiceEventBusDriver serviceEventBusDriver() { return new ServiceEventBusDriver() { + + private Boolean connected = Boolean.FALSE; + @Override public String getType() { return "test"; @@ -39,12 +42,12 @@ public String getType() { @Override public void start() throws ServiceEventBusException { - //Nothing to do! + connected = Boolean.TRUE; } @Override public void stop() throws ServiceEventBusException { - //Nothing to do! + connected = Boolean.FALSE; } @Override @@ -61,6 +64,11 @@ public void subscribe(String address, String name, ServiceEventBusListener event } }; } + + @Override + public Boolean isConnected() { + return connected; + } }; } } diff --git a/message/internal/src/test/java/org/eclipse/kapua/message/internal/TestModule.java b/message/internal/src/test/java/org/eclipse/kapua/message/internal/TestModule.java index 72cf04f8345..fdd64457c20 100644 --- a/message/internal/src/test/java/org/eclipse/kapua/message/internal/TestModule.java +++ b/message/internal/src/test/java/org/eclipse/kapua/message/internal/TestModule.java @@ -47,6 +47,9 @@ String eventsModuleName() { @Singleton ServiceEventBusDriver serviceEventBusDriver() { return new ServiceEventBusDriver() { + + private Boolean connected = Boolean.FALSE; + @Override public String getType() { return "test"; @@ -54,12 +57,12 @@ public String getType() { @Override public void start() throws ServiceEventBusException { - //Nothing to do! + connected = Boolean.TRUE; } @Override public void stop() throws ServiceEventBusException { - //Nothing to do! + connected = Boolean.FALSE; } @Override @@ -76,6 +79,11 @@ public void subscribe(String address, String name, ServiceEventBusListener event } }; } + + @Override + public Boolean isConnected() { + return connected; + } }; } }