Skip to content

Commit

Permalink
- Create unit test LeaveTest (https://issues.jboss.org/browse/JGRP-2293)
Browse files Browse the repository at this point in the history
- Replaced base64 conversion via javax.xml.bind with Base64 methods
- Moved GmsImpl.leaving and GmsServerImpl.leave_promise to GMS
- Added setXmitInterval() to UNICAST3
- Fixed CCME in FD_SOCK, changed suspected_mbrs to concurrent set
- Added single-test target to build.xml
- Removed -bind_addr from ViewDemo and Draw
- Removed setting of system properties external_port and bind_addr in tests
- Changed log formatting of message exchanges in NAKACK2 (https://issues.jboss.org/browse/JGRP-2326)
- Changed log formatting of message exchanges in UNICAST3 (https://issues.jboss.org/browse/JGRP-2325)
  • Loading branch information
belaban committed Jan 22, 2019
1 parent 4188ff7 commit 76362a5
Show file tree
Hide file tree
Showing 38 changed files with 783 additions and 353 deletions.
8 changes: 1 addition & 7 deletions bin/mperf.sh
@@ -1,8 +1,2 @@

# Author: Bela Ban

#!/bin/bash

JG=$HOME/JGroups

`dirname $0`/jgroups.sh org.jgroups.tests.perf.MPerf $*
jgroups.sh org.jgroups.tests.perf.MPerf $*
8 changes: 1 addition & 7 deletions bin/uperf.sh
@@ -1,8 +1,2 @@

# Author: Bela Ban

#!/bin/bash

JG=$HOME/JGroups

`dirname $0`/jgroups.sh org.jgroups.tests.perf.UPerf $*
jgroups.sh org.jgroups.tests.perf.UPerf $*
19 changes: 18 additions & 1 deletion build.xml
Expand Up @@ -751,6 +751,24 @@
</target>


<target name="single-test" description="Run a single test" depends="define-testng-task">
<fail unless="test-class" message="Property not set: 'test-class'"/>
<echo message=" -- Running single test ${test-class}"/>
<testng classpathref="jg.classpath"
outputdir="${tmp.dir}/test-results/xml"
suitename="single-test"
threadcount="1"
usedefaultlisteners="false"
listeners="org.jgroups.util.JUnitXMLReporter">
<classfileset dir="${compile.dir}" includes="**/${test-class}.class"/>

<jvmarg value="-Djgroups.bind_addr=${jgroups.bind_addr}"/>
<jvmarg value="-Djgroups.tcpping.initial_hosts=${jgroups.tcpping.initial_hosts}"/>
<jvmarg value="-Dtests.tmp.dir=${tmp.dir}"/>
<jvmarg value="-Dlog4j.configurationFile=${conf.dir}/log4j2.xml"/>
</testng>
</target>

<macrodef name="runtest" description="Runs a configuration-dependent test suite">
<attribute name="classpath" default="jg.classpath"/>
<attribute name="suitename" />
Expand Down Expand Up @@ -801,7 +819,6 @@
<jvmarg value="-Djgroups.tcpping.initial_hosts=${jgroups.tcpping.initial_hosts}"/>
<jvmarg value="-Djgroups.tunnel.gossip_router_hosts=${jgroups.tunnel.gossip_router_hosts}"/>
<jvmarg value="-Dtests.tmp.dir=${tmp.dir}"/>
<jvmarg value="-Dlog4j.configuration=file:${conf.dir}/log4j.properties"/>
<jvmarg value="-Dlog4j.configurationFile=${conf.dir}/log4j2.xml"/>
<jvmarg value="-Xms400M"/>
<jvmarg value="-Xmx800M"/>
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/auth/Krb5Token.java
Expand Up @@ -135,7 +135,7 @@ private boolean isAuthenticated() {
}

private void authenticateClientPrincipal() throws LoginException {
subject = kerb5Utils.generateSecuritySubject(JASS_SECURITY_CONFIG,client_principal_name,client_password);
subject = Krb5TokenUtils.generateSecuritySubject(JASS_SECURITY_CONFIG, client_principal_name, client_password);
}

private void generateServiceTicket() throws IOException {
Expand Down
8 changes: 4 additions & 4 deletions src/org/jgroups/auth/Krb5TokenUtils.java
Expand Up @@ -3,14 +3,14 @@
import org.ietf.jgss.*;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
import org.jgroups.util.Base64;
import org.jgroups.util.Bits;
import org.jgroups.util.Util;

import javax.security.auth.Subject;
import javax.security.auth.callback.*;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import javax.xml.bind.DatatypeConverter;

import java.io.DataInput;
import java.io.DataOutput;
Expand Down Expand Up @@ -39,7 +39,7 @@ public class Krb5TokenUtils {
}

// Authenticate against the KDC using JAAS.
public Subject generateSecuritySubject(String jassLoginConfig, String username, String password) throws LoginException {
public static Subject generateSecuritySubject(String jassLoginConfig, String username, String password) throws LoginException {

LoginContext loginCtx = null;

Expand Down Expand Up @@ -103,7 +103,7 @@ public static String validateSecurityContext(Subject subject, final byte[] servi
}

public static void encodeDataToStream(byte[] data, DataOutput out) throws Exception {
String encodedToken = DatatypeConverter.printBase64Binary(data);
String encodedToken =Base64.encodeBytes(data); // DatatypeConverter.printBase64Binary(data);

log.debug(" : Written Encoded Data: \n%s", encodedToken);

Expand All @@ -113,7 +113,7 @@ public static void encodeDataToStream(byte[] data, DataOutput out) throws Except
public static byte[] decodeDataFromStream(DataInput in) throws Exception {
String str = Bits.readString(in);
log.debug(" : Read Encoded Data: \n%s", str);
return DatatypeConverter.parseBase64Binary(str);
return Base64.decode(str); // DatatypeConverter.parseBase64Binary(str);
}

/*
Expand Down
2 changes: 2 additions & 0 deletions src/org/jgroups/protocols/BasicTCP.java
Expand Up @@ -75,8 +75,10 @@ protected BasicTCP() {
public boolean supportsMulticasting() {return false;}
public long getReaperInterval() {return reaper_interval;}
public void setReaperInterval(long interval) {this.reaper_interval=interval;}
public BasicTCP reaperInterval(long interval) {this.reaper_interval=interval; return this;}
public long getConnExpireTime() {return conn_expire_time;}
public void setConnExpireTime(long time) {this.conn_expire_time=time;}
public BasicTCP connExpireTime(long time) {this.conn_expire_time=time; return this;}


public void init() throws Exception {
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/FD_SOCK.java
Expand Up @@ -104,7 +104,7 @@ public class FD_SOCK extends Protocol implements Runnable {

protected volatile List<Address> members=new ArrayList<>(11); // volatile eliminates the lock

protected final Set<Address> suspected_mbrs=Collections.synchronizedSet(new HashSet<>());
protected final Set<Address> suspected_mbrs=new ConcurrentSkipListSet<>();

protected final List<Address> pingable_mbrs=Collections.synchronizedList(new ArrayList<>());

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/SHARED_LOOPBACK.java
Expand Up @@ -91,7 +91,7 @@ protected void sendToSingleMember(Address dest, byte[] buf, int offset, int leng
}
SHARED_LOOPBACK target=dests.get(dest);
if(target == null) {
log.trace("destination address " + dest + " not found");
log.trace("%s: destination address %s not found", local_addr, dest);
return;
}
target.receive(local_addr, buf, offset, length);
Expand Down
4 changes: 2 additions & 2 deletions src/org/jgroups/protocols/TCP.java
Expand Up @@ -104,9 +104,9 @@ public void start() throws Exception {
}

public void stop() {
if(log.isDebugEnabled()) log.debug("closing sockets and stopping threads");
server.stop(); //not needed, but just in case
if(log.isDebugEnabled()) log.debug("%s: closing sockets and stopping threads", local_addr);
super.stop();
server.stop(); //not needed, but just in case
}


Expand Down
39 changes: 24 additions & 15 deletions src/org/jgroups/protocols/TransferQueueBundler.java
Expand Up @@ -84,7 +84,7 @@ public synchronized void stop() {
try {tmp.join(500);} catch(InterruptedException e) {}
}
}
queue.clear();
drain();
}


Expand All @@ -103,27 +103,15 @@ public void run() {
try {
if((msg=queue.take()) == null)
continue;
long size=msg.size();
if(count + size >= transport.getMaxBundleSize()) {
num_sends_because_full_queue++;
fill_count.add(count);
_sendBundledMessages();
}
_addMessage(msg, size);
addAndSendIfSizeExceeded(msg);
while(true) {
remove_queue.clear();
int num_msgs=queue.drainTo(remove_queue);
if(num_msgs <= 0)
break;
for(int i=0; i < remove_queue.size(); i++) {
msg=remove_queue.get(i);
size=msg.size();
if(count + size >= transport.getMaxBundleSize()) {
num_sends_because_full_queue++;
fill_count.add(count);
_sendBundledMessages();
}
_addMessage(msg, size);
addAndSendIfSizeExceeded(msg);
}
}
if(count > 0) {
Expand All @@ -137,6 +125,27 @@ public void run() {
}
}


protected void addAndSendIfSizeExceeded(Message msg) {
long size=msg.size();
if(count + size >= transport.getMaxBundleSize()) {
num_sends_because_full_queue++;
fill_count.add(count);
_sendBundledMessages();
}
_addMessage(msg, size);
}


/** Takes all messages from the queue, adds them to the hashmap and then sends all bundled messages */
protected void drain() {
Message msg;
while((msg=queue.poll()) != null)
addAndSendIfSizeExceeded(msg);
_sendBundledMessages();
}


// This should not affect perf, as the lock is uncontended most of the time
protected void _sendBundledMessages() {
lock.lock();
Expand Down
6 changes: 3 additions & 3 deletions src/org/jgroups/protocols/UDP.java
Expand Up @@ -311,7 +311,7 @@ public void start() throws Exception {


public void stop() {
log.debug("closing sockets and stopping threads");
log.debug("%s: closing sockets and stopping threads", local_addr);
destroySockets();
stopThreads();
super.stop();
Expand Down Expand Up @@ -592,7 +592,7 @@ void closeMulticastSocket() {
}
getSocketFactory().close(mcast_sock); // this will cause the mcast receiver thread to break out of its loop
mcast_sock=null;
if(log.isDebugEnabled()) log.debug("multicast socket closed");
if(log.isDebugEnabled()) log.debug("%s: multicast socket closed", local_addr);
}
catch(IOException ex) {
}
Expand Down Expand Up @@ -711,7 +711,7 @@ public void run() {
}
catch(SocketException sock_ex) {
if(receiver_socket.isClosed()) {
log.debug("receiver socket is closed, exception=" + sock_ex);
log.debug("%s: receiver socket is closed, exception=%s", local_addr, sock_ex);
break;
}
log.error(Util.getMessage("FailedReceivingPacket"), sock_ex);
Expand Down
37 changes: 21 additions & 16 deletions src/org/jgroups/protocols/UNICAST3.java
Expand Up @@ -178,6 +178,11 @@ public <T extends Protocol> T setLevel(String level) {
return retval;
}

public <T extends UNICAST3> T setXmitInterval(long interval) {
xmit_interval=interval;
return (T)this;
}

@ManagedOperation
public String printConnections() {
StringBuilder sb=new StringBuilder();
Expand Down Expand Up @@ -377,7 +382,7 @@ public Object up(Message msg) {
switch(hdr.type) {
case UnicastHeader3.DATA: // received regular message
if(is_trace)
log.trace("%s <-- DATA(%s: #%d, conn_id=%d%s)", local_addr, sender, hdr.seqno, hdr.conn_id, hdr.first? ", first" : "");
log.trace("%s <-- %s: DATA(#%d, conn_id=%d%s)", local_addr, sender, hdr.seqno, hdr.conn_id, hdr.first? ", first" : "");
if(Objects.equals(local_addr, sender))
handleDataReceivedFromSelf(sender, hdr.seqno, msg);
else
Expand Down Expand Up @@ -406,7 +411,7 @@ protected void handleUpEvent(Address sender, Message msg, UnicastHeader3 hdr) {
handleXmitRequest(sender, Util.streamableFromBuffer(SeqnoList::new, msg.getRawBuffer(), msg.getOffset(), msg.getLength()));
break;
case UnicastHeader3.CLOSE:
log.trace(local_addr + "%s <-- CLOSE(%s: conn-id=%s)", local_addr, sender, hdr.conn_id);
log.trace(local_addr + "%s <-- %s: CLOSE(conn-id=%s)", local_addr, sender, hdr.conn_id);
ReceiverEntry entry=recv_table.get(sender);
if(entry != null && entry.connId() == hdr.conn_id) {
recv_table.remove(sender, entry);
Expand Down Expand Up @@ -501,7 +506,7 @@ protected void handleBatchFromSelf(MessageBatch batch, Entry entry) {

if(!list.isEmpty()) {
if(is_trace)
log.trace("%s <-- DATA(%s: %s)", local_addr, batch.sender(), printMessageList(list));
log.trace("%s <-- %s: DATA(%s)", local_addr, batch.sender(), printMessageList(list));

int len=list.size();
Table<Message> win=entry.msgs;
Expand Down Expand Up @@ -609,7 +614,7 @@ public Object down(Message msg) {

if(is_trace) {
StringBuilder sb=new StringBuilder();
sb.append(local_addr).append(" --> DATA(").append(dst).append(": #").append(seqno).
sb.append(local_addr).append(" --> ").append(dst).append(": DATA(").append("#").append(seqno).
append(", conn_id=").append(send_conn_id);
if(seqno == DEFAULT_FIRST_SEQNO) sb.append(", first");
sb.append(')');
Expand Down Expand Up @@ -674,7 +679,7 @@ protected void retransmit(SeqnoList missing, Address sender) {
Message xmit_msg=new Message(sender).setBuffer(Util.streamableToBuffer(missing))
.setFlag(Message.Flag.OOB, Message.Flag.INTERNAL).putHeader(id, UnicastHeader3.createXmitReqHeader());
if(is_trace)
log.trace("%s: sending XMIT_REQ (%s) to %s", local_addr, missing, sender);
log.trace("%s --> %s: XMIT_REQ(%s)", local_addr, sender, missing);
down_prot.down(xmit_msg);
xmit_reqs_sent.add(missing.size());
}
Expand All @@ -685,7 +690,7 @@ protected void retransmit(Message msg) {
if(is_trace) {
UnicastHeader3 hdr=msg.getHeader(id);
long seqno=hdr != null? hdr.seqno : -1;
log.trace("%s --> XMIT(%s: #%d)", local_addr, msg.getDest(), seqno);
log.trace("%s --> %s: resending(#%d)", local_addr, msg.getDest(), seqno);
}
down_prot.down(msg);
num_xmits++;
Expand All @@ -697,7 +702,7 @@ protected void retransmit(Message msg) {
*/
public void expired(Address key) {
if(key != null) {
log.debug("%s: removing connection to %s because it expired", local_addr, key);
log.debug("%s: removing expired connection to %s", local_addr, key);
closeConnection(key);
}
}
Expand Down Expand Up @@ -775,7 +780,7 @@ protected void processInternalMessage(final Table<Message> win, final Address se

protected void handleBatchReceived(final ReceiverEntry entry, Address sender, List<LongTuple<Message>> msgs, boolean oob) {
if(is_trace)
log.trace("%s <-- DATA(%s: %s)", local_addr, sender, printMessageList(msgs));
log.trace("%s <-- %s: DATA(%s)", local_addr, sender, printMessageList(msgs));

int batch_size=msgs.size();
Table<Message> win=entry.msgs;
Expand Down Expand Up @@ -825,7 +830,7 @@ protected void removeAndDeliver(Table<Message> win, Address sender) {
batch_creator, BATCH_ACCUMULATOR);
}
catch(Throwable t) {
log.error("failed removing messages from table for " + sender, t);
log.error("%s: failed removing messages from table for %s: %s", local_addr, sender, t);
}
if(!batch.isEmpty()) {
// batch is guaranteed to NOT contain any OOB messages as the drop_oob_msgs_filter above removed them
Expand Down Expand Up @@ -927,7 +932,7 @@ protected ReceiverEntry createReceiverEntry(Address sender, long seqno, short co
/** Add the ACK to hashtable.sender.sent_msgs */
protected void handleAckReceived(Address sender, long seqno, short conn_id, int timestamp) {
if(is_trace)
log.trace("%s <-- ACK(%s: #%d, conn-id=%d, ts=%d)", local_addr, sender, seqno, conn_id, timestamp);
log.trace("%s <-- %s: ACK(#%d, conn-id=%d, ts=%d)", local_addr, sender, seqno, conn_id, timestamp);
SenderEntry entry=send_table.get(sender);
if(entry != null && entry.connId() != conn_id) {
log.trace("%s: my conn_id (%d) != received conn_id (%d); discarding ACK", local_addr, entry.connId(), conn_id);
Expand All @@ -948,7 +953,7 @@ protected void handleAckReceived(Address sender, long seqno, short conn_id, int
* @param sender
*/
protected void handleResendingOfFirstMessage(Address sender, int timestamp) {
log.trace("%s <-- SEND_FIRST_SEQNO(%s)", local_addr, sender);
log.trace("%s <-- %s: SEND_FIRST_SEQNO", local_addr, sender);
SenderEntry entry=send_table.get(sender);
Table<Message> win=entry != null? entry.msgs : null;
if(win == null) {
Expand Down Expand Up @@ -976,7 +981,7 @@ protected void handleResendingOfFirstMessage(Address sender, int timestamp) {

protected void handleXmitRequest(Address sender, SeqnoList missing) {
if(is_trace)
log.trace("%s <-- XMIT(%s: #%s)", local_addr, sender, missing);
log.trace("%s <-- %s: XMIT(#%s)", local_addr, sender, missing);

SenderEntry entry=send_table.get(sender);
xmit_reqs_received.add(missing.size());
Expand Down Expand Up @@ -1053,7 +1058,7 @@ protected void sendAck(Address dst, long seqno, short conn_id) {
Message ack=new Message(dst).setFlag(Message.Flag.INTERNAL).
putHeader(this.id, UnicastHeader3.createAckHeader(seqno, conn_id, timestamper.incrementAndGet()));
if(is_trace)
log.trace("%s --> ACK(%s: #%d)", local_addr, dst, seqno);
log.trace("%s --> %s: ACK(#%d)", local_addr, dst, seqno);
try {
down_prot.down(ack);
num_acks_sent++;
Expand All @@ -1078,14 +1083,14 @@ protected void sendRequestForFirstSeqno(Address dest) {
if(last_sync_sent.addIfAbsentOrExpired(dest)) {
Message msg=new Message(dest).setFlag(Message.Flag.OOB)
.putHeader(this.id, UnicastHeader3.createSendFirstSeqnoHeader(timestamper.incrementAndGet()));
log.trace("%s --> SEND_FIRST_SEQNO(%s)", local_addr, dest);
log.trace("%s --> %s: SEND_FIRST_SEQNO", local_addr, dest);
down_prot.down(msg);
}
}

public void sendClose(Address dest, short conn_id) {
Message msg=new Message(dest).setFlag(Message.Flag.INTERNAL).putHeader(id, UnicastHeader3.createCloseHeader(conn_id));
log.trace("%s --> CLOSE(%s, conn-id=%d)", local_addr, dest, conn_id);
log.trace("%s --> %s: CLOSE(conn-id=%d)", local_addr, dest, conn_id);
down_prot.down(msg);
}

Expand Down Expand Up @@ -1197,7 +1202,7 @@ protected void update(Entry entry, int num_received) {
/** Compares 2 timestamps, handles numeric overflow */
protected static int compare(int ts1, int ts2) {
int diff=ts1 - ts2;
return diff < 0? -1 : diff > 0? 1 : 0;
return Integer.compare(diff, 0);
}

@SafeVarargs
Expand Down

0 comments on commit 76362a5

Please sign in to comment.