Skip to content

Commit

Permalink
Caching log.isTraceEnabled() in selected protocols (e.g. UNICAST3, NA…
Browse files Browse the repository at this point in the history
…KACK2)
  • Loading branch information
belaban committed Nov 14, 2016
1 parent e7718b0 commit 7724ba0
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 58 deletions.
19 changes: 7 additions & 12 deletions src/org/jgroups/blocks/RpcDispatcher.java
Expand Up @@ -90,12 +90,10 @@ public <T> RspList<T> callRemoteMethods(Collection<Address> dests, MethodCall me
return empty_rsplist;
}

if(log.isTraceEnabled())
log.trace("dests=%s, method_call=%s, options=%s", dests, method_call, opts);

Buffer buf=methodCallToBuffer(method_call, marshaller);
RspList<T> retval=super.castMessage(dests, buf, opts);
if(log.isTraceEnabled()) log.trace("responses: %s", retval);
if(log.isTraceEnabled())
log.trace("dests=%s, method_call=%s, options=%s, responses: %s", dests, method_call, opts, retval);
return retval;
}

Expand All @@ -116,12 +114,11 @@ public <T> CompletableFuture<RspList<T>> callRemoteMethodsWithFuture(Collection<
log.trace("destination list of %s() is empty: no need to send message", method_call.methodName());
return CompletableFuture.completedFuture(empty_rsplist);
}

Buffer buf=methodCallToBuffer(method_call, marshaller);
CompletableFuture<RspList<T>> retval=super.castMessageWithFuture(dests, buf, options);
if(log.isTraceEnabled())
log.trace("dests=%s, method_call=%s, options=%s", dests, method_call, options);

Buffer buf=methodCallToBuffer(method_call, marshaller);
return super.castMessageWithFuture(dests, buf, options);
return retval;
}


Expand Down Expand Up @@ -151,12 +148,10 @@ public <T> T callRemoteMethod(Address dest, String meth, Object[] args, Class[]
* @throws Exception Thrown if the method invocation threw an exception, either at the caller or the callee
*/
public <T> T callRemoteMethod(Address dest, MethodCall call, RequestOptions options) throws Exception {
if(log.isTraceEnabled())
log.trace("dest=%s, method_call=%s, options=%s", dest, call, options);

Buffer buf=methodCallToBuffer(call, marshaller);
T retval=super.sendMessage(dest, buf, options);
if(log.isTraceEnabled()) log.trace("retval: %s", retval);
if(log.isTraceEnabled())
log.trace("dest=%s, method_call=%s, options=%s, retval: %s", dest, call, options, retval);
return retval;
}

Expand Down
6 changes: 0 additions & 6 deletions src/org/jgroups/protocols/BaseBundler.java
Expand Up @@ -12,7 +12,6 @@
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;

import static org.jgroups.protocols.TP.BUNDLE_MSG;
import static org.jgroups.protocols.TP.MSG_OVERHEAD;

/**
Expand Down Expand Up @@ -67,11 +66,6 @@ public int size() {
* The map will be cleared when done.
*/
@GuardedBy("lock") protected void sendBundledMessages() {
if(log.isTraceEnabled()) {
double percentage=100.0 / transport.getMaxBundleSize() * count;
log.trace(BUNDLE_MSG, transport.localAddress(), size(), count, percentage, msgs.size(), msgs.keySet());
}

for(Map.Entry<Address,List<Message>> entry: msgs.entrySet()) {
List<Message> list=entry.getValue();
if(list.isEmpty())
Expand Down
1 change: 0 additions & 1 deletion src/org/jgroups/protocols/BasicTCP.java
Expand Up @@ -107,7 +107,6 @@ public void sendMulticast(byte[] data, int offset, int length) throws Exception
}

public void sendUnicast(PhysicalAddress dest, byte[] data, int offset, int length) throws Exception {
if(log.isTraceEnabled()) log.trace("dest=" + dest + " (" + length + " bytes)");
send(dest, data, offset, length);
}

Expand Down
1 change: 0 additions & 1 deletion src/org/jgroups/protocols/FRAG2.java
Expand Up @@ -150,7 +150,6 @@ public Object up(Message msg) {
if(hdr != null) { // needs to be defragmented
Message assembled_msg=unfragment(msg, hdr);
if(assembled_msg != null) {
if(log.isTraceEnabled()) log.trace("%s: assembled_msg is %s", local_addr, assembled_msg);
assembled_msg.setSrc(msg.getSrc()); // needed ? YES, because fragments have a null src !!
up_prot.up(assembled_msg);
}
Expand Down
2 changes: 0 additions & 2 deletions src/org/jgroups/protocols/MFC.java
Expand Up @@ -55,8 +55,6 @@ public class MFC extends FlowControl {
/** Allows to unblock a blocked sender from an external program, e.g. JMX */
@ManagedOperation(description="Unblock a sender")
public void unblock() {
if(log.isTraceEnabled())
log.trace("unblocking the sender and replenishing all members");
credits.replenishAll();
}

Expand Down
5 changes: 0 additions & 5 deletions src/org/jgroups/protocols/RingBufferBundler.java
@@ -1,10 +1,5 @@
package org.jgroups.protocols;

/**
* A bundler based on {@link org.jgroups.util.RingBuffer}
* @author Bela Ban
* @since 4.0
*/

import org.jgroups.Address;
import org.jgroups.Global;
Expand Down
19 changes: 14 additions & 5 deletions src/org/jgroups/protocols/TP.java
Expand Up @@ -57,7 +57,6 @@ public abstract class TP extends Protocol implements DiagnosticsHandler.ProbeHan
public static final byte LIST=1; // we have a list of messages rather than a single message when set
public static final byte MULTICAST=2; // message is a multicast (versus a unicast) message when set
public static final int MSG_OVERHEAD=Global.SHORT_SIZE + Global.BYTE_SIZE; // version + flags
public static final String BUNDLE_MSG="%s: sending %d msgs (%d bytes (%.2f%% of max_bundle_size) to %d dests(s): %s";
protected static final long MIN_WAIT_BETWEEN_DISCOVERIES=TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS); // ns
protected static final boolean can_bind_to_mcast_addr;

Expand Down Expand Up @@ -92,6 +91,9 @@ public abstract class TP extends Protocol implements DiagnosticsHandler.ProbeHan
systemProperty=Global.EXTERNAL_PORT,writable=false)
protected int external_port;

@ManagedAttribute(description="tracing is enabled or disabled for the given log",writable=true)
protected boolean is_trace=log.isTraceEnabled();


@Property(description="If true, the transport should use all available interfaces to receive multicast messages")
protected boolean receive_on_all_interfaces;
Expand Down Expand Up @@ -319,6 +321,12 @@ public Object[] getJmxObjects() {
return new Object[]{msg_stats};
}

public <T extends Protocol> T setLevel(String level) {
T retval=super.setLevel(level);
is_trace=log.isTraceEnabled();
return retval;
}

/* --------------------------------------------- JMX ---------------------------------------------- */

protected final MsgStats msg_stats=new MsgStats();
Expand Down Expand Up @@ -1002,7 +1010,7 @@ public Object down(Message msg) {
setSourceAddress(msg); // very important !! listToBuffer() will fail with a null src address !!

Address dest=msg.getDest(), sender=msg.getSrc();
if(log.isTraceEnabled())
if(is_trace)
log.trace("%s: sending msg to %s, src=%s, headers are %s", local_addr, dest, sender, msg.printHeaders());

// Don't send if dest is local address. Instead, send it up the stack. If multicast message, loop back directly
Expand Down Expand Up @@ -1077,7 +1085,8 @@ protected Bundler createBundler(String type) {

protected void loopback(Message msg, final boolean multicast) {
final Message copy=loopback_copy? msg.copy() : msg;
if(log.isTraceEnabled()) log.trace("%s: looping back message %s", local_addr, copy);
if(is_trace)
log.trace("%s: looping back message %s", local_addr, copy);

if(!loopback_separate_thread) {
passMessageUp(copy, null, false, multicast, false);
Expand Down Expand Up @@ -1124,7 +1133,7 @@ protected void setSourceAddress(Message msg) {

protected void passMessageUp(Message msg, AsciiString cluster_name, boolean perform_cluster_name_matching,
boolean multicast, boolean discard_own_mcast) {
if(log.isTraceEnabled())
if(is_trace)
log.trace("%s: received %s, headers are %s", local_addr, msg, msg.printHeaders());

if(up_prot == null)
Expand All @@ -1150,7 +1159,7 @@ protected void passMessageUp(Message msg, AsciiString cluster_name, boolean perf


protected void passBatchUp(MessageBatch batch, boolean perform_cluster_name_matching, boolean discard_own_mcast) {
if(log.isTraceEnabled())
if(is_trace)
log.trace("%s: received message batch of %d messages from %s", local_addr, batch.size(), batch.sender());
if(up_prot == null)
return;
Expand Down
4 changes: 0 additions & 4 deletions src/org/jgroups/protocols/TransferQueueBundler.java
@@ -1,9 +1,5 @@
package org.jgroups.protocols;

/**
* @author Bela Ban
* @since x.y
*/

import org.jgroups.Message;

Expand Down
3 changes: 1 addition & 2 deletions src/org/jgroups/protocols/UDP.java
Expand Up @@ -441,8 +441,7 @@ protected void bindToInterfaces(List<NetworkInterface> interfaces,
log.trace("joined %s on %s", tmp_mcast_addr, intf.getName());
}
catch(IOException e) {
if(log.isWarnEnabled())
log.warn(Util.getMessage("InterfaceJoinFailed"), tmp_mcast_addr, intf.getName());
log.warn(Util.getMessage("InterfaceJoinFailed"), tmp_mcast_addr, intf.getName());
}
}
}
Expand Down
30 changes: 19 additions & 11 deletions src/org/jgroups/protocols/UNICAST3.java
Expand Up @@ -94,6 +94,9 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler<Address> {
@ManagedAttribute(description="True if sending a message can block at the transport level")
protected boolean sends_can_block=true;

@ManagedAttribute(description="tracing is enabled or disabled for the given log",writable=true)
protected boolean is_trace=log.isTraceEnabled();

/* --------------------------------------------- Fields ------------------------------------------------ */


Expand Down Expand Up @@ -166,6 +169,11 @@ public int getNumConnections() {
@ManagedAttribute(description="Next seqno issued by the timestamper")
public int getTimestamper() {return timestamper.get();}

public <T extends Protocol> T setLevel(String level) {
T retval= super.setLevel(level);
is_trace=log.isTraceEnabled();
return retval;
}

@ManagedOperation
public String printConnections() {
Expand Down Expand Up @@ -406,7 +414,7 @@ public Object up(Message msg) {
Address sender=msg.getSrc();
switch(hdr.type) {
case UnicastHeader3.DATA: // received regular message
if(log.isTraceEnabled())
if(is_trace)
log.trace("%s <-- DATA(%s: #%d, conn_id=%d%s)", local_addr, sender, hdr.seqno, hdr.conn_id, hdr.first? ", first" : "");
if(Objects.equals(local_addr, sender))
handleDataReceivedFromSelf(sender, hdr.seqno, msg);
Expand Down Expand Up @@ -530,7 +538,7 @@ protected void handleBatchFromSelf(MessageBatch batch, Entry entry) {
}

if(!list.isEmpty()) {
if(log.isTraceEnabled())
if(is_trace)
log.trace("%s <-- DATA(%s: %s)", local_addr, batch.sender(), printMessageList(list));

int len=list.size();
Expand Down Expand Up @@ -640,7 +648,7 @@ public Object down(Message msg) {
}
while(running);

if(log.isTraceEnabled()) {
if(is_trace) {
StringBuilder sb=new StringBuilder();
sb.append(local_addr).append(" --> DATA(").append(dst).append(": #").append(seqno).
append(", conn_id=").append(send_conn_id);
Expand Down Expand Up @@ -706,7 +714,7 @@ public void removeAllConnections() {
protected void retransmit(SeqnoList missing, Address sender) {
Message xmit_msg=new Message(sender).setBuffer(Util.streamableToBuffer(missing))
.setFlag(Message.Flag.OOB, Message.Flag.INTERNAL).putHeader(id, UnicastHeader3.createXmitReqHeader());
if(log.isTraceEnabled())
if(is_trace)
log.trace("%s: sending XMIT_REQ (%s) to %s", local_addr, missing, sender);
down_prot.down(xmit_msg);
xmit_reqs_sent.addAndGet(missing.size());
Expand All @@ -715,7 +723,7 @@ protected void retransmit(SeqnoList missing, Address sender) {

/** Called by the sender to resend messages for which no ACK has been received yet */
protected void retransmit(Message msg) {
if(log.isTraceEnabled()) {
if(is_trace) {
UnicastHeader3 hdr=msg.getHeader(id);
long seqno=hdr != null? hdr.seqno : -1;
log.trace("%s --> XMIT(%s: #%d)", local_addr, msg.getDest(), seqno);
Expand Down Expand Up @@ -818,7 +826,7 @@ protected void processInternalMessage(final Table<Message> win, final Address se


protected void handleBatchReceived(final ReceiverEntry entry, Address sender, List<LongTuple<Message>> msgs, boolean oob) {
if(log.isTraceEnabled())
if(is_trace)
log.trace("%s <-- DATA(%s: %s)", local_addr, sender, printMessageList(msgs));

int batch_size=msgs.size();
Expand Down Expand Up @@ -967,7 +975,7 @@ protected ReceiverEntry createReceiverEntry(Address sender, long seqno, short co

/** Add the ACK to hashtable.sender.sent_msgs */
protected void handleAckReceived(Address sender, long seqno, short conn_id, int timestamp) {
if(log.isTraceEnabled())
if(is_trace)
log.trace("%s <-- ACK(%s: #%d, conn-id=%d, ts=%d)", local_addr, sender, seqno, conn_id, timestamp);
SenderEntry entry=send_table.get(sender);
if(entry != null && entry.connId() != conn_id) {
Expand Down Expand Up @@ -1016,7 +1024,7 @@ protected void handleResendingOfFirstMessage(Address sender, int timestamp) {


protected void handleXmitRequest(Address sender, SeqnoList missing) {
if(log.isTraceEnabled())
if(is_trace)
log.trace("%s <-- XMIT(%s: #%s)", local_addr, sender, missing);

SenderEntry entry=send_table.get(sender);
Expand All @@ -1038,7 +1046,7 @@ protected void handleXmitRequest(Address sender, SeqnoList missing) {
}

protected void deliverMessage(final Message msg, final Address sender, final long seqno) {
if(log.isTraceEnabled())
if(is_trace)
log.trace("%s: delivering %s#%s", local_addr, sender, seqno);
try {
up_prot.up(msg);
Expand All @@ -1053,7 +1061,7 @@ protected void deliverBatch(MessageBatch batch) {
try {
if(batch.isEmpty())
return;
if(log.isTraceEnabled()) {
if(is_trace) {
Message first=batch.first(), last=batch.last();
StringBuilder sb=new StringBuilder(local_addr + ": delivering");
if(first != null && last != null) {
Expand Down Expand Up @@ -1093,7 +1101,7 @@ protected void sendAck(Address dst, long seqno, short conn_id) {
return;
Message ack=new Message(dst).setFlag(Message.Flag.INTERNAL).
putHeader(this.id, UnicastHeader3.createAckHeader(seqno, conn_id, timestamper.incrementAndGet()));
if(log.isTraceEnabled())
if(is_trace)
log.trace("%s --> ACK(%s: #%d)", local_addr, dst, seqno);
try {
down_prot.down(ack);
Expand Down
24 changes: 16 additions & 8 deletions src/org/jgroups/protocols/pbcast/NAKACK2.java
Expand Up @@ -193,9 +193,11 @@ public void setResendLastSeqno(boolean flag) {
@ManagedAttribute(description="Whether or not the task to resend the last seqno is running (depends on resend_last_seqno)")
public boolean resendTaskRunning() {return last_seqno_resender != null;}

@ManagedAttribute(description="tracing is enabled or disabled for the given log",writable=true)
protected boolean is_trace=log.isTraceEnabled();

/* ------------------------------------------------- Fields ------------------------------------------------------------------------- */
protected volatile boolean is_server=false;
protected volatile boolean is_server;
protected Address local_addr;
protected volatile List<Address> members=new ArrayList<>();
protected volatile View view;
Expand Down Expand Up @@ -253,6 +255,12 @@ public void setDiscardDeliveredMsgs(boolean discard_delivered_msgs) {
this.discard_delivered_msgs=discard_delivered_msgs;
}

public <T extends Protocol> T setLevel(String level) {
T retval=super.setLevel(level);
is_trace=log.isTraceEnabled();
return retval;
}

@ManagedAttribute(description="Actual size of the become_server_queue")
public int getBecomeServerQueueSizeActual() {
return become_server_queue != null? become_server_queue.size() : -1;
Expand Down Expand Up @@ -772,7 +780,7 @@ protected void send(Message msg) {
while(running);

// moved down_prot.down() out of synchronized clause (bela Sept 7 2006) http://jira.jboss.com/jira/browse/JGRP-300
if(log.isTraceEnabled())
if(is_trace)
log.trace("%s: sending %s#%d", local_addr, local_addr, msg_id);
down_prot.down(msg); // if this fails, since msg is in sent_msgs, it can be retransmitted
num_messages_sent++;
Expand Down Expand Up @@ -803,7 +811,7 @@ protected void handleMessage(Message msg, NakAckHeader2 hdr) {
// removal. Else insert the real message
boolean added=loopback || buf.add(hdr.seqno, msg.isFlagSet(Message.Flag.OOB)? DUMMY_OOB_MSG : msg);

if(added && log.isTraceEnabled())
if(added && is_trace)
log.trace("%s: received %s#%d", local_addr, sender, hdr.seqno);


Expand Down Expand Up @@ -832,7 +840,7 @@ protected void handleMessages(Address dest, Address sender, List<LongTuple<Messa
boolean loopback=local_addr.equals(sender);
boolean added=loopback || buf.add(msgs, oob, oob? DUMMY_OOB_MSG : null);

if(added && log.isTraceEnabled())
if(added && is_trace)
log.trace("%s: received %s#%d-%d (%d messages)",
local_addr, sender, msgs.get(0).getVal1(), msgs.get(msgs.size() -1).getVal1(), msgs.size());

Expand Down Expand Up @@ -922,14 +930,14 @@ protected void handleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Add
log.warn(Util.getMessage("MessageNotFound"), local_addr, original_sender, i);
continue;
}
if(log.isTraceEnabled())
if(is_trace)
log.trace(local_addr + ": resending " + original_sender + "::" + i);
sendXmitRsp(xmit_requester, msg);
}
}

protected void deliver(Message msg, Address sender, long seqno, String error_msg) {
if(log.isTraceEnabled())
if(is_trace)
log.trace("%s: delivering %s#%d", local_addr, sender, seqno);
try {
up_prot.up(msg);
Expand All @@ -943,7 +951,7 @@ protected void deliverBatch(MessageBatch batch) {
try {
if(batch == null || batch.isEmpty())
return;
if(log.isTraceEnabled()) {
if(is_trace) {
Message first=batch.first(), last=batch.last();
StringBuilder sb=new StringBuilder(local_addr + ": delivering " + batch.sender());
if(first != null && last != null) {
Expand Down Expand Up @@ -1303,7 +1311,7 @@ protected void setDigest(Digest digest, boolean merge) {
return;

StringBuilder sb=log.isDebugEnabled()?
new StringBuilder(merge? "\n[" + local_addr + " mergeDigest()]\n" : "\n["+local_addr + " setDigest()]\n")
new StringBuilder("\n[" + local_addr + (merge? " mergeDigest()]\n" : " setDigest()]\n"))
.append("existing digest: " + getDigest()).append("\nnew digest: " + digest) : null;

boolean set_own_seqno=false;
Expand Down

0 comments on commit 7724ba0

Please sign in to comment.