Skip to content

Commit

Permalink
Replaced Tuple<Long,V> -> LongTuple<V> (https://issues.jboss.org/brow…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Nov 3, 2016
1 parent 62c4d23 commit c06f84b
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 73 deletions.
20 changes: 10 additions & 10 deletions src/org/jgroups/protocols/UNICAST3.java
Expand Up @@ -470,7 +470,7 @@ public void up(MessageBatch batch) {
}

int size=batch.size();
Map<Short,List<Tuple<Long,Message>>> msgs=new LinkedHashMap<>();
Map<Short,List<LongTuple<Message>>> msgs=new LinkedHashMap<>();
ReceiverEntry entry=recv_table.get(batch.sender());

for(Message msg: batch) {
Expand All @@ -484,10 +484,10 @@ public void up(MessageBatch batch) {
continue;
}

List<Tuple<Long,Message>> list=msgs.get(hdr.conn_id);
List<LongTuple<Message>> list=msgs.get(hdr.conn_id);
if(list == null)
msgs.put(hdr.conn_id, list=new ArrayList<>(size));
list.add(new Tuple<>(hdr.seqno(), msg));
list.add(new LongTuple<>(hdr.seqno(), msg));

if(hdr.first)
entry=getReceiverEntry(batch.sender(), hdr.seqno(), hdr.first, hdr.connId());
Expand All @@ -499,7 +499,7 @@ public void up(MessageBatch batch) {
else {
if(msgs.keySet().retainAll(Collections.singletonList(entry.connId()))) // remove all conn-ids that don't match
sendRequestForFirstSeqno(batch.sender());
List<Tuple<Long,Message>> list=msgs.get(entry.connId());
List<LongTuple<Message>> list=msgs.get(entry.connId());
if(list != null && !list.isEmpty())
handleBatchReceived(entry, batch.sender(), list, batch.mode() == MessageBatch.Mode.OOB);
}
Expand All @@ -511,7 +511,7 @@ public void up(MessageBatch batch) {


protected void handleBatchFromSelf(MessageBatch batch, Entry entry) {
List<Tuple<Long,Message>> list=new ArrayList<>(batch.size());
List<LongTuple<Message>> list=new ArrayList<>(batch.size());

for(Message msg: batch) {
Header hdr;
Expand All @@ -528,7 +528,7 @@ protected void handleBatchFromSelf(MessageBatch batch, Entry entry) {
batch.remove(msg);
continue;
}
list.add(new Tuple<>(hdr.seqno(), msg));
list.add(new LongTuple<>(hdr.seqno(), msg));
}

if(!list.isEmpty()) {
Expand All @@ -542,7 +542,7 @@ protected void handleBatchFromSelf(MessageBatch batch, Entry entry) {
// OOB msg is passed up. When removed, we discard it. Affects ordering: http://jira.jboss.com/jira/browse/JGRP-379
if(batch.mode() == MessageBatch.Mode.OOB) {
MessageBatch oob_batch=new MessageBatch(local_addr, batch.sender(), batch.clusterName(), batch.multicast(), MessageBatch.Mode.OOB, len);
for(Tuple<Long,Message> tuple: list) {
for(LongTuple<Message> tuple: list) {
long seq=tuple.getVal1();
Message msg=win.get(seq); // we *have* to get the message, because loopback means we didn't add it to win !
if(msg != null && msg.isFlagSet(Message.Flag.OOB) && msg.setTransientFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED))
Expand Down Expand Up @@ -819,7 +819,7 @@ protected void processInternalMessage(final Table<Message> win, final Address se



protected void handleBatchReceived(final ReceiverEntry entry, Address sender, List<Tuple<Long,Message>> msgs, boolean oob) {
protected void handleBatchReceived(final ReceiverEntry entry, Address sender, List<LongTuple<Message>> msgs, boolean oob) {
if(log.isTraceEnabled())
log.trace("%s <-- DATA(%s: %s)", local_addr, sender, printMessageList(msgs));

Expand All @@ -838,7 +838,7 @@ protected void handleBatchReceived(final ReceiverEntry entry, Address sender, Li
// OOB msg is passed up. When removed, we discard it. Affects ordering: http://jira.jboss.com/jira/browse/JGRP-379
if(added && oob) {
MessageBatch oob_batch=new MessageBatch(local_addr, sender, null, false, MessageBatch.Mode.OOB, msgs.size());
for(Tuple<Long,Message> tuple: msgs)
for(LongTuple<Message> tuple: msgs)
oob_batch.add(tuple.getVal2());

deliverBatch(oob_batch);
Expand Down Expand Up @@ -881,7 +881,7 @@ protected void removeAndDeliver(final AtomicBoolean processing, Table<Message> w
}


protected String printMessageList(List<Tuple<Long,Message>> list) {
protected String printMessageList(List<LongTuple<Message>> list) {
StringBuilder sb=new StringBuilder();
int size=list.size();
Message first=size > 0? list.get(0).getVal2() : null, second=size > 1? list.get(size-1).getVal2() : first;
Expand Down
12 changes: 6 additions & 6 deletions src/org/jgroups/protocols/pbcast/NAKACK2.java
Expand Up @@ -626,7 +626,7 @@ public Object up(Message msg) {
public void up(MessageBatch batch) {
int size=batch.size();
boolean got_retransmitted_msg=false; // if at least 1 XMIT-RSP was received
List<Tuple<Long,Message>> msgs=null; // regular or retransmitted messages
List<LongTuple<Message>> msgs=null; // regular or retransmitted messages

for(Iterator<Message> it=batch.iterator(); it.hasNext();) {
final Message msg=it.next();
Expand All @@ -644,7 +644,7 @@ public void up(MessageBatch batch) {
case NakAckHeader2.MSG:
if(msgs == null)
msgs=new ArrayList<>(size);
msgs.add(new Tuple<>(hdr.seqno, msg));
msgs.add(new LongTuple<>(hdr.seqno, msg));
break;
case NakAckHeader2.XMIT_REQ:
try {
Expand All @@ -661,7 +661,7 @@ public void up(MessageBatch batch) {
if(xmitted_msg != null) {
if(msgs == null)
msgs=new ArrayList<>(size);
msgs.add(new Tuple<>(hdr.seqno, xmitted_msg));
msgs.add(new LongTuple<>(hdr.seqno, xmitted_msg));
got_retransmitted_msg=true;
}
break;
Expand Down Expand Up @@ -822,7 +822,7 @@ protected void handleMessage(Message msg, NakAckHeader2 hdr) {
}


protected void handleMessages(Address dest, Address sender, List<Tuple<Long,Message>> msgs, boolean oob, AsciiString cluster_name) {
protected void handleMessages(Address dest, Address sender, List<LongTuple<Message>> msgs, boolean oob, AsciiString cluster_name) {
Table<Message> buf=xmit_table.get(sender);
if(buf == null) { // discard message if there is no entry for sender
unknownMember(sender, "batch");
Expand All @@ -841,15 +841,15 @@ protected void handleMessages(Address dest, Address sender, List<Tuple<Long,Mess
if(added && oob) {
MessageBatch oob_batch=new MessageBatch(dest, sender, null, dest == null, MessageBatch.Mode.OOB, msgs.size());
if(loopback) {
for(Tuple<Long,Message> tuple: msgs) {
for(LongTuple<Message> tuple: msgs) {
long seq=tuple.getVal1();
Message msg=buf.get(seq); // we *have* to get the message, because loopback means we didn't add it to win !
if(msg != null && msg.isFlagSet(Message.Flag.OOB) && msg.setTransientFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED))
oob_batch.add(msg);
}
}
else {
for(Tuple<Long,Message> tuple: msgs)
for(LongTuple<Message> tuple: msgs)
oob_batch.add(tuple.getVal2());
}
deliverBatch(oob_batch);
Expand Down
5 changes: 1 addition & 4 deletions src/org/jgroups/protocols/pbcast/STATE_SOCK.java
Expand Up @@ -115,19 +115,16 @@ protected void modifyStateResponseHeader(StateHeader hdr) {

protected Tuple<InputStream,Object> createStreamToProvider(Address provider, StateHeader hdr) throws Exception {
IpAddress address=hdr.bind_addr;
Tuple<InputStream,Object> retval=new Tuple<>(null,null);
Socket socket=null;
try {
socket=getSocketFactory().createSocket("jgroups.state_sock.sock");
retval.setVal2(socket);
socket.bind(new InetSocketAddress(bind_addr, 0));
socket.setReceiveBufferSize(buffer_size);
Util.connect(socket, new InetSocketAddress(address.getIpAddress(), address.getPort()), 0);
log.debug("%s: connected to state provider %s:%d", local_addr, address.getIpAddress(), address.getPort());
DataOutputStream out=new DataOutputStream(socket.getOutputStream());
Util.writeAddress(local_addr, out);
retval.setVal1(new BufferedInputStream(socket.getInputStream(), buffer_size));
return retval;
return new Tuple<>(new BufferedInputStream(socket.getInputStream(), buffer_size), socket);
}
catch(Throwable t) {
Util.close(socket);
Expand Down
22 changes: 22 additions & 0 deletions src/org/jgroups/util/LongTuple.java
@@ -0,0 +1,22 @@
package org.jgroups.util;

/**
* A tuple with a long (primitive) first value
* @author Bela Ban
*/
public class LongTuple<V> {
private final long val1;
private final V val2;

public LongTuple(long val1, V val2) {
this.val1=val1;
this.val2=val2;
}

public long getVal1() {return val1;}
public V getVal2() {return val2;}

public String toString() {
return val1 + " : " + val2;
}
}
18 changes: 9 additions & 9 deletions src/org/jgroups/util/Table.java
Expand Up @@ -218,7 +218,7 @@ public boolean add(long seqno, T element, Predicate<T> remove_filter) {
* @param list
* @return True if at least 1 element was added successfully
*/
public boolean add(final List<Tuple<Long,T>> list) {
public boolean add(final List<LongTuple<T>> list) {
return add(list, false);
}

Expand All @@ -228,7 +228,7 @@ public boolean add(final List<Tuple<Long,T>> list) {
* @param list
* @return True if at least 1 element was added successfully. This guarantees that the list has at least 1 element
*/
public boolean add(final List<Tuple<Long,T>> list, boolean remove_added_elements) {
public boolean add(final List<LongTuple<T>> list, boolean remove_added_elements) {
return add(list, remove_added_elements, null);
}

Expand All @@ -241,7 +241,7 @@ public boolean add(final List<Tuple<Long,T>> list, boolean remove_added_elements
* @param const_value If non-null, this value should be used rather than the values of the list tuples
* @return True if at least 1 element was added successfully, false otherwise.
*/
public boolean add(final List<Tuple<Long,T>> list, boolean remove_added_elements, T const_value) {
public boolean add(final List<LongTuple<T>> list, boolean remove_added_elements, T const_value) {
if(list == null || list.isEmpty())
return false;
boolean added=false;
Expand All @@ -252,8 +252,8 @@ public boolean add(final List<Tuple<Long,T>> list, boolean remove_added_elements
if(highest_seqno != -1 && computeRow(highest_seqno) >= matrix.length)
resize(highest_seqno);

for(Iterator<Tuple<Long,T>> it=list.iterator(); it.hasNext();) {
Tuple<Long,T> tuple=it.next();
for(Iterator<LongTuple<T>> it=list.iterator(); it.hasNext();) {
LongTuple<T> tuple=it.next();
long seqno=tuple.getVal1();
T element=const_value != null? const_value : tuple.getVal2();
if(_add(seqno, element, false, null))
Expand Down Expand Up @@ -544,11 +544,11 @@ protected boolean _add(long seqno, T element, boolean check_if_resize_needed, Pr
}

// list must not be null or empty
protected long findHighestSeqno(List<Tuple<Long,T>> list) {
protected long findHighestSeqno(List<LongTuple<T>> list) {
long seqno=-1;
for(Tuple<Long,T> tuple: list) {
Long val=tuple.getVal1();
if(val != null && val - seqno > 0)
for(LongTuple<T> tuple: list) {
long val=tuple.getVal1();
if(val - seqno > 0)
seqno=val;
}
return seqno;
Expand Down
21 changes: 4 additions & 17 deletions src/org/jgroups/util/Tuple.java
Expand Up @@ -6,29 +6,16 @@
* @author Bela Ban
*/
public class Tuple<V1,V2> {
private V1 val1;
private V2 val2;
private final V1 val1;
private final V2 val2;

public Tuple(V1 val1, V2 val2) {
this.val1=val1;
this.val2=val2;
}

public V1 getVal1() {
return val1;
}

public void setVal1(V1 val1) {
this.val1=val1;
}

public V2 getVal2() {
return val2;
}

public void setVal2(V2 val2) {
this.val2=val2;
}
public V1 getVal1() {return val1;}
public V2 getVal2() {return val2;}

public String toString() {
return val1 + " : " + val2;
Expand Down
35 changes: 16 additions & 19 deletions tests/junit-functional/org/jgroups/tests/TableTest.java
Expand Up @@ -2,10 +2,7 @@

import org.jgroups.Global;
import org.jgroups.Message;
import org.jgroups.util.SeqnoList;
import org.jgroups.util.Table;
import org.jgroups.util.Tuple;
import org.jgroups.util.Util;
import org.jgroups.util.*;
import org.testng.annotations.Test;

import java.util.ArrayList;
Expand All @@ -20,7 +17,7 @@
/** Tests {@link org.jgroups.util.Table<Integer>}
* @author Bela Ban
*/
@Test(groups=Global.FUNCTIONAL,sequential=false)
@Test(groups=Global.FUNCTIONAL)
public class TableTest {

protected static final Predicate<Message> dont_loopback_filter=msg -> msg != null && msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK);
Expand All @@ -45,7 +42,7 @@ public void testAdd() {

public void testAddList() {
Table<Integer> buf=new Table<>(3, 10, 0);
List<Tuple<Long,Integer>> msgs=createList(1,2);
List<LongTuple<Integer>> msgs=createList(1, 2);
boolean rc=buf.add(msgs);
System.out.println("buf = " + buf);
assert rc;
Expand All @@ -54,7 +51,7 @@ public void testAddList() {

public void testAddListWithConstValue() {
Table<Integer> buf=new Table<>(3, 10, 0);
List<Tuple<Long,Integer>> msgs=createList(1,2,3,4,5,6,7,8,9,10);
List<LongTuple<Integer>> msgs=createList(1,2,3,4,5,6,7,8,9,10);
final Integer DUMMY=0;
boolean rc=buf.add(msgs, false, DUMMY);
System.out.println("buf = " + buf);
Expand All @@ -69,7 +66,7 @@ public void testAddListWithConstValue() {

public void testAddListWithRemoval() {
Table<Integer> buf=new Table<>(3, 10, 0);
List<Tuple<Long,Integer>> msgs=createList(1,2,3,4,5,6,7,8,9,10);
List<LongTuple<Integer>> msgs=createList(1,2,3,4,5,6,7,8,9,10);
int size=msgs.size();
boolean added=buf.add(msgs);
System.out.println("buf = " + buf);
Expand Down Expand Up @@ -107,7 +104,7 @@ public void testAddition() {

public static void testAdditionList() {
Table<Integer> table=new Table<>(3, 10, 0);
List<Tuple<Long,Integer>> msgs=createList(0);
List<LongTuple<Integer>> msgs=createList(0);
assert !table.add(msgs);
long[] seqnos={1,5,9,10,11,19,20,29};
msgs=createList(seqnos);
Expand Down Expand Up @@ -136,7 +133,7 @@ public static void testAdditionWithOffset() {
public void testAdditionListWithOffset() {
Table<Integer> table=new Table<>(3, 10, 100);
long seqnos[]={101,105,109,110,111,119,120,129};
List<Tuple<Long,Integer>> msgs=createList(seqnos);
List<LongTuple<Integer>> msgs=createList(seqnos);
System.out.println("table: " + table.dump());
assert table.add(msgs);
assert table.size() == 8;
Expand All @@ -149,9 +146,9 @@ public void testAdditionListWithOffset() {

public static void testAddListWithResizing() {
Table<Integer> table=new Table<>(3, 5, 0);
List<Tuple<Long,Integer>> msgs=new ArrayList<>();
List<LongTuple<Integer>> msgs=new ArrayList<>();
for(int i=1; i < 100; i++)
msgs.add(new Tuple<>((long)i,i));
msgs.add(new LongTuple<>((long)i,i));
table.add(msgs, false);
System.out.println("table = " + table);
int num_resizes=table.getNumResizes();
Expand All @@ -162,9 +159,9 @@ public static void testAddListWithResizing() {
public static void testAddListWithResizingNegativeSeqnos() {
long seqno=Long.MAX_VALUE-50;
Table<Integer> table=new Table<>(3, 5, seqno);
List<Tuple<Long,Integer>> msgs=new ArrayList<>();
List<LongTuple<Integer>> msgs=new ArrayList<>();
for(int i=1; i < 100; i++)
msgs.add(new Tuple<>((long)i+seqno,i));
msgs.add(new LongTuple<>((long)i+seqno,i));
table.add(msgs, false);
System.out.println("table = " + table);
int num_resizes=table.getNumResizes();
Expand All @@ -174,9 +171,9 @@ public static void testAddListWithResizingNegativeSeqnos() {

public static void testAddListWithResizing2() {
Table<Integer> table=new Table<>(3, 500, 0);
List<Tuple<Long,Integer>> msgs=new ArrayList<>();
List<LongTuple<Integer>> msgs=new ArrayList<>();
for(int i=1; i < 100; i++)
msgs.add(new Tuple<>((long)i,i));
msgs.add(new LongTuple<>((long)i,i));
table.add(msgs, false);
System.out.println("table = " + table);
int num_resizes=table.getNumResizes();
Expand Down Expand Up @@ -1426,12 +1423,12 @@ protected static void addAndGet(Table<Integer> table, int ... seqnos) {
}
}

protected static List<Tuple<Long,Integer>> createList(long ... seqnos) {
protected static List<LongTuple<Integer>> createList(long ... seqnos) {
if(seqnos == null)
return null;
List<Tuple<Long,Integer>> msgs=new ArrayList<>(seqnos.length);
List<LongTuple<Integer>> msgs=new ArrayList<>(seqnos.length);
for(long seqno: seqnos)
msgs.add(new Tuple<>(seqno, (int)seqno));
msgs.add(new LongTuple<>(seqno, (int)seqno));
return msgs;
}

Expand Down

0 comments on commit c06f84b

Please sign in to comment.