Skip to content

Commit

Permalink
Fixed OOME on DONT_LOOPBACK (https://issues.redhat.com/browse/JGRP-2607)
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Feb 17, 2022
1 parent 5a1ac36 commit 2ea4ef4
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 6 deletions.
7 changes: 5 additions & 2 deletions src/org/jgroups/protocols/UNICAST3.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.util.function.ToIntFunction;
import java.util.stream.Stream;

import static org.jgroups.Message.TransientFlag.DONT_LOOPBACK;
import static org.jgroups.Message.TransientFlag.OOB_DELIVERED;


/**
* Reliable unicast protocol using a combination of positive and negative acks. See docs/design/UNICAST3.txt for details.
Expand Down Expand Up @@ -143,8 +146,8 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler<Address> {
&& (!msg.isFlagSet(Message.Flag.OOB) || msg.setTransientFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED))
&& !(msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK) && local_addr != null && local_addr.equals(msg.src()));

protected static final Predicate<Message> dont_loopback_filter=
msg -> msg != null && msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK);
protected static final Predicate<Message> dont_loopback_filter=m -> m != null
&& (m.isTransientFlagSet(DONT_LOOPBACK) || m == DUMMY_OOB_MSG || m.isTransientFlagSet(OOB_DELIVERED));

protected static final BiConsumer<MessageBatch,Message> BATCH_ACCUMULATOR=MessageBatch::add;

Expand Down
9 changes: 6 additions & 3 deletions src/org/jgroups/protocols/pbcast/NAKACK2.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.function.Predicate;
import java.util.function.Supplier;

import static org.jgroups.Message.TransientFlag.DONT_LOOPBACK;
import static org.jgroups.Message.TransientFlag.OOB_DELIVERED;

/**
* Negative AcKnowledgement layer (NAKs). Messages are assigned a monotonically
Expand Down Expand Up @@ -144,10 +146,11 @@ public class NAKACK2 extends Protocol implements DiagnosticsHandler.ProbeHandler
// Accepts messages which are (1) non-null, (2) no DUMMY_OOB_MSGs and (3) not OOB_DELIVERED
protected final Predicate<Message> no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs= msg ->
msg != null && msg != DUMMY_OOB_MSG
&& (!msg.isFlagSet(Message.Flag.OOB) || msg.setTransientFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED))
&& !(msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK) && this.local_addr != null && this.local_addr.equals(msg.getSrc()));
&& (!msg.isFlagSet(Message.Flag.OOB) || msg.setTransientFlagIfAbsent(OOB_DELIVERED))
&& !(msg.isTransientFlagSet(DONT_LOOPBACK) && Objects.equals(this.local_addr, msg.getSrc()));

protected static final Predicate<Message> dont_loopback_filter=msg -> msg != null && msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK);
protected static final Predicate<Message> dont_loopback_filter=m -> m != null
&& (m.isTransientFlagSet(DONT_LOOPBACK) || m == DUMMY_OOB_MSG || m.isTransientFlagSet(OOB_DELIVERED));

protected static final BiConsumer<MessageBatch,Message> BATCH_ACCUMULATOR=MessageBatch::add;

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/util/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ protected boolean _add(long seqno, T element, boolean check_if_resize_needed, Pr
size++;
if(seqno - hr > 0)
hr=seqno;
if(remove_filter != null && hd +1 == seqno) {
if(remove_filter != null && seqno-hd > 0) {
forEach(hd + 1, hr,
(seq, msg, r, c) -> {
if(msg == null || !remove_filter.test(msg))
Expand Down

0 comments on commit 2ea4ef4

Please sign in to comment.