Skip to content

Commit

Permalink
Use the registering bundle to load the event type
Browse files Browse the repository at this point in the history
  • Loading branch information
timothyjward committed Sep 7, 2021
1 parent 7af44f9 commit 43eb522
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 71 deletions.
Expand Up @@ -93,18 +93,21 @@ private OSGi<?> createProgram(Map<String, ?> configuration) {
serviceReferences(TypedEventHandler.class,
csr -> {
tebi.updatedTypedEventHandler(
csr.getServiceReference().getBundle(),
getServiceProps(csr.getServiceReference()));
return false;
})
.flatMap(csr -> service(csr)
.effects(
handler -> tebi.addTypedEventHandler(handler,
handler -> tebi.addTypedEventHandler(
csr.getServiceReference().getBundle(),
handler,
getServiceProps(csr.getServiceReference())),
handler -> tebi.removeTypedEventHandler(handler,
getServiceProps(csr.getServiceReference())))),
serviceReferences(UntypedEventHandler.class,
csr -> {
tebi.updatedTypedEventHandler(
tebi.updatedUntypedEventHandler(
getServiceProps(csr.getServiceReference()));
return false;
})
Expand Down
Expand Up @@ -41,6 +41,7 @@
import java.util.stream.Stream;

import org.osgi.annotation.bundle.Capability;
import org.osgi.framework.Bundle;
import org.osgi.framework.Constants;
import org.osgi.framework.Filter;
import org.osgi.framework.FrameworkUtil;
Expand Down Expand Up @@ -118,20 +119,20 @@ public TypedEventBusImpl(TypedEventMonitorImpl monitorImpl, Map<String, ?> confi
this.monitorImpl = monitorImpl;
}

void addTypedEventHandler(TypedEventHandler<?> handler, Map<String, Object> properties) {
Class<?> clazz = discoverTypeForTypedHandler(handler, properties);
void addTypedEventHandler(Bundle registeringBundle, TypedEventHandler<?> handler, Map<String, Object> properties) {
Class<?> clazz = discoverTypeForTypedHandler(registeringBundle, handler, properties);

String defaultTopic = clazz == null ? null : clazz.getName().replace(".", "/");

doAddEventHandler(topicsToTypedHandlers, knownTypedHandlers, handler, defaultTopic, properties);
}

private Class<?> discoverTypeForTypedHandler(TypedEventHandler<?> handler, Map<String, Object> properties) {
private Class<?> discoverTypeForTypedHandler(Bundle registeringBundle, TypedEventHandler<?> handler, Map<String, Object> properties) {
Class<?> clazz = null;
Object type = properties.get(TypedEventConstants.TYPED_EVENT_TYPE);
if (type != null) {
try {
clazz = handler.getClass().getClassLoader().loadClass(String.valueOf(type));
clazz = registeringBundle.loadClass(String.valueOf(type));
} catch (ClassNotFoundException e) {
// TODO Blow up
e.printStackTrace();
Expand Down Expand Up @@ -293,14 +294,14 @@ private <T, U> void doRemoveEventHandler(Map<String, Map<T, U>> map, Map<Long, T
}
}

void updatedTypedEventHandler(Map<String, Object> properties) {
void updatedTypedEventHandler(Bundle registeringBundle, Map<String, Object> properties) {
Long serviceId = getServiceId(properties);
TypedEventHandler<?> handler;
synchronized (lock) {
handler = knownTypedHandlers.get(serviceId);
}

Class<?> clazz = discoverTypeForTypedHandler(handler, properties);
Class<?> clazz = discoverTypeForTypedHandler(registeringBundle, handler, properties);

String defaultTopic = clazz == null ? null : clazz.getName().replace(".", "/");

Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.osgi.framework.Bundle;
import org.osgi.framework.Constants;
import org.osgi.service.typedevent.TypedEventHandler;
import org.osgi.service.typedevent.UnhandledEventHandler;
Expand All @@ -62,6 +63,9 @@ public static class SpecialTestEvent extends TestEvent {

}

@Mock(lenient = true)
Bundle registeringBundle;

@Mock(lenient = true)
TypedEventHandler<Object> handlerA, handlerB;

Expand All @@ -80,10 +84,13 @@ public static class SpecialTestEvent extends TestEvent {
private AutoCloseable mocks;

@BeforeEach
public void start() {
public void start() throws ClassNotFoundException {

mocks = MockitoAnnotations.openMocks(this);

Mockito.doAnswer(i -> TestEvent.class.getClassLoader().loadClass(i.getArgument(0, String.class)))
.when(registeringBundle).loadClass(Mockito.anyString());

Mockito.doAnswer(i -> {
semA.release();
return null;
Expand Down Expand Up @@ -139,15 +146,15 @@ public void testEventSending() throws InterruptedException {
serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
serviceProperties.put(SERVICE_ID, 42L);

impl.addTypedEventHandler(handlerA, serviceProperties);
impl.addTypedEventHandler(registeringBundle, handlerA, serviceProperties);

serviceProperties = new HashMap<>();

serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent2.class.getName().replace(".", "/"));
serviceProperties.put(TYPED_EVENT_TYPE, TestEvent2.class.getName());
serviceProperties.put(SERVICE_ID, 43L);

impl.addTypedEventHandler(handlerB, serviceProperties);
impl.addTypedEventHandler(registeringBundle, handlerB, serviceProperties);

serviceProperties = new HashMap<>();

Expand Down Expand Up @@ -211,20 +218,20 @@ public void testGenericTypeInference() throws InterruptedException {
Map<String, Object> serviceProperties = new HashMap<>();
serviceProperties.put(SERVICE_ID, 42L);

impl.addTypedEventHandler(handler, serviceProperties);
impl.addTypedEventHandler(registeringBundle, handler, serviceProperties);

serviceProperties = new HashMap<>();

serviceProperties.put(TYPED_EVENT_TYPE, SpecialTestEvent.class.getName());
serviceProperties.put(SERVICE_ID, 43L);

impl.addTypedEventHandler(handler2, serviceProperties);
impl.addTypedEventHandler(registeringBundle, handler2, serviceProperties);

serviceProperties = new HashMap<>();

serviceProperties.put(SERVICE_ID, 44L);

impl.addTypedEventHandler(handler3, serviceProperties);
impl.addTypedEventHandler(registeringBundle, handler3, serviceProperties);

impl.deliver(event);

Expand Down Expand Up @@ -265,15 +272,15 @@ public void testUntypedEventSending() throws InterruptedException {
serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
serviceProperties.put(SERVICE_ID, 42L);

impl.addTypedEventHandler(handlerA, serviceProperties);
impl.addTypedEventHandler(registeringBundle, handlerA, serviceProperties);

serviceProperties = new HashMap<>();

serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent2.class.getName().replace('.', '/'));
serviceProperties.put(TYPED_EVENT_TYPE, TestEvent2.class.getName());
serviceProperties.put(SERVICE_ID, 43L);

impl.addTypedEventHandler(handlerB, serviceProperties);
impl.addTypedEventHandler(registeringBundle, handlerB, serviceProperties);

serviceProperties = new HashMap<>();

Expand Down Expand Up @@ -322,7 +329,7 @@ public void testEventFiltering() throws InterruptedException {
serviceProperties.put(TYPED_EVENT_FILTER, "(message=foo)");
serviceProperties.put(SERVICE_ID, 42L);

impl.addTypedEventHandler(handlerA, serviceProperties);
impl.addTypedEventHandler(registeringBundle, handlerA, serviceProperties);

serviceProperties = new HashMap<>();

Expand All @@ -331,7 +338,7 @@ public void testEventFiltering() throws InterruptedException {
serviceProperties.put(TYPED_EVENT_FILTER, "(message=bar)");
serviceProperties.put(SERVICE_ID, 43L);

impl.addTypedEventHandler(handlerB, serviceProperties);
impl.addTypedEventHandler(registeringBundle, handlerB, serviceProperties);

serviceProperties = new HashMap<>();

Expand Down Expand Up @@ -403,7 +410,7 @@ public void testEventFilteringWithEmptyStringFilter() throws InterruptedExceptio
serviceProperties.put(TYPED_EVENT_FILTER, "");
serviceProperties.put(SERVICE_ID, 42L);

impl.addTypedEventHandler(handlerA, serviceProperties);
impl.addTypedEventHandler(registeringBundle, handlerA, serviceProperties);

TestEvent event = new TestEvent();
event.message = "foo";
Expand All @@ -428,7 +435,7 @@ public void testUnhandledEventHandlers() throws InterruptedException {
serviceProperties.put(TYPED_EVENT_FILTER, "(message=foo)");
serviceProperties.put(SERVICE_ID, 42L);

impl.addTypedEventHandler(handlerA, serviceProperties);
impl.addTypedEventHandler(registeringBundle, handlerA, serviceProperties);

serviceProperties = new HashMap<>();

Expand Down
48 changes: 24 additions & 24 deletions org.apache.aries.typedevent.bus/test.bndrun
Expand Up @@ -27,27 +27,27 @@

-resolve.effective: active
-runbundles: \
ch.qos.logback.classic;version='[1.2.3,1.2.4)',\
ch.qos.logback.core;version='[1.2.3,1.2.4)',\
org.apache.aries.component-dsl.component-dsl;version='[1.2.2,1.2.3)',\
org.apache.aries.typedevent.bus;version='[0.0.1,0.0.2)',\
org.apache.felix.configadmin;version='[1.9.18,1.9.19)',\
org.apache.felix.converter;version='[1.0.14,1.0.15)',\
org.osgi.service.typedevent;version='[1.0.0,1.0.1)',\
org.osgi.util.function;version='[1.1.0,1.1.1)',\
org.osgi.util.promise;version='[1.1.1,1.1.2)',\
org.osgi.util.pushstream;version='[1.0.1,1.0.2)',\
slf4j.api;version='[1.7.30,1.7.31)',\
junit-jupiter-api;version='[5.6.2,5.6.3)',\
junit-platform-commons;version='[1.6.2,1.6.3)',\
net.bytebuddy.byte-buddy;version='[1.10.13,1.10.14)',\
net.bytebuddy.byte-buddy-agent;version='[1.10.13,1.10.14)',\
org.apache.aries.typedevent.bus-tests;version='[0.0.1,0.0.2)',\
org.mockito.mockito-core;version='[3.5.10,3.5.11)',\
org.objenesis;version='[3.1.0,3.1.1)',\
org.opentest4j;version='[1.2.0,1.2.1)',\
org.osgi.test.common;version='[0.9.0,0.9.1)',\
org.osgi.test.junit5;version='[0.9.0,0.9.1)',\
junit-platform-engine;version='[1.6.2,1.6.3)',\
junit-platform-launcher;version='[1.6.2,1.6.3)',\
junit-jupiter-engine;version='[5.6.2,5.6.3)'
ch.qos.logback.classic;version='[1.2.3,1.2.4)',\
ch.qos.logback.core;version='[1.2.3,1.2.4)',\
org.apache.aries.component-dsl.component-dsl;version='[1.2.2,1.2.3)',\
org.apache.aries.typedevent.bus;version='[0.0.1,0.0.2)',\
org.apache.felix.configadmin;version='[1.9.18,1.9.19)',\
org.apache.felix.converter;version='[1.0.14,1.0.15)',\
org.osgi.service.typedevent;version='[1.0.0,1.0.1)',\
org.osgi.util.function;version='[1.1.0,1.1.1)',\
org.osgi.util.promise;version='[1.1.1,1.1.2)',\
org.osgi.util.pushstream;version='[1.0.1,1.0.2)',\
slf4j.api;version='[1.7.30,1.7.31)',\
junit-jupiter-api;version='[5.6.2,5.6.3)',\
junit-platform-commons;version='[1.6.2,1.6.3)',\
net.bytebuddy.byte-buddy;version='[1.10.13,1.10.14)',\
net.bytebuddy.byte-buddy-agent;version='[1.10.13,1.10.14)',\
org.apache.aries.typedevent.bus-tests;version='[0.0.1,0.0.2)',\
org.mockito.mockito-core;version='[3.5.10,3.5.11)',\
org.objenesis;version='[3.1.0,3.1.1)',\
org.opentest4j;version='[1.2.0,1.2.1)',\
org.osgi.test.common;version='[0.9.0,0.9.1)',\
org.osgi.test.junit5;version='[0.9.0,0.9.1)',\
junit-platform-engine;version='[1.6.2,1.6.3)',\
junit-platform-launcher;version='[1.6.2,1.6.3)',\
junit-jupiter-engine;version='[5.6.2,5.6.3)'
Expand Up @@ -27,30 +27,30 @@

-resolve.effective: active
-runbundles: \
ch.qos.logback.classic;version='[1.2.3,1.2.4)',\
ch.qos.logback.core;version='[1.2.3,1.2.4)',\
org.apache.aries.component-dsl.component-dsl;version='[1.2.2,1.2.3)',\
org.apache.felix.configadmin;version='[1.9.18,1.9.19)',\
org.apache.felix.converter;version='[1.0.14,1.0.15)',\
org.osgi.service.typedevent;version='[1.0.0,1.0.1)',\
org.osgi.util.function;version='[1.1.0,1.1.1)',\
org.osgi.util.promise;version='[1.1.1,1.1.2)',\
org.osgi.util.pushstream;version='[1.0.1,1.0.2)',\
slf4j.api;version='[1.7.30,1.7.31)',\
junit-jupiter-api;version='[5.6.2,5.6.3)',\
junit-platform-commons;version='[1.6.2,1.6.3)',\
net.bytebuddy.byte-buddy;version='[1.10.13,1.10.14)',\
net.bytebuddy.byte-buddy-agent;version='[1.10.13,1.10.14)',\
org.mockito.mockito-core;version='[3.5.10,3.5.11)',\
org.objenesis;version='[3.1.0,3.1.1)',\
org.opentest4j;version='[1.2.0,1.2.1)',\
org.osgi.test.common;version='[0.9.0,0.9.1)',\
org.osgi.test.junit5;version='[0.9.0,0.9.1)',\
junit-platform-engine;version='[1.6.2,1.6.3)',\
junit-platform-launcher;version='[1.6.2,1.6.3)',\
junit-jupiter-engine;version='[5.6.2,5.6.3)',\
org.apache.aries.typedevent.remote.api;version='[0.0.1,0.0.2)',\
org.apache.aries.typedevent.remote.remoteservices;version='[0.0.1,0.0.2)',\
org.apache.aries.typedevent.remote.remoteservices-tests;version='[0.0.1,0.0.2)',\
org.apache.aries.typedevent.remote.spi;version='[0.0.1,0.0.2)',\
org.apache.aries.typedevent.bus;version='[0.0.1,0.0.2)'
ch.qos.logback.classic;version='[1.2.3,1.2.4)',\
ch.qos.logback.core;version='[1.2.3,1.2.4)',\
org.apache.aries.component-dsl.component-dsl;version='[1.2.2,1.2.3)',\
org.apache.felix.configadmin;version='[1.9.18,1.9.19)',\
org.apache.felix.converter;version='[1.0.14,1.0.15)',\
org.osgi.service.typedevent;version='[1.0.0,1.0.1)',\
org.osgi.util.function;version='[1.1.0,1.1.1)',\
org.osgi.util.promise;version='[1.1.1,1.1.2)',\
org.osgi.util.pushstream;version='[1.0.1,1.0.2)',\
slf4j.api;version='[1.7.30,1.7.31)',\
junit-jupiter-api;version='[5.6.2,5.6.3)',\
junit-platform-commons;version='[1.6.2,1.6.3)',\
net.bytebuddy.byte-buddy;version='[1.10.13,1.10.14)',\
net.bytebuddy.byte-buddy-agent;version='[1.10.13,1.10.14)',\
org.mockito.mockito-core;version='[3.5.10,3.5.11)',\
org.objenesis;version='[3.1.0,3.1.1)',\
org.opentest4j;version='[1.2.0,1.2.1)',\
org.osgi.test.common;version='[0.9.0,0.9.1)',\
org.osgi.test.junit5;version='[0.9.0,0.9.1)',\
junit-platform-engine;version='[1.6.2,1.6.3)',\
junit-platform-launcher;version='[1.6.2,1.6.3)',\
junit-jupiter-engine;version='[5.6.2,5.6.3)',\
org.apache.aries.typedevent.remote.api;version='[0.0.1,0.0.2)',\
org.apache.aries.typedevent.remote.remoteservices;version='[0.0.1,0.0.2)',\
org.apache.aries.typedevent.remote.remoteservices-tests;version='[0.0.1,0.0.2)',\
org.apache.aries.typedevent.remote.spi;version='[0.0.1,0.0.2)',\
org.apache.aries.typedevent.bus;version='[0.0.1,0.0.2)'

0 comments on commit 43eb522

Please sign in to comment.