Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Causal Ordering ?

  • Loading branch information...
commit e0205896dcaf58568def9b4880a717b2d299102d 1 parent 373ef27
forkloop authored
View
179 src/us/forkloop/sockettalk/ListenService.java
@@ -12,8 +12,10 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import android.app.IntentService;
import android.content.Intent;
@@ -42,10 +44,17 @@ protected void onHandleIntent(Intent intent) {
int id = SocketTalkActivity.id;
Log.i("log", "Emulator: " + id);
+
+ // For Causal Ordering
+ SocketChannel[] skc = new SocketChannel[5];
+ LinkedList<Msg>[] hold_hold = (LinkedList<Msg>[]) new LinkedList[5];
- HashMap<Integer, Integer> delivery = new HashMap<Integer, Integer>();
- for(int j=5554; j<5564; j+=2)
- delivery.put(j, 0);
+ for (int i=0; i<5; i++) {
+ SocketTalkActivity.receive[i] = 0;
+ skc[i] = null;
+ hold_hold[i] = new LinkedList<Msg>();
+ }
+
// Connect to previous emulators first
try {
@@ -100,6 +109,7 @@ protected void onHandleIntent(Intent intent) {
else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
Log.i("log", "Receive a new message...");
+ int num_conn = SocketTalkActivity.out.size();
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer bb = ByteBuffer.allocate(1000);
sc.read(bb);
@@ -115,49 +125,127 @@ else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
if (msg_type.equals("us.forkloop.sockettalk.Msg")) {
Msg mmsg = (Msg) msg;
- PropSeq pmsg = new PropSeq();
- // Determine proposed #
- SocketTalkActivity.Pmax = Math.max(SocketTalkActivity.Pmax, SocketTalkActivity.Amax) + 1;
- // Used to sort the messages
- mmsg.msg_seq = SocketTalkActivity.Pmax;
- // Add to hold back queue
- SocketTalkActivity.hold_back.add(mmsg);
- Log.i("log", "Proposed # is " + SocketTalkActivity.Pmax);
- pmsg.msg_id = mmsg.msg_id;
- pmsg.msg_seq = SocketTalkActivity.Pmax;
-
- byte[] bytes = null;
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- try {
- ObjectOutputStream oos = new ObjectOutputStream(bos);
- oos.writeObject(pmsg);
- oos.flush();
- oos.close();
- bos.close();
- bytes = bos.toByteArray();
- } catch (IOException e) {
- Log.i("log", e.toString());
+ boolean vefy = true;
+ int tmp_id = (mmsg.send_id - 5554)/2;
+ if ( skc[tmp_id] != null ) {
+ skc[tmp_id] = sc;
}
- sc.write(ByteBuffer.wrap(bytes));
-
- //---------------------------------------------------
- if (SocketTalkActivity.TEST_TWO_FLAG && SocketTalkActivity.TEST_TWO_SENT ) {
+ for (int i = 0;i <=num_conn; i++) {
+ if (i == tmp_id ) {
+ if (SocketTalkActivity.receive[i] != mmsg.recv[i]-1) {
+ vefy = false;
+ break;
+ }
+ }
+ else {
+ if (SocketTalkActivity.receive[i]<mmsg.recv[i]) {
+ vefy = false;
+ break;
+ }
+ }
+ }
+ if (vefy) {
+ SocketTalkActivity.receive[tmp_id]++;
+ PropSeq pmsg = new PropSeq();
+ // Determine proposed #
+ SocketTalkActivity.Pmax = Math.max(SocketTalkActivity.Pmax, SocketTalkActivity.Amax) + 1;
+ // Used to sort the messages
+ mmsg.msg_seq = SocketTalkActivity.Pmax;
+ // Add to hold back queue
+ SocketTalkActivity.hold_back.add(mmsg);
+ Log.i("log", "Proposed # is " + SocketTalkActivity.Pmax);
+ pmsg.msg_id = mmsg.msg_id;
+ pmsg.msg_seq = SocketTalkActivity.Pmax;
+
+ byte[] bytes = null;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try {
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(pmsg);
+ oos.flush();
+ oos.close();
+ bos.close();
+ bytes = bos.toByteArray();
+ } catch (IOException e) {
+ Log.i("log", e.toString());
+ }
+ sc.write(ByteBuffer.wrap(bytes));
- int prev = ((id - mmsg.send_id)/2+SocketTalkActivity.out.size())%SocketTalkActivity.out.size();
- Log.i("log", "prev is " + prev);
- if (prev == 1) {
- SocketTalkActivity.TEST_TWO_SENT = false;
- int i = 1;
- String m = id + ":" + i;
- SocketTalkActivity.stat.add(1);
- SocketTalkActivity.Pmax = Math.max(SocketTalkActivity.Amax, SocketTalkActivity.Pmax) + 1;
- SocketTalkActivity.seq.add(SocketTalkActivity.Pmax);
+ //-----------------------
+ // FIXME SORT THEM
+ for (int i=0; i< 5; i++) {
+ int pos = 0;
+ for (Msg m : hold_hold[i]) {
+ boolean rflag = true;
+ for (int j=0; j<5; j++) {
+ if (tmp_id == i && i == j) {
+ if (m.msg_id != SocketTalkActivity.receive[j]+1) {
+ rflag = false;
+ break;
+ }
+ }
+ else {
+ if (m.recv[j] > SocketTalkActivity.receive[j]) {
+ rflag = false;
+ break;
+ }
+ }
+ }
+ if (rflag) {
+ Msg nm = hold_hold[i].remove(pos);
+
+ PropSeq pro = new PropSeq();
+ // Determine proposed #
+ SocketTalkActivity.Pmax = Math.max(SocketTalkActivity.Pmax, SocketTalkActivity.Amax) + 1;
+ // Used to sort the messages
+ nm.msg_seq = SocketTalkActivity.Pmax;
+ // Add to hold back queue
+ SocketTalkActivity.hold_back.add(nm);
+ Log.i("log", "Proposed # is " + SocketTalkActivity.Pmax);
+ pro.msg_id = nm.msg_id;
+ pro.msg_seq = SocketTalkActivity.Pmax;
+
+ byte[] bbytes = null;
+ ByteArrayOutputStream bbos = new ByteArrayOutputStream();
+ try {
+ ObjectOutputStream ooos = new ObjectOutputStream(bbos);
+ ooos.writeObject(pro);
+ ooos.flush();
+ ooos.close();
+ bbos.close();
+ bbytes = bbos.toByteArray();
+ } catch (IOException e) {
+ Log.i("log", e.toString());
+ }
+ skc[i].write(ByteBuffer.wrap(bbytes));
+ }
+ pos++;
+ }
+ }
+ //---------------------------------------------------
+ if ( mmsg.test && SocketTalkActivity.TEST_TWO_SENT ) {
- Intent send_intent = new Intent(this, SendService.class);
- send_intent.putExtra("text", m);
- startService(send_intent);
+ SocketTalkActivity.TEST_TWO_FLAG = true;
+ int prev = ((id - mmsg.send_id)/2+SocketTalkActivity.out.size()+1)%(SocketTalkActivity.out.size()+1);
+ Log.i("log", "prev is " + prev);
+ if (prev == 1) {
+ SocketTalkActivity.TEST_TWO_SENT = false;
+ int i = 1;
+ String m = id + ":" + i;
+ SocketTalkActivity.stat.add(1);
+ SocketTalkActivity.Pmax = Math.max(SocketTalkActivity.Amax, SocketTalkActivity.Pmax) + 1;
+ SocketTalkActivity.seq.add(SocketTalkActivity.Pmax);
+
+ Intent send_intent = new Intent(this, SendService.class);
+ send_intent.putExtra("text", m);
+ startService(send_intent);
+ }
}
}
+ else {
+
+ hold_hold[tmp_id].add(mmsg);
+ }
}
else if (msg_type.equals("us.forkloop.sockettalk.AgreeSeq")) {
@@ -166,7 +254,7 @@ else if (msg_type.equals("us.forkloop.sockettalk.AgreeSeq")) {
SocketTalkActivity.Amax = Math.max(SocketTalkActivity.Amax, amsg.msg_seq);
int k = 0, pos = -1;
- int no = (Integer) delivery.get(amsg.send_id);
+// int no = SocketTalkActivity.receive[amsg.send_id];
int sm_seq, sm_ind = -1;
sm_seq = amsg.msg_seq;
@@ -174,16 +262,11 @@ else if (msg_type.equals("us.forkloop.sockettalk.AgreeSeq")) {
for (Msg m : SocketTalkActivity.hold_back) {
-// if (m.msg_seq < sm_seq || (m.msg_seq == sm_seq && m.send_id < m.send_id)) {
-// flag = false;
-// }
if (m.msg_id == amsg.msg_id && m.send_id == amsg.send_id ) {
- //&& (m.msg_id == no+1)) {
- //XXX Forget the FIFO first
m.d_flag = true;
m.msg_seq = amsg.msg_seq;
pos = k;
- delivery.put(amsg.msg_id, no+1);
+// SocketTalkActivity.receive[amsg.msg_id] = no+1;
}
else {
if (m.msg_seq < sm_seq || (m.msg_seq == sm_seq && m.send_id < m.send_id)) {
View
2  src/us/forkloop/sockettalk/Msg.java
@@ -15,6 +15,8 @@
String msg_content;
int msg_id;
boolean d_flag; // Ready for delivery flag
+ int[] recv;
+ boolean test;
}
/* class for proposed sequence # */
View
9 src/us/forkloop/sockettalk/SendService.java
@@ -26,13 +26,20 @@ protected void onHandleIntent (Intent intent) {
Msg msg = new Msg();
msg.msg_content = intent.getStringExtra("text");
- //Log.i("log", intent.getStringExtra("text"));
msg.msg_id = ++SocketTalkActivity.sent_count;
+ SocketTalkActivity.receive[(SocketTalkActivity.id-5554)/2] = SocketTalkActivity.sent_count;
Log.i("log", "# of msgs sent is " + SocketTalkActivity.sent_count);
// SocketTalkActivity.Pmax = Math.max(SocketTalkActivity.Pmax, SocketTalkActivity.Amax) + 1;
msg.msg_seq = SocketTalkActivity.Pmax;
msg.send_id = SocketTalkActivity.id;
msg.d_flag = false;
+ msg.recv = SocketTalkActivity.receive;
+ if (SocketTalkActivity.TEST_TWO_FLAG) {
+ msg.test = true;
+ } else {
+ msg.test = false;
+ }
+
SocketTalkActivity.hold_back.add(msg);
byte[] bytes = null;
View
4 src/us/forkloop/sockettalk/SocketTalkActivity.java
@@ -42,7 +42,7 @@
// FOR TEST
- static boolean TEST_TWO_FLAG=true;
+ static boolean TEST_TWO_FLAG=false;
static boolean TEST_TWO_SENT=true;
// Used to break tie when two msgs have the same seq.#
static int id;
@@ -52,6 +52,7 @@
// # of messages sent by me
static int sent_count = 0;
+ static int[] receive = new int[5];
////////////////////
static int Amax;
static int Pmax;
@@ -303,6 +304,7 @@ public void Test1() {
public void Test2() {
+ TEST_TWO_FLAG = true;
Intent i = new Intent(this, TestTwo.class);
startService(i);
}
Please sign in to comment.
Something went wrong with that request. Please try again.