Skip to content

Commit

Permalink
Test new Observe API at coap client side (lwm2m server side)
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernard31 committed Jun 19, 2023
1 parent 10ab042 commit 7210459
Show file tree
Hide file tree
Showing 10 changed files with 398 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.eclipse.leshan.transport.javacoap.server.endpoint;

import java.net.URI;
import java.util.Collection;
import java.util.SortedMap;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
Expand All @@ -33,21 +32,13 @@

import org.eclipse.leshan.core.endpoint.EndpointUriUtil;
import org.eclipse.leshan.core.endpoint.Protocol;
import org.eclipse.leshan.core.observation.CompositeObservation;
import org.eclipse.leshan.core.observation.Observation;
import org.eclipse.leshan.core.observation.ObservationIdentifier;
import org.eclipse.leshan.core.observation.SingleObservation;
import org.eclipse.leshan.core.request.DownlinkRequest;
import org.eclipse.leshan.core.request.ObserveCompositeRequest;
import org.eclipse.leshan.core.request.ObserveRequest;
import org.eclipse.leshan.core.request.exception.RequestCanceledException;
import org.eclipse.leshan.core.request.exception.SendFailedException;
import org.eclipse.leshan.core.request.exception.TimeoutException.Type;
import org.eclipse.leshan.core.response.AbstractLwM2mResponse;
import org.eclipse.leshan.core.response.ErrorCallback;
import org.eclipse.leshan.core.response.LwM2mResponse;
import org.eclipse.leshan.core.response.ObserveCompositeResponse;
import org.eclipse.leshan.core.response.ObserveResponse;
import org.eclipse.leshan.core.response.ResponseCallback;
import org.eclipse.leshan.core.util.NamedThreadFactory;
import org.eclipse.leshan.core.util.Validate;
Expand All @@ -57,30 +48,17 @@
import org.eclipse.leshan.server.profile.ClientProfile;
import org.eclipse.leshan.server.registration.RegistrationStore;
import org.eclipse.leshan.server.request.LowerLayerConfig;
import org.eclipse.leshan.transport.javacoap.request.RandomTokenGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mbed.coap.client.ObservationConsumer;
import com.mbed.coap.exception.CoapTimeoutException;
import com.mbed.coap.packet.CoapRequest;
import com.mbed.coap.packet.CoapResponse;
import com.mbed.coap.packet.Opaque;
import com.mbed.coap.server.CoapServer;

public class JavaCoapServerEndpoint implements LwM2mServerEndpoint {

private static final Logger LOG = LoggerFactory.getLogger(JavaCoapServerEndpoint.class);

// TODO : this should be configurable
// https://github.com/open-coap/java-coap/issues/27#issuecomment-1516277799
private final RandomTokenGenerator tokenGenerator = new RandomTokenGenerator(8);

private final CoapServer coapServer;
private final ServerCoapMessageTranslator translator;
private final ServerEndpointToolbox toolbox;
private final LwM2mNotificationReceiver notificationReceiver;
private final RegistrationStore registrationStore;

private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("Leshan Async Request timeout"));
Expand All @@ -98,9 +76,6 @@ public JavaCoapServerEndpoint(CoapServer coapServer, ServerCoapMessageTranslator
this.coapServer = coapServer;
this.translator = translator;
this.toolbox = toolbox;
this.notificationReceiver = notificationReceiver;
this.registrationStore = registrationStore;

}

@Override
Expand Down Expand Up @@ -178,138 +153,23 @@ protected <T extends LwM2mResponse> CompletableFuture<T> sendLwM2mRequest(Client
DownlinkRequest<T> lwm2mRequest, LowerLayerConfig lowerLayerConfig) {

CompletableFuture<T> lwm2mResponseFuture;
if (lwm2mRequest instanceof ObserveRequest || lwm2mRequest instanceof ObserveCompositeRequest) {
lwm2mResponseFuture = sendObserveRequest(destination, lwm2mRequest, lowerLayerConfig);
} else {
// Create Coap Request to send from LWM2M Request
CoapRequest coapRequest = translator.createCoapRequest(destination, lwm2mRequest, toolbox);

// Apply Users customization
applyUserConfig(lowerLayerConfig, coapRequest);

// Send CoAP Request
CompletableFuture<CoapResponse> coapResponseFuture = coapServer.clientService().apply(coapRequest);

// On response, create LWM2M Response from CoAP response
lwm2mResponseFuture = coapResponseFuture.thenApply(coapResponse -> translator
.createLwM2mResponse(destination, lwm2mRequest, coapResponse, toolbox, null));
}

// store ongoing request
addOngoingRequest(destination.getRegistrationId(), lwm2mResponseFuture);

return lwm2mResponseFuture;
}

protected <T extends LwM2mResponse> CompletableFuture<T> sendObserveRequest(ClientProfile destination,
DownlinkRequest<T> lwm2mRequest, LowerLayerConfig lowerLayerConfig) {
// Create Coap Request to send from LWM2M Request
CoapRequest coapRequest = translator.createCoapRequest(destination, lwm2mRequest, toolbox);

// TODO HACK as we can not get token from coap response.
Opaque token = tokenGenerator.createToken();
final CoapRequest hackedCoapRequest = coapRequest.token(token);

// Apply Users customization
applyUserConfig(lowerLayerConfig, coapRequest);

// Send CoAP Request
CompletableFuture<CoapResponse> coapResponseFuture = coapServer.clientService().apply(hackedCoapRequest);

// Handle Notifications
// --------------------
coapResponseFuture.thenAccept(r -> ObservationConsumer.consumeFrom(r.next, notification -> {
// Check if we have observe relation in store for this notification
Observation observeRelation = registrationStore.getObservation(destination.getRegistrationId(),
new ObservationIdentifier(token.getBytes()));

if (observeRelation == null) {
// We haven't observe relation so stop this observation.
return false;
} else {
// We have an observe relation notify upper layer
try {
// Create LWM2M response
AbstractLwM2mResponse lwm2mResponse = translator.createLwM2mResponseForNotification(observeRelation,
notification, toolbox, destination);

// Get observation
Observation observation;
if (lwm2mRequest instanceof ObserveRequest && lwm2mResponse instanceof ObserveResponse) {
observation = ((ObserveResponse) lwm2mResponse).getObservation();
} else if (lwm2mRequest instanceof ObserveCompositeRequest
&& lwm2mResponse instanceof ObserveCompositeResponse) {
observation = ((ObserveCompositeResponse) lwm2mResponse).getObservation();
} else {
throw new IllegalStateException(String.format("Unexpected response %s for request %s",
lwm2mResponse.getClass().getSimpleName(), lwm2mRequest.getClass().getSimpleName()));
}

// Check observation created from notification is the same than the one stored.
if (!observeRelation.equals(observation)) {
notificationReceiver.onError(observeRelation, destination,
new IllegalStateException(String.format(
"Store Observation is to equals to observation created from notification: [%s] != [%s]",
observeRelation, observation)));
// TODO should we stop observation and remove it from store in this case ?
return true;
}

// Notify upper layer
if (observation instanceof SingleObservation) {
notificationReceiver.onNotification((SingleObservation) observation, destination,
(ObserveResponse) lwm2mResponse);
} else if (observation instanceof CompositeObservation) {
notificationReceiver.onNotification((CompositeObservation) observation, destination,
(ObserveCompositeResponse) lwm2mResponse);
} else {
throw new IllegalStateException(
"Unsupported Observer Relation :" + observeRelation.getClass().getSimpleName());
}
return true;
} catch (Exception e) {
notificationReceiver.onError(observeRelation, destination, e);
// TODO should we stop observation and remove it from store in this case ?
return true;
}
}
}));
CompletableFuture<CoapResponse> coapResponseFuture = coapServer.clientService().apply(coapRequest);

// On response, create LWM2M Response from CoAP response
CompletableFuture<T> lwm2mResponseFuture = coapResponseFuture.thenApply(coapResponse -> {
// create LWM2M response
T lwm2mResponse = translator.createLwM2mResponse(destination, lwm2mRequest, coapResponse, toolbox, token);

// Handle Observation Relation
// ----------------------------
// Add Observation to the store if relation is established
if (lwm2mResponse.isSuccess()) {
Observation observation;
if (lwm2mRequest instanceof ObserveRequest && lwm2mResponse instanceof ObserveResponse) {
observation = ((ObserveResponse) lwm2mResponse).getObservation();
} else if (lwm2mRequest instanceof ObserveCompositeRequest
&& lwm2mResponse instanceof ObserveCompositeResponse) {
observation = ((ObserveCompositeResponse) lwm2mResponse).getObservation();
} else {
throw new IllegalStateException(String.format("Unexpected response %s for request %s",
lwm2mResponse.getClass().getSimpleName(), lwm2mRequest.getClass().getSimpleName()));
}
lwm2mResponseFuture = coapResponseFuture.thenApply(coapResponse -> translator.createLwM2mResponse(destination,
lwm2mRequest, coapResponse, coapRequest, toolbox));

// TODO should we handle conflict ?
Collection<Observation> previousRelation = registrationStore
.addObservation(destination.getRegistrationId(), observation, false);
if (!previousRelation.isEmpty()) {
// TODO log that a relation is override.
LOG.warn("Observation conflict ? {} is replaced by {}", previousRelation, observation);
}
// notify upper layer that new relation is established
notificationReceiver.newObservation(observation, destination.getRegistration());
}
return lwm2mResponse;
});
// store ongoing request
addOngoingRequest(destination.getRegistrationId(), lwm2mResponseFuture);

return lwm2mResponseFuture;

}

public void timeoutAfter(CompletableFuture<?> future, long timeoutInMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.eclipse.leshan.server.observation.LwM2mNotificationReceiver;
import org.eclipse.leshan.server.request.UplinkRequestReceiver;
import org.eclipse.leshan.server.security.ServerSecurityInfo;
import org.eclipse.leshan.transport.javacoap.server.observation.CoapNotificationReceiver;
import org.eclipse.leshan.transport.javacoap.server.observation.LwM2mObservationsStore;
import org.eclipse.leshan.transport.javacoap.server.resource.RegistrationResource;
import org.eclipse.leshan.transport.javacoap.server.resource.SendResource;

Expand All @@ -53,13 +55,14 @@ public JavaCoapServerEndpointsProvider(InetSocketAddress localAddress) {
}

@Override
public void createEndpoints(UplinkRequestReceiver requestReceiver, LwM2mNotificationReceiver notificationReceiver,
ServerEndpointToolbox toolbox, ServerSecurityInfo serverSecurityInfo, LeshanServer server) {
public void createEndpoints(UplinkRequestReceiver requestReceiver,
final LwM2mNotificationReceiver notificationReceiver, final ServerEndpointToolbox toolbox,
ServerSecurityInfo serverSecurityInfo, LeshanServer server) {

// HACK to be able to get local URI in resource, need to discuss about it with java-coap.
EndpointUriProvider endpointUriProvider = new EndpointUriProvider(Protocol.COAP);

// create Resources / Routes
// Create Resources / Routes
RegistrationResource registerResource = new RegistrationResource(requestReceiver, toolbox.getLinkParser(),
endpointUriProvider);
Service<CoapRequest, CoapResponse> resources = RouterService.builder() //
Expand All @@ -69,7 +72,15 @@ public void createEndpoints(UplinkRequestReceiver requestReceiver, LwM2mNotifica
new SendResource(requestReceiver, toolbox.getDecoder(), toolbox.getProfileProvider(),
endpointUriProvider))//
.build();
coapServer = createCoapServer().transport(new DatagramSocketTransport(localAddress)).route(resources).build();

// Create CoAP Server
coapServer = createCoapServer() //
.transport(new DatagramSocketTransport(localAddress)) //
.route(resources) //
.notificationsReceiver(new CoapNotificationReceiver(coapServer, notificationReceiver,
server.getRegistrationStore(), server.getModelProvider(), toolbox.getDecoder())) //
.observationsStore(new LwM2mObservationsStore(server.getRegistrationStore(), notificationReceiver)) //
.build();
endpointUriProvider.setCoapServer(coapServer);

lwm2mEndpoint = new JavaCoapServerEndpoint(coapServer, new ServerCoapMessageTranslator(), toolbox,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,82 +15,35 @@
*******************************************************************************/
package org.eclipse.leshan.transport.javacoap.server.endpoint;

import java.util.List;
import java.util.Map;

import org.eclipse.leshan.core.ResponseCode;
import org.eclipse.leshan.core.node.LwM2mNode;
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.node.TimestampedLwM2mNode;
import org.eclipse.leshan.core.observation.CompositeObservation;
import org.eclipse.leshan.core.observation.Observation;
import org.eclipse.leshan.core.observation.SingleObservation;
import org.eclipse.leshan.core.request.DownlinkRequest;
import org.eclipse.leshan.core.response.AbstractLwM2mResponse;
import org.eclipse.leshan.core.response.LwM2mResponse;
import org.eclipse.leshan.core.response.ObserveCompositeResponse;
import org.eclipse.leshan.core.response.ObserveResponse;
import org.eclipse.leshan.server.endpoint.ServerEndpointToolbox;
import org.eclipse.leshan.server.profile.ClientProfile;
import org.eclipse.leshan.transport.javacoap.request.ResponseCodeUtil;
import org.eclipse.leshan.transport.javacoap.server.request.CoapRequestBuilder;
import org.eclipse.leshan.transport.javacoap.server.request.LwM2mResponseBuilder;

import com.mbed.coap.packet.CoapRequest;
import com.mbed.coap.packet.CoapResponse;
import com.mbed.coap.packet.Opaque;

public class ServerCoapMessageTranslator {

public CoapRequest createCoapRequest(ClientProfile clientProfile,
DownlinkRequest<? extends LwM2mResponse> lwm2mRequest,
ServerEndpointToolbox toolbox /* , IdentityHandler identityHandler */) {

CoapRequestBuilder builder = new CoapRequestBuilder(clientProfile.getIdentity(), clientProfile.getRootPath(),
clientProfile.getModel(), toolbox.getEncoder());
CoapRequestBuilder builder = new CoapRequestBuilder(clientProfile.getRegistration(),
clientProfile.getIdentity(), clientProfile.getRootPath(), clientProfile.getModel(),
toolbox.getEncoder());
lwm2mRequest.accept(builder);
return builder.getRequest();
}

public <T extends LwM2mResponse> T createLwM2mResponse(ClientProfile clientProfile, DownlinkRequest<T> lwm2mRequest,
CoapResponse coapResponse, ServerEndpointToolbox toolbox,
/* TODO HACK https://github.com/open-coap/java-coap/issues/27#issuecomment-1516277799 */ Opaque token) {
CoapResponse coapResponse, CoapRequest coapRequest, ServerEndpointToolbox toolbox) {

LwM2mResponseBuilder<T> builder = new LwM2mResponseBuilder<T>(coapResponse, clientProfile.getEndpoint(),
clientProfile.getModel(), toolbox.getDecoder(), toolbox.getLinkParser(),
clientProfile.getRegistrationId(), token);
LwM2mResponseBuilder<T> builder = new LwM2mResponseBuilder<T>(coapResponse, coapRequest,
clientProfile.getEndpoint(), clientProfile.getModel(), toolbox.getDecoder(), toolbox.getLinkParser());
lwm2mRequest.accept(builder);
return builder.getResponse();
}

public AbstractLwM2mResponse createLwM2mResponseForNotification(Observation observation, CoapResponse coapResponse,
ServerEndpointToolbox toolbox, ClientProfile profile) {

ResponseCode responseCode = ResponseCodeUtil.toLwM2mResponseCode(coapResponse.getCode());

if (observation instanceof SingleObservation) {
SingleObservation singleObservation = (SingleObservation) observation;

List<TimestampedLwM2mNode> timestampedNodes = toolbox.getDecoder().decodeTimestampedData(
coapResponse.getPayload().getBytes(), singleObservation.getContentFormat(),
singleObservation.getPath(), profile.getModel());

// create lwm2m response
if (timestampedNodes.size() == 1 && !timestampedNodes.get(0).isTimestamped()) {
return new ObserveResponse(responseCode, timestampedNodes.get(0).getNode(), null, singleObservation,
null, coapResponse);
} else {
return new ObserveResponse(responseCode, null, timestampedNodes, singleObservation, null, coapResponse);
}
} else if (observation instanceof CompositeObservation) {
CompositeObservation compositeObservation = (CompositeObservation) observation;

Map<LwM2mPath, LwM2mNode> nodes = toolbox.getDecoder().decodeNodes(coapResponse.getPayload().getBytes(),
compositeObservation.getResponseContentFormat(), compositeObservation.getPaths(),
profile.getModel());

return new ObserveCompositeResponse(responseCode, nodes, null, coapResponse, compositeObservation);
}
return null;
}
}
Loading

0 comments on commit 7210459

Please sign in to comment.