Skip to content

Commit

Permalink
IP support updated
Browse files Browse the repository at this point in the history
  • Loading branch information
Michał Kit committed Feb 17, 2014
1 parent 10d4e2d commit a02f710
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class KnowledgeDataManager implements KnowledgeDataReceiver,
// this rssi corresponds to max (roughly 250m) distance
public static final double RSSI_MIN = 1.11e-10;

public static final int IP_DELAY = 1;

/** Global version counter for all outgoing local knowledge. */
protected long localVersion;
/** For scheduling rebroadcasts. */
Expand All @@ -83,7 +85,8 @@ public class KnowledgeDataManager implements KnowledgeDataReceiver,
/** List of ensemble definitions whose boundary conditions should be considered. */
private final List<EnsembleDefinition> ensembleDefinitions;

private final Map<String, KnowledgeData> dataToRebroadcast;
private final Map<String, KnowledgeData> dataToRebroadcastOverMANET;
private final Map<String, KnowledgeData> dataToRebroadcastOverIP;

private final boolean useIndividualPublishing;
private final boolean checkGossipCondition;
Expand Down Expand Up @@ -126,7 +129,8 @@ public KnowledgeDataManager(
this.recipientSelectors = recipientSelectors;
this.directGossipStrategy = directGossipStrategy;

dataToRebroadcast = new HashMap<>();
dataToRebroadcastOverMANET = new HashMap<>();
dataToRebroadcastOverIP = new HashMap<>();

RuntimeMetadataFactory factory = RuntimeMetadataFactoryExt.eINSTANCE;
KnowledgePath empty = factory.createKnowledgePath();
Expand Down Expand Up @@ -166,16 +170,16 @@ public void publish() {
// we re-publish periodically only local data
List<KnowledgeData> data = prepareLocalKnowledgeData();

// int origCnt = dataToRebroadcast.size();
// filterBasedOnBoundaryCondition(dataToRebroadcast, getNodeKnowledge());
// filterBasedOnGossipCondition(dataToRebroadcast);
// int origCnt = dataToRebroadcastOverMANET.size();
// filterBasedOnBoundaryCondition(dataToRebroadcastOverMANET, getNodeKnowledge());
// filterBasedOnGossipCondition(dataToRebroadcastOverMANET);
//
// Log.d(String.format("Rebroadcasting %d out of %d received messages", dataToRebroadcast.size(), origCnt));
// Log.d(String.format("Rebroadcasting %d out of %d received messages", dataToRebroadcastOverMANET.size(), origCnt));
//
// for (KnowledgeData kd: dataToRebroadcast) {
// for (KnowledgeData kd: dataToRebroadcastOverMANET) {
// data.add(prepareForRebroadcast(kd));
// }
// dataToRebroadcast.clear();
// dataToRebroadcastOverMANET.clear();


//TODO
Expand Down Expand Up @@ -218,22 +222,26 @@ private void sendDirect(List<KnowledgeData> data) {


@Override
public void rebroacast(KnowledgeMetaData metadata) {
public void rebroacast(KnowledgeMetaData metadata, NICType nicType) {
String sig = metadata.getSignature();
// if the data was marked as not to be sent anymore (e.g., it was received again in the mean time)
if (!dataToRebroadcast.containsKey(sig)) {
if (!dataToRebroadcastOverMANET.containsKey(sig)) {
return;
}

KnowledgeData data = prepareForRebroadcast(dataToRebroadcast.get(sig));
if (!dataToRebroadcastOverIP.containsKey(sig)) {
return;
}

KnowledgeData data = prepareForRebroadcast(dataToRebroadcastOverMANET.get(sig));
logPublish(Arrays.asList(data));

knowledgeDataSender.broadcastKnowledgeData(Arrays.asList(data));
dataToRebroadcast.remove(sig);
if (nicType.equals(NICType.MANET)) {
knowledgeDataSender.broadcastKnowledgeData(Arrays.asList(data));
dataToRebroadcastOverMANET.remove(sig);
} else {
sendDirect(Arrays.asList(data));
}
Log.d(String.format("Rebroadcast finished (%d) at %s, data %s", timeProvider.getCurrentTime(), host, sig));

sendDirect(Arrays.asList(data));
}

@Override
Expand All @@ -255,10 +263,21 @@ public void receive(List<? extends KnowledgeData> knowledgeData) {
// if we get the same or newer data before we manage to rebroadcast, abort the rebroadcast
if ((currentMetadata != null)
&& (currentMetadata.versionId <= newMetadata.versionId)
&& dataToRebroadcast.containsKey(currentMetadata.getSignature())) {
dataToRebroadcast.remove(currentMetadata.getSignature());
&& dataToRebroadcastOverMANET.containsKey(currentMetadata.getSignature())) {
dataToRebroadcastOverMANET.remove(currentMetadata.getSignature());
Log.d(String.format(
"MANET: Rebroadcast aborted (%d) at %s, data %s, because of %s",
timeProvider.getCurrentTime(), host,
currentMetadata.getSignature(),
newMetadata.getSignature()));
}

if ((currentMetadata != null)
&& (currentMetadata.versionId < newMetadata.versionId)
&& dataToRebroadcastOverIP.containsKey(currentMetadata.getSignature())) {
dataToRebroadcastOverIP.remove(currentMetadata.getSignature());
Log.d(String.format(
"Rebroadcast aborted (%d) at %s, data %s, because of %s",
"IP: Rebroadcast aborted (%d) at %s, data %s, because of %s",
timeProvider.getCurrentTime(), host,
currentMetadata.getSignature(),
newMetadata.getSignature()));
Expand Down Expand Up @@ -316,10 +335,13 @@ void queueForRebroadcast(KnowledgeData kd) {
kmd.versionId, kmd.sender,
kmd.rssi, delay));

dataToRebroadcast.put(kmd.getSignature(), kd);
dataToRebroadcastOverMANET.put(kmd.getSignature(), kd);
dataToRebroadcastOverIP.put(kmd.getSignature(), kd);

// schedule a task for rebroadcast
RebroadcastTask task = new RebroadcastTask(scheduler, this, delay, kmd);
RebroadcastTask task = new RebroadcastTask(scheduler, this, IP_DELAY, kmd, NICType.IP);
scheduler.addTask(task);
task = new RebroadcastTask(scheduler, this, delay, kmd, NICType.MANET);
scheduler.addTask(task);
}

Expand Down Expand Up @@ -364,7 +386,7 @@ private int getRebroadcastDelay(KnowledgeMetaData metaData) {
// rssi < 0 means received from IP
if (metaData.rssi < 0) {
Log.d("Got data from IP. Gossip condition does not apply, rebroadcasting automatically.");
return 1;
return IP_DELAY;
}

// the further further from the source (i.e. smaller rssi) the bigger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ public interface KnowledgeDataPublisher {
/**
* Republishes the received knowledge data with the given metadata
*/
void rebroacast(KnowledgeMetaData metadata);
void rebroacast(KnowledgeMetaData metadata, NICType nicType);
}
5 changes: 5 additions & 0 deletions jdeeco-core/src/cz/cuni/mff/d3s/deeco/network/NICType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package cz.cuni.mff.d3s.deeco.network;

public enum NICType {
IP,MANET;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ class RebroadcastTask extends Task {
private final TimeTrigger trigger;
private final KnowledgeMetaData dataToRebroadcast;
private final KnowledgeDataPublisher publisher;
private final NICType nicType;

public RebroadcastTask(Scheduler scheduler, KnowledgeDataPublisher publisher, int rebroadcastAfter, KnowledgeMetaData metadata) {
public RebroadcastTask(Scheduler scheduler, KnowledgeDataPublisher publisher, int rebroadcastAfter, KnowledgeMetaData metadata, NICType nicType) {
super(scheduler);
this.dataToRebroadcast = metadata;
this.publisher = publisher;
this.nicType = nicType;

trigger = RuntimeMetadataFactoryExt.eINSTANCE.createTimeTrigger();
trigger.setPeriod(0);
Expand All @@ -25,7 +27,7 @@ public RebroadcastTask(Scheduler scheduler, KnowledgeDataPublisher publisher, i

@Override
public void invoke(Trigger trigger) throws TaskInvocationException {
publisher.rebroacast(dataToRebroadcast);
publisher.rebroacast(dataToRebroadcast, nicType);
}

/* (non-Javadoc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;


public class AreaNetworkRegistry {
Expand All @@ -19,6 +16,10 @@ public class AreaNetworkRegistry {

private AreaNetworkRegistry() {
}

public Collection<Area> getAreas() {
return componentsByArea.keySet();
}

public synchronized static AreaNetworkRegistry getInstance() {
if (INSTANCE == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public static void main(String[] args) throws AnnotationProcessorException, IOEx
DirectGossipStrategy directGossipStrategy = new DirectGossipStrategy() {
@Override
public boolean gossipTo(String recipient) {
return new Random(areas.size()).nextInt(100) < 20;
//50% chances of sending to the given recipient
return new Random(areas.size()).nextInt(100) < 50;
}
};

Expand Down Expand Up @@ -178,7 +179,6 @@ public boolean gossipTo(String recipient) {
System.out.println("Simulation finished.");
}


private static void logSimulationParameters(int componentCnt) {
Log.d(String.format("Simulation parameters: %d components, packet size %d, publish period %d,"
+ " %s publishing, boundary %s, cache deadline %d, cache wipe period %d, maxRebroadcastDelay %d",
Expand All @@ -205,25 +205,29 @@ public void initialize(List<String> ethernetEnabled, AreaNetworkRegistry network
@Override
public Collection<String> getRecipients(KnowledgeData data,
ReadOnlyKnowledgeManager sender) {
List<String> result = new LinkedList<>();
KnowledgePath kpTeam = KnowledgePathBuilder.buildSimplePath("teamId");
String ownerTeam = (String) data.getKnowledge().getValue(kpTeam);
if (ownerTeam != null) {
//Find all areas of my team
List<Area> areas = networkRegistry.getTeamSites(ownerTeam);
//Pick one randomly
Area area = areas.get(new Random().nextInt(areas.size()));
//Get all the members in that area
List<PositionAwareComponent> recipients = networkRegistry.getMembersBelongingToTeam(ownerTeam, area);
//Randomly choose a subset of them and return those as possible message recipients
for (PositionAwareComponent c : recipients) {
if (!c.id.equals(sender.getId()) && ethernetEnabled.contains(c.id)) {
result.add(c.id);
if (networkRegistry.getAreas().size() > 1) {
List<String> result = new LinkedList<>();
KnowledgePath kpTeam = KnowledgePathBuilder.buildSimplePath("teamId");
String ownerTeam = (String) data.getKnowledge().getValue(kpTeam);
if (ownerTeam != null) {
//Find all areas of my team
List<Area> areas = networkRegistry.getTeamSites(ownerTeam);
//Pick one randomly
Area area = areas.get(new Random().nextInt(areas.size()));
//Get all the members in that area
List<PositionAwareComponent> recipients = networkRegistry.getMembersBelongingToTeam(ownerTeam, area);
//Randomly choose a subset of them and return those as possible message recipients
for (PositionAwareComponent c : recipients) {
if (!c.id.equals(sender.getId()) && ethernetEnabled.contains(c.id)) {
result.add(c.id);
}
}
}
//return result;
return new LinkedList<>();
} else {
return new LinkedList<>(ethernetEnabled);
}
//return result;
return new LinkedList<>();
}

}
Expand Down

0 comments on commit a02f710

Please sign in to comment.