/
CaliforniumLwM2mRequestSender.java
240 lines (207 loc) · 10.2 KB
/
CaliforniumLwM2mRequestSender.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
/*******************************************************************************
* Copyright (c) 2013-2015 Sierra Wireless and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.html.
*
* Contributors:
* Sierra Wireless - initial API and implementation
* Achim Kraus (Bosch Software Innovations GmbH) - use Identity as destination
*******************************************************************************/
package org.eclipse.leshan.server.californium.impl;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.eclipse.californium.core.coap.MessageObserver;
import org.eclipse.californium.core.coap.MessageObserverAdapter;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.network.Endpoint;
import org.eclipse.leshan.core.californium.AsyncRequestObserver;
import org.eclipse.leshan.core.californium.SyncRequestObserver;
import org.eclipse.leshan.core.model.LwM2mModel;
import org.eclipse.leshan.core.node.codec.LwM2mNodeDecoder;
import org.eclipse.leshan.core.node.codec.LwM2mNodeEncoder;
import org.eclipse.leshan.core.request.DownlinkRequest;
import org.eclipse.leshan.core.response.ErrorCallback;
import org.eclipse.leshan.core.response.LwM2mResponse;
import org.eclipse.leshan.core.response.ResponseCallback;
import org.eclipse.leshan.server.model.LwM2mModelProvider;
import org.eclipse.leshan.server.registration.Registration;
import org.eclipse.leshan.server.request.LwM2mRequestSender;
import org.eclipse.leshan.util.Validate;
public class CaliforniumLwM2mRequestSender implements LwM2mRequestSender {
private final Set<Endpoint> endpoints;
private final ObservationServiceImpl observationService;
private final LwM2mModelProvider modelProvider;
private final LwM2mNodeDecoder decoder;
private final LwM2mNodeEncoder encoder;
// A map which contains all pending CoAP requests
// This is mainly used to cancel request and avoid retransmission on de-registration
private final ConcurrentNavigableMap<String/* registrationId#requestId */, Request /* pending coap Request */> pendingRequests = new ConcurrentSkipListMap<>();
/**
* @param endpoints the CoAP endpoints to use for sending requests
* @param observationService the service for keeping track of observed resources
* @param modelProvider provides the supported objects definitions
*/
public CaliforniumLwM2mRequestSender(Set<Endpoint> endpoints, ObservationServiceImpl observationService,
LwM2mModelProvider modelProvider, LwM2mNodeEncoder encoder, LwM2mNodeDecoder decoder) {
Validate.notNull(endpoints);
Validate.notNull(observationService);
Validate.notNull(modelProvider);
this.observationService = observationService;
this.endpoints = endpoints;
this.modelProvider = modelProvider;
this.encoder = encoder;
this.decoder = decoder;
}
@Override
public <T extends LwM2mResponse> T send(final Registration destination, final DownlinkRequest<T> request,
long timeout) throws InterruptedException {
// Retrieve the objects definition
final LwM2mModel model = modelProvider.getObjectModel(destination);
// Create the CoAP request from LwM2m request
CoapRequestBuilder coapRequestBuilder = new CoapRequestBuilder(destination.getIdentity(), destination.getRootPath(),
destination.getId(), destination.getEndpoint(), model, encoder);
request.accept(coapRequestBuilder);
final Request coapRequest = coapRequestBuilder.getRequest();
// Send CoAP request synchronously
SyncRequestObserver<T> syncMessageObserver = new SyncRequestObserver<T>(coapRequest, timeout) {
@Override
public T buildResponse(Response coapResponse) {
// Build LwM2m response
LwM2mResponseBuilder<T> lwm2mResponseBuilder = new LwM2mResponseBuilder<>(coapRequest, coapResponse,
destination, model, observationService, decoder);
request.accept(lwm2mResponseBuilder);
return lwm2mResponseBuilder.getResponse();
}
};
coapRequest.addMessageObserver(syncMessageObserver);
// Store pending request to cancel it on de-registration
addPendingRequest(destination.getId(), coapRequest);
// Send CoAP request asynchronously
Endpoint endpoint = getEndpointForClient(destination);
endpoint.sendRequest(coapRequest);
// Wait for response, then return it
return syncMessageObserver.waitForResponse();
}
@Override
public <T extends LwM2mResponse> void send(final Registration destination, final DownlinkRequest<T> request,
long timeout, ResponseCallback<T> responseCallback, ErrorCallback errorCallback) {
// Retrieve the objects definition
final LwM2mModel model = modelProvider.getObjectModel(destination);
// Create the CoAP request from LwM2m request
CoapRequestBuilder coapRequestBuilder = new CoapRequestBuilder(
destination.getIdentity(), destination.getRootPath(),
destination.getId(), destination.getEndpoint(), model, encoder);
request.accept(coapRequestBuilder);
final Request coapRequest = coapRequestBuilder.getRequest();
// Add CoAP request callback
MessageObserver obs = new AsyncRequestObserver<T>(coapRequest, responseCallback, errorCallback, timeout) {
@Override
public T buildResponse(Response coapResponse) {
// Build LwM2m response
LwM2mResponseBuilder<T> lwm2mResponseBuilder = new LwM2mResponseBuilder<>(coapRequest, coapResponse,
destination, model, observationService, decoder);
request.accept(lwm2mResponseBuilder);
return lwm2mResponseBuilder.getResponse();
}
};
coapRequest.addMessageObserver(obs);
// Store pending request to cancel it on de-registration
addPendingRequest(destination.getId(), coapRequest);
// Send CoAP request asynchronously
Endpoint endpoint = getEndpointForClient(destination);
endpoint.sendRequest(coapRequest);
}
@Override
public void cancelPendingRequests(Registration registration) {
Validate.notNull(registration);
String registrationId = registration.getId();
SortedMap<String, Request> requests = pendingRequests.subMap(getFloorKey(registrationId),
getCeilingKey(registrationId));
for (Request coapRequest : requests.values()) {
coapRequest.cancel();
}
requests.clear();
}
private String getFloorKey(String registrationId) {
// The key format is regid#int, So we need a key which is always before this pattern (in natural order).
return registrationId + '#';
}
private String getCeilingKey(String registrationId) {
// The key format is regid#int, So we need a key which is always after this pattern (in natural order).
return registrationId + "#A";
}
private String getKey(String registrationId, int requestId) {
return registrationId + '#' + requestId;
}
private void addPendingRequest(String registrationId, Request coapRequest) {
Validate.notNull(registrationId);
CleanerMessageObserver observer = new CleanerMessageObserver(registrationId, coapRequest);
coapRequest.addMessageObserver(observer);
pendingRequests.put(observer.getRequestKey(), coapRequest);
}
private void removePendingRequest(String key, Request coapRequest) {
Validate.notNull(key);
pendingRequests.remove(key, coapRequest);
}
private class CleanerMessageObserver extends MessageObserverAdapter {
private final String requestKey;
private final Request coapRequest;
public CleanerMessageObserver(String registrationId, Request coapRequest) {
super();
requestKey = getKey(registrationId, hashCode());
this.coapRequest = coapRequest;
}
public String getRequestKey() {
return requestKey;
}
@Override
public void onRetransmission() {
}
@Override
public void onResponse(Response response) {
removePendingRequest(requestKey, coapRequest);
}
@Override
public void onAcknowledgement() {
// we can remove the request on acknowledgement as we only want to avoid CoAP retransmission.
removePendingRequest(requestKey, coapRequest);
}
@Override
protected void failed() {
removePendingRequest(requestKey, coapRequest);
}
@Override
public void onCancel() {
removePendingRequest(requestKey, coapRequest);
}
}
/**
* Gets the CoAP endpoint that should be used to communicate with a given client.
*
* @param registration the client
* @return the CoAP endpoint bound to the same network address and port that the client connected to during
* registration. If no such CoAP endpoint is available, the first CoAP endpoint from the list of registered
* endpoints is returned
*/
private Endpoint getEndpointForClient(Registration registration) {
for (Endpoint ep : endpoints) {
InetSocketAddress endpointAddress = ep.getAddress();
if (endpointAddress.equals(registration.getRegistrationEndpointAddress())) {
return ep;
}
}
throw new IllegalStateException(
"can't find the client endpoint for address : " + registration.getRegistrationEndpointAddress());
}
}