Skip to content

Commit

Permalink
Support Observations
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernard31 committed Nov 7, 2016
1 parent 83fd62d commit ba16318
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ public static void createAndStartServer(String clusterInstanceId, int webPort, S

// Create Clustering support
RedisTokenHandler tokenHandler = new RedisTokenHandler(jedis, clusterInstanceId);
new RedisRequestResponseHandler(jedis, lwServer, lwServer.getClientRegistry(), tokenHandler);
new RedisRequestResponseHandler(jedis, lwServer, lwServer.getClientRegistry(), tokenHandler,
lwServer.getObservationRegistry());
clientRegistry.addListener(tokenHandler);
clientRegistry.addListener(new RedisRegistrationEventPublisher(jedis));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,25 @@
*******************************************************************************/
package org.eclipse.leshan.server.cluster;

import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.eclipse.californium.core.Utils;
import org.eclipse.leshan.core.node.LwM2mNode;
import org.eclipse.leshan.core.observation.Observation;
import org.eclipse.leshan.core.request.DownlinkRequest;
import org.eclipse.leshan.core.response.LwM2mResponse;
import org.eclipse.leshan.core.response.ObserveResponse;
import org.eclipse.leshan.server.LwM2mServer;
import org.eclipse.leshan.server.client.Client;
import org.eclipse.leshan.server.client.ClientRegistry;
import org.eclipse.leshan.server.cluster.serialization.DownlinkRequestSerDes;
import org.eclipse.leshan.server.cluster.serialization.ResponseSerDes;
import org.eclipse.leshan.server.observation.ObservationRegistry;
import org.eclipse.leshan.server.observation.ObservationRegistryListener;
import org.eclipse.leshan.server.response.ResponseListener;
import org.eclipse.leshan.util.NamedThreadFactory;
import org.slf4j.Logger;
Expand Down Expand Up @@ -53,16 +62,37 @@ public class RedisRequestResponseHandler {
private final ClientRegistry clientRegistry;
private final ExecutorService excutorService;
private final RedisTokenHandler tokenHandler;
private final ObservationRegistry observationRegistry;
private final Map<KeyId, String> observatioIdToTicket = new ConcurrentHashMap<>();

public RedisRequestResponseHandler(Pool<Jedis> p, LwM2mServer server, ClientRegistry clientRegistry,
RedisTokenHandler tokenHandler) {
RedisTokenHandler tokenHandler, ObservationRegistry observationRegistry) {
// Listen LWM2M response
this.server = server;
this.clientRegistry = clientRegistry;
this.observationRegistry = observationRegistry;
this.tokenHandler = tokenHandler;
this.excutorService = Executors.newCachedThreadPool(
new NamedThreadFactory(String.format("Redis %s channel writer", RESPONSE_CHANNEL)));

// Listen LWM2M notification from client
this.observationRegistry.addListener(new ObservationRegistryListener() {

@Override
public void newValue(Observation observation, ObserveResponse response) {
handleNotification(observation, response.getContent());
}

@Override
public void newObservation(Observation observation) {
}

@Override
public void cancelled(Observation observation) {
observatioIdToTicket.remove(new KeyId(observation.getId()));
}
});

// Listen LWM2M response from client
this.server.addResponseListener(new ResponseListener() {

Expand Down Expand Up @@ -121,6 +151,22 @@ public void run() {
});
}

private void handleNotification(final Observation observation, final LwM2mNode value) {
excutorService.submit(new Runnable() {
@Override
public void run() {
String ticket = observatioIdToTicket.get(new KeyId(observation.getId()));
try {
sendNotification(ticket, value);
} catch (Throwable t) {
LOG.error("Unable to send Notification.", t);
sendError(ticket,
String.format("Expected error while sending LWM2M Notification.(%s)", t.getMessage()));
}
}
});
}

private void handlerError(String clientEndpoint, final String ticket, final Exception exception) {
excutorService.submit(new Runnable() {
@Override
Expand Down Expand Up @@ -212,13 +258,56 @@ private void sendError(String ticket, String message) {

}

private void sendNotification(String ticket, LwM2mNode value) {
try (Jedis j = pool.getResource()) {
JsonObject m = Json.object();
m.add("ticket", ticket);
m.add("rep", ResponseSerDes.jSerialize(ObserveResponse.success(value)));
j.publish(RESPONSE_CHANNEL, m.toString());
}
}

private void sendResponse(String ticket, LwM2mResponse response) {
if (response instanceof ObserveResponse) {
Observation observation = ((ObserveResponse) response).getObservation();
observatioIdToTicket.put(new KeyId(observation.getId()), ticket);
}
try (Jedis j = pool.getResource()) {
JsonObject m = Json.object();
m.add("ticket", ticket);
m.add("rep", ResponseSerDes.jSerialize(response));
j.publish(RESPONSE_CHANNEL, m.toString());
}
}

public static final class KeyId {

protected final byte[] id;
private final int hash;

public KeyId(byte[] token) {
if (token == null)
throw new NullPointerException();
this.id = token;
this.hash = Arrays.hashCode(token);
}

@Override
public int hashCode() {
return hash;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof KeyId))
return false;
KeyId key = (KeyId) o;
return Arrays.equals(id, key.id);
}

@Override
public String toString() {
return new StringBuilder("KeyId[").append(Utils.toHexString(id)).append("]").toString();
}
}
}

0 comments on commit ba16318

Please sign in to comment.