Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Limit size of Ibft future message buffer #873

Merged
merged 26 commits into from
Feb 20, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c863d21
Limit size of ibft future message buffer and add some simple eviction…
jframe Feb 15, 2019
f6c5cf4
Config options for future messages limit and max distance from chain …
jframe Feb 15, 2019
e3379da
improve unit test
jframe Feb 15, 2019
6da82cd
java doc
jframe Feb 15, 2019
f8f0eea
test more cases
jframe Feb 15, 2019
de80a51
var renames
jframe Feb 15, 2019
7b60d63
replace LinkedList with ArrayList as per error prone suggestions
jframe Feb 15, 2019
fc11dc7
improve test around boundary cases
jframe Feb 15, 2019
4d4978e
ensure that total message size is determined correctly and limit of 0…
jframe Feb 18, 2019
4d1b6eb
move current chain height onto addMessage method to simplify constructor
jframe Feb 18, 2019
b7a9370
pass in the duplicate message tracker and future message buffer inste…
jframe Feb 18, 2019
4753dd4
remove testing constructor
jframe Feb 19, 2019
86b7f0d
ensure that latest height is evicted regardless if it comes from adde…
jframe Feb 19, 2019
1dbe40f
Merge branch 'master' into futureMessageBuffer
jframe Feb 19, 2019
2c18c4f
use putIfAbsent to simplify adding messages
jframe Feb 20, 2019
74832e2
Merge branch 'futureMessageBuffer' of github.com:jframe/pantheon into…
jframe Feb 20, 2019
159b87f
remove chain height from addMessage and move to another method
jframe Feb 20, 2019
acc3dea
discard messages for old chain heights when updating height for futur…
jframe Feb 20, 2019
73a83f5
Merge branch 'master' into futureMessageBuffer
jframe Feb 20, 2019
27413a8
pass through blockchain current chain height
jframe Feb 20, 2019
9066aa4
shouldn't discard messages higher than chain height
jframe Feb 20, 2019
c9bd2fa
ensure that controller polls future messages before updating height
jframe Feb 20, 2019
3236bad
renamed poll height arg to make it clear it's the msg chain height no…
jframe Feb 20, 2019
725aeb4
combine poll and updateChainHeight with a single func retrieveMessage…
jframe Feb 20, 2019
a4750a3
remove unneeded this
jframe Feb 20, 2019
4a08565
Merge branch 'master' into futureMessageBuffer
jframe Feb 20, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -26,6 +26,8 @@ public class IbftConfigOptions {
private static final int DEFAULT_GOSSIPED_HISTORY_LIMIT = 1000;
private static final int DEFAULT_MESSAGE_QUEUE_LIMIT = 1000;
private static final int DEFAULT_DUPLICATE_MESSAGE_LIMIT = 100;
private static final int DEFAULT_FUTURE_MESSAGES_LIMIT = 1000;
private static final int DEFAULT_FUTURE_MESSAGES_MAX_DISTANCE = 10;

private final JsonObject ibftConfigRoot;

Expand Down Expand Up @@ -56,4 +58,13 @@ public int getMessageQueueLimit() {
public int getDuplicateMessageLimit() {
return ibftConfigRoot.getInteger("duplicatemessagelimit", DEFAULT_DUPLICATE_MESSAGE_LIMIT);
}

public int getFutureMessagesLimit() {
return ibftConfigRoot.getInteger("futuremessageslimit", DEFAULT_FUTURE_MESSAGES_LIMIT);
}

public int getFutureMessagesMaxDistance() {
return ibftConfigRoot.getInteger(
"futuremessagesmaxdistance", DEFAULT_FUTURE_MESSAGES_MAX_DISTANCE);
}
}
Expand Up @@ -29,6 +29,8 @@ public class IbftConfigOptionsTest {
private static final int EXPECTED_DEFAULT_GOSSIPED_HISTORY_LIMIT = 1000;
private static final int EXPECTED_DEFAULT_MESSAGE_QUEUE_LIMIT = 1000;
private static final int EXPECTED_DEFAULT_DUPLICATE_MESSAGE_LIMIT = 100;
private static final int EXPECTED_DEFAULT_FUTURE_MESSAGES_LIMIT = 1000;
private static final int EXPECTED_DEFAULT_FUTURE_MESSAGES_MAX_DISTANCE = 10;

@Test
public void shouldGetEpochLengthFromConfig() {
Expand Down Expand Up @@ -138,6 +140,44 @@ public void shouldGetDefaultDuplicateMessageLimitFromDefaultConfig() {
.isEqualTo(EXPECTED_DEFAULT_DUPLICATE_MESSAGE_LIMIT);
}

@Test
public void shouldGetFutureMessagesLimitFromConfig() {
final IbftConfigOptions config = fromConfigOptions(singletonMap("FutureMessagesLimit", 50));
assertThat(config.getFutureMessagesLimit()).isEqualTo(50);
}

@Test
public void shouldFallbackToDefaultFutureMessagesLimit() {
final IbftConfigOptions config = fromConfigOptions(emptyMap());
assertThat(config.getFutureMessagesLimit()).isEqualTo(EXPECTED_DEFAULT_FUTURE_MESSAGES_LIMIT);
}

@Test
public void shouldGetDefaultFutureMessagesLimitsFromDefaultConfig() {
assertThat(IbftConfigOptions.DEFAULT.getFutureMessagesLimit())
.isEqualTo(EXPECTED_DEFAULT_FUTURE_MESSAGES_LIMIT);
}

@Test
public void shouldGetFutureMessagesMaxDistanceFromConfig() {
final IbftConfigOptions config =
fromConfigOptions(singletonMap("FutureMessagesMaxDistance", 50));
assertThat(config.getFutureMessagesMaxDistance()).isEqualTo(50);
}

@Test
public void shouldFallbackToDefaultFutureMessagesMaxDistance() {
final IbftConfigOptions config = fromConfigOptions(emptyMap());
assertThat(config.getFutureMessagesMaxDistance())
.isEqualTo(EXPECTED_DEFAULT_FUTURE_MESSAGES_MAX_DISTANCE);
}

@Test
public void shouldGetDefaultFutureMessagesMaxDistanceFromDefaultConfig() {
assertThat(IbftConfigOptions.DEFAULT.getFutureMessagesMaxDistance())
.isEqualTo(EXPECTED_DEFAULT_FUTURE_MESSAGES_MAX_DISTANCE);
}

private IbftConfigOptions fromConfigOptions(final Map<String, Object> ibftConfigOptions) {
return GenesisConfigFile.fromConfig(
new JsonObject(singletonMap("config", singletonMap("ibft", ibftConfigOptions))))
Expand Down
Expand Up @@ -114,6 +114,8 @@ public EventMultiplexer getEventMultiplexer() {
public static final int MESSAGE_QUEUE_LIMIT = 1000;
public static final int GOSSIPED_HISTORY_LIMIT = 100;
public static final int DUPLICATE_MESSAGE_LIMIT = 100;
public static final int FUTURE_MESSAGES_MAX_DISTANCE = 10;
public static final int FUTURE_MESSAGES_LIMIT = 1000;

private Clock clock = Clock.fixed(Instant.MIN, ZoneId.of("UTC"));
private IbftEventQueue ibftEventQueue = new IbftEventQueue(MESSAGE_QUEUE_LIMIT);
Expand Down Expand Up @@ -307,7 +309,9 @@ private static ControllerAndState createControllerAndFinalState(
messageValidatorFactory),
messageValidatorFactory),
gossiper,
DUPLICATE_MESSAGE_LIMIT);
DUPLICATE_MESSAGE_LIMIT,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would probably prefer to see the futureMessageBuffer get created here and passed in (rather than the raw values) - why? Can't answer that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 ^^^
It's cleaner, in terms of IoC and reduces number of constructors on the controller.
I'd suggest instead of passing in values, construct & pass in the objects:
final MessageTracker duplicateMessageTracker, final FutureMessageBuffer futureMessageBuffer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is cleaner and also reduces the need the seperate constructor in the IbftController.
Done.

FUTURE_MESSAGES_MAX_DISTANCE,
FUTURE_MESSAGES_LIMIT);

final EventMultiplexer eventMultiplexer = new EventMultiplexer(ibftController);
//////////////////////////// END IBFT PantheonController ////////////////////////////
Expand Down
@@ -0,0 +1,101 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft.statemachine;

import tech.pegasys.pantheon.ethereum.p2p.api.Message;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.function.Supplier;

import com.google.common.annotations.VisibleForTesting;

/**
* Buffer which holds future IBFT messages.
*
* <p>This buffer only allows messages to be added which have a chain height greater than current
* height and up to chain futureMessagesMaxDistance from the current chain height.
*
* <p>When the total number of messages is greater futureMessagesLimit then messages are evicted.
*
* <p>If there is more than one height in the buffer then all messages for the highest chain height
* are removed. Otherwise if there is only one height the oldest inserted message is removed.
*/
public class FutureMessageBuffer {
private final NavigableMap<Long, List<Message>> buffer;
private final long futureMessagesMaxDistance;
private final long futureMessagesLimit;
private final Supplier<Long> currentChainHeight;
private long totalMessages;

public FutureMessageBuffer(
final long futureMessagesMaxDistance,
final long futureMessagesLimit,
final Supplier<Long> currentChainHeight) {
this(futureMessagesMaxDistance, futureMessagesLimit, currentChainHeight, new TreeMap<>());
}

@VisibleForTesting
FutureMessageBuffer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this constructor really of use? ...looks like when it's called in the tests a new TreeMap<> is passed in

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is, I'm using that so I can check the state of the map is correct in the tests without exposing the map to everyone. A new TreeMap<> is passed in, but I'm checking in my tests messages are added to the map correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a tight coupling between the test and the specific implementation of the logic, I'm not convinced that's the best approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. updated so don't need special constructor

final long futureMessagesMaxDistance,
final long futureMessagesLimit,
final Supplier<Long> currentChainHeight,
final NavigableMap<Long, List<Message>> buffer) {
this.futureMessagesMaxDistance = futureMessagesMaxDistance;
this.futureMessagesLimit = futureMessagesLimit;
this.currentChainHeight = currentChainHeight;
this.buffer = buffer;
}

public void addMessage(final long msgChainHeight, final Message rawMsg) {
final long currentHeight = currentChainHeight.get();
if (!validMessageHeight(msgChainHeight, currentHeight)) {
return;
}

if (totalMessages >= futureMessagesLimit) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had assumed evictMessages() MAY need to loop (i.e. what if you need to remove more than 1 height?) - should this conditional become a loop in evict?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is any need to remove any more than one height at a time. Each height must contain at least one message so if we remove a height we will have room for the message to be added.

evictMessages();
}

if (!buffer.containsKey(msgChainHeight)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could use computeIfAbsent/putIfAbsent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

buffer.put(msgChainHeight, new ArrayList<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if this msgChainHeight is higher than the height removed in evictMessage? Eg. MsgChainHeight is 5, but we just removed chain-height 4 ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming that msgChainHeight is still within the max distance we would add a message for chain height 5. Which is perhaps is a bit odd. Maybe it would be better not to add in that case, essentially evicting itself as the highest chain height? Need to think about this a bit more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think I'd rather see it evict itself. MAYBE you could do something like add the new message, then run the eviction process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. yeah I think it better for it to be evicted. changed the code to run the evict after after adding, avoids needing to special case this add case.

}
buffer.get(msgChainHeight).add(rawMsg);
totalMessages += 1;
}

private boolean validMessageHeight(final long msgChainHeight, final long currentHeight) {
final boolean isFutureMsg = msgChainHeight > currentHeight;
final boolean withinMaxChainHeight =
msgChainHeight <= currentHeight + futureMessagesMaxDistance;
return isFutureMsg && withinMaxChainHeight;
}

private void evictMessages() {
if (buffer.size() > 1) {
buffer.remove(buffer.lastKey());
} else {
List<Message> messages = buffer.firstEntry().getValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that buffer size is 0? I guess not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am making an assumption that it isn't 0 at this point but that only holds if futureMessagesLimit isn't 0. Be better if the evict messages handled that case, so i'll update evictMessages to handle that properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. added handling of zero message limit.

messages.remove(0);
}
}

public List<Message> poll(final long chainHeight) {
final List<Message> messages = buffer.getOrDefault(chainHeight, Collections.emptyList());
buffer.remove(chainHeight);
return messages;
}
}
Expand Up @@ -12,8 +12,6 @@
*/
package tech.pegasys.pantheon.consensus.ibft.statemachine;

import static java.util.Collections.emptyList;

import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.Gossiper;
import tech.pegasys.pantheon.consensus.ibft.MessageTracker;
Expand All @@ -34,13 +32,9 @@
import tech.pegasys.pantheon.ethereum.p2p.api.Message;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;

import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -49,7 +43,7 @@ public class IbftController {
private final Blockchain blockchain;
private final IbftFinalState ibftFinalState;
private final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory;
private final Map<Long, List<Message>> futureMessages;
private final FutureMessageBuffer futureMessageBuffer;
private BlockHeightManager currentHeightManager;
private final Gossiper gossiper;
private final MessageTracker duplicateMessageTracker;
Expand All @@ -59,14 +53,21 @@ public IbftController(
final IbftFinalState ibftFinalState,
final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory,
final Gossiper gossiper,
final int duplicateMessageLimit) {
this(
blockchain,
ibftFinalState,
ibftBlockHeightManagerFactory,
gossiper,
Maps.newHashMap(),
new MessageTracker(duplicateMessageLimit));
final int duplicateMessageLimit,
final int futureMessagesMaxDistance,
final int futureMessagesLimit) {
// Assigning fields instead of using other constructor so that a reference to height manager can
// be used for the future message buffer
this.blockchain = blockchain;
this.ibftFinalState = ibftFinalState;
this.ibftBlockHeightManagerFactory = ibftBlockHeightManagerFactory;
this.futureMessageBuffer =
new FutureMessageBuffer(
futureMessagesMaxDistance,
futureMessagesLimit,
() -> currentHeightManager.getChainHeight());
this.gossiper = gossiper;
this.duplicateMessageTracker = new MessageTracker(duplicateMessageLimit);
}

@VisibleForTesting
Expand All @@ -75,12 +76,12 @@ public IbftController(
final IbftFinalState ibftFinalState,
final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory,
final Gossiper gossiper,
final Map<Long, List<Message>> futureMessages,
final MessageTracker duplicateMessageTracker) {
final MessageTracker duplicateMessageTracker,
final FutureMessageBuffer futureMessageBuffer) {
this.blockchain = blockchain;
this.ibftFinalState = ibftFinalState;
this.ibftBlockHeightManagerFactory = ibftBlockHeightManagerFactory;
this.futureMessages = futureMessages;
this.futureMessageBuffer = futureMessageBuffer;
this.gossiper = gossiper;
this.duplicateMessageTracker = duplicateMessageTracker;
}
Expand Down Expand Up @@ -211,16 +212,15 @@ private void startNewHeightManager(final BlockHeader parentHeader) {
currentHeightManager = ibftBlockHeightManagerFactory.create(parentHeader);
currentHeightManager.start();
final long newChainHeight = currentHeightManager.getChainHeight();
futureMessages.getOrDefault(newChainHeight, emptyList()).forEach(this::handleMessage);
futureMessages.remove(newChainHeight);
futureMessageBuffer.poll(newChainHeight).forEach(this::handleMessage);
}

private boolean processMessage(final IbftMessage<?> msg, final Message rawMsg) {
final ConsensusRoundIdentifier msgRoundIdentifier = msg.getRoundIdentifier();
if (isMsgForCurrentHeight(msgRoundIdentifier)) {
return isMsgFromKnownValidator(msg) && ibftFinalState.isLocalNodeValidator();
} else if (isMsgForFutureChainHeight(msgRoundIdentifier)) {
addMessageToFutureMessageBuffer(msgRoundIdentifier.getSequenceNumber(), rawMsg);
futureMessageBuffer.addMessage(msgRoundIdentifier.getSequenceNumber(), rawMsg);
} else {
LOG.debug(
"IBFT message discarded as it is from a previous block height messageType={} chainHeight={} eventHeight={}",
Expand All @@ -242,11 +242,4 @@ private boolean isMsgForCurrentHeight(final ConsensusRoundIdentifier roundIdenti
private boolean isMsgForFutureChainHeight(final ConsensusRoundIdentifier roundIdentifier) {
return roundIdentifier.getSequenceNumber() > currentHeightManager.getChainHeight();
}

private void addMessageToFutureMessageBuffer(final long chainHeight, final Message rawMsg) {
if (!futureMessages.containsKey(chainHeight)) {
futureMessages.put(chainHeight, Lists.newArrayList());
}
futureMessages.get(chainHeight).add(rawMsg);
}
}