Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Limit size of Ibft future message buffer (#873)
Browse files Browse the repository at this point in the history
  • Loading branch information
jframe committed Feb 20, 2019
1 parent 28ea0a2 commit 7efd040
Show file tree
Hide file tree
Showing 8 changed files with 416 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public class IbftConfigOptions {
private static final int DEFAULT_GOSSIPED_HISTORY_LIMIT = 1000;
private static final int DEFAULT_MESSAGE_QUEUE_LIMIT = 1000;
private static final int DEFAULT_DUPLICATE_MESSAGE_LIMIT = 100;
private static final int DEFAULT_FUTURE_MESSAGES_LIMIT = 1000;
private static final int DEFAULT_FUTURE_MESSAGES_MAX_DISTANCE = 10;

private final JsonObject ibftConfigRoot;

Expand Down Expand Up @@ -56,4 +58,13 @@ public int getMessageQueueLimit() {
public int getDuplicateMessageLimit() {
return ibftConfigRoot.getInteger("duplicatemessagelimit", DEFAULT_DUPLICATE_MESSAGE_LIMIT);
}

public int getFutureMessagesLimit() {
return ibftConfigRoot.getInteger("futuremessageslimit", DEFAULT_FUTURE_MESSAGES_LIMIT);
}

public int getFutureMessagesMaxDistance() {
return ibftConfigRoot.getInteger(
"futuremessagesmaxdistance", DEFAULT_FUTURE_MESSAGES_MAX_DISTANCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class IbftConfigOptionsTest {
private static final int EXPECTED_DEFAULT_GOSSIPED_HISTORY_LIMIT = 1000;
private static final int EXPECTED_DEFAULT_MESSAGE_QUEUE_LIMIT = 1000;
private static final int EXPECTED_DEFAULT_DUPLICATE_MESSAGE_LIMIT = 100;
private static final int EXPECTED_DEFAULT_FUTURE_MESSAGES_LIMIT = 1000;
private static final int EXPECTED_DEFAULT_FUTURE_MESSAGES_MAX_DISTANCE = 10;

@Test
public void shouldGetEpochLengthFromConfig() {
Expand Down Expand Up @@ -138,6 +140,44 @@ public void shouldGetDefaultDuplicateMessageLimitFromDefaultConfig() {
.isEqualTo(EXPECTED_DEFAULT_DUPLICATE_MESSAGE_LIMIT);
}

@Test
public void shouldGetFutureMessagesLimitFromConfig() {
final IbftConfigOptions config = fromConfigOptions(singletonMap("FutureMessagesLimit", 50));
assertThat(config.getFutureMessagesLimit()).isEqualTo(50);
}

@Test
public void shouldFallbackToDefaultFutureMessagesLimit() {
final IbftConfigOptions config = fromConfigOptions(emptyMap());
assertThat(config.getFutureMessagesLimit()).isEqualTo(EXPECTED_DEFAULT_FUTURE_MESSAGES_LIMIT);
}

@Test
public void shouldGetDefaultFutureMessagesLimitsFromDefaultConfig() {
assertThat(IbftConfigOptions.DEFAULT.getFutureMessagesLimit())
.isEqualTo(EXPECTED_DEFAULT_FUTURE_MESSAGES_LIMIT);
}

@Test
public void shouldGetFutureMessagesMaxDistanceFromConfig() {
final IbftConfigOptions config =
fromConfigOptions(singletonMap("FutureMessagesMaxDistance", 50));
assertThat(config.getFutureMessagesMaxDistance()).isEqualTo(50);
}

@Test
public void shouldFallbackToDefaultFutureMessagesMaxDistance() {
final IbftConfigOptions config = fromConfigOptions(emptyMap());
assertThat(config.getFutureMessagesMaxDistance())
.isEqualTo(EXPECTED_DEFAULT_FUTURE_MESSAGES_MAX_DISTANCE);
}

@Test
public void shouldGetDefaultFutureMessagesMaxDistanceFromDefaultConfig() {
assertThat(IbftConfigOptions.DEFAULT.getFutureMessagesMaxDistance())
.isEqualTo(EXPECTED_DEFAULT_FUTURE_MESSAGES_MAX_DISTANCE);
}

private IbftConfigOptions fromConfigOptions(final Map<String, Object> ibftConfigOptions) {
return GenesisConfigFile.fromConfig(
new JsonObject(singletonMap("config", singletonMap("ibft", ibftConfigOptions))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
import tech.pegasys.pantheon.consensus.ibft.IbftGossip;
import tech.pegasys.pantheon.consensus.ibft.IbftHelpers;
import tech.pegasys.pantheon.consensus.ibft.IbftProtocolSchedule;
import tech.pegasys.pantheon.consensus.ibft.MessageTracker;
import tech.pegasys.pantheon.consensus.ibft.RoundTimer;
import tech.pegasys.pantheon.consensus.ibft.SynchronizerUpdater;
import tech.pegasys.pantheon.consensus.ibft.UniqueMessageMulticaster;
import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftBlockCreatorFactory;
import tech.pegasys.pantheon.consensus.ibft.blockcreation.ProposerSelector;
import tech.pegasys.pantheon.consensus.ibft.payload.MessageFactory;
import tech.pegasys.pantheon.consensus.ibft.statemachine.FutureMessageBuffer;
import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftBlockHeightManagerFactory;
import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftController;
import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftFinalState;
Expand Down Expand Up @@ -115,6 +117,8 @@ public EventMultiplexer getEventMultiplexer() {
public static final int MESSAGE_QUEUE_LIMIT = 1000;
public static final int GOSSIPED_HISTORY_LIMIT = 100;
public static final int DUPLICATE_MESSAGE_LIMIT = 100;
public static final int FUTURE_MESSAGES_MAX_DISTANCE = 10;
public static final int FUTURE_MESSAGES_LIMIT = 1000;

private Clock clock = Clock.fixed(Instant.MIN, ZoneId.of("UTC"));
private IbftEventQueue ibftEventQueue = new IbftEventQueue(MESSAGE_QUEUE_LIMIT);
Expand Down Expand Up @@ -309,6 +313,13 @@ private static ControllerAndState createControllerAndFinalState(

final Subscribers<MinedBlockObserver> minedBlockObservers = new Subscribers<>();

final MessageTracker duplicateMessageTracker = new MessageTracker(DUPLICATE_MESSAGE_LIMIT);
final FutureMessageBuffer futureMessageBuffer =
new FutureMessageBuffer(
FUTURE_MESSAGES_MAX_DISTANCE,
FUTURE_MESSAGES_LIMIT,
blockChain.getChainHeadBlockNumber());

final IbftController ibftController =
new IbftController(
blockChain,
Expand All @@ -323,7 +334,8 @@ private static ControllerAndState createControllerAndFinalState(
messageValidatorFactory),
messageValidatorFactory),
gossiper,
DUPLICATE_MESSAGE_LIMIT,
duplicateMessageTracker,
futureMessageBuffer,
synchronizerUpdater);

final EventMultiplexer eventMultiplexer = new EventMultiplexer(ibftController);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft.statemachine;

import tech.pegasys.pantheon.ethereum.p2p.api.Message;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;

import com.google.common.annotations.VisibleForTesting;

/**
* Buffer which holds future IBFT messages.
*
* <p>This buffer only allows messages to be added which have a chain height greater than current
* height and up to chain futureMessagesMaxDistance from the current chain height.
*
* <p>When the total number of messages is greater futureMessagesLimit then messages are evicted.
*
* <p>If there is more than one height in the buffer then all messages for the highest chain height
* are removed. Otherwise if there is only one height the oldest inserted message is removed.
*/
public class FutureMessageBuffer {
private final NavigableMap<Long, List<Message>> buffer = new TreeMap<>();
private final long futureMessagesMaxDistance;
private final long futureMessagesLimit;
private long chainHeight;

public FutureMessageBuffer(
final long futureMessagesMaxDistance,
final long futureMessagesLimit,
final long chainHeight) {
this.futureMessagesMaxDistance = futureMessagesMaxDistance;
this.futureMessagesLimit = futureMessagesLimit;
this.chainHeight = chainHeight;
}

public void addMessage(final long msgChainHeight, final Message rawMsg) {
if (futureMessagesLimit == 0 || !validMessageHeight(msgChainHeight, chainHeight)) {
return;
}

addMessageToBuffer(msgChainHeight, rawMsg);

if (totalMessagesSize() > futureMessagesLimit) {
evictMessages();
}
}

private void addMessageToBuffer(final long msgChainHeight, final Message rawMsg) {
buffer.putIfAbsent(msgChainHeight, new ArrayList<>());
buffer.get(msgChainHeight).add(rawMsg);
}

private boolean validMessageHeight(final long msgChainHeight, final long currentHeight) {
final boolean isFutureMsg = msgChainHeight > currentHeight;
final boolean withinMaxChainHeight =
msgChainHeight <= currentHeight + futureMessagesMaxDistance;
return isFutureMsg && withinMaxChainHeight;
}

private void evictMessages() {
if (buffer.size() > 1) {
buffer.remove(buffer.lastKey());
} else if (buffer.size() == 1) {
List<Message> messages = buffer.firstEntry().getValue();
messages.remove(0);
}
}

public List<Message> retrieveMessagesForHeight(final long height) {
chainHeight = height;
final List<Message> messages = buffer.getOrDefault(height, Collections.emptyList());
discardPreviousHeightMessages();
return messages;
}

private void discardPreviousHeightMessages() {
if (!buffer.isEmpty()) {
for (long h = buffer.firstKey(); h <= chainHeight; h++) {
buffer.remove(h);
}
}
}

@VisibleForTesting
long totalMessagesSize() {
return buffer.values().stream().map(List::size).reduce(0, Integer::sum).longValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
*/
package tech.pegasys.pantheon.consensus.ibft.statemachine;

import static java.util.Collections.emptyList;

import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.Gossiper;
import tech.pegasys.pantheon.consensus.ibft.MessageTracker;
Expand All @@ -34,13 +32,8 @@
import tech.pegasys.pantheon.ethereum.p2p.api.Message;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;

import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -50,7 +43,7 @@ public class IbftController {
private final Blockchain blockchain;
private final IbftFinalState ibftFinalState;
private final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory;
private final Map<Long, List<Message>> futureMessages;
private final FutureMessageBuffer futureMessageBuffer;
private BlockHeightManager currentHeightManager;
private final Gossiper gossiper;
private final MessageTracker duplicateMessageTracker;
Expand All @@ -61,31 +54,13 @@ public IbftController(
final IbftFinalState ibftFinalState,
final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory,
final Gossiper gossiper,
final int duplicateMessageLimit,
final SynchronizerUpdater sychronizerUpdater) {
this(
blockchain,
ibftFinalState,
ibftBlockHeightManagerFactory,
gossiper,
Maps.newHashMap(),
new MessageTracker(duplicateMessageLimit),
sychronizerUpdater);
}

@VisibleForTesting
public IbftController(
final Blockchain blockchain,
final IbftFinalState ibftFinalState,
final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory,
final Gossiper gossiper,
final Map<Long, List<Message>> futureMessages,
final MessageTracker duplicateMessageTracker,
final FutureMessageBuffer futureMessageBuffer,
final SynchronizerUpdater sychronizerUpdater) {
this.blockchain = blockchain;
this.ibftFinalState = ibftFinalState;
this.ibftBlockHeightManagerFactory = ibftBlockHeightManagerFactory;
this.futureMessages = futureMessages;
this.futureMessageBuffer = futureMessageBuffer;
this.gossiper = gossiper;
this.duplicateMessageTracker = duplicateMessageTracker;
this.sychronizerUpdater = sychronizerUpdater;
Expand Down Expand Up @@ -210,16 +185,15 @@ private void startNewHeightManager(final BlockHeader parentHeader) {
currentHeightManager = ibftBlockHeightManagerFactory.create(parentHeader);
currentHeightManager.start();
final long newChainHeight = currentHeightManager.getChainHeight();
futureMessages.getOrDefault(newChainHeight, emptyList()).forEach(this::handleMessage);
futureMessages.remove(newChainHeight);
futureMessageBuffer.retrieveMessagesForHeight(newChainHeight).forEach(this::handleMessage);
}

private boolean processMessage(final IbftMessage<?> msg, final Message rawMsg) {
final ConsensusRoundIdentifier msgRoundIdentifier = msg.getRoundIdentifier();
if (isMsgForCurrentHeight(msgRoundIdentifier)) {
return isMsgFromKnownValidator(msg) && ibftFinalState.isLocalNodeValidator();
} else if (isMsgForFutureChainHeight(msgRoundIdentifier)) {
addMessageToFutureMessageBuffer(msgRoundIdentifier.getSequenceNumber(), rawMsg);
futureMessageBuffer.addMessage(msgRoundIdentifier.getSequenceNumber(), rawMsg);
// Notify the synchronizer the transmitting peer must have the parent block to the received
// message's target height.
sychronizerUpdater.updatePeerChainState(
Expand All @@ -245,11 +219,4 @@ private boolean isMsgForCurrentHeight(final ConsensusRoundIdentifier roundIdenti
private boolean isMsgForFutureChainHeight(final ConsensusRoundIdentifier roundIdentifier) {
return roundIdentifier.getSequenceNumber() > currentHeightManager.getChainHeight();
}

private void addMessageToFutureMessageBuffer(final long chainHeight, final Message rawMsg) {
if (!futureMessages.containsKey(chainHeight)) {
futureMessages.put(chainHeight, Lists.newArrayList());
}
futureMessages.get(chainHeight).add(rawMsg);
}
}

0 comments on commit 7efd040

Please sign in to comment.