Skip to content
Permalink
Browse files
Allow the remoting implementation to configure which listener interes…
…ts are made available remotely
  • Loading branch information
timothyjward committed Oct 1, 2020
1 parent 07b5f41 commit 413c2e8be1b140cc3f9869ca168f190da6d64258
Showing 5 changed files with 134 additions and 17 deletions.
@@ -0,0 +1,13 @@
package org.apache.aries.typedevent.remote.remoteservices.impl;

public @interface Config {

public static enum Selector {
ALL, WITH_FILTER, WITH_PROPERTY, CUSTOM;
}

public Selector listener_selection() default Selector.WITH_PROPERTY;

public String listener_selection_custom_filter();

}
@@ -19,13 +19,16 @@
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.aries.typedevent.remote.api.RemoteEventConstants.RECEIVE_REMOTE_EVENTS;
import static org.osgi.namespace.service.ServiceNamespace.SERVICE_NAMESPACE;
import static org.osgi.util.converter.Converters.standardConverter;

import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.aries.typedevent.remote.api.RemoteEventConstants;
@@ -57,9 +60,16 @@ public class RemoteEventBusImpl implements RemoteEventBus {
private final Map<Long, Map<String, Filter>> servicesToInterests = new HashMap<>();

private final Object lock = new Object();

public RemoteEventBusImpl(TypedEventBus eventBus) {

private final Config configuration;

public RemoteEventBusImpl(TypedEventBus eventBus, Map<String, ?> config) {
this.eventBus = eventBus;

Map<String, Object> configWithDefaults = new HashMap<String, Object>(config);
configWithDefaults.putIfAbsent("listener.selection", Config.Selector.WITH_PROPERTY);

this.configuration = standardConverter().convert(configWithDefaults).to(Config.class);
}

public void init(BundleContext ctx) {
@@ -119,18 +129,53 @@ public void notify(String topic, Map<String, Object> properties) {
* @param topics
* @param filter
*/
void updateLocalInterest(Long id, List<String> topics, Filter filter) {

boolean doUpdate = false;
void updateLocalInterest(Long id, List<String> topics, Filter filter, Map<String, ?> serviceProps) {

Map<String, Filter> newData = topics.stream()
Map<String, Filter> newData;
Supplier<Map<String, Filter>> fromTopics = () -> topics.stream()
.collect(toMap(identity(), x -> filter, (a,b) -> a));

switch(configuration.listener_selection()) {
case ALL:
newData = fromTopics.get();
break;
case CUSTOM:
String listenerFilterString = configuration.listener_selection_custom_filter();
try {
Filter listenerFilter = FrameworkUtil.createFilter(listenerFilterString);

if(listenerFilter.matches(serviceProps)) {
newData = fromTopics.get();
break;
}
} catch (InvalidSyntaxException ise) {
//TODO log that this is ignored;
}
newData = new HashMap<>();
break;
case WITH_FILTER:
newData = filter == null ? new HashMap<>() : fromTopics.get();
break;
case WITH_PROPERTY:
boolean hasProperty = Boolean.valueOf(String.valueOf(serviceProps.get(RECEIVE_REMOTE_EVENTS)));
newData = hasProperty ? fromTopics.get() : new HashMap<>();
break;
default:
newData = new HashMap<>();
break;

}

boolean doUpdate;
Map<String, Filter> updatedFilters;
synchronized(lock) {
doUpdate = true;
servicesToInterests.put(id, newData);

Map<String, Filter> tmpFilters = topicsToFilters;
topicsToFilters = getUpdatedFilters();

doUpdate = !tmpFilters.equals(topicsToFilters);

updatedFilters = topicsToFilters;
}

@@ -95,7 +95,8 @@ private OSGi<?> createProgram(Map<String, ?> configuration) {
.flatMap(remi -> register(RemoteEventMonitor.class, remi, new HashMap<>()));

OSGi<Object> remote = bundleContext().flatMap(ctx -> service(once(serviceReferences(TypedEventBus.class)))
.map(RemoteEventBusImpl::new).effects(rebi -> rebi.init(ctx), rebi -> rebi.destroy())
.map(teb -> new RemoteEventBusImpl(teb, configuration))
.effects(rebi -> rebi.init(ctx), rebi -> rebi.destroy())
.flatMap(rebi -> all(
just(new UntypedEventTracker(ctx, rebi)).map(ServiceTracker.class::cast)
.effects(st -> st.open(), st -> st.close()),
@@ -124,7 +125,7 @@ private Map<String, Object> toConfigProps(Dictionary<String, ?> config) {
return map;
}

private Map<String, Object> getServiceProps(ServiceReference<?> ref) {
private static Map<String, Object> getServiceProps(ServiceReference<?> ref) {
return Arrays.stream(ref.getPropertyKeys()).collect(Collectors.toMap(identity(), ref::getProperty));
}

@@ -185,7 +186,7 @@ public Object addingService(ServiceReference<UntypedEventHandler> reference) {
// TODO Auto-generated catch block
return reference;
}
impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter);
impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter, getServiceProps(reference));
return reference;
}

@@ -199,7 +200,7 @@ public void modifiedService(ServiceReference<UntypedEventHandler> reference, Obj
impl.removeLocalInterest(getServiceId(reference));
return;
}
impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter);
impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter, getServiceProps(reference));
}

@Override
@@ -230,7 +231,7 @@ public TypedEventHandler addingService(ServiceReference<TypedEventHandler> refer
}
List<String> topics = findTopics(reference, toReturn);
if (!topics.isEmpty()) {
impl.updateLocalInterest(getServiceId(reference), topics, filter);
impl.updateLocalInterest(getServiceId(reference), topics, filter, getServiceProps(reference));
}
return toReturn;
}
@@ -320,7 +321,7 @@ public void modifiedService(ServiceReference<TypedEventHandler> reference, Typed
if (topics.isEmpty()) {
impl.removeLocalInterest(getServiceId(reference));
} else {
impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter);
impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter, getServiceProps(reference));
}
}

@@ -16,13 +16,18 @@
*/
package org.apache.aries.typedevent.remote.remoteservices.impl;

import static java.lang.Boolean.TRUE;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.apache.aries.typedevent.remote.api.RemoteEventConstants.RECEIVE_REMOTE_EVENTS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.osgi.framework.FrameworkUtil.createFilter;

import java.util.Arrays;
import java.util.Dictionary;
import java.util.HashMap;

import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus;
import org.junit.jupiter.api.AfterEach;
@@ -65,7 +70,7 @@ public void start() {
Mockito.any(RemoteEventBus.class), Mockito.any())).thenReturn(remoteReg);
Mockito.when(remoteReg.getReference()).thenReturn(remoteRef);

remoteImpl = new RemoteEventBusImpl(eventBusImpl);
remoteImpl = new RemoteEventBusImpl(eventBusImpl, new HashMap<>());
}


@@ -98,7 +103,8 @@ public void testEmptyStart() {
@Test
public void testStartWithDetails() throws InvalidSyntaxException {

remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"));
remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"),
singletonMap(RECEIVE_REMOTE_EVENTS, TRUE));

remoteImpl.init(context);

@@ -139,7 +145,8 @@ public void testLateRegisterOfListener() throws InvalidSyntaxException {

// Add a listener to the remote

remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"));
remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"),
singletonMap(RECEIVE_REMOTE_EVENTS, TRUE));

Mockito.verify(remoteReg, Mockito.times(2)).setProperties(propsCaptor.capture());

@@ -148,4 +155,28 @@ public void testLateRegisterOfListener() throws InvalidSyntaxException {
assertEquals(RemoteEventBus.class.getName(), props.get("service.exported.interfaces"));
assertEquals(Arrays.asList("FOO=(fizz=buzz)"), props.get(RemoteEventBus.REMOTE_EVENT_FILTERS));
}

@Test
public void testStartWithNonRemoteListener() throws InvalidSyntaxException {

remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"),
emptyMap());

remoteImpl.init(context);

ArgumentCaptor<Dictionary<String, Object>> propsCaptor = ArgumentCaptor.forClass(Dictionary.class);

Mockito.verify(context).registerService(Mockito.eq(RemoteEventBus.class), Mockito.same(remoteImpl),
propsCaptor.capture());

Dictionary<String, Object> props = propsCaptor.getValue();
assertNull(props);

Mockito.verify(remoteReg).setProperties(propsCaptor.capture());

props = propsCaptor.getValue();

assertEquals(RemoteEventBus.class.getName(), props.get("service.exported.interfaces"));
assertEquals(emptyList(), props.get(RemoteEventBus.REMOTE_EVENT_FILTERS));
}
}
@@ -16,6 +16,7 @@
*/
package org.apache.aries.typedevent.remote.remoteservices.osgi;

import static org.apache.aries.typedevent.remote.api.RemoteEventConstants.RECEIVE_REMOTE_EVENTS;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
@@ -91,7 +92,7 @@ public class RemoteEventBusIntegrationTest extends AbstractIntegrationTest {
TypedEventBus bus;

@Mock
UntypedEventHandler untypedEventHandler;
UntypedEventHandler untypedEventHandler, untypedEventHandler2;

@Mock
UnhandledEventHandler unhandledEventHandler;
@@ -330,19 +331,45 @@ public void testSendToRemoteFramework() throws InterruptedException {

props = new Hashtable<>();
props.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
props.put(RECEIVE_REMOTE_EVENTS, true);
props.put(TYPED_EVENT_FILTER, "(message=boo)");

regs.add(remoteContext.registerService(UNTYPED_HANDLER,
new EventHandlerFactory(untypedEventHandler, UNTYPED_HANDLER), props));

props = new Hashtable<>();
props.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
props.put(RECEIVE_REMOTE_EVENTS, false);
props.put(TYPED_EVENT_FILTER, "(message=far)");

regs.add(remoteContext.registerService(UNTYPED_HANDLER,
new EventHandlerFactory(untypedEventHandler2, UNTYPED_HANDLER), props));


bus.deliver(event);

verify(unhandledEventHandler, Mockito.after(1000).times(1))
.notifyUnhandled(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("boo")));

verify(untypedEventHandler2, Mockito.after(1000).never())
.notifyUntyped(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("boo")));

verify(untypedEventHandler)
.notifyUntyped(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("boo")));

event = new TestEvent();
event.message = "far";

bus.deliver(event);

verify(unhandledEventHandler, Mockito.after(100).times(1))
.notifyUnhandled(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("far")));

verify(untypedEventHandler2, Mockito.after(1000).never())
.notifyUntyped(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("far")));

verify(untypedEventHandler, Mockito.after(1000).never())
.notifyUntyped(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("far")));
}

}

0 comments on commit 413c2e8

Please sign in to comment.