Skip to content

Commit

Permalink
- Support for intelligent gossip (do not rebroadcast if the original
Browse files Browse the repository at this point in the history
source is too close, based on RSSI)
- tweeked constants for purging packed cache 
- basic log analysis
  • Loading branch information
keznikl committed Feb 5, 2014
1 parent 047b33a commit d068b86
Show file tree
Hide file tree
Showing 8 changed files with 38,363 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
public class KnowledgeDataManager implements KnowledgeDataReceiver,
KnowledgeDataPublisher {

// this rssi corresponds to roughly 20m distance
public static final double RSSI_20m = 5.52e-8;
// this rssi corresponds to roughly 10m distance
public static final double RSSI_10m = 3.12e-7;

/** Global version counter for all outgoing local knowledge. */
protected long localVersion;
/** Service for convenient time retrieval. */
Expand Down Expand Up @@ -100,8 +105,10 @@ public void publish() {
logPublish(data);

//TODO
knowledgeDataSender.broadcastKnowledgeData(data);
localVersion++;
if (!data.isEmpty()) {
knowledgeDataSender.broadcastKnowledgeData(data);
localVersion++;
}
}

@Override
Expand Down Expand Up @@ -162,24 +169,31 @@ List<? extends KnowledgeData> prepareKnowledgeData() {
for (KnowledgeManager km : kmContainer.getReplicas()) {
try {

KnowledgeData kd = new KnowledgeData(km.get(emptyPath), replicaMetadata.get(km));
KnowledgeMetaData kmd = replicaMetadata.get(km);
KnowledgeManager nodeKm = getNodeKnowledge();

boolean isInSomeBoundary = false;
for (EnsembleDefinition ens: ensembleDefinitions) {
// null boundary condition counts as a satisfied one
if (ens.getCommunicationBoundary() == null
|| ens.getCommunicationBoundary().eval(kd, nodeKm)) {
isInSomeBoundary = true;
break;
}

if (!satisfiesGossipCondition(kmd)) {
Log.d(String.format("Gossip condition failed (%d) at %s for %sv%d from %s with rssi %g\n",
timeProvider.getCurrentTime(), host, kmd.componentId, kmd.versionId, kmd.sender, kmd.rssi));
continue;
}
if (isInSomeBoundary) {
result.add(kd);
} else {

Log.d(String.format("Gossip condition succeeded (%d) at %s for %sv%d from %s with rssi %g\n",
timeProvider.getCurrentTime(), host, kmd.componentId, kmd.versionId, kmd.sender, kmd.rssi));

KnowledgeMetaData kmdCopy = kmd.clone();
kmdCopy.sender = host;
KnowledgeData kd = new KnowledgeData(km.get(emptyPath), kmdCopy);

if (!isInSomeBoundary(kd, nodeKm)) {
Log.d(String.format("Boundary failed (%d) at %s for %sv%d\n",
timeProvider.getCurrentTime(), host, kd.getMetaData().componentId, kd.getMetaData().versionId));
}
timeProvider.getCurrentTime(), host, kmd.componentId, kmd.versionId));
continue;
}

result.add(kd);


} catch (KnowledgeNotFoundException e) {
Log.e("prepareKnowledgeData error", e);
}
Expand All @@ -188,6 +202,25 @@ List<? extends KnowledgeData> prepareKnowledgeData() {
return result;
}

private boolean satisfiesGossipCondition(KnowledgeMetaData kmd) {
// rssi < 0 means received from IP
// the signal came from more than 10m
return kmd.rssi < 0 || kmd.rssi <= RSSI_10m;
}

private boolean isInSomeBoundary(KnowledgeData data, KnowledgeManager sender) {
boolean isInSomeBoundary = false;
for (EnsembleDefinition ens: ensembleDefinitions) {
// null boundary condition counts as a satisfied one
if (ens.getCommunicationBoundary() == null
|| ens.getCommunicationBoundary().eval(data, sender)) {
isInSomeBoundary = true;
break;
}
}
return isInSomeBoundary;
}

private KnowledgeManager getNodeKnowledge() {
// FIXME: in the future, we need to unify the knowledge of all the local KMs.
return kmContainer.getLocals().iterator().next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ public KnowledgeMetaData(String componentId, long versionId, String sender, long
this.createdAt = createdAt;
this.hopCount = hopCount;
}


public KnowledgeMetaData clone() {
return new KnowledgeMetaData(componentId, versionId, sender, createdAt, hopCount);
}

@Override
public int hashCode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/
public class PacketReceiver {

private final static int MESSAGE_WIPE_PERIOD = 1500;
private final static int MESSAGE_WIPE_PERIOD = 500;

private final int packetSize;
private final Map<Integer, Message> messages;
Expand Down Expand Up @@ -80,17 +80,22 @@ public void packetReceived(byte[] packet, double rssi) {
//TODO Possibly this should be scheduled as a task in the Scheduler.
private void clearCachedMessagesIfNecessary() {
if (timeProvider.getCurrentTime() - lastMessagesWipe >= MESSAGE_WIPE_PERIOD) {
int origCnt = messages.size();

Message message;
Iterator<Entry<Integer, Message>> it = messages.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, Message> entry = it.next();
message = entry.getValue();
if (message != null && message.isStale()) {
it.remove();
it.remove();
}
}
lastMessagesWipe = timeProvider.getCurrentTime();
int currentCnt = messages.size();
Log.i(String.format("Message wipe removed %d cached packets", origCnt - currentCnt));
}

}

// -----------Helper methods-----------
Expand Down Expand Up @@ -122,7 +127,7 @@ private byte[] getPacketData(byte[] packet) {

private class Message {

private final static int MAX_MESSAGE_TIME = 2000;
private final static int MAX_MESSAGE_TIME = 1000;

private Map<Integer, Object> cache = new HashMap<>();

Expand Down
39 changes: 39 additions & 0 deletions jdeeco-simulation-demo/analysis/analyze_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import re
from numpy import average


lines = []
with open('../logs/no-boundary.log', 'r') as f:
lines = f.readlines()

hoplines = filter(lambda x: x.endswith('hops\n'), lines)
hopData = []
p = re.compile('after (\d+)ms and (\d+) hops')
for line in hoplines:
m = p.search(line)
time = m.group(1)
hops = m.group(2)
hopData.append({'time': int(time), 'hops': int(hops)})



timesPerHop = map(lambda x: x['time'] / x['hops'], hopData)
hopCounts = map(lambda x: x['hops'], hopData)
print "Average time per hop: ", average(timesPerHop), 'ms'
print "Max number of hops: ", max(hopCounts)
print "Average number of hops: ", average(hopCounts)


boundaryFailedLines = filter(lambda x: 'Boundary failed' in x, lines)
print 'Boundary failure occurred:', len(boundaryFailedLines)

publishLines = filter(lambda x: 'Publish' in x, lines)
# on each publish line, each sent KD is followed by a ',' + there is one more in the message
dataPublished = sum(map(lambda l: l.count(',') - 1, publishLines))
print 'KnowledgeData published:', dataPublished


receiveLines = filter(lambda x: ', received [' in x, lines)
# on each receive line, each received KD is followed by a ',' + there is one more in the message
dataReceived = sum(map(lambda l: l.count(',') - 1, receiveLines))
print 'KnowledgeData received:', dataReceived
Loading

0 comments on commit d068b86

Please sign in to comment.