Skip to content

Commit

Permalink
Adding simulation.
Browse files Browse the repository at this point in the history
  • Loading branch information
Michał Kit committed Jan 24, 2014
1 parent 2c45348 commit bae652b
Show file tree
Hide file tree
Showing 67 changed files with 604,184 additions and 74 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package cz.cuni.mff.d3s.deeco.publisher;

/**
* @author Michal Kit <kit@d3s.mff.cuni.cz>
*
*/
public interface IncomingKnowledgeListener {

public void knowledgeReceived(KnowledgeData knowledgeData);

}
23 changes: 23 additions & 0 deletions jdeeco-core/src/cz/cuni/mff/d3s/deeco/publisher/KnowledgeData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package cz.cuni.mff.d3s.deeco.publisher;

import java.io.Serializable;

import cz.cuni.mff.d3s.deeco.knowledge.ValueSet;

public class KnowledgeData implements Serializable {
private final ValueSet knowledge;
private final String ownerId;

public KnowledgeData(String ownerId, ValueSet knowledge) {
this.ownerId= ownerId;
this.knowledge = knowledge;
}

public ValueSet getKnowledge() {
return knowledge;
}

public String getOwnerId() {
return ownerId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package cz.cuni.mff.d3s.deeco.publisher;

public class PacketHelper {

}
140 changes: 140 additions & 0 deletions jdeeco-core/src/cz/cuni/mff/d3s/deeco/publisher/PacketReceiver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package cz.cuni.mff.d3s.deeco.publisher;

import static cz.cuni.mff.d3s.deeco.publisher.Serializer.deserialize;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import cz.cuni.mff.d3s.deeco.logging.Log;

public class PacketReceiver {

private final int packetSize;
private final Map<Integer, Message> messages;
private final IncomingKnowledgeListener incomingKnowledgeListener;

public PacketReceiver(int packetSize, IncomingKnowledgeListener incomingKnowledgeListener) {
this.packetSize = packetSize;
this.messages = new HashMap<Integer, Message>();
this.incomingKnowledgeListener = incomingKnowledgeListener;
}

public void packetReceived(byte[] packet) {
int messageId;
Message msg;
if (isInitialMessage(packet)) {
messageId = getInitialMessageId(packet);
int messageSize = getMessageSize(packet);
if (!messages.containsKey(messageId)) {
msg = new Message(messageSize);
messages.put(messageId, msg);
} else {
msg = messages.get(messageId);
msg.setPacketCount(messageSize);
}
} else {
messageId = getMessageId(packet);
int seqNumber = getMessageSeqNumber(packet);
if (messages.containsKey(messageId)) {
msg = messages.get(messageId);
msg.setData(seqNumber, getData(packet));
} else {
msg = new Message(seqNumber, getData(packet));
messages.put(messageId, msg);
}
}
if (msg.isComplete()) {
incomingKnowledgeListener.knowledgeReceived(msg.getKnowledgeData());
}
}

//-----------Helper methods-----------

private int getMessageId(byte [] packet) {
return ByteBuffer.wrap(Arrays.copyOfRange(packet, 0, 4)).getInt();
}

private int getInitialMessageId(byte [] packet) {
return ByteBuffer.wrap(Arrays.copyOfRange(packet, 4, 8)).getInt();
}

private int getMessageSize(byte [] packet) {
return ByteBuffer.wrap(Arrays.copyOfRange(packet, 8, 12)).getInt();
}

private int getMessageSeqNumber(byte [] packet) {
return getInitialMessageId(packet);
}

private boolean isInitialMessage(byte [] packet) {
return getMessageId(packet) == Integer.MIN_VALUE;
}

private byte [] getData(byte [] packet) {
return Arrays.copyOfRange(packet, 8, packet.length);
}


//---------Helper Class--------------

private class Message {

private Map<Integer, Object> cache;

private byte[] data;
private int messageSize;

public Message(int messageSize) {
initialize(messageSize);
}

public Message(int seqNumber, byte [] data) {
this.messageSize = -1;
this.cache = new HashMap<>();
setData(seqNumber, data);
}

public void setPacketCount(int packetCount) {
if (this.messageSize == -1) {
initialize(packetCount);
for (Integer key : cache.keySet())
setData(key, (byte[]) cache.get(key));
}
}

public void setData(int seqNumber, byte[] data) {
if (messageSize != -1) {
for (int i = seqNumber * packetSize; i < data.length; i++)
this.data[i] = data[i];
this.messageSize -= data.length;
} else {
cache.put(seqNumber, data);
}
}

public boolean isComplete() {
return messageSize == 0;
}

public KnowledgeData getKnowledgeData() {
try {
if (isComplete()) {
return (KnowledgeData) deserialize(data);
}
} catch (IOException | ClassNotFoundException e) {
Log.e("Error while deserializing data");
}
return null;
}

private void initialize(int messageSize) {
this.messageSize = messageSize;
this.data = new byte[messageSize];
}

}

}
108 changes: 108 additions & 0 deletions jdeeco-core/src/cz/cuni/mff/d3s/deeco/publisher/PacketSender.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package cz.cuni.mff.d3s.deeco.publisher;

import static cz.cuni.mff.d3s.deeco.publisher.Serializer.serialize;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;

import cz.cuni.mff.d3s.deeco.logging.Log;

/**
* @author Michal Kit <kit@d3s.mff.cuni.cz>
*
*/
public abstract class PacketSender {

// We reserver Integer.MIN_VALUE for distinguishing initial packets.
private static int CURRENT_MESSAGE_ID = Integer.MIN_VALUE;

public synchronized static int getNextMessageId() {
CURRENT_MESSAGE_ID++;
return CURRENT_MESSAGE_ID;
}

private final int packetSize;

/**
* Minimum fragment size is at least 12 bytes.
*
*/
public PacketSender(int packetSize) {
// At least 12 because: 4 bytes for the initial frame marker, 4 bytes
// for message id and 4 bytes for packet count.
assert packetSize >= 12;
this.packetSize = packetSize;
}

public void sendData(Object data) {
sendData(data, "");
}

public void sendData(Object data, String recipient) {
try {
byte[][] fragments = fragment(data);
int messageId = getNextMessageId();
// We need to send the message containing id and the number of
// packets.that will be sent.
sendPacket(buildInitialPacket(messageId, getDataLength(fragments)), recipient);
// Now we can send packets
for (int i = 0; i < fragments.length; i++) {
sendPacket(buildPacket(messageId, i, fragments[i]), recipient);
}
} catch (IOException e) {
Log.e("Error while serializing data: " + data);
e.printStackTrace();
}
}

protected abstract void sendPacket(byte[] packet, String recipient);

private byte[][] fragment(Object data) throws IOException {
byte[] serialized = serialize(data);
byte[][] result = new byte[(int) Math.ceil(serialized.length
/ (double) packetSize)][packetSize];
int start = 0;
for (int i = 0; i < result.length; i++) {
result[i] = Arrays.copyOfRange(serialized, start, start
+ packetSize);
start += packetSize;
}
return result;
}

private byte[] buildInitialPacket(int id, int packetCount) {
byte[] bType = ByteBuffer.allocate(4).putInt(Integer.MIN_VALUE).array();
byte[] bId = ByteBuffer.allocate(4).putInt(id).array();
byte[] bPacketCount = ByteBuffer.allocate(4).putInt(packetCount)
.array();
byte[] result = new byte[12];
for (int i = 0; i < bType.length; i++)
result[i] = bType[i];
for (int i = 0; i < bId.length; i++)
result[i + 4] = bId[i];
for (int i = 0; i < bPacketCount.length; i++)
result[i + 8] = bPacketCount[i];
return result;
}

private byte[] buildPacket(int id, int seqNumber, byte[] packetData) {
byte[] bId = ByteBuffer.allocate(4).putInt(id).array();
byte[] bSeqNumber = ByteBuffer.allocate(4).putInt(seqNumber).array();
byte[] result = new byte[8 + packetData.length];
for (int i = 0; i < bId.length; i++)
result[i] = bId[i];
for (int i = 0; i < bSeqNumber.length; i++)
result[i + 4] = bSeqNumber[i];
for (int i = 0; i < packetData.length; i++)
result[i + 8] = packetData[i];
return result;
}

private int getDataLength(byte [][] data) {
int result = 0;
for (int i = 0; i < data.length; i++)
result += data[i].length;
return result;
}
}
70 changes: 70 additions & 0 deletions jdeeco-core/src/cz/cuni/mff/d3s/deeco/publisher/Publisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package cz.cuni.mff.d3s.deeco.publisher;

import java.util.LinkedList;
import java.util.List;

import cz.cuni.mff.d3s.deeco.knowledge.ValueSet;
import cz.cuni.mff.d3s.deeco.model.runtime.api.PeriodicTrigger;
import cz.cuni.mff.d3s.deeco.model.runtime.meta.RuntimeMetadataFactory;
import cz.cuni.mff.d3s.deeco.scheduler.Scheduler;

/**
* @author Michal Kit <kit@d3s.mff.cuni.cz>
*
*/
public class Publisher {

private final Scheduler scheduler;
private final List<PublisherTask> publisherTasks;
private final PeriodicTrigger periodicTrigger;
private final PacketSender packetSender;

public Publisher(Scheduler scheduler, long period, PacketSender packetSender) {
assert scheduler != null;

this.scheduler = scheduler;
this.publisherTasks = new LinkedList<>();
this.periodicTrigger = createPeriodicTrigger(period);
this.packetSender = packetSender;
}

public void addKnowledgeSource(PublisherKnowledgeSource knowledgeSource) {
if (knowledgeSource != null && findPublisherTaskForKnowledgeSource(knowledgeSource) == null)
publisherTasks.add(new PublisherTask(scheduler, this,
knowledgeSource));
}

public void removeKnowledgeSource(PublisherKnowledgeSource knowledgeSource) {
if (knowledgeSource != null) {
PublisherTask pTask = findPublisherTaskForKnowledgeSource(knowledgeSource);
if (pTask != null)
scheduler.removeTask(pTask);
}
}

public PeriodicTrigger getPeriodicTrigger() {
return periodicTrigger;
}

public void publish(String ownerId, ValueSet knowledge) {
packetSender.sendData(new KnowledgeData(ownerId, knowledge));
}

// --------Private Methods-------------

private PublisherTask findPublisherTaskForKnowledgeSource(
PublisherKnowledgeSource knowledgeSource) {
if (knowledgeSource != null)
for (PublisherTask pTask : publisherTasks)
if (pTask.getPublisherKnowledgeSource().equals(knowledgeSource))
return pTask;
return null;
}

private PeriodicTrigger createPeriodicTrigger(long period) {
RuntimeMetadataFactory factory = RuntimeMetadataFactory.eINSTANCE;
PeriodicTrigger result = factory.createPeriodicTrigger();
result.setPeriod(period);
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package cz.cuni.mff.d3s.deeco.publisher;

import cz.cuni.mff.d3s.deeco.knowledge.ValueSet;

public interface PublisherKnowledgeSource {

public String getOwnerId();
public ValueSet getKnowledge();
}
Loading

0 comments on commit bae652b

Please sign in to comment.