Skip to content

Commit

Permalink
Add new send API (async with ticket)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bala Azhagappan authored and sbernard31 committed Jul 1, 2016
1 parent 9479dde commit aeb7586
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
package org.eclipse.leshan.server.californium.impl;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

Expand All @@ -34,6 +36,7 @@
import org.eclipse.leshan.server.model.LwM2mModelProvider;
import org.eclipse.leshan.server.observation.ObservationRegistry;
import org.eclipse.leshan.server.request.LwM2mRequestSender;
import org.eclipse.leshan.server.response.ResponseListener;
import org.eclipse.leshan.util.Validate;

public class CaliforniumLwM2mRequestSender implements LwM2mRequestSender {
Expand All @@ -44,6 +47,7 @@ public class CaliforniumLwM2mRequestSender implements LwM2mRequestSender {
// A map which contains all pending CoAP requests
// This is mainly used to cancel request and avoid retransmission on de-registration
private final ConcurrentNavigableMap<String/* registrationId#requestId */, Request /* pending coap Request */> pendingRequests = new ConcurrentSkipListMap<>();
private final Collection<ResponseListener> responseListeners = new ConcurrentLinkedQueue<>();

/**
* @param endpoints the CoAP endpoints to use for sending requests
Expand Down Expand Up @@ -87,7 +91,7 @@ public T buildResponse(final Response coapResponse) {
};
coapRequest.addMessageObserver(syncMessageObserver);

// Store pending request to cancel it on deregistration
// Store pending request to cancel it on de-registration
addPendingRequest(destination.getRegistrationId(), coapRequest);

// Send CoAP request asynchronously
Expand Down Expand Up @@ -123,14 +127,80 @@ public T buildResponse(final Response coapResponse) {
}
});

// Store pending request to cancel it on deregistration
// Store pending request to cancel it on de-registration
addPendingRequest(destination.getRegistrationId(), coapRequest);

// Send CoAP request asynchronously
final Endpoint endpoint = getEndpointForClient(destination);
endpoint.sendRequest(coapRequest);
}

@SuppressWarnings("unchecked") // Java generic usage and strict type has to be removed from Leshan
@Override
public <T extends LwM2mResponse> void send(final Client destination, final String requestTicket,
final DownlinkRequest<T> request) {
send(destination, request, (ResponseCallback<T>) getResponseCallback(destination.getEndpoint(), requestTicket),
getErrorCallback(destination.getEndpoint(), requestTicket));
}

/**
* callback that correlates the response to the given requestTicket
*
* @parm requestTicket to correlate the response to a request.
* @return callback which is invoked on getting a response from LWM2M Client.
*/
private <T extends LwM2mResponse> ResponseCallback<T> getResponseCallback(final String clientEndpoint,
final String requestTicket) {
return new ResponseCallback<T>() {
@Override
public void onResponse(T response) {
for (ResponseListener listener : responseListeners) {
listener.onResponse(clientEndpoint, requestTicket, response);
}
}
};
}

/**
* callback that correlates the error from LWM2M client to the given requestTicket
*
* @param requestTicket to correlate the error to a request.
* @return
*/
private ErrorCallback getErrorCallback(final String clientEndpoint, final String requestTicket) {
return new ErrorCallback() {
@Override
public void onError(Exception e) {
for (ResponseListener listener : responseListeners) {
listener.onError(clientEndpoint, requestTicket, e);
}
}
};
}

/*
* (non-Javadoc)
*
* @see org.eclipse.leshan.server.request.LwM2mRequestSender#addResponseListener(java.lang.String,
* org.eclipse.leshan.server.response.ResponseListener)
*/
@Override
public void addResponseListener(ResponseListener listener) {
responseListeners.add(listener);
}

/*
* (non-Javadoc)
*
* @see
* org.eclipse.leshan.server.request.LwM2mRequestSender#removeResponseListener(org.eclipse.leshan.server.response.
* ResponseListener)
*/
@Override
public void removeResponseListener(ResponseListener listener) {
responseListeners.remove(listener);
}

private String getFloorKey(String registrationId) {
// The key format is regid#int, So we need a key which is always before this pattern (in natural order).
return registrationId + '#';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.eclipse.leshan.server.queue.QueuedRequest;
import org.eclipse.leshan.server.queue.QueuedRequestFactory;
import org.eclipse.leshan.server.request.LwM2mRequestSender;
import org.eclipse.leshan.server.response.ResponseListener;
import org.eclipse.leshan.util.NamedThreadFactory;
import org.eclipse.leshan.util.Validate;
import org.slf4j.Logger;
Expand Down Expand Up @@ -144,6 +145,24 @@ public ErrorCallback getErrorCallback() {
}
}

@Override
public <T extends LwM2mResponse> void send(Client destination, String requestTicket, DownlinkRequest<T> request) {
// TODO Auto-generated method stub

}

@Override
public void addResponseListener(ResponseListener listener) {
// TODO Auto-generated method stub

}

@Override
public void removeResponseListener(ResponseListener listener) {
// TODO Auto-generated method stub

}

@Override
public void stop() {
clientRegistry.removeListener(queueModeClientRegistryListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*
* Contributors:
* Sierra Wireless - initial API and implementation
* Bosch Software Innovations GmbH - extension of ticket based asynchronous call.
*******************************************************************************/
package org.eclipse.leshan.server.request;

Expand All @@ -20,21 +21,49 @@
import org.eclipse.leshan.core.response.LwM2mResponse;
import org.eclipse.leshan.core.response.ResponseCallback;
import org.eclipse.leshan.server.client.Client;
import org.eclipse.leshan.server.response.ResponseListener;

public interface LwM2mRequestSender {

/**
* Send a Lightweight M2M request synchronously. Will block until a response is received from the remote client.
*
* @return the LWM2M response. The response can be <code>null</code> if the timeout (given parameter or CoAP
* timeout) expires.
* @Deprecated Synchronous send of a message will not be supported in the future. It is replaced by
* {@link #send(Client, String, DownlinkRequest)}
*/
@Deprecated
<T extends LwM2mResponse> T send(Client destination, DownlinkRequest<T> request, Long timeout)
throws InterruptedException;

/**
* Send a Lightweight M2M request asynchronously.
* @Deprecated Asynchronous send of a message with a callback will not be supported in the future. It is replaced by
* {@link #send(Client, String, DownlinkRequest)}
*/
@Deprecated
<T extends LwM2mResponse> void send(Client destination, DownlinkRequest<T> request,
ResponseCallback<T> responseCallback, ErrorCallback errorCallback);

/**
* sends a Lightweight M2M request asynchronously and uses the requestTicket to correlate the response from a LWM2M
* Client.
*
* @param destination registration meta data of a LWM2M client.
* @param requestTicket a globally unique identifier for correlating the response
* @param request an instance of downlink request.
* @param <T> instance of LwM2mResponse
*/
<T extends LwM2mResponse> void send(Client destination, String requestTicket, DownlinkRequest<T> request);

/**
* adds the listener for the given LWM2M client. This method shall be used to re-register a listener for already
* sent messages or pending messages.
*
* @param listener global listener for handling the responses from a LWM2M client.
*/
void addResponseListener(ResponseListener listener);

/**
* removes the given instance of response listener from LWM2M Sender's list of response listeners.
*
* @param listener target listener to be removed.
*/
void removeResponseListener(ResponseListener listener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*******************************************************************************
* Copyright (c) 2016 Bosch Software Innovations GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.html.
*
* Contributors:
* Bosch Software Innovations GmbH - initial API and implementation
*******************************************************************************/
package org.eclipse.leshan.server.response;

import org.eclipse.leshan.core.response.LwM2mResponse;

/**
* The listener is responsible for handling all the success and error responses for a given requestTicket.
*/
public interface ResponseListener {

/**
* this method is invoked when a response is received from LWM2M Client correlated by the request ticket.
*
* @param clientEndpoint unique identifier of the LWM2M client
* @param requestTicket globally unique identifier used to correlate the response to the orginial request
* @param response from LWM2M client
*/
void onResponse(String clientEndpoint, String requestTicket, LwM2mResponse response);

/**
* this method is invoked when a an error response is received from LWM2M Client correlated by the request ticket.
*
* @param clientEndpoint unique identifier of the LWM2M client
* @param requestTicket globally unique identifier used to correlate the response to the orginial request
* @param exception error from LWM2M client
*/
void onError(String clientEndpoint, String requestTicket, Exception exception);
}

0 comments on commit aeb7586

Please sign in to comment.