Skip to content

Commit

Permalink
- TcpConnection.buffer made non-volatile (only accessed by thread)
Browse files Browse the repository at this point in the history
- Streamable classes now implement SizeStreamable instead (https://issues.jboss.org/browse/JGRP-2122)
- All headers now implement SizeStreamable (changed size() -> serializedSize())
  • Loading branch information
belaban committed Nov 3, 2016
1 parent 3e8d56d commit 62c4d23
Show file tree
Hide file tree
Showing 47 changed files with 56 additions and 63 deletions.
19 changes: 6 additions & 13 deletions src/org/jgroups/Header.java
@@ -1,7 +1,7 @@

package org.jgroups;

import org.jgroups.util.Streamable;
import org.jgroups.util.SizeStreamable;

/**
* Header is a JGroups internal base class for all JGroups headers. Client normally do not need to
Expand All @@ -10,7 +10,7 @@
* @author Bela Ban
* @since 2.0
*/
public abstract class Header implements Streamable, Constructable<Header> {
public abstract class Header implements SizeStreamable, Constructable<Header> {
/** The ID of the protocol which added a header to a message. Set externally, e.g. by {@link Message#putHeader(short,Header)} */
protected short prot_id;

Expand All @@ -20,17 +20,10 @@ public abstract class Header implements Streamable, Constructable<Header> {
/** Returns the magic-ID. If defined in jg-magic-map.xml, the IDs need to be the same */
public abstract short getMagicId();


/**
* To be implemented by subclasses. Return the size of this object for the serialized version of it.
* I.e. how many bytes this object takes when flattened into a buffer. This may be different for each instance,
* or can be the same. This may also just be an estimation. E.g. FRAG uses it on Message to determine whether
* or not to fragment the message. Fragmentation itself will be accurate, because the entire message will actually
* be serialized into a byte buffer, so we can determine the exact size.
*/
public abstract int size();


/** @deprecated Headers should implement {@link SizeStreamable#serializedSize()} instead */
@Deprecated public int size() {
return serializedSize();
}

public String toString() {
return '[' + getClass().getSimpleName() + "]";
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/auth/DemoToken.java
Expand Up @@ -186,7 +186,7 @@ public void readFrom(DataInput in) throws Exception {
}
}

public int size() {
public int serializedSize() {
int retval=Global.BYTE_SIZE; // type
switch(type) {
case CHALLENGE:
Expand Down
6 changes: 3 additions & 3 deletions src/org/jgroups/blocks/RequestCorrelator.java
Expand Up @@ -563,7 +563,7 @@ public void readFrom(DataInput in) throws Exception {
corrId=in.readShort();
}

public int size() {
public int serializedSize() {
return Global.BYTE_SIZE // type
+ Bits.size(req_id) // req_id
+ Global.SHORT_SIZE; // corrId
Expand Down Expand Up @@ -598,8 +598,8 @@ public void readFrom(DataInput in) throws Exception {
exclusion_list=Util.readAddresses(in);
}

public int size() {
return (int)(super.size() + Util.size(exclusion_list));
public int serializedSize() {
return (int)(super.serializedSize() + Util.size(exclusion_list));
}

public String toString() {
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/blocks/cs/TcpConnection.java
Expand Up @@ -246,7 +246,7 @@ protected Address readPeerAddress(Socket client_sock) throws Exception {
protected class Receiver implements Runnable {
protected final Thread recv;
protected volatile boolean receiving=true;
protected volatile byte[] buffer;
protected byte[] buffer; // no need to be volatile, only accessed by this thread

public Receiver(ThreadFactory f) {
recv=f.newThread(this,"Connection.Receiver [" + getSockAddress() + "]");
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/ABP.java
Expand Up @@ -183,7 +183,7 @@ public ABPHeader(Type type, byte bit) {
public Supplier<? extends Header> create() {return ABPHeader::new;}

@Override
public int size() {
public int serializedSize() {
return Global.BYTE_SIZE *2;
}

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/AuthHeader.java
Expand Up @@ -41,7 +41,7 @@ public void readFrom(DataInput in) throws Exception {
this.token=readAuthToken(in);
}

public int size() {
public int serializedSize() {
return sizeOf(token);
}

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/COMPRESS.java
Expand Up @@ -195,7 +195,7 @@ public Supplier<? extends Header> create() {
return CompressHeader::new;
}

public int size() {
public int serializedSize() {
return Global.INT_SIZE;
}

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/COUNTER.java
Expand Up @@ -1092,7 +1092,7 @@ public String toString() {
public static class CounterHeader extends Header {
public Supplier<? extends Header> create() {return CounterHeader::new;}
public short getMagicId() {return 74;}
public int size() {return 0;}
public int serializedSize() {return 0;}
public void writeTo(DataOutput out) throws Exception {}
public void readFrom(DataInput in) throws Exception {}
}
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/DAISYCHAIN.java
Expand Up @@ -194,7 +194,7 @@ public void setTTL(short ttl) {

public Supplier<? extends Header> create() {return DaisyHeader::new;}

public int size() {
public int serializedSize() {
return Global.SHORT_SIZE;
}

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/EXAMPLE.java
Expand Up @@ -73,7 +73,7 @@ public static class ExampleHeader extends Header {

public Supplier<? extends Header> create() {return ExampleHeader::new;}
public short getMagicId() {return 21000;}
public int size() {
public int serializedSize() {
return 0; // return serialized size of all variables sent across the wire
}

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/EncryptHeader.java
Expand Up @@ -55,7 +55,7 @@ public String toString() {
return String.format("[%s version=%s]", typeToString(type), (version != null? version.length + " bytes" : "n/a"));
}

public int size() {return Global.BYTE_SIZE + Util.size(version) + Util.size(signature) /*+ Util.size(payload) */;}
public int serializedSize() {return Global.BYTE_SIZE + Util.size(version) + Util.size(signature) /*+ Util.size(payload) */;}

protected static String typeToString(byte type) {
switch(type) {
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/Executing.java
Expand Up @@ -1047,7 +1047,7 @@ public static class ExecutorHeader extends Header {
public ExecutorHeader() {
}
public short getMagicId() {return 73;}
public int size() {
public int serializedSize() {
return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/FD.java
Expand Up @@ -413,7 +413,7 @@ public String toString() {
}


public int size() {
public int serializedSize() {
int retval=Global.BYTE_SIZE; // type
retval+=Util.size(mbrs);
retval+=Util.size(from);
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/FD_ALL.java
Expand Up @@ -396,7 +396,7 @@ public HeartbeatHeader() {}
public String toString() {return "heartbeat";}
public short getMagicId() {return 62;}
public Supplier<? extends Header> create() {return HeartbeatHeader::new;}
public int size() {return 0;}
public int serializedSize() {return 0;}
public void writeTo(DataOutput out) throws Exception {}
public void readFrom(DataInput in) throws Exception {}
}
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/FD_ALL2.java
Expand Up @@ -374,7 +374,7 @@ public HeartbeatHeader() {}
public short getMagicId() {return 63;}
public Supplier<? extends Header> create() {return HeartbeatHeader::new;}
public String toString() {return "heartbeat";}
public int size() {return 0;}
public int serializedSize() {return 0;}
public void writeTo(DataOutput out) throws Exception {}
public void readFrom(DataInput in) throws Exception {}
}
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/FD_SOCK.java
Expand Up @@ -909,7 +909,7 @@ public static String type2String(byte type) {
}


public int size() {
public int serializedSize() {
int retval=Global.BYTE_SIZE; // type
retval+=Util.size(mbr);

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/FORK.java
Expand Up @@ -370,7 +370,7 @@ public void setForkChannelId(String fork_channel_id) {
this.fork_channel_id=fork_channel_id;
}

public int size() {return Util.size(fork_stack_id) + Util.size(fork_channel_id);}
public int serializedSize() {return Util.size(fork_stack_id) + Util.size(fork_channel_id);}

public void writeTo(DataOutput out) throws Exception {
Bits.writeString(fork_stack_id,out);
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/FORWARD_TO_COORD.java
Expand Up @@ -205,7 +205,7 @@ public ForwardHeader(byte type, long id) {
public short getMagicId() {return 81;}
public long getId() {return id;}
public byte getType() {return type;}
public int size() {return Global.BYTE_SIZE + Bits.size(id);}
public int serializedSize() {return Global.BYTE_SIZE + Bits.size(id);}

public void writeTo(DataOutput out) throws Exception {
out.writeByte(type);
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/FcHeader.java
Expand Up @@ -31,7 +31,7 @@ public Supplier<? extends Header> create() {

public short getMagicId() {return 59;}

public int size() {
public int serializedSize() {
return Global.BYTE_SIZE;
}

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/FragHeader.java
Expand Up @@ -43,7 +43,7 @@ public void writeTo(DataOutput out) throws Exception {
Bits.writeInt(num_frags, out);
}

public int size() {
public int serializedSize() {
return Bits.size(id) + Bits.size(frag_id) + Bits.size(num_frags);
}

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/Locking.java
Expand Up @@ -1402,7 +1402,7 @@ public Supplier<? extends Header> create() {
return LockingHeader::new;
}

public int size() {
public int serializedSize() {
return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/MERGE3.java
Expand Up @@ -555,7 +555,7 @@ protected MergeHeader(Type type, ViewId view_id, String logical_name, PhysicalAd
public short getMagicId() {return 75;}
public Supplier<? extends Header> create() {return MergeHeader::new;}

public int size() {
public int serializedSize() {
int retval=Global.BYTE_SIZE; // for the type
retval+=Util.size(view_id);
retval+=Global.BYTE_SIZE; // presence byte for logical_name
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/NAMING.java
Expand Up @@ -190,7 +190,7 @@ public String toString() {
return String.format("%s addr=%s name=%s", type, addr, name);
}

public int size() {
public int serializedSize() {
return Global.SHORT_SIZE + Util.size(addr) + Util.size(name);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/PERF.java
Expand Up @@ -105,7 +105,7 @@ public Supplier<? extends Header> create() {
return PerfHeader::new;
}

public int size() {
public int serializedSize() {
return Global.LONG_SIZE;
}

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/PingHeader.java
Expand Up @@ -35,7 +35,7 @@ public PingHeader() {

public Supplier<? extends Header> create() {return PingHeader::new;}

public int size() {
public int serializedSize() {
int retval=Global.BYTE_SIZE *3; // type, cluster_name presence and initial_discovery
if(cluster_name != null)
retval += cluster_name.length() +2;
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/RELAY.java
Expand Up @@ -631,7 +631,7 @@ public Supplier<? extends Header> create() {
return RelayHeader::new;
}

public int size() {
public int serializedSize() {
int retval=Global.BYTE_SIZE; // type
switch(type) {
case DISSEMINATE:
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/RSVP.java
Expand Up @@ -384,7 +384,7 @@ public RsvpHeader(byte type, short id) {
public short getMagicId() {return 76;}
public Supplier<? extends Header> create() {return RsvpHeader::new;}

public int size() {
public int serializedSize() {
return Global.BYTE_SIZE + Global.SHORT_SIZE;
}

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/SEQUENCER.java
Expand Up @@ -654,7 +654,7 @@ public void readFrom(DataInput in) throws Exception {
flush_ack=in.readBoolean();
}

public int size() {
public int serializedSize() {
return Global.BYTE_SIZE + Bits.size(seqno) + Global.BYTE_SIZE; // type + seqno + flush_ack
}

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/SEQUENCER2.java
Expand Up @@ -474,7 +474,7 @@ public void readFrom(DataInput in) throws Exception {
}

// type + seqno + localSeqno + flush_ack
public int size() {
public int serializedSize() {
return Global.BYTE_SIZE + Bits.size(seqno) + Global.SHORT_SIZE;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/STOMP.java
Expand Up @@ -648,7 +648,7 @@ public static StompHeader createHeader(Type type, Map<String,String> headers) {



public int size() {
public int serializedSize() {
int retval=Global.INT_SIZE *2; // type + size of hashmap
for(Map.Entry<String,String> entry: headers.entrySet()) {
retval+=entry.getKey().length() +2;
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/SaslHeader.java
Expand Up @@ -70,7 +70,7 @@ public void readFrom(DataInput in) throws Exception {
}

@Override
public int size() {
public int serializedSize() {
return Util.size(payload);
}

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/TpHeader.java
Expand Up @@ -43,7 +43,7 @@ public String toString() {
return String.format("[cluster_name=%s]", new String(cluster_name));
}

public int size() {
public int serializedSize() {
return cluster_name != null? Global.SHORT_SIZE + cluster_name.length : Global.SHORT_SIZE;
}

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/UNICAST3.java
Expand Up @@ -1304,7 +1304,7 @@ public static String type2Str(byte t) {
}
}

public final int size() {
public final int serializedSize() {
int retval=Global.BYTE_SIZE; // type
switch(type) {
case DATA:
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/VERIFY_SUSPECT.java
Expand Up @@ -381,7 +381,7 @@ public void readFrom(DataInput in) throws Exception {
from=Util.readAddress(in);
}

public int size() {
public int serializedSize() {
return Global.SHORT_SIZE + Util.size(from);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/pbcast/FLUSH.java
Expand Up @@ -1014,7 +1014,7 @@ public FlushHeader(byte type, long viewID) {
public long getViewID() {return viewID;}

@Override
public int size() {
public int serializedSize() {
return Global.BYTE_SIZE + Global.LONG_SIZE; // type and viewId
}

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/pbcast/GMS.java
Expand Up @@ -1421,7 +1421,7 @@ public void readFrom(DataInput in) throws Exception {
useFlushIfPresent=(flags & USE_FLUSH) == USE_FLUSH;
}

public int size() {
public int serializedSize() {
int retval=Global.BYTE_SIZE // type
+ Global.SHORT_SIZE // flags
+ Util.size(mbr);
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/pbcast/NakAckHeader2.java
Expand Up @@ -100,7 +100,7 @@ public void readFrom(DataInput in) throws Exception {
}


public int size() {
public int serializedSize() {
int retval=Global.BYTE_SIZE; // type
switch(type) {
case MSG:
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/pbcast/STABLE.java
Expand Up @@ -789,7 +789,7 @@ public String toString() {
return String.format("[%s] view-id= %s", type2String(type), view_id);
}

public int size() {
public int serializedSize() {
return Global.BYTE_SIZE // type
+ Util.size(view_id);
}
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/pbcast/STATE_TRANSFER.java
Expand Up @@ -452,7 +452,7 @@ public void readFrom(DataInput in) throws Exception {
my_digest=Util.readStreamable(Digest.class, in);
}

public int size() {
public int serializedSize() {
int retval=Global.BYTE_SIZE; // type
retval+=Global.BYTE_SIZE; // presence byte for my_digest
if(my_digest != null)
Expand Down

0 comments on commit 62c4d23

Please sign in to comment.