Skip to content

Commit

Permalink
Ensure that user receive either 1 response or 1 error on send
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernard31 committed Nov 15, 2019
1 parent 91b4340 commit 0487082
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 17 deletions.
Expand Up @@ -24,13 +24,18 @@
public abstract class AsyncRequestObserver<T extends LwM2mResponse> extends CoapAsyncRequestObserver {

public AsyncRequestObserver(Request coapRequest, final ResponseCallback<T> responseCallback,
ErrorCallback errorCallback, long timeoutInMs) {
final ErrorCallback errorCallback, long timeoutInMs) {
super(coapRequest, null, errorCallback, timeoutInMs);
this.responseCallback = new CoapResponseCallback() {

@Override
public void onResponse(Response coapResponse) {
T lwM2mResponseT = buildResponse(coapResponse);
T lwM2mResponseT = null;
try {
lwM2mResponseT = buildResponse(coapResponse);
} catch (Exception e) {
errorCallback.onError(e);
}
if (lwM2mResponseT != null) {
responseCallback.onResponse(lwM2mResponseT);
}
Expand Down
Expand Up @@ -45,6 +45,13 @@ public class CoapAsyncRequestObserver extends AbstractRequestObserver {
private ScheduledFuture<?> cleaningTask;
private boolean cancelled = false;

// The Californium API does not ensure that message callback are exclusive
// meaning that you can get a onReponse call and a onCancel one.
// The CoapAsyncRequestObserver ensure that you will receive only one event.
// You get either 1 response or 1 error.
// This boolean is used to ensure this.
private AtomicBoolean eventRaised = new AtomicBoolean(false);

private final AtomicBoolean responseTimedOut = new AtomicBoolean(false);

public CoapAsyncRequestObserver(Request coapRequest, CoapResponseCallback responseCallback,
Expand All @@ -58,13 +65,18 @@ public CoapAsyncRequestObserver(Request coapRequest, CoapResponseCallback respon
@Override
public void onResponse(Response coapResponse) {
LOG.debug("Received coap response: {} for {}", coapResponse, coapRequest);
try {
coapRequest.removeMessageObserver(this);
if (eventRaised.compareAndSet(false, true)) {
cleaningTask.cancel(false);
responseCallback.onResponse(coapResponse);
} catch (Exception e) {
errorCallback.onError(e);
} finally {
coapRequest.removeMessageObserver(this);
try {
responseCallback.onResponse(coapResponse);
} catch (RuntimeException e) {
LOG.warn("Uncaught exception during onResponse callback");
}

} else {
LOG.debug("OnResponse callback ignored because an event was already raised for this request {}",
coapRequest);
}
}

Expand All @@ -76,30 +88,51 @@ public void onReadyToSend() {
@Override
public void onTimeout() {
cancelCleaningTask();
errorCallback.onError(new org.eclipse.leshan.core.request.exception.TimeoutException("Request %s timed out",
coapRequest.getURI()));
if (eventRaised.compareAndSet(false, true)) {
errorCallback.onError(new org.eclipse.leshan.core.request.exception.TimeoutException("Request %s timed out",
coapRequest.getURI()));
} else {
LOG.debug("OnTimeout callback ignored because an event was already raised for this request {}",
coapRequest);
}
}

@Override
public void onCancel() {
cancelCleaningTask();
if (responseTimedOut.get())
errorCallback.onError(new org.eclipse.leshan.core.request.exception.TimeoutException("Request %s timed out",
coapRequest.getURI()));
else
errorCallback.onError(new RequestCanceledException("Request %s cancelled", coapRequest.getURI()));
if (eventRaised.compareAndSet(false, true)) {
if (responseTimedOut.get()) {
errorCallback.onError(new org.eclipse.leshan.core.request.exception.TimeoutException(
"Request %s timed out", coapRequest.getURI()));
} else {
errorCallback.onError(new RequestCanceledException("Request %s cancelled", coapRequest.getURI()));
}
} else {
LOG.debug(
"OnCancel(responsetimeout={}) callback ignored because an event was already raised for this request {}",
responseTimedOut.get(), coapRequest);
}
}

@Override
public void onReject() {
cancelCleaningTask();
errorCallback.onError(new RequestRejectedException("Request %s rejected", coapRequest.getURI()));
if (eventRaised.compareAndSet(false, true)) {
errorCallback.onError(new RequestRejectedException("Request %s rejected", coapRequest.getURI()));
} else {
LOG.debug("OnReject callback ignored because an event was already raised for this request {}", coapRequest);
}
}

@Override
public void onSendError(Throwable error) {
cancelCleaningTask();
errorCallback.onError(new SendFailedException(error, "Unable to send request %s", coapRequest.getURI()));
if (eventRaised.compareAndSet(false, true)) {
errorCallback.onError(new SendFailedException(error, "Unable to send request %s", coapRequest.getURI()));
} else {
LOG.debug("onSendError callback ignored because an event was already raised for this request {}",
coapRequest);
}
}

private synchronized void scheduleCleaningTask() {
Expand Down

0 comments on commit 0487082

Please sign in to comment.