Skip to content

Commit

Permalink
Update to californium 2.0.0 M8.
Browse files Browse the repository at this point in the history
Adjust for modified ObservationStore API.
Remove/replace Key by Token.

Signed-off-by: Achim Kraus <achim.kraus@bosch-si.com>
  • Loading branch information
Achim Kraus authored and sbernard31 committed Mar 1, 2018
1 parent b27fb6c commit ae02eaf
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 97 deletions.
39 changes: 0 additions & 39 deletions leshan-core/src/main/java/org/eclipse/leshan/util/Key.java

This file was deleted.

Expand Up @@ -12,6 +12,7 @@
*
* Contributors:
* Sierra Wireless - initial API and implementation
* Achim Kraus (Bosch Software Innovations GmbH) - use CoapEndpointBuilder
*******************************************************************************/
package org.eclipse.leshan.server.californium;

Expand Down Expand Up @@ -252,7 +253,10 @@ public LeshanBootstrapServer build() {
if (endpointFactory != null) {
unsecuredEndpoint = endpointFactory.createUnsecuredEndpoint(localAddress, coapConfig, null);
} else {
unsecuredEndpoint = new CoapEndpoint(localAddress, coapConfig);
CoapEndpoint.CoapEndpointBuilder builder = new CoapEndpoint.CoapEndpointBuilder();
builder.setInetSocketAddress(localAddress);
builder.setNetworkConfig(coapConfig);
unsecuredEndpoint = builder.build();
}
}

Expand All @@ -261,8 +265,11 @@ public LeshanBootstrapServer build() {
if (endpointFactory != null) {
securedEndpoint = endpointFactory.createSecuredEndpoint(dtlsConfig, coapConfig, null);
} else {
securedEndpoint = new CoapEndpoint(new DTLSConnector(dtlsConfig), coapConfig, null, null,
new Lwm2mEndpointContextMatcher());
CoapEndpoint.CoapEndpointBuilder builder = new CoapEndpoint.CoapEndpointBuilder();
builder.setConnector(new DTLSConnector(dtlsConfig));
builder.setNetworkConfig(coapConfig);
builder.setEndpointContextMatcher(new Lwm2mEndpointContextMatcher());
securedEndpoint = builder.build();
}
}

Expand Down
Expand Up @@ -14,6 +14,7 @@
* Sierra Wireless - initial API and implementation
* Achim Kraus (Bosch Software Innovations GmbH) - use Lwm2mEndpointContextMatcher
* for secure endpoint.
* Achim Kraus (Bosch Software Innovations GmbH) - use CoapEndpointBuilder
*******************************************************************************/
package org.eclipse.leshan.server.californium;

Expand Down Expand Up @@ -435,7 +436,11 @@ public LeshanServer build() {
unsecuredEndpoint = endpointFactory.createUnsecuredEndpoint(localAddress, coapConfig,
registrationStore);
} else {
unsecuredEndpoint = new CoapEndpoint(localAddress, coapConfig, registrationStore);
CoapEndpoint.CoapEndpointBuilder builder = new CoapEndpoint.CoapEndpointBuilder();
builder.setInetSocketAddress(localAddress);
builder.setNetworkConfig(coapConfig);
builder.setObservationStore(registrationStore);
unsecuredEndpoint = builder.build();
}
}

Expand All @@ -444,8 +449,12 @@ public LeshanServer build() {
if (endpointFactory != null) {
securedEndpoint = endpointFactory.createSecuredEndpoint(dtlsConfig, coapConfig, registrationStore);
} else {
securedEndpoint = new CoapEndpoint(new DTLSConnector(dtlsConfig), coapConfig, registrationStore, null,
new Lwm2mEndpointContextMatcher());
CoapEndpoint.CoapEndpointBuilder builder = new CoapEndpoint.CoapEndpointBuilder();
builder.setConnector(new DTLSConnector(dtlsConfig));
builder.setNetworkConfig(coapConfig);
builder.setObservationStore(registrationStore);
builder.setEndpointContextMatcher(new Lwm2mEndpointContextMatcher());
securedEndpoint = builder.build();
}
}

Expand Down
Expand Up @@ -60,7 +60,7 @@ public static Observation createLwM2mObservation(Request request) {
context.put(ctx.getKey(), ctx.getValue());
}
}
return new Observation(request.getToken(), regId, new LwM2mPath(lwm2mPath), context);
return new Observation(request.getToken().getBytes(), regId, new LwM2mPath(lwm2mPath), context);
}

/**
Expand Down
Expand Up @@ -19,6 +19,8 @@
* setContext().
* Achim Kraus (Bosch Software Innovations GmbH) - rename CorrelationContext to
* EndpointContext
* Achim Kraus (Bosch Software Innovations GmbH) - update to modified
* ObservationStore API
*******************************************************************************/
package org.eclipse.leshan.server.californium.impl;

Expand All @@ -36,6 +38,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.eclipse.californium.core.coap.Token;
import org.eclipse.californium.core.observe.ObservationUtil;
import org.eclipse.californium.elements.EndpointContext;
import org.eclipse.leshan.core.observation.Observation;
Expand All @@ -48,7 +51,6 @@
import org.eclipse.leshan.server.registration.Registration;
import org.eclipse.leshan.server.registration.RegistrationUpdate;
import org.eclipse.leshan.server.registration.UpdatedRegistration;
import org.eclipse.leshan.util.Key;
import org.eclipse.leshan.util.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -61,8 +63,8 @@ public class InMemoryRegistrationStore implements CaliforniumRegistrationStore,

// Data structure
private final Map<String /* end-point */, Registration> regsByEp = new HashMap<>();
private Map<Key, org.eclipse.californium.core.observe.Observation> obsByToken = new HashMap<>();
private Map<String, List<Key>> tokensByRegId = new HashMap<>();
private Map<Token, org.eclipse.californium.core.observe.Observation> obsByToken = new HashMap<>();
private Map<String, List<Token>> tokensByRegId = new HashMap<>();

private final ReadWriteLock lock = new ReentrantReadWriteLock();

Expand Down Expand Up @@ -212,7 +214,7 @@ public Collection<Observation> addObservation(String registrationId, Observation
// cancel existing observations for the same path and registration id.
for (Observation obs : unsafeGetObservations(registrationId)) {
if (observation.getPath().equals(obs.getPath()) && !Arrays.equals(observation.getId(), obs.getId())) {
unsafeRemoveObservation(obs.getId());
unsafeRemoveObservation(new Token(obs.getId()));
removed.add(obs);
}
}
Expand All @@ -227,10 +229,10 @@ public Collection<Observation> addObservation(String registrationId, Observation
public Observation removeObservation(String registrationId, byte[] observationId) {
try {
lock.writeLock().lock();

Observation observation = build(unsafeGetObservation(new Key(observationId)));
Token token = new Token(observationId);
Observation observation = build(unsafeGetObservation(token));
if (observation != null && registrationId.equals(observation.getRegistrationId())) {
unsafeRemoveObservation(observationId);
unsafeRemoveObservation(token);
return observation;
}
return null;
Expand All @@ -243,7 +245,7 @@ public Observation removeObservation(String registrationId, byte[] observationId
public Observation getObservation(String registrationId, byte[] observationId) {
try {
lock.readLock().lock();
Observation observation = build(unsafeGetObservation(new Key(observationId)));
Observation observation = build(unsafeGetObservation(new Token(observationId)));
if (observation != null && registrationId.equals(observation.getRegistrationId())) {
return observation;
}
Expand Down Expand Up @@ -276,18 +278,34 @@ public Collection<Observation> removeObservations(String registrationId) {
/* *************** Californium ObservationStore API **************** */

@Override
public void add(org.eclipse.californium.core.observe.Observation obs) {
public org.eclipse.californium.core.observe.Observation putIfAbsent(Token token, org.eclipse.californium.core.observe.Observation obs) {
return add(token, obs, true);
}

@Override
public org.eclipse.californium.core.observe.Observation put(Token token, org.eclipse.californium.core.observe.Observation obs) {
return add(token, obs, false);
}

private org.eclipse.californium.core.observe.Observation add(Token token, org.eclipse.californium.core.observe.Observation obs, boolean ifAbsent) {
org.eclipse.californium.core.observe.Observation previousObservation = null;
if (obs != null) {
try {
lock.writeLock().lock();

validateObservation(obs);

String registrationId = ObserveUtil.extractRegistrationId(obs);
Key token = new Key(obs.getRequest().getToken());
org.eclipse.californium.core.observe.Observation previousObservation = obsByToken.put(token, obs);
if (ifAbsent) {
if (!obsByToken.containsKey(token))
previousObservation = obsByToken.put(token, obs);
else
previousObservation = obsByToken.get(token);
} else {
previousObservation = obsByToken.put(token, obs);
}
if (!tokensByRegId.containsKey(registrationId)) {
tokensByRegId.put(registrationId, new ArrayList<Key>());
tokensByRegId.put(registrationId, new ArrayList<Token>());
}
tokensByRegId.get(registrationId).add(token);

Expand All @@ -301,34 +319,34 @@ public void add(org.eclipse.californium.core.observe.Observation obs) {
lock.writeLock().unlock();
}
}
return previousObservation;
}

@Override
public org.eclipse.californium.core.observe.Observation get(byte[] token) {
public org.eclipse.californium.core.observe.Observation get(Token token) {
try {
lock.readLock().lock();
return unsafeGetObservation(new Key(token));
return unsafeGetObservation(token);
} finally {
lock.readLock().unlock();
}
}

@Override
public void setContext(byte[] token, EndpointContext ctx) {
public void setContext(Token token, EndpointContext ctx) {
try {
lock.writeLock().lock();
Key key = new Key(token);
org.eclipse.californium.core.observe.Observation obs = obsByToken.get(key);
org.eclipse.californium.core.observe.Observation obs = obsByToken.get(token);
if (obs != null) {
obsByToken.put(key, new org.eclipse.californium.core.observe.Observation(obs.getRequest(), ctx));
obsByToken.put(token, new org.eclipse.californium.core.observe.Observation(obs.getRequest(), ctx));
}
} finally {
lock.writeLock().unlock();
}
}

@Override
public void remove(byte[] token) {
public void remove(Token token) {
try {
lock.writeLock().lock();
unsafeRemoveObservation(token);
Expand All @@ -339,19 +357,18 @@ public void remove(byte[] token) {

/* *************** Observation utility functions **************** */

private org.eclipse.californium.core.observe.Observation unsafeGetObservation(Key token) {
private org.eclipse.californium.core.observe.Observation unsafeGetObservation(Token token) {
org.eclipse.californium.core.observe.Observation obs = obsByToken.get(token);
return ObservationUtil.shallowClone(obs);
}

private void unsafeRemoveObservation(byte[] observationId) {
Key kToken = new Key(observationId);
org.eclipse.californium.core.observe.Observation removed = obsByToken.remove(kToken);
private void unsafeRemoveObservation(Token observationId) {
org.eclipse.californium.core.observe.Observation removed = obsByToken.remove(observationId);

if (removed != null) {
String registrationId = ObserveUtil.extractRegistrationId(removed);
List<Key> tokens = tokensByRegId.get(registrationId);
tokens.remove(kToken);
List<Token> tokens = tokensByRegId.get(registrationId);
tokens.remove(observationId);
if (tokens.isEmpty()) {
tokensByRegId.remove(registrationId);
}
Expand All @@ -360,9 +377,9 @@ private void unsafeRemoveObservation(byte[] observationId) {

private Collection<Observation> unsafeRemoveAllObservations(String registrationId) {
Collection<Observation> removed = new ArrayList<>();
List<Key> tokens = tokensByRegId.get(registrationId);
List<Token> tokens = tokensByRegId.get(registrationId);
if (tokens != null) {
for (Key token : tokens) {
for (Token token : tokens) {
Observation observationRemoved = build(obsByToken.remove(token));
if (observationRemoved != null) {
removed.add(observationRemoved);
Expand All @@ -375,9 +392,9 @@ private Collection<Observation> unsafeRemoveAllObservations(String registrationI

private Collection<Observation> unsafeGetObservations(String registrationId) {
Collection<Observation> result = new ArrayList<>();
List<Key> tokens = tokensByRegId.get(registrationId);
List<Token> tokens = tokensByRegId.get(registrationId);
if (tokens != null) {
for (Key token : tokens) {
for (Token token : tokens) {
Observation obs = build(unsafeGetObservation(token));
if (obs != null) {
result.add(obs);
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.coap.Token;
import org.eclipse.californium.core.network.Endpoint;
import org.eclipse.californium.core.observe.NotificationListener;
import org.eclipse.californium.core.observe.ObservationStore;
Expand Down Expand Up @@ -139,10 +140,11 @@ public void cancelObservation(Observation observation) {
}

private void cancel(Observation observation) {
Token token = new Token(observation.getId());
if (secureEndpoint != null)
secureEndpoint.cancelObservation(observation.getId());
secureEndpoint.cancelObservation(token);
if (nonSecureEndpoint != null)
nonSecureEndpoint.cancelObservation(observation.getId());
nonSecureEndpoint.cancelObservation(token);

for (ObservationListener listener : listeners) {
listener.cancelled(observation);
Expand Down Expand Up @@ -205,10 +207,10 @@ public void onNotification(Request coapRequest, Response coapResponse) {
String regid = coapRequest.getUserContext().get(ObserveUtil.CTX_REGID);

// get observation for this request
Observation observation = registrationStore.getObservation(regid, coapResponse.getToken());
Observation observation = registrationStore.getObservation(regid, coapResponse.getToken().getBytes());
if (observation == null) {
LOG.error("Unexpected error: Unable to find observation with token {} for registration {}",
Hex.encodeHexString(coapResponse.getToken()), regid);
coapResponse.getToken(), regid);
return;
}

Expand Down
Expand Up @@ -146,9 +146,9 @@ private Observation givenAnObservation(String registrationId, LwM2mPath target)
registrationId, new ObserveRequest(target.toString()));
coapRequest.setUserContext(context);

store.add(new org.eclipse.californium.core.observe.Observation(coapRequest, null));
store.put(coapRequest.getToken(), new org.eclipse.californium.core.observe.Observation(coapRequest, null));

Observation observation = new Observation(coapRequest.getToken(), registrationId, target, null);
Observation observation = new Observation(coapRequest.getToken().getBytes(), registrationId, target, null);
observationService.addObservation(registration, observation);

return observation;
Expand Down

0 comments on commit ae02eaf

Please sign in to comment.