Skip to content

Commit

Permalink
#206 Add a way to handle exception on new notification
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernard31 committed Jan 25, 2017
1 parent 2d0a43b commit c7c25f5
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 19 deletions.
Expand Up @@ -329,12 +329,19 @@ private final class TestObservationListener implements ObservationListener {
private ObserveResponse response;

@Override
public void newValue(Observation observation, ObserveResponse response) {
public void onResponse(Observation observation, ObserveResponse response) {
receivedNotify.set(true);
this.response = response;
latch.countDown();
}

@Override
public void onError(Observation observation, Exception error) {
receivedNotify.set(true);
this.response = null;
latch.countDown();
}

@Override
public void cancelled(Observation observation) {
latch.countDown();
Expand Down
Expand Up @@ -43,8 +43,8 @@
import org.eclipse.leshan.server.californium.CaliforniumRegistrationStore;
import org.eclipse.leshan.server.client.Registration;
import org.eclipse.leshan.server.model.LwM2mModelProvider;
import org.eclipse.leshan.server.observation.ObservationService;
import org.eclipse.leshan.server.observation.ObservationListener;
import org.eclipse.leshan.server.observation.ObservationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -212,15 +212,15 @@ public void onNotification(Request coapRequest, Response coapResponse) {

if (coapResponse.getCode() == CoAP.ResponseCode.CHANGED
|| coapResponse.getCode() == CoAP.ResponseCode.CONTENT) {
try {
// get registration Id
String regid = coapRequest.getUserContext().get(CTX_REGID);

// get observation for this request
Observation observation = registrationStore.getObservation(regid, coapResponse.getToken());
if (observation == null)
return;
// get registration Id
String regid = coapRequest.getUserContext().get(CTX_REGID);

// get observation for this request
Observation observation = registrationStore.getObservation(regid, coapResponse.getToken());
if (observation == null)
return;

try {
// get registration
Registration registration = registrationStore.getRegistration(observation.getRegistrationId());
if (registration == null)
Expand Down Expand Up @@ -252,10 +252,15 @@ public void onNotification(Request coapRequest, Response coapResponse) {

// notify all listeners
for (ObservationListener listener : listeners) {
listener.newValue(observation, response);
listener.onResponse(observation, response);
}
} catch (InvalidValueException | RuntimeException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Unable to handle notification for observation [%s]", observation), e);
}
for (ObservationListener listener : listeners) {
listener.onError(observation, e);
}
} catch (InvalidValueException e) {
LOG.debug(String.format("[%s] ([%s])", e.getMessage(), e.getPath().toString()));
}
}
}
Expand Down
Expand Up @@ -32,8 +32,8 @@
import org.eclipse.leshan.server.client.RegistrationService;
import org.eclipse.leshan.server.cluster.serialization.DownlinkRequestSerDes;
import org.eclipse.leshan.server.cluster.serialization.ResponseSerDes;
import org.eclipse.leshan.server.observation.ObservationService;
import org.eclipse.leshan.server.observation.ObservationListener;
import org.eclipse.leshan.server.observation.ObservationService;
import org.eclipse.leshan.server.response.ResponseListener;
import org.eclipse.leshan.util.NamedThreadFactory;
import org.slf4j.Logger;
Expand Down Expand Up @@ -80,10 +80,18 @@ public RedisRequestResponseHandler(Pool<Jedis> p, LwM2mServer server, Registrati
this.observationService.addListener(new ObservationListener() {

@Override
public void newValue(Observation observation, ObserveResponse response) {
public void onResponse(Observation observation, ObserveResponse response) {
handleNotification(observation, response.getContent());
}

@Override
public void onError(Observation observation, Exception error) {
if (LOG.isWarnEnabled()) {
LOG.warn(String.format("Unable to handle notification of [%s:%s]", observation.getRegistrationId(),
observation.getPath()), error);
}
}

@Override
public void newObservation(Observation observation) {
}
Expand Down
Expand Up @@ -30,5 +30,13 @@ public interface ObservationListener {
* @param observation the observation for which new data are received
* @param reponse the lwm2m response received
*/
void newValue(Observation observation, ObserveResponse response);
void onResponse(Observation observation, ObserveResponse response);

/**
* Called when an error occurs on new notification.
*
* @param observation the observation for which new data are received
* @param error the exception raised when we handle the notification
*/
void onError(Observation observation, Exception error);
}
Expand Up @@ -28,10 +28,10 @@
import org.eclipse.leshan.core.response.ObserveResponse;
import org.eclipse.leshan.server.californium.impl.LeshanServer;
import org.eclipse.leshan.server.client.Registration;
import org.eclipse.leshan.server.client.RegistrationUpdate;
import org.eclipse.leshan.server.client.RegistrationListener;
import org.eclipse.leshan.server.demo.servlet.json.RegistrationSerializer;
import org.eclipse.leshan.server.client.RegistrationUpdate;
import org.eclipse.leshan.server.demo.servlet.json.LwM2mNodeSerializer;
import org.eclipse.leshan.server.demo.servlet.json.RegistrationSerializer;
import org.eclipse.leshan.server.demo.servlet.log.CoapMessage;
import org.eclipse.leshan.server.demo.servlet.log.CoapMessageListener;
import org.eclipse.leshan.server.demo.servlet.log.CoapMessageTracer;
Expand Down Expand Up @@ -99,7 +99,7 @@ public void cancelled(Observation observation) {
}

@Override
public void newValue(Observation observation, ObserveResponse response) {
public void onResponse(Observation observation, ObserveResponse response) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received notification from [{}] containing value [{}]", observation.getPath(),
response.getContent().toString());
Expand All @@ -116,6 +116,14 @@ public void newValue(Observation observation, ObserveResponse response) {
}
}

@Override
public void onError(Observation observation, Exception error) {
if (LOG.isWarnEnabled()) {
LOG.warn(String.format("Unable to handle notification of [%s:%s]", observation.getRegistrationId(),
observation.getPath()), error);
}
}

@Override
public void newObservation(Observation observation) {
}
Expand Down

0 comments on commit c7c25f5

Please sign in to comment.