Skip to content

Commit

Permalink
pp
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jul 7, 2016
1 parent 67cc32e commit 35dafa7
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 41 deletions.
16 changes: 8 additions & 8 deletions src/org/jgroups/JChannel.java
Expand Up @@ -652,7 +652,7 @@ public Object up(Event evt) {
switch(evt.getType()) {

case Event.MSG:
Message msg=(Message)evt.getArg();
Message msg=evt.getArg();
if(stats) {
received_msgs++;
received_bytes+=msg.getLength();
Expand All @@ -664,7 +664,7 @@ public Object up(Event evt) {
break;

case Event.VIEW_CHANGE:
View tmp=(View)evt.getArg();
View tmp=evt.getArg();
if(tmp instanceof MergeView)
view=new View(tmp.getViewId(), tmp.getMembers());
else
Expand All @@ -681,7 +681,7 @@ public Object up(Event evt) {
break;

case Event.CONFIG:
Map<String,Object> cfg=(Map<String,Object>)evt.getArg();
Map<String,Object> cfg=evt.getArg();
if(cfg != null) {
if(cfg.containsKey("state_transfer")) {
state_transfer_supported=(Boolean)cfg.get("state_transfer");
Expand All @@ -693,7 +693,7 @@ public Object up(Event evt) {
break;

case Event.GET_STATE_OK:
StateTransferResult result=(StateTransferResult)evt.getArg();
StateTransferResult result=evt.getArg();
if(up_handler != null) {
try {
Object retval=up_handler.up(evt);
Expand Down Expand Up @@ -721,7 +721,7 @@ public Object up(Event evt) {
break;

case Event.STATE_TRANSFER_INPUTSTREAM_CLOSED:
state_promise.setResult((StateTransferResult)evt.getArg());
state_promise.setResult(evt.getArg());
break;

case Event.STATE_TRANSFER_INPUTSTREAM:
Expand All @@ -731,7 +731,7 @@ public Object up(Event evt) {
if(up_handler != null)
return up_handler.up(evt);

InputStream is=(InputStream)evt.getArg();
InputStream is=evt.getArg();
if(is != null && receiver != null) {
try {
receiver.setState(is);
Expand All @@ -745,7 +745,7 @@ public Object up(Event evt) {
case Event.STATE_TRANSFER_OUTPUTSTREAM:
if(receiver != null && evt.getArg() != null) {
try {
receiver.getState((OutputStream)evt.getArg());
receiver.getState(evt.getArg());
}
catch(Exception e) {
throw new RuntimeException("failed calling getState() in state provider", e);
Expand Down Expand Up @@ -1113,7 +1113,7 @@ protected JChannel notifyChannelClosed(JChannel c) {
protected JChannel notifyListeners(Consumer<ChannelListener> func, String msg) {
if(channel_listeners != null) {
try {
channel_listeners.forEach(func::accept);
channel_listeners.forEach(func);
}
catch(Throwable t) {
log.error(Util.getMessage("CallbackException"), msg, t);
Expand Down
51 changes: 18 additions & 33 deletions src/org/jgroups/Message.java
Expand Up @@ -12,8 +12,7 @@
/**
* A Message encapsulates data sent to members of a group. It contains among other things the
* address of the sender, the destination address, a payload (byte buffer) and a list of headers.
* Headers are added by protocols on the sender side and removed by protocols on the receiver's
* side.
* Headers are added by protocols on the sender side and removed by protocols on the receiver's side.
* <p>
* The byte buffer can point to a reference, and we can subset it using index and length. However,
* when the message is serialized, we only write the bytes between index and length.
Expand Down Expand Up @@ -61,7 +60,6 @@ public enum Flag {
INTERNAL( (short)(1 << 9)), // for internal use by JGroups only, don't use !
SKIP_BARRIER( (short)(1 << 10)); // passing messages through a closed BARRIER


final short value;
Flag(short value) {this.value=value;}

Expand Down Expand Up @@ -159,11 +157,11 @@ public Message(boolean create_headers) {

public Address getDest() {return dest_addr;}
public Address dest() {return dest_addr;}
public void setDest(Address new_dest) {dest_addr=new_dest;}
public Message setDest(Address new_dest) {dest_addr=new_dest; return this;}
public Message dest(Address new_dest) {dest_addr=new_dest; return this;}
public Address getSrc() {return src_addr;}
public Address src() {return src_addr;}
public void setSrc(Address new_src) {src_addr=new_src;}
public Message setSrc(Address new_src) {src_addr=new_src; return this;}
public Message src(Address new_src) {src_addr=new_src; return this;}

/**
Expand All @@ -178,7 +176,6 @@ public byte[] getRawBuffer() {

/**
* Returns a copy of the buffer if offset and length are used, otherwise a reference.
*
* @return byte array with a copy of the buffer.
*/
public byte[] getBuffer() {
Expand All @@ -200,13 +197,12 @@ public Buffer getBuffer2() {
}

/**
* <em>
* Sets the buffer.<p/>
* Note that the byte[] buffer passed as argument must not be modified. Reason: if we retransmit the
* message, it would still have a ref to the original byte[] buffer passed in as argument, and so we would
* retransmit a changed byte[] buffer !
* </em>
*/
final public Message setBuffer(byte[] b) {
public Message setBuffer(byte[] b) {
buf=b;
if(buf != null) {
offset=0;
Expand All @@ -229,7 +225,7 @@ final public Message setBuffer(byte[] b) {
* @param offset The initial position
* @param length The number of bytes
*/
final public Message setBuffer(byte[] b, int offset, int length) {
public Message setBuffer(byte[] b, int offset, int length) {
buf=b;
if(buf != null) {
if(offset < 0 || offset > buf.length)
Expand All @@ -245,13 +241,12 @@ final public Message setBuffer(byte[] b, int offset, int length) {
}

/**
* <em>
* Sets the buffer<p/>
* Note that the byte[] buffer passed as argument must not be modified. Reason: if we retransmit the
* message, it would still have a ref to the original byte[] buffer passed in as argument, and so we would
* retransmit a changed byte[] buffer !
* </em>
*/
public final Message setBuffer(Buffer buf) {
public Message setBuffer(Buffer buf) {
if(buf != null) {
this.buf=buf.getBuf();
this.offset=buf.getOffset();
Expand All @@ -260,20 +255,12 @@ public final Message setBuffer(Buffer buf) {
return this;
}

/**
*
* Returns the offset into the buffer at which the data starts
*
*/
/** Returns the offset into the buffer at which the data starts */
public int getOffset() {
return offset;
}

/**
*
* Returns the number of bytes in the buffer
*
*/
/** Returns the number of bytes in the buffer */
public int getLength() {
return length;
}
Expand All @@ -299,7 +286,7 @@ public int getNumHeaders() {
* message. Parameter 'obj' has to be serializable (e.g. implementing Serializable,
* Externalizable or Streamable, or be a basic type (e.g. Integer, Short etc)).
*/
final public Message setObject(Object obj) {
public Message setObject(Object obj) {
if(obj == null) return this;
if(obj instanceof byte[])
return setBuffer((byte[])obj);
Expand All @@ -314,23 +301,21 @@ final public Message setObject(Object obj) {
}


final public <T extends Object> T getObject() {
public <T extends Object> T getObject() {
return getObject(null);
}

/**
* Uses custom serialization to create an object from the buffer of the message. Note that this
* is dangerous when using your own classloader, e.g. inside of an application server ! Most
* likely, JGroups will use the system classloader to deserialize the buffer into an object,
* whereas (for example) a web application will want to use the webapp's classloader, resulting
* in a ClassCastException. The recommended way is for the application to use their own
* serialization and only pass byte[] buffer to JGroups.<p/>
* Uses custom serialization to create an object from the buffer of the message. Note that this is dangerous when
* using your own classloader, e.g. inside of an application server ! Most likely, JGroups will use the system
* classloader to deserialize the buffer into an object, whereas (for example) a web application will want to use
* the webapp's classloader, resulting in a ClassCastException. The recommended way is for the application to use
* their own serialization and only pass byte[] buffer to JGroups.<p/>
* As of 3.5, a classloader can be passed in. It will be used first to find a class, before contacting
* the other classloaders in the list. If null, the default list of classloaders will be used.
*
* @return the object
*/
final public <T extends Object> T getObject(ClassLoader loader) {
public <T extends Object> T getObject(ClassLoader loader) {
try {
return Util.objectFromByteBuffer(buf, offset, length, loader);
}
Expand Down

0 comments on commit 35dafa7

Please sign in to comment.