Skip to content

Commit

Permalink
Fix concurrent issue between stop/update on leshan client
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernard31 committed Sep 4, 2017
1 parent a6c55c8 commit 2d57883
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 34 deletions.
Expand Up @@ -21,7 +21,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.leshan.LwM2m;
import org.eclipse.leshan.ResponseCode;
Expand Down Expand Up @@ -66,14 +65,14 @@ public class RegistrationEngine {
private final Map<Integer, LwM2mObjectEnabler> objectEnablers;
private final BootstrapHandler bootstrapHandler;
private final LwM2mClientObserver observer;
private final AtomicBoolean started = new AtomicBoolean(false);
private boolean started = false;

// registration update
private String registrationID;
private Future<?> registerFuture;
private ScheduledFuture<?> updateFuture;
private final ScheduledExecutorService schedExecutor = Executors.newScheduledThreadPool(2,
new NamedThreadFactory("RegistrationEngine#%d"));
private final ScheduledExecutorService schedExecutor = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("RegistrationEngine#%d"));

public RegistrationEngine(String endpoint, Map<Integer, LwM2mObjectEnabler> objectEnablers,
LwM2mRequestSender requestSender, BootstrapHandler bootstrapState, LwM2mClientObserver observer) {
Expand All @@ -87,8 +86,10 @@ public RegistrationEngine(String endpoint, Map<Integer, LwM2mObjectEnabler> obje

public void start() {
stop(false); // stop without de-register
started.set(true);
registerFuture = schedExecutor.submit(new RegistrationTask());
synchronized (this) {
started = true;
registerFuture = schedExecutor.submit(new RegistrationTask());
}
}

private boolean bootstrap() throws InterruptedException {
Expand Down Expand Up @@ -157,11 +158,10 @@ private boolean register() throws InterruptedException {

// send register request
LOG.info("Trying to register to {} ...", dmInfo.getFullUri());
RegisterResponse response = sender
.send(dmInfo.getAddress(),
dmInfo.isSecure(), new RegisterRequest(endpoint, dmInfo.lifetime, LwM2m.VERSION, dmInfo.binding,
null, LinkFormatHelper.getClientDescription(objectEnablers.values(), null), null),
null);
RegisterResponse response = sender.send(dmInfo.getAddress(), dmInfo.isSecure(),
new RegisterRequest(endpoint, dmInfo.lifetime, LwM2m.VERSION, dmInfo.binding, null,
LinkFormatHelper.getClientDescription(objectEnablers.values(), null), null),
null);
if (response == null) {
registrationID = null;
LOG.error("Registration failed: Timeout.");
Expand All @@ -170,11 +170,11 @@ private boolean register() throws InterruptedException {
}
} else if (response.isSuccess()) {
registrationID = response.getRegistrationID();
LOG.info("Registered with location '{}'.", response.getRegistrationID());

// update every lifetime period
scheduleUpdate(dmInfo);

LOG.info("Registered with location '{}'.", response.getRegistrationID());
if (observer != null) {
observer.onRegistrationSuccess(dmInfo, response.getRegistrationID());
}
Expand Down Expand Up @@ -233,8 +233,6 @@ private boolean deregister() throws InterruptedException {
}

private boolean update() throws InterruptedException {
cancelUpdateTask(false);

ServersInfo serversInfo = ServersInfoExtractor.getInfo(objectEnablers);
DmServerInfo dmInfo = serversInfo.deviceMangements.values().iterator().next();
if (dmInfo == null) {
Expand All @@ -255,8 +253,8 @@ private boolean update() throws InterruptedException {
return false;
} else if (response.getCode() == ResponseCode.CHANGED) {
// Update successful, so we reschedule new update
scheduleUpdate(dmInfo);
LOG.info("Registration update succeed.");
scheduleUpdate(dmInfo);
if (observer != null) {
observer.onUpdateSuccess(dmInfo, registrationID);
}
Expand Down Expand Up @@ -296,22 +294,22 @@ public void run() {
}
}
} catch (InterruptedException e) {
LOG.info("Registration task interrupted.");
LOG.info("Registration task interrupted. ");
} catch (RuntimeException e) {
LOG.error("Unexpected exception during update registration task", e);
}
}
}

private void scheduleRegistration() {
if (started.get()) {
private synchronized void scheduleRegistration() {
if (started) {
LOG.info("Unable to connect to any server, next retry in {}s...", BS_RETRY);
registerFuture = schedExecutor.schedule(new RegistrationTask(), BS_RETRY, TimeUnit.SECONDS);
}
}

private void scheduleUpdate(DmServerInfo dmInfo) {
if (started.get()) {
private synchronized void scheduleUpdate(DmServerInfo dmInfo) {
if (started) {
// calculate next update : lifetime - 10%
// dmInfo.lifetime is in seconds
long nextUpdate = dmInfo.lifetime * 900l;
Expand Down Expand Up @@ -356,10 +354,14 @@ private void cancelRegistrationTask() {
}

public void stop(boolean deregister) {
started.set(false);
cancelUpdateTask(true);
// TODO we should manage the case where we stop in the middle of a bootstrap session ...
cancelRegistrationTask();
synchronized (this) {
if (!started)
return;
started = false;
cancelUpdateTask(true);
// TODO we should manage the case where we stop in the middle of a bootstrap session ...
cancelRegistrationTask();
}
try {
if (deregister)
deregister();
Expand All @@ -368,7 +370,9 @@ public void stop(boolean deregister) {
}

public void destroy(boolean deregister) {
started.set(false);
synchronized (this) {
started = false;
}
// TODO we should manage the case where we stop in the middle of a bootstrap session ...
schedExecutor.shutdownNow();
try {
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.leshan.LwM2mId;
import org.eclipse.leshan.client.LwM2mClient;
Expand Down Expand Up @@ -75,7 +76,7 @@ public class IntegrationTestHelper {
LeshanServer server;

LwM2mClient client;
String currentEndpointIdentifier;
AtomicReference<String> currentEndpointIdentifier = new AtomicReference<String>();

CountDownLatch registerLatch;
Registration last_registration;
Expand Down Expand Up @@ -109,11 +110,11 @@ protected List<ObjectModel> createObjectModels() {
}

public void initialize() {
currentEndpointIdentifier = "leshan_integration_test_" + r.nextInt();
currentEndpointIdentifier.set("leshan_integration_test_" + r.nextInt());
}

public String getCurrentEndpoint() {
return currentEndpointIdentifier;
return currentEndpointIdentifier.get();
}

public void createClient() {
Expand All @@ -137,7 +138,7 @@ public ExecuteResponse execute(int resourceid, String params) {
objects.addAll(initializer.create(2, 2000));

// Build Client
LeshanClientBuilder builder = new LeshanClientBuilder(currentEndpointIdentifier);
LeshanClientBuilder builder = new LeshanClientBuilder(currentEndpointIdentifier.get());
builder.setObjects(objects);
client = builder.build();
}
Expand All @@ -159,23 +160,23 @@ protected void setupRegistrationMonitoring() {
@Override
public void updated(RegistrationUpdate update, Registration updatedRegistration,
Registration previousRegistration) {
if (updatedRegistration.getEndpoint().equals(currentEndpointIdentifier)) {
if (updatedRegistration.getEndpoint().equals(currentEndpointIdentifier.get())) {
updateLatch.countDown();
}
}

@Override
public void unregistered(Registration registration, Collection<Observation> observations, boolean expired,
Registration newReg) {
if (registration.getEndpoint().equals(currentEndpointIdentifier)) {
if (registration.getEndpoint().equals(currentEndpointIdentifier.get())) {
deregisterLatch.countDown();
}
}

@Override
public void registered(Registration registration, Registration previousReg,
Collection<Observation> previousObsersations) {
if (registration.getEndpoint().equals(currentEndpointIdentifier)) {
if (registration.getEndpoint().equals(currentEndpointIdentifier.get())) {
last_registration = registration;
registerLatch.countDown();
}
Expand Down Expand Up @@ -238,7 +239,7 @@ public void ensureNoDeregistration(long timeInSeconds) {
}

public Registration getCurrentRegistration() {
return server.getRegistrationService().getByEndpoint(currentEndpointIdentifier);
return server.getRegistrationService().getByEndpoint(currentEndpointIdentifier.get());
}

public void deregisterClient() {
Expand All @@ -249,7 +250,7 @@ public void deregisterClient() {

public void dispose() {
deregisterClient();
currentEndpointIdentifier = null;
currentEndpointIdentifier.set(null);
}

public void assertClientRegisterered() {
Expand Down

0 comments on commit 2d57883

Please sign in to comment.