Skip to content

Commit

Permalink
Changed SEQUENCER to replace Util.objectFromByteBuffer() with Util.st…
Browse files Browse the repository at this point in the history
…reamableToBuffer()
  • Loading branch information
belaban committed Mar 18, 2016
1 parent 9b165d1 commit 0db305e
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions src/org/jgroups/protocols/SEQUENCER.java
Expand Up @@ -344,17 +344,17 @@ protected void flushMessagesInForwardTable() {
for(Map.Entry<Long,Message> entry: forward_table.entrySet()) { for(Map.Entry<Long,Message> entry: forward_table.entrySet()) {
Long key=entry.getKey(); Long key=entry.getKey();
Message msg=entry.getValue(); Message msg=entry.getValue();
byte[] val; Buffer buf;
try { try {
val=Util.objectToByteBuffer(msg); buf=Util.streamableToBuffer(msg);
} }
catch(Exception e) { catch(Exception e) {
log.error(Util.getMessage("FlushingBroadcastingFailed"), e); log.error(Util.getMessage("FlushingBroadcastingFailed"), e);
continue; continue;
} }


SequencerHeader hdr=new SequencerHeader(SequencerHeader.WRAPPED_BCAST, key); SequencerHeader hdr=new SequencerHeader(SequencerHeader.WRAPPED_BCAST, key);
Message forward_msg=new Message(null, val).putHeader(this.id, hdr); Message forward_msg=new Message(null, buf).putHeader(this.id, hdr);
if(log.isTraceEnabled()) if(log.isTraceEnabled())
log.trace(local_addr + ": flushing (broadcasting) " + local_addr + "::" + key); log.trace(local_addr + ": flushing (broadcasting) " + local_addr + "::" + key);
down_prot.down(new Event(Event.MSG, forward_msg)); down_prot.down(new Event(Event.MSG, forward_msg));
Expand All @@ -377,10 +377,10 @@ protected void flushMessagesInForwardTable() {
Map.Entry<Long,Message> entry=forward_table.firstEntry(); Map.Entry<Long,Message> entry=forward_table.firstEntry();
final Long key=entry.getKey(); final Long key=entry.getKey();
Message msg=entry.getValue(); Message msg=entry.getValue();
byte[] val; Buffer buf;


try { try {
val=Util.objectToByteBuffer(msg); buf=Util.streamableToBuffer(msg);
} }
catch(Exception e) { catch(Exception e) {
log.error(Util.getMessage("FlushingBroadcastingFailed"), e); log.error(Util.getMessage("FlushingBroadcastingFailed"), e);
Expand All @@ -389,7 +389,7 @@ protected void flushMessagesInForwardTable() {


while(flushing && running && !forward_table.isEmpty()) { while(flushing && running && !forward_table.isEmpty()) {
SequencerHeader hdr=new SequencerHeader(SequencerHeader.FLUSH, key); SequencerHeader hdr=new SequencerHeader(SequencerHeader.FLUSH, key);
Message forward_msg=new Message(coord, val).putHeader(this.id,hdr).setFlag(Message.Flag.DONT_BUNDLE); Message forward_msg=new Message(coord, buf).putHeader(this.id,hdr).setFlag(Message.Flag.DONT_BUNDLE);
if(log.isTraceEnabled()) if(log.isTraceEnabled())
log.trace(local_addr + ": flushing (forwarding) " + local_addr + "::" + key + " to coord " + coord); log.trace(local_addr + ": flushing (forwarding) " + local_addr + "::" + key + " to coord " + coord);
ack_promise.reset(); ack_promise.reset();
Expand Down Expand Up @@ -444,7 +444,7 @@ protected void forward(final Message msg, long seqno, boolean flush) {
byte type=flush? SequencerHeader.FLUSH : SequencerHeader.FORWARD; byte type=flush? SequencerHeader.FLUSH : SequencerHeader.FORWARD;
try { try {
SequencerHeader hdr=new SequencerHeader(type, seqno); SequencerHeader hdr=new SequencerHeader(type, seqno);
Message forward_msg=new Message(target, Util.objectToByteBuffer(msg)).putHeader(this.id,hdr); Message forward_msg=new Message(target, Util.streamableToBuffer(msg)).putHeader(this.id,hdr);
down_prot.down(new Event(Event.MSG, forward_msg)); down_prot.down(new Event(Event.MSG, forward_msg));
forwarded_msgs++; forwarded_msgs++;
} }
Expand Down Expand Up @@ -483,7 +483,7 @@ protected void broadcast(final Message msg, boolean copy, Address original_sende
*/ */
protected void unwrapAndDeliver(final Message msg, boolean flush_ack) { protected void unwrapAndDeliver(final Message msg, boolean flush_ack) {
try { try {
Message msg_to_deliver=(Message)Util.objectFromByteBuffer(msg.getRawBuffer(), msg.getOffset(), msg.getLength()); Message msg_to_deliver=Util.streamableFromBuffer(Message.class, msg.getRawBuffer(), msg.getOffset(), msg.getLength());
SequencerHeader hdr=(SequencerHeader)msg_to_deliver.getHeader(this.id); SequencerHeader hdr=(SequencerHeader)msg_to_deliver.getHeader(this.id);
if(flush_ack) if(flush_ack)
hdr.flush_ack=true; hdr.flush_ack=true;
Expand Down

0 comments on commit 0db305e

Please sign in to comment.