Skip to content

Commit

Permalink
- Expanding ByteArrayDataOutputStream more conservatively (https://is…
Browse files Browse the repository at this point in the history
…sues.jboss.org/browse/JGRP-2124)

- Changed Address.size() -> serializedSize() (extending SizeStreamable)
- ByteArrayDataOutputStream has not the correct size for all SizeStreamable classes
  • Loading branch information
belaban committed Nov 4, 2016
1 parent 9619a9a commit de91ff4
Show file tree
Hide file tree
Showing 22 changed files with 72 additions and 43 deletions.
8 changes: 2 additions & 6 deletions src/org/jgroups/Address.java
@@ -1,7 +1,7 @@

package org.jgroups;

import org.jgroups.util.Streamable;
import org.jgroups.util.SizeStreamable;
import org.jgroups.util.UUID;

/**
Expand All @@ -16,16 +16,12 @@
* @see PhysicalAddress
* @see UUID
*/
public interface Address extends Streamable, Comparable<Address> {
public interface Address extends SizeStreamable, Comparable<Address> {
// flags used for marshalling
byte NULL = 1 << 0;
byte UUID_ADDR = 1 << 1;
byte SITE_UUID = 1 << 2;
byte SITE_MASTER = 1 << 3;
byte IP_ADDR = 1 << 4;
byte IP_ADDR_UUID = 1 << 5;


/** Returns serialized size of this address */
int size();
}
2 changes: 1 addition & 1 deletion src/org/jgroups/AnycastAddress.java
Expand Up @@ -65,7 +65,7 @@ private void initCollection(int estimatedSize) {
}
}

public int size() {
public int serializedSize() {
if (destinations == null) {
return Global.INT_SIZE;
}
Expand Down
6 changes: 4 additions & 2 deletions src/org/jgroups/blocks/cs/NioConnection.java
Expand Up @@ -317,10 +317,12 @@ protected void setSocketParameters(Socket client_sock) throws SocketException {

protected void sendLocalAddress(Address local_addr) throws Exception {
try {
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream();
int addr_size=local_addr.serializedSize();
int expected_size=cookie.length + Global.SHORT_SIZE*2 + addr_size;
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(expected_size +2);
out.write(cookie, 0, cookie.length);
out.writeShort(Version.version);
out.writeShort(local_addr.size()); // address size
out.writeShort(addr_size); // address size
local_addr.writeTo(out);
ByteBuffer buf=out.getByteBuffer();
send(buf, false);
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/blocks/cs/TcpConnection.java
Expand Up @@ -196,7 +196,7 @@ protected void sendLocalAddress(Address local_addr) throws Exception {

// write the version
out.writeShort(Version.version);
out.writeShort(local_addr.size()); // address size
out.writeShort(local_addr.serializedSize()); // address size
local_addr.writeTo(out);
out.flush(); // needed ?
updateLastAccessed();
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/BPING.java
Expand Up @@ -101,7 +101,7 @@ protected void sendMcastDiscoveryRequest(Message msg) {
try {
if(msg.getSrc() == null)
msg.setSrc(local_addr);
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(128);
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream((int)msg.size());
msg.writeTo(out);
for(int i=bind_port; i <= bind_port+port_range; i++) {
DatagramPacket packet=new DatagramPacket(out.buffer(), 0, out.position(), dest_addr, i);
Expand Down
3 changes: 2 additions & 1 deletion src/org/jgroups/protocols/COUNTER.java
Expand Up @@ -503,7 +503,8 @@ protected static Buffer responseToBuffer(Response rsp) throws Exception {
}

protected static Buffer streamableToBuffer(byte req_or_rsp, byte type, Streamable obj) throws Exception {
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(100);
int expected_size=obj instanceof SizeStreamable? ((SizeStreamable)obj).serializedSize() : 100;
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(expected_size);
out.writeByte(req_or_rsp);
out.writeByte(type);
obj.writeTo(out);
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/FD_SOCK.java
Expand Up @@ -918,7 +918,7 @@ public int serializedSize() {
int ipaddr_size = 0 ;
ipaddr_size += Global.BYTE_SIZE ; // presence byte
if (sock_addr != null)
ipaddr_size += sock_addr.size(); // IpAddress size
ipaddr_size += sock_addr.serializedSize(); // IpAddress size
retval += ipaddr_size ;
retval+=Global.INT_SIZE; // mbrs size
if(mbrs != null)
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/MPING.java
Expand Up @@ -285,7 +285,7 @@ protected void sendMcastDiscoveryRequest(Message msg) {
try {
if(msg.getSrc() == null)
msg.setSrc(local_addr);
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(128);
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream((int)(msg.size()+1));
msg.writeTo(out);
Buffer buf=out.getBuffer();
DatagramPacket packet=new DatagramPacket(buf.getBuf(), buf.getOffset(), buf.getLength(), mcast_addr, mcast_port);
Expand Down
14 changes: 10 additions & 4 deletions src/org/jgroups/protocols/pbcast/GMS.java
Expand Up @@ -1166,13 +1166,19 @@ else if(view instanceof DeltaView)

protected static Buffer marshal(final View view, final Digest digest) {
try {
final ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(512);
int expected_size=Global.SHORT_SIZE;
if(view != null)
expected_size+=view.serializedSize();
boolean write_addrs=writeAddresses(view, digest);
if(digest != null)
expected_size=(int)digest.serializedSize(write_addrs);
final ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(expected_size +10);
out.writeShort(determineFlags(view, digest));
if(view != null)
view.writeTo(out);

if(digest != null)
digest.writeTo(out, writeAddresses(view, digest));
digest.writeTo(out, write_addrs);

return out.getBuffer();
}
Expand All @@ -1187,7 +1193,7 @@ public static Buffer marshal(JoinRsp join_rsp) {

protected static Buffer marshal(Collection<? extends Address> mbrs) {
try {
final ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(512);
final ByteArrayDataOutputStream out=new ByteArrayDataOutputStream((int)Util.size(mbrs));
Util.writeAddresses(mbrs, out);
return out.getBuffer();
}
Expand All @@ -1198,7 +1204,7 @@ protected static Buffer marshal(Collection<? extends Address> mbrs) {

protected static Buffer marshal(final ViewId view_id) {
try {
final ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(512);
final ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(Util.size(view_id));
Util.writeViewId(view_id, out);
return out.getBuffer();
}
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/stack/IpAddress.java
Expand Up @@ -184,7 +184,7 @@ public void readFrom(DataInput in) throws Exception {
port=in.readUnsignedShort();
}

public int size() {
public int serializedSize() {
// length (1 bytes) + 4 bytes for port
int tmp_size=Global.BYTE_SIZE+ Global.SHORT_SIZE;
if(ip_addr != null) {
Expand Down
4 changes: 2 additions & 2 deletions src/org/jgroups/stack/IpAddressUUID.java
Expand Up @@ -122,8 +122,8 @@ public void readFrom(DataInput in) throws Exception {
high=in.readInt();
}

public int size() {
return super.size() + Global.LONG_SIZE + Global.INT_SIZE;
public int serializedSize() {
return super.serializedSize() + Global.LONG_SIZE + Global.INT_SIZE;
}

public String toString() {
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/util/ByteArrayDataOutputStream.java
Expand Up @@ -191,7 +191,7 @@ protected void ensureCapacity(int bytes) {
int minCapacity=pos+bytes;

if(minCapacity - buf.length > 0) {
int newCapacity=this.grow_exponentially? buf.length << 1 : buf.length + bytes + 32;
int newCapacity=this.grow_exponentially? buf.length << 1 : pos + bytes + 32;
if(newCapacity - minCapacity < 0)
newCapacity=minCapacity;
if(newCapacity < 0) {
Expand Down
6 changes: 5 additions & 1 deletion src/org/jgroups/util/Digest.java
Expand Up @@ -27,7 +27,7 @@
* @author Bela Ban
*/
@Immutable
public class Digest implements Streamable, Iterable<Digest.Entry>, Constructable<Digest> {
public class Digest implements SizeStreamable, Iterable<Digest.Entry>, Constructable<Digest> {

// Stores the members corresponding to the seqnos. Example: members[2] --> hd=seqnos[4], hr=seqnos[5]
protected Address[] members;
Expand Down Expand Up @@ -175,6 +175,10 @@ public void readFrom(DataInput in, boolean read_addrs) throws Exception {
}
}

public int serializedSize() {
return (int)serializedSize(true);
}

public long serializedSize(boolean with_members) {
long retval=with_members? Util.size(members) : Global.SHORT_SIZE;
for(int i=0; i < members.length; i++)
Expand Down
4 changes: 2 additions & 2 deletions src/org/jgroups/util/ExtendedUUID.java
Expand Up @@ -163,8 +163,8 @@ public void readFrom(DataInput in) throws Exception {
}

/** The number of bytes required to serialize this instance */
public int size() {
return super.size() + Global.BYTE_SIZE + sizeofHashMap();
public int serializedSize() {
return super.serializedSize() + Global.BYTE_SIZE + sizeofHashMap();
}

/** The number of non-null keys */
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/util/FlagsUUID.java
Expand Up @@ -74,7 +74,7 @@ public void readFrom(DataInput in) throws Exception {
}

/** The number of bytes required to serialize this instance */
public int size() {return super.size() + Bits.size(flags);}
public int serializedSize() {return super.serializedSize() + Bits.size(flags);}
public String toString() {return String.format("%s (flags=%d)", super.toString(), flags);}


Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/util/UUID.java
Expand Up @@ -216,7 +216,7 @@ public void readFrom(DataInput in) throws Exception {
}


public int size() {
public int serializedSize() {
return SIZE;
}

Expand Down
16 changes: 9 additions & 7 deletions src/org/jgroups/util/Util.java
Expand Up @@ -534,7 +534,8 @@ public static byte[] objectToByteBuffer(Object obj) throws Exception {
}

if(obj instanceof Streamable) {
final ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(512, true);
int expected_size=obj instanceof SizeStreamable? ((SizeStreamable)obj).serializedSize() : 512;
final ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(expected_size, true);
out.write(TYPE_STREAMABLE);
writeGenericStreamable((Streamable)obj,out);
return Arrays.copyOf(out.buf,out.position());
Expand Down Expand Up @@ -567,7 +568,8 @@ public static Buffer objectToBuffer(Object obj) throws Exception {
}

if(obj instanceof Streamable) {
final ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(512, true);
int expected_size=obj instanceof SizeStreamable? ((SizeStreamable)obj).serializedSize() : 512;
final ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(expected_size, true);
out.write(TYPE_STREAMABLE);
writeGenericStreamable((Streamable)obj,out);
return out.getBuffer();
Expand Down Expand Up @@ -983,7 +985,7 @@ public static Buffer streamableToBuffer(Streamable obj) {


public static byte[] collectionToByteBuffer(Collection<Address> c) throws Exception {
final ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(512);
final ByteArrayDataOutputStream out=new ByteArrayDataOutputStream((int)Util.size(c));
Util.writeAddresses(c,out);
return Arrays.copyOf(out.buffer(), out.position());
}
Expand Down Expand Up @@ -1373,13 +1375,13 @@ public static int size(Address addr) {
if(addr instanceof UUID) {
Class<? extends Address> clazz=addr.getClass();
if(clazz.equals(UUID.class) || clazz.equals(SiteUUID.class) || clazz.equals(SiteMaster.class))
return retval + addr.size();
return retval + addr.serializedSize();
}
if(addr instanceof IpAddress)
return retval + addr.size();
return retval + addr.serializedSize();

retval+=Global.SHORT_SIZE; // magic number
retval+=addr.size();
retval+=addr.serializedSize();
return retval;
}

Expand Down Expand Up @@ -1745,7 +1747,7 @@ public static byte[] readByteBuffer(DataInput in) throws Exception {


public static Buffer messageToByteBuffer(Message msg) throws Exception {
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(512);
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream((int)msg.size()+1);

out.writeBoolean(msg != null);
if(msg != null)
Expand Down
Expand Up @@ -282,7 +282,7 @@ protected ASYM_ENCRYPT createENCRYPT() throws Exception {


protected static Buffer marshalView(final View view) throws Exception {
final ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(512);
final ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(Global.SHORT_SIZE + view.serializedSize());
out.writeShort(determineFlags(view));
view.writeTo(out);
return out.getBuffer();
Expand Down
Expand Up @@ -299,7 +299,7 @@ protected static JChannel createRogue(String name) throws Exception {


protected static Buffer marshal(final View view) throws Exception {
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(512);
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(Util.size(view));
out.writeShort(1);
if(view != null)
view.writeTo(out);
Expand Down
Expand Up @@ -122,14 +122,32 @@ public void testExpanding2() {
out=new ByteArrayDataOutputStream(1);
out.writeBoolean(true);
out.writeBoolean(false);
assert out.position() == 2;

out=new ByteArrayDataOutputStream(1);
out.writeShort(22);
out.writeShort(23);
assert out.position() == 4;

out=new ByteArrayDataOutputStream(1);
out.writeInt(23);
out.writeInt(24);
assert out.position() == 8;
}

public void testExpanding3() {
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(1024);
out.writeInt(1);
assert out.position() == 4;
byte[] buf=new byte[1024];
out.write(buf);
System.out.println("out=" + out);
assert out.position() == 1028;

buf=new byte[512];
out.write(buf);
System.out.println("out = " + out);
assert out.position() == 1028+512;
}

public void testSkipBytes() {
Expand Down
10 changes: 5 additions & 5 deletions tests/junit-functional/org/jgroups/tests/ExtendedUUIDTest.java
Expand Up @@ -139,7 +139,7 @@ public void testMarshalling() throws Exception {
uuid.put("name", Util.objectToByteBuffer("Bela"))
.put("age",Util.objectToByteBuffer(49))
.put("bool",Util.objectToByteBuffer(true));
int size=uuid.size();
int size=uuid.serializedSize();
byte[] buffer=Util.streamableToByteBuffer(uuid);
assert size == buffer.length : "expected size of " + size + ", but got " + buffer.length;
ExtendedUUID uuid2=Util.streamableFromByteBuffer(ExtendedUUID.class, buffer);
Expand All @@ -152,7 +152,7 @@ public void testMarshalling() throws Exception {

public void testMarshallingNullHashMap() throws Exception {
ExtendedUUID uuid=ExtendedUUID.randomUUID("A");
int size=uuid.size();
int size=uuid.serializedSize();
byte[] buffer=Util.streamableToByteBuffer(uuid);
assert size == buffer.length : "expected size of " + size + ", but got " + buffer.length;
Util.streamableFromByteBuffer(ExtendedUUID.class, buffer);
Expand All @@ -163,7 +163,7 @@ public void testMarshallingLargeValues() throws Exception {
for(int i=1; i <= 5; i++)
uuid.put(String.valueOf(i), new byte[250]);
System.out.println("uuid = " + uuid);
int size=uuid.size();
int size=uuid.serializedSize();
byte[] buffer=Util.streamableToByteBuffer(uuid);
assert size == buffer.length : "expected size of " + size + ", but got " + buffer.length;
ExtendedUUID uuid2=Util.streamableFromByteBuffer(ExtendedUUID.class, buffer);
Expand All @@ -179,7 +179,7 @@ public void testmarshallingWithNullValues() throws Exception {
byte[] value=Util.objectToByteBuffer("Bela");
for(int i=1; i <= 5; i++)
uuid.put(String.valueOf(i), i % 2 == 0? value : null);
int size=uuid.size();
int size=uuid.serializedSize();
byte[] buffer=Util.streamableToByteBuffer(uuid);
assert size == buffer.length : "expected size of " + size + ", but got " + buffer.length;
ExtendedUUID uuid2=Util.streamableFromByteBuffer(ExtendedUUID.class, buffer);
Expand All @@ -206,7 +206,7 @@ public void testMarshallingWithRemoval() throws Exception {
uuid.remove(String.valueOf(i));
assert uuid.length() == 5;

int size=uuid.size();
int size=uuid.serializedSize();
byte[] buffer=Util.streamableToByteBuffer(uuid);
assert size == buffer.length : "expected size of " + size + ", but got " + buffer.length;
ExtendedUUID uuid2=Util.streamableFromByteBuffer(ExtendedUUID.class, buffer);
Expand Down
4 changes: 2 additions & 2 deletions tests/junit-functional/org/jgroups/tests/SizeTest.java
Expand Up @@ -117,7 +117,7 @@ public void testGossipData() throws Exception {
}


public static void testDigest() throws Exception {
public void testDigest() throws Exception {
Address addr=Util.createRandomAddress();
Address addr2=Util.createRandomAddress();
View view=View.create(addr, 1, addr, addr2);
Expand Down Expand Up @@ -832,7 +832,7 @@ private static void _testSize(Header hdr) throws Exception {


private static void _testSize(Address addr) throws Exception {
long size=addr.size();
long size=addr.serializedSize();
byte[] serialized_form=Util.streamableToByteBuffer(addr);
System.out.println("size=" + size + ", serialized size=" + serialized_form.length);
Assert.assertEquals(serialized_form.length, size);
Expand Down

0 comments on commit de91ff4

Please sign in to comment.