Skip to content

Commit

Permalink
#491 : provide a way to share an executor between several clients
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernard31 committed Feb 7, 2020
1 parent 8287d7f commit 9f3cb0a
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 9 deletions.
Expand Up @@ -18,6 +18,7 @@
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;

import org.eclipse.californium.core.CoapServer;
import org.eclipse.californium.core.network.config.NetworkConfig;
Expand Down Expand Up @@ -62,7 +63,7 @@ public class LeshanClient implements LwM2mClient {
public LeshanClient(String endpoint, InetSocketAddress localAddress,
List<? extends LwM2mObjectEnabler> objectEnablers, NetworkConfig coapConfig, Builder dtlsConfigBuilder,
EndpointFactory endpointFactory, Map<String, String> additionalAttributes, final LwM2mNodeEncoder encoder,
final LwM2mNodeDecoder decoder) {
final LwM2mNodeDecoder decoder, ScheduledExecutorService sharedExecutor) {

Validate.notNull(endpoint);
Validate.notEmpty(objectEnablers);
Expand All @@ -84,6 +85,9 @@ protected Resource createRoot() {
return new org.eclipse.leshan.client.californium.RootResource(bootstrapHandler, this);
}
};
if (sharedExecutor != null) {
clientSideServer.setExecutors(sharedExecutor, sharedExecutor, true);
}

// Create CoAP resources for each lwm2m Objects.
for (LwM2mObjectEnabler enabler : objectEnablers) {
Expand Down Expand Up @@ -119,7 +123,7 @@ public void objectRemoved(LwM2mObjectEnabler object) {

// Create registration engine
engine = new RegistrationEngine(endpoint, objectTree.getObjectEnablers(), endpointsManager, requestSender,
bootstrapHandler, observers, additionalAttributes);
bootstrapHandler, observers, additionalAttributes, sharedExecutor);

RegistrationUpdateHandler registrationUpdateHandler = new RegistrationUpdateHandler(engine, bootstrapHandler);
registrationUpdateHandler.listen(objectTree);
Expand Down
Expand Up @@ -18,9 +18,11 @@
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;

import org.eclipse.californium.core.network.config.NetworkConfig;
import org.eclipse.californium.core.network.config.NetworkConfig.Keys;
import org.eclipse.californium.elements.Connector;
import org.eclipse.californium.elements.EndpointContextMatcher;
import org.eclipse.californium.elements.UDPConnector;
import org.eclipse.californium.scandium.DTLSConnector;
Expand Down Expand Up @@ -61,6 +63,8 @@ public class LeshanClientBuilder {
private EndpointFactory endpointFactory;
private Map<String, String> additionalAttributes;

private ScheduledExecutorService executor;

/**
* Creates a new instance for setting the configuration options for a {@link LeshanClient} instance.
*
Expand Down Expand Up @@ -163,6 +167,25 @@ public LeshanClientBuilder setAdditionalAttributes(Map<String, String> additiona
return this;
}

/**
* Set a shared executor. This executor will be used everywhere it is possible. This is generally used when you want
* to limit the number of thread to use or if you want to simulate a lot of clients sharing the same thread pool.
* <p>
* Currently UDP and DTLS receiver and sender thread could not be share meaning that you will at least consume 2
* thread by client + the number of thread available in the shared executor (see <a
* href=https://github.com/eclipse/californium/issues/1203>californium#1203 issue</a>)
* <p>
* Executor will not be shutdown automatically on {@link LeshanClient#destroy(boolean)}, this should be done
* manually.
*
* @param executor the executor to share.
* @return the builder for fluent client creation.
*/
public LeshanClientBuilder setSharedExecutor(ScheduledExecutorService executor) {
this.executor = executor;
return this;
}

public static NetworkConfig createDefaultNetworkConfig() {
NetworkConfig networkConfig = new NetworkConfig();
networkConfig.set(Keys.MID_TRACKER, "NULL");
Expand Down Expand Up @@ -200,6 +223,15 @@ public LeshanClient build() {
protected EndpointContextMatcher createSecuredContextMatcher() {
return null; // use default californium one.
}

@Override
protected Connector createSecuredConnector(DtlsConnectorConfig dtlsConfig) {
DTLSConnector dtlsConnector = new DTLSConnector(dtlsConfig);
if (executor != null) {
dtlsConnector.setExecutor(executor);
}
return dtlsConnector;
}
};
}

Expand Down Expand Up @@ -239,6 +271,6 @@ protected EndpointContextMatcher createSecuredContextMatcher() {
}

return new LeshanClient(endpoint, localAddress, objectEnablers, coapConfig, dtlsConfigBuilder, endpointFactory,
additionalAttributes, encoder, decoder);
additionalAttributes, encoder, decoder, executor);
}
}
Expand Up @@ -97,12 +97,12 @@ private static enum Status {
private Future<?> bootstrapFuture;
private Future<?> registerFuture;
private Future<?> updateFuture;
private final ScheduledExecutorService schedExecutor = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("RegistrationEngine#%d"));
private final ScheduledExecutorService schedExecutor;
private final boolean attachedExecutor;

public RegistrationEngine(String endpoint, Map<Integer, LwM2mObjectEnabler> objectEnablers,
EndpointsManager endpointsManager, LwM2mRequestSender requestSender, BootstrapHandler bootstrapState,
LwM2mClientObserver observer, Map<String, String> additionalAttributes) {
LwM2mClientObserver observer, Map<String, String> additionalAttributes, ScheduledExecutorService executor) {
this.endpoint = endpoint;
this.objectEnablers = objectEnablers;
this.bootstrapHandler = bootstrapState;
Expand All @@ -111,9 +111,21 @@ public RegistrationEngine(String endpoint, Map<Integer, LwM2mObjectEnabler> obje
this.additionalAttributes = additionalAttributes;
this.registeredServers = new ConcurrentHashMap<>();

if (executor == null) {
schedExecutor = createScheduledExecutor();
attachedExecutor = true;
} else {
schedExecutor = executor;
attachedExecutor = false;
}

sender = requestSender;
}

protected ScheduledExecutorService createScheduledExecutor() {
return Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("RegistrationEngine#%d"));
}

public void start() {
stop(false); // Stop without de-register
synchronized (this) {
Expand Down Expand Up @@ -559,10 +571,17 @@ public void destroy(boolean deregister) {
wasStarted = started;
started = false;
}
// TODO we should manage the case where we stop in the middle of a bootstrap session ...
schedExecutor.shutdownNow();
try {
schedExecutor.awaitTermination(BS_TIMEOUT, TimeUnit.SECONDS);
// TODO we should manage the case where we stop in the middle of a bootstrap session ...
if (attachedExecutor) {
schedExecutor.shutdownNow();
schedExecutor.awaitTermination(BS_TIMEOUT, TimeUnit.SECONDS);
} else {
cancelUpdateTask(true);
cancelRegistrationTask();
// TODO we should manage the case where we stop in the middle of a bootstrap session ...
cancelBootstrapTask();
}
if (wasStarted && deregister) {
// TODO we currently support only one dm server.
if (!registeredServers.isEmpty()) {
Expand Down

0 comments on commit 9f3cb0a

Please sign in to comment.