Skip to content
Permalink
Browse files
Use the Component DSL to run the whiteboard, contribute integration t…
…ests from the BRAIN-IoT prototype
  • Loading branch information
timothyjward committed Sep 18, 2020
1 parent e7a191c commit 6635fbe86d0bdd5a69522a7bd73cbcea250214bd
Showing 17 changed files with 1,734 additions and 229 deletions.
@@ -22,6 +22,18 @@

<artifactId>org.apache.aries.typedevent.bus</artifactId>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.aries.typedevent</groupId>
<artifactId>typedevent-test-bom</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.osgi</groupId>
@@ -67,5 +79,38 @@
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.test.junit5</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>biz.aQute.bnd</groupId>
<artifactId>bnd-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>biz.aQute.bnd</groupId>
<artifactId>bnd-resolver-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>biz.aQute.bnd</groupId>
<artifactId>bnd-testing-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>biz.aQute.bnd</groupId>
<artifactId>bnd-run-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

-tester: biz.aQute.tester.junit-platform

-runfw: org.apache.felix.framework

-runrequires: bnd.identity;id="org.apache.aries.typedevent.bus",\
bnd.identity;id="org.apache.felix.gogo.shell",\
bnd.identity;id="org.apache.felix.gogo.runtime",\
bnd.identity;id="org.apache.felix.gogo.command"


-resolve.effective: active
-runbundles: \
ch.qos.logback.classic;version='[1.2.3,1.2.4)',\
ch.qos.logback.core;version='[1.2.3,1.2.4)',\
org.apache.aries.component-dsl.component-dsl;version='[1.2.2,1.2.3)',\
org.apache.aries.typedevent.bus;version='[0.0.1,0.0.2)',\
org.apache.felix.configadmin;version='[1.9.18,1.9.19)',\
org.apache.felix.converter;version='[1.0.14,1.0.15)',\
org.osgi.service.typedevent;version='[1.0.0,1.0.1)',\
org.osgi.util.function;version='[1.1.0,1.1.1)',\
org.osgi.util.promise;version='[1.1.1,1.1.2)',\
org.osgi.util.pushstream;version='[1.0.1,1.0.2)',\
slf4j.api;version='[1.7.30,1.7.31)',\
org.apache.felix.gogo.command;version='[1.0.2,1.0.3)',\
org.apache.felix.gogo.runtime;version='[1.0.10,1.0.11)',\
org.apache.felix.gogo.shell;version='[1.0.0,1.0.1)'
@@ -18,28 +18,36 @@
package org.apache.aries.typedevent.bus.impl;

import static java.util.function.Function.identity;
import static org.apache.aries.component.dsl.OSGi.all;
import static org.apache.aries.component.dsl.OSGi.coalesce;
import static org.apache.aries.component.dsl.OSGi.configuration;
import static org.apache.aries.component.dsl.OSGi.nothing;
import static org.apache.aries.component.dsl.OSGi.just;
import static org.apache.aries.component.dsl.OSGi.register;
import static org.apache.aries.component.dsl.OSGi.service;
import static org.apache.aries.component.dsl.OSGi.serviceReferences;

import java.util.Arrays;
import java.util.Dictionary;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;

import org.apache.aries.component.dsl.OSGi;
import org.apache.aries.component.dsl.OSGiResult;
import org.osgi.annotation.bundle.Header;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.typedevent.TypedEventBus;
import org.osgi.service.typedevent.TypedEventHandler;
import org.osgi.service.typedevent.UnhandledEventHandler;
import org.osgi.service.typedevent.UntypedEventHandler;
import org.osgi.service.typedevent.monitor.TypedEventMonitor;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -48,136 +56,87 @@ public class TypedEventBusActivator implements BundleActivator {

private static final Logger _log = LoggerFactory.getLogger(TypedEventBusActivator.class);

private TypedEventMonitorImpl monitorImpl;
private ServiceRegistration<TypedEventMonitor> monitorReg;

private TypedEventBusImpl busImpl;
private ServiceRegistration<TypedEventBus> busReg;

private ServiceTracker<TypedEventHandler<?>, TypedEventHandler<?>> typedTracker;
private ServiceTracker<UntypedEventHandler, UntypedEventHandler> untypedTracker;
private ServiceTracker<UnhandledEventHandler, UnhandledEventHandler> unhandledTracker;
OSGiResult eventBus;

@Override
public void start(BundleContext bundleContext) throws Exception {
if (_log.isDebugEnabled()) {
_log.debug("Aries Typed Event Bus Starting");
}

// TODO use Config Admin

Map<String, Object> map = new HashMap<String, Object>();

createEventBus(bundleContext, map);
eventBus = coalesce(
configuration("org.apache.aries.typedevent.bus"),
just(Hashtable::new)
)
.map(this::toConfigProps)
.flatMap(configuration -> createProgram(configuration))
.run(bundleContext);

if (_log.isDebugEnabled()) {
_log.debug("Aries Typed Event Bus Started");
}
}

private void createEventBus(BundleContext bundleContext, Map<String, ?> configuration) throws Exception {

Dictionary<String, Object> serviceProps = toServiceProps(configuration);

monitorImpl = new TypedEventMonitorImpl(configuration);
busImpl = new TypedEventBusImpl(monitorImpl, configuration);

untypedTracker = new ServiceTracker<>(bundleContext, UntypedEventHandler.class,
new ServiceTrackerCustomizer<UntypedEventHandler, UntypedEventHandler>() {

@Override
public UntypedEventHandler addingService(ServiceReference<UntypedEventHandler> reference) {
UntypedEventHandler service = bundleContext.getService(reference);
busImpl.addUntypedEventHandler(service, getServiceProps(reference));
return service;
}

@Override
public void modifiedService(ServiceReference<UntypedEventHandler> reference,
UntypedEventHandler service) {
busImpl.updatedUntypedEventHandler(service, getServiceProps(reference));
}

@Override
public void removedService(ServiceReference<UntypedEventHandler> reference,
UntypedEventHandler service) {
busImpl.removeUntypedEventHandler(service, getServiceProps(reference));
}
});

untypedTracker = new ServiceTracker<>(bundleContext, UntypedEventHandler.class,
new ServiceTrackerCustomizer<UntypedEventHandler, UntypedEventHandler>() {

@Override
public UntypedEventHandler addingService(ServiceReference<UntypedEventHandler> reference) {
UntypedEventHandler service = bundleContext.getService(reference);
busImpl.addUntypedEventHandler(service, getServiceProps(reference));
return service;
}

@Override
public void modifiedService(ServiceReference<UntypedEventHandler> reference,
UntypedEventHandler service) {
busImpl.updatedUntypedEventHandler(service, getServiceProps(reference));
}

@Override
public void removedService(ServiceReference<UntypedEventHandler> reference,
UntypedEventHandler service) {
busImpl.removeUntypedEventHandler(service, getServiceProps(reference));
}
});

unhandledTracker = new ServiceTracker<>(bundleContext, UnhandledEventHandler.class,
new ServiceTrackerCustomizer<UnhandledEventHandler, UnhandledEventHandler>() {

@Override
public UnhandledEventHandler addingService(ServiceReference<UnhandledEventHandler> reference) {
UnhandledEventHandler service = bundleContext.getService(reference);
busImpl.addUnhandledEventHandler(service, getServiceProps(reference));
return service;
}

@Override
public void modifiedService(ServiceReference<UnhandledEventHandler> reference,
UnhandledEventHandler service) {
}

@Override
public void removedService(ServiceReference<UnhandledEventHandler> reference,
UnhandledEventHandler service) {
busImpl.removeUnhandledEventHandler(service, getServiceProps(reference));
}
});

try {
busImpl.start();

monitorReg = bundleContext.registerService(TypedEventMonitor.class, monitorImpl, serviceProps);

typedTracker.open();
untypedTracker.open();
unhandledTracker.open();

busReg = bundleContext.registerService(TypedEventBus.class, busImpl, serviceProps);

} catch (Exception e) {
stop(bundleContext);
}
private OSGi<?> createProgram(Map<String, ?> configuration) {

Map<String, Object> serviceProps = toServiceProps(configuration);

return just(configuration)
.map(TypedEventMonitorImpl::new)
.effects(x -> { }, TypedEventMonitorImpl::destroy)
.flatMap(
temi -> register(TypedEventMonitor.class, temi, serviceProps)
.then(just(new TypedEventBusImpl(temi, configuration))
.effects(TypedEventBusImpl::start, TypedEventBusImpl::stop)))
.flatMap(
tebi -> all(
serviceReferences(TypedEventHandler.class,
csr -> {
tebi.updatedTypedEventHandler(
getServiceProps(csr.getServiceReference()));
return false;
})
.flatMap(csr -> service(csr)
.effects(
handler -> tebi.addTypedEventHandler(handler,
getServiceProps(csr.getServiceReference())),
handler -> tebi.removeTypedEventHandler(handler,
getServiceProps(csr.getServiceReference())))),
serviceReferences(UntypedEventHandler.class,
csr -> {
tebi.updatedTypedEventHandler(
getServiceProps(csr.getServiceReference()));
return false;
})
.flatMap(csr -> service(csr)
.effects(
handler -> tebi.addUntypedEventHandler(handler,
getServiceProps(csr.getServiceReference())),
handler -> tebi.removeUntypedEventHandler(handler,
getServiceProps(csr.getServiceReference())))),
serviceReferences(UnhandledEventHandler.class)
.flatMap(csr -> service(csr)
.effects(handler -> tebi.addUnhandledEventHandler(handler,
getServiceProps(csr.getServiceReference())),
handler -> tebi.removeUnhandledEventHandler(handler,
getServiceProps(csr.getServiceReference())))),
register(TypedEventBus.class, tebi, serviceProps)
.flatMap(x -> nothing())));


}

private void safeUnregister(ServiceRegistration<?> reg) {
try {
reg.unregister();
} catch (IllegalStateException ise) {
// no op
// TODO LOG this
private Map<String, Object> toConfigProps(Dictionary<String, ?> config) {
Enumeration<String> keys = config.keys();
Map<String, Object> map = new HashMap<>();
while(keys.hasMoreElements()) {
String key = keys.nextElement();
map.put(key, config.get(key));
}

return map;
}

private Dictionary<String, Object> toServiceProps(Map<String, ?> config) {
private Map<String, Object> toServiceProps(Map<String, ?> config) {
return config.entrySet().stream().filter(e -> e.getKey() != null && e.getKey().startsWith("."))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> {
throw new IllegalArgumentException("Duplicate key ");
@@ -194,30 +153,7 @@ public void stop(BundleContext context) throws Exception {
_log.debug("Aries Typed Event Bus Stopping");
}

// Order matters here
if (busReg != null) {
safeUnregister(busReg);
}

if (busImpl != null) {
busImpl.stop();
}

if (typedTracker != null) {
typedTracker.close();
}
if (untypedTracker != null) {
untypedTracker.close();
}
if (unhandledTracker != null) {
unhandledTracker.close();
}

if (monitorReg != null) {
safeUnregister(monitorReg);
}

monitorImpl.destroy();
eventBus.close();

if (_log.isDebugEnabled()) {
_log.debug("Aries Typed Event Bus Stopped");

0 comments on commit 6635fbe

Please sign in to comment.