Skip to content

Commit

Permalink
Moved UNICAST3$Header -> UnicastHeader3
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Nov 9, 2016
1 parent dbd2fce commit dbcb4d4
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 209 deletions.
2 changes: 1 addition & 1 deletion conf/jg-magic-map.xml
Expand Up @@ -60,7 +60,7 @@
<class id="79" name="org.jgroups.protocols.tom.ToaHeader"/>
<class id="80" name="org.jgroups.protocols.relay.RELAY2$Relay2Header"/>
<class id="81" name="org.jgroups.protocols.FORWARD_TO_COORD$ForwardHeader"/>
<class id="82" name="org.jgroups.protocols.UNICAST3$Header"/>
<class id="82" name="org.jgroups.protocols.UnicastHeader3"/>
<class id="83" name="org.jgroups.protocols.FORK$ForkHeader"/>
<class id="84" name="org.jgroups.protocols.PERF$PerfHeader"/>
<class id="85" name="org.jgroups.protocols.SaslHeader"/>
Expand Down
217 changes: 23 additions & 194 deletions src/org/jgroups/protocols/UNICAST3.java

Large diffs are not rendered by default.

183 changes: 183 additions & 0 deletions src/org/jgroups/protocols/UnicastHeader3.java
@@ -0,0 +1,183 @@
package org.jgroups.protocols;

import org.jgroups.Global;
import org.jgroups.Header;
import org.jgroups.util.Bits;

import java.io.DataInput;
import java.io.DataOutput;
import java.util.function.Supplier;

/**
* Moved out of {@link UNICAST3} into separate class. Used to attach/remove headers (e.g. seqnos) from {@link UNICAST3}.
* @author Bela Ban
* @since 4.0
*/
public class UnicastHeader3 extends Header {
public static final byte DATA = 0;
public static final byte ACK = 1;
public static final byte SEND_FIRST_SEQNO = 2;
public static final byte XMIT_REQ = 3; // SeqnoList of missing message is in the message's payload
public static final byte CLOSE = 4;

byte type;
long seqno; // DATA and ACK
short conn_id; // DATA and CLOSE
boolean first; // DATA
long timestamp; // SEND_FIRST_SEQNO and ACK


public UnicastHeader3() {} // used for externalization

protected UnicastHeader3(byte type) {
this.type=type;
}

protected UnicastHeader3(byte type, long seqno) {
this.type=type;
this.seqno=seqno;
}

protected UnicastHeader3(byte type, long seqno, short conn_id, boolean first) {
this.type=type;
this.seqno=seqno;
this.conn_id=conn_id;
this.first=first;
}
public short getMagicId() {return 82;}
public Supplier<? extends Header> create() {return UnicastHeader3::new;}

public static UnicastHeader3 createDataHeader(long seqno, short conn_id, boolean first) {
return new UnicastHeader3(DATA, seqno, conn_id, first);
}

public static UnicastHeader3 createAckHeader(long seqno, short conn_id, long timestamp) {
return new UnicastHeader3(ACK, seqno, conn_id, false).timestamp(timestamp);
}

public static UnicastHeader3 createSendFirstSeqnoHeader(long timestamp) {
return new UnicastHeader3(SEND_FIRST_SEQNO).timestamp(timestamp);
}

public static UnicastHeader3 createXmitReqHeader() {
return new UnicastHeader3(XMIT_REQ);
}

public static UnicastHeader3 createCloseHeader(short conn_id) {
return new UnicastHeader3(CLOSE, 0, conn_id, false);
}

public byte type() {return type;}
public long seqno() {return seqno;}
public short connId() {return conn_id;}
public boolean first() {return first;}
public long timestamp() {return timestamp;}
public UnicastHeader3 timestamp(long ts) {timestamp=ts; return this;}

public String toString() {
StringBuilder sb=new StringBuilder();
sb.append(type2Str(type)).append(", seqno=").append(seqno);
if(conn_id != 0) sb.append(", conn_id=").append(conn_id);
if(first) sb.append(", first");
if(timestamp != 0)
sb.append(", ts=").append(timestamp);
return sb.toString();
}

public static String type2Str(byte t) {
switch(t) {
case DATA: return "DATA";
case ACK: return "ACK";
case SEND_FIRST_SEQNO: return "SEND_FIRST_SEQNO";
case XMIT_REQ: return "XMIT_REQ";
case CLOSE: return "CLOSE";
default: return "<unknown>";
}
}

public final int serializedSize() {
int retval=Global.BYTE_SIZE; // type
switch(type) {
case DATA:
retval+=Bits.size(seqno) // seqno
+ Global.SHORT_SIZE // conn_id
+ Global.BYTE_SIZE; // first
break;
case ACK:
retval+=Bits.size(seqno)
+ Global.SHORT_SIZE // conn_id
+ Bits.size(timestamp);
break;
case SEND_FIRST_SEQNO:
retval+=Bits.size(timestamp);
break;
case XMIT_REQ:
break;
case CLOSE:
retval+=Global.SHORT_SIZE; // conn-id
break;
}
return retval;
}

public UnicastHeader3 copy() {
return new UnicastHeader3(type, seqno, conn_id, first);
}

/**
* The following types and fields are serialized:
* <pre>
* | DATA | seqno | conn_id | first |
* | ACK | seqno | timestamp |
* | SEND_FIRST_SEQNO | timestamp |
* | CLOSE | conn_id |
* </pre>
*/
public void writeTo(DataOutput out) throws Exception {
out.writeByte(type);
switch(type) {
case DATA:
Bits.writeLong(seqno, out);
out.writeShort(conn_id);
out.writeBoolean(first);
break;
case ACK:
Bits.writeLong(seqno, out);
out.writeShort(conn_id);
Bits.writeLong(timestamp, out);
break;
case SEND_FIRST_SEQNO:
Bits.writeLong(timestamp, out);
break;
case XMIT_REQ:
break;
case CLOSE:
out.writeShort(conn_id);
break;
}
}

public void readFrom(DataInput in) throws Exception {
type=in.readByte();
switch(type) {
case DATA:
seqno=Bits.readLong(in);
conn_id=in.readShort();
first=in.readBoolean();
break;
case ACK:
seqno=Bits.readLong(in);
conn_id=in.readShort();
timestamp=Bits.readLong(in);
break;
case SEND_FIRST_SEQNO:
timestamp=Bits.readLong(in);
break;
case XMIT_REQ:
break;
case CLOSE:
conn_id=in.readShort();
break;
}
}
}
Expand Up @@ -5,6 +5,7 @@
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.UnicastHeader3;

/**
* @author Bela Ban
Expand All @@ -24,7 +25,7 @@ public void sendUnicast(JChannel ch) throws Exception {

// Add a UNICAST2 header
final UNICAST3 unicast=ch.getProtocolStack().findProtocol(UNICAST3.class);
UNICAST3.Header hdr=UNICAST3.Header.createDataHeader(1, (short)1, true);
UnicastHeader3 hdr=UnicastHeader3.createDataHeader(1, (short)1, true);
msg.putHeader(unicast.getId(), hdr);

new Thread() {
Expand Down
Expand Up @@ -245,7 +245,7 @@ public void testMessageToNonExistingMember(Class<? extends Protocol> unicast) th

protected Header createDataHeader(Protocol unicast, long seqno, short conn_id, boolean first) {
if(unicast instanceof UNICAST3)
return UNICAST3.Header.createDataHeader(seqno, conn_id, first);
return UnicastHeader3.createDataHeader(seqno, conn_id, first);
throw new IllegalArgumentException("protocol " + unicast.getClass().getSimpleName() + " needs to be UNICAST3");
}

Expand Down
Expand Up @@ -514,14 +514,14 @@ protected List<Message> createMessages() {
List<Message> retval=new ArrayList<>(10);

for(long seqno=1; seqno <= 5; seqno++)
retval.add(new Message(b).putHeader(UNICAST3_ID, UNICAST3.Header.createDataHeader(seqno, (short)22, false)));
retval.add(new Message(b).putHeader(UNICAST3_ID, UnicastHeader3.createDataHeader(seqno, (short)22, false)));

retval.add(new Message(b).putHeader(PING_ID, new PingHeader(PingHeader.GET_MBRS_RSP).clusterName("demo-cluster")));
retval.add(new Message(b).putHeader(FD_ID, new FD.FdHeader(org.jgroups.protocols.FD.FdHeader.HEARTBEAT)));
retval.add(new Message(b).putHeader(MERGE_ID, MERGE3.MergeHeader.createViewResponse()));

for(long seqno=6; seqno <= 10; seqno++)
retval.add(new Message(b).putHeader(UNICAST3_ID, UNICAST3.Header.createDataHeader(seqno, (short)22, false)));
retval.add(new Message(b).putHeader(UNICAST3_ID, UnicastHeader3.createDataHeader(seqno, (short)22, false)));

for(Message msg: retval)
msg.putHeader(UDP_ID, new TpHeader("demo-cluster"));
Expand Down
4 changes: 2 additions & 2 deletions tests/junit-functional/org/jgroups/tests/MessageSizeTest.java
Expand Up @@ -6,7 +6,7 @@
import org.jgroups.Message;
import org.jgroups.Version;
import org.jgroups.protocols.TpHeader;
import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.UnicastHeader3;
import org.jgroups.protocols.pbcast.NakAckHeader2;
import org.jgroups.util.Buffer;
import org.jgroups.util.ByteArrayDataOutputStream;
Expand Down Expand Up @@ -108,7 +108,7 @@ protected static void writeMessage(Message msg, DataOutput dos, boolean multicas
static Message createMessage(Address dest, Address src) {
Message msg=new Message(dest, "hello world").src(src);
msg.putHeader(NAKACK_ID, NakAckHeader2.createMessageHeader(322649));
msg.putHeader(UNICAST_ID, UNICAST3.Header.createDataHeader(465784, (short)23323, true));
msg.putHeader(UNICAST_ID, UnicastHeader3.createDataHeader(465784, (short)23323, true));
msg.putHeader(UDP_ID, new TpHeader("DrawDemo"));
return msg;
}
Expand Down
16 changes: 8 additions & 8 deletions tests/junit-functional/org/jgroups/tests/SizeTest.java
Expand Up @@ -194,30 +194,30 @@ public static void testFdSockHeaders() throws Exception {


public void testUnicast3Header() throws Exception {
UNICAST3.Header hdr=UNICAST3.Header.createDataHeader(322649, (short)127, false);
UnicastHeader3 hdr=UnicastHeader3.createDataHeader(322649, (short)127, false);
_testSize(hdr);
_testMarshalling(hdr);

hdr=UNICAST3.Header.createDataHeader(322649, Short.MAX_VALUE, false);
hdr=UnicastHeader3.createDataHeader(322649, Short.MAX_VALUE, false);
_testSize(hdr);
_testMarshalling(hdr);

hdr=UNICAST3.Header.createDataHeader(322649, (short)(Short.MAX_VALUE -10), true);
hdr=UnicastHeader3.createDataHeader(322649, (short)(Short.MAX_VALUE -10), true);
_testSize(hdr);
_testMarshalling(hdr);

//noinspection NumericOverflow
for(long timestamp: new long[]{0, 100, Long.MAX_VALUE -1, Long.MAX_VALUE, Long.MAX_VALUE +100}) {
hdr=UNICAST3.Header.createSendFirstSeqnoHeader((int)timestamp);
hdr=UnicastHeader3.createSendFirstSeqnoHeader((int)timestamp);
_testSize(hdr);
_testMarshalling(hdr);
}

hdr=UNICAST3.Header.createAckHeader(322649, (short)2, 500600);
hdr=UnicastHeader3.createAckHeader(322649, (short)2, 500600);
_testSize(hdr);
_testMarshalling(hdr);

hdr=UNICAST3.Header.createXmitReqHeader();
hdr=UnicastHeader3.createXmitReqHeader();
_testSize(hdr);
_testMarshalling(hdr);
}
Expand Down Expand Up @@ -801,9 +801,9 @@ public static void testRequestCorrelatorHeader() throws Exception {
}


private static void _testMarshalling(UNICAST3.Header hdr) throws Exception {
private static void _testMarshalling(UnicastHeader3 hdr) throws Exception {
byte[] buf=Util.streamableToByteBuffer(hdr);
UNICAST3.Header hdr2=Util.streamableFromByteBuffer(UNICAST3.Header.class, buf);
UnicastHeader3 hdr2=Util.streamableFromByteBuffer(UnicastHeader3.class, buf);

assert hdr.type() == hdr2.type();
assert hdr.seqno() == hdr2.seqno();
Expand Down

0 comments on commit dbcb4d4

Please sign in to comment.