Skip to content

Commit

Permalink
no message
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed May 14, 2004
1 parent fb99ed7 commit f70e1e2
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 49 deletions.
70 changes: 36 additions & 34 deletions src/org/jgroups/stack/AckSenderWindow.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// $Id: AckSenderWindow.java,v 1.3 2004/04/23 01:39:04 belaban Exp $
// $Id: AckSenderWindow.java,v 1.4 2004/05/14 16:20:21 belaban Exp $

package org.jgroups.stack;

Expand Down Expand Up @@ -119,29 +119,31 @@ public void reset() {
* threshold (<code>min_threshold</code>)
*/
public void add(long seqno, Message msg) {
Long tmp = new Long(seqno);
Long tmp=new Long(seqno);

synchronized (msgs) {
if (msgs.containsKey(tmp))
synchronized(msgs) {
if(msgs.containsKey(tmp))
return;

//System.out.println("### add: " + seqno + "(msg size=" + msgs.size() +
// ", queue size=" + msg_queue.size() +
// "). tstamp=" + System.currentTimeMillis() + ")"); // <remove>

if (!use_sliding_window) {
if(!use_sliding_window) {
addMessage(seqno, tmp, msg);
} else { // we use a sliding window
if (queueing)
}
else { // we use a sliding window
if(queueing)
addToQueue(seqno, msg);
else {
if (msgs.size() + 1 > window_size) {
queueing = true;
if(msgs.size() + 1 > window_size) {
queueing=true;
addToQueue(seqno, msg);

if(log.isTraceEnabled()) log.trace("window_size (" + window_size + ") was exceeded, " +
if(log.isTraceEnabled())
log.trace("window_size (" + window_size + ") was exceeded, " +
"starting to queue messages until window size falls under " + min_threshold);
} else {
}
else {
addMessage(seqno, tmp, msg);
}
}
Expand All @@ -157,37 +159,35 @@ public void add(long seqno, Message msg) {
* set queueing to false.
*/
public void ack(long seqno) {
Long tmp = new Long(seqno);
Long tmp=new Long(seqno);
Entry entry;

synchronized (msgs) {
synchronized(msgs) {
msgs.remove(tmp);
retransmitter.remove(seqno);

if (use_sliding_window && queueing) {
if (msgs.size() < min_threshold) { // we fell below threshold, now we can resume adding msgs

if(log.isTraceEnabled()) log.trace("number of messages in table fell " +
"under min_threshold (" + min_threshold + "): adding " +
msg_queue.size() + " messages on queue");
if(use_sliding_window && queueing) {
if(msgs.size() < min_threshold) { // we fell below threshold, now we can resume adding msgs
if(log.isTraceEnabled())
log.trace("number of messages in table fell under min_threshold (" +
min_threshold + "): adding " + msg_queue.size() + " messages on queue");


while (msgs.size() < window_size) {
if ((entry = removeFromQueue()) != null)
while(msgs.size() < window_size) {
if((entry=removeFromQueue()) != null)
addMessage(entry.seqno, new Long(entry.seqno), entry.msg);
else
break;
}

if (msgs.size() + 1 > window_size) {

if(log.isTraceEnabled()) log.trace("exceeded window_size (" + window_size +
") again, will still queue");
return; // still queueuing
} else
queueing = false; // allows add() to add messages again
if(msgs.size() + 1 > window_size) {
if(log.isTraceEnabled())
log.trace("exceeded window_size (" + window_size + ") again, will still queue");
return; // still queueing
}
else
queueing=false; // allows add() to add messages again

if(log.isTraceEnabled()) log.trace("set queueing to false (table size=" + msgs.size() + ")");
if(log.isTraceEnabled()) log.trace("set queueing to false (table size=" + msgs.size() + ")");
}
}
}
Expand Down Expand Up @@ -228,15 +228,17 @@ void addMessage(long seqno, Long tmp, Message msg) {
void addToQueue(long seqno, Message msg) {
try {
msg_queue.add(new Entry(seqno, msg));
} catch (Exception ex) {
}
catch(Exception ex) {
if(log.isErrorEnabled()) log.error("exception=" + ex);
}
}

Entry removeFromQueue() {
try {
return msg_queue.size() == 0 ? null : (Entry) msg_queue.remove();
} catch (Exception ex) {
return msg_queue.size() == 0 ? null : (Entry)msg_queue.remove();
}
catch(Exception ex) {
if(log.isErrorEnabled()) log.error("exception=" + ex);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// $Id: LogicalAddress.java,v 1.6 2003/12/31 12:59:55 belaban Exp $
// $Id: LogicalAddress1_4.java,v 1.1 2004/05/14 16:20:21 belaban Exp $

package org.jgroups.stack;

Expand All @@ -19,18 +19,18 @@
* Logical address that spans the lifetime of a member. Assigned at member (JVM) startup, and
* retained until member is shutdown. Note that the address does <em>not</em> change on
* disconnect-connect sequences. For example, when a member is shunned and subsequently
* readmitted to the group, the member's address (LogicalAddress) remains the same.<br/>
* An instance of LogicalAddress is generated by the transport protocol. Currently, only
* readmitted to the group, the member's address (LogicalAddress1_4) remains the same.<br/>
* An instance of LogicalAddress1_4 is generated by the transport protocol. Currently, only
* UDP1_4 generates LogicalAddresses.<br/>
* Note that host, timestamp and id are supposed to make LogicalAddress as unique as possible.
* Note that host, timestamp and id are supposed to make LogicalAddress1_4 as unique as possible.
* However, there is a remote chance that 2 instances started on the same machine create their
* address at exactly the same time, resulting in identical addresses (leading to problems).
* In the future, I will try to make this totally unique, by for example using the PID of the current
* process (once available though the JDK, or by locking on a common resource (e.g. /dev/random)
* to serialize creation. However, as for now, chances are you will never experience this problem.
* @author Bela Ban, Dec 23 2003
*/
public class LogicalAddress implements Address {
public class LogicalAddress1_4 implements Address {
protected static int count=1;
protected String host=null;
protected long timestamp=0;
Expand All @@ -53,12 +53,12 @@ public class LogicalAddress implements Address {


// Used only by Externalization
public LogicalAddress() {
public LogicalAddress1_4() {
}


/** Use this constructor to create an instance, not the null-constructor */
public LogicalAddress(String host_name, List physical_addrs) {
public LogicalAddress1_4(String host_name, List physical_addrs) {
init(host_name, physical_addrs);
}

Expand All @@ -79,7 +79,7 @@ protected void init(String host_name, List physical_addrs) {

timestamp=System.currentTimeMillis();

synchronized(LogicalAddress.class) {
synchronized(LogicalAddress1_4.class) {
id=count++;
}

Expand Down Expand Up @@ -174,7 +174,7 @@ public void setAdditionalData(byte[] additional_data) {
* Excludes channel_name from comparison.
* @return 0 for equality, value less than 0 if smaller, greater than 0 if greater.
*/
public int compare(LogicalAddress other) {
public int compare(LogicalAddress1_4 other) {
return compareTo(other);
}

Expand All @@ -191,9 +191,9 @@ public int compare(LogicalAddress other) {
public int compareTo(Object o) {
int rc;

if ((o == null) || !(o instanceof LogicalAddress))
throw new ClassCastException("LogicalAddress.compareTo(): comparison between different classes");
LogicalAddress other = (LogicalAddress) o;
if ((o == null) || !(o instanceof LogicalAddress1_4))
throw new ClassCastException("LogicalAddress1_4.compareTo(): comparison between different classes");
LogicalAddress1_4 other = (LogicalAddress1_4) o;

rc=this.host.compareTo(other.host);
if(rc != 0) return rc;
Expand Down Expand Up @@ -287,7 +287,7 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept
}

public Object clone() throws CloneNotSupportedException {
LogicalAddress ret=new LogicalAddress();
LogicalAddress1_4 ret=new LogicalAddress1_4();
ret.host=host;
ret.timestamp=timestamp;
ret.id=id;
Expand All @@ -299,9 +299,9 @@ public Object clone() throws CloneNotSupportedException {
return ret;
}

public LogicalAddress copy() {
public LogicalAddress1_4 copy() {
try {
return (LogicalAddress)clone();
return (LogicalAddress1_4)clone();
}
catch(CloneNotSupportedException e) {
return null;
Expand Down

0 comments on commit f70e1e2

Please sign in to comment.