Skip to content

Commit

Permalink
Merge pull request stellar#2422 from MonsieurNicolas/banImprove
Browse files Browse the repository at this point in the history
Ban tx improvement

Reviewed-by: MonsieurNicolas
  • Loading branch information
latobarita committed Feb 7, 2020
2 parents 33ca622 + d62fb64 commit b6f8767
Show file tree
Hide file tree
Showing 13 changed files with 227 additions and 52 deletions.
4 changes: 3 additions & 1 deletion src/herder/HerderImpl.cpp
Expand Up @@ -45,6 +45,7 @@ namespace stellar

constexpr auto const TRANSACTION_QUEUE_SIZE = 4;
constexpr auto const TRANSACTION_QUEUE_BAN_SIZE = 10;
constexpr auto const TRANSACTION_QUEUE_MULTIPLIER = 4;

std::unique_ptr<Herder>
Herder::create(Application& app)
Expand All @@ -68,7 +69,8 @@ HerderImpl::SCPMetrics::SCPMetrics(Application& app)
}

HerderImpl::HerderImpl(Application& app)
: mTransactionQueue(app, TRANSACTION_QUEUE_SIZE, TRANSACTION_QUEUE_BAN_SIZE)
: mTransactionQueue(app, TRANSACTION_QUEUE_SIZE, TRANSACTION_QUEUE_BAN_SIZE,
TRANSACTION_QUEUE_MULTIPLIER)
, mPendingEnvelopes(app, *this)
, mHerderSCPDriver(app, *this, mUpgrades, mPendingEnvelopes)
, mLastSlotSaved(0)
Expand Down
68 changes: 52 additions & 16 deletions src/herder/TransactionQueue.cpp
Expand Up @@ -4,6 +4,7 @@

#include "herder/TransactionQueue.h"
#include "crypto/SecretKey.h"
#include "ledger/LedgerManager.h"
#include "ledger/LedgerTxn.h"
#include "main/Application.h"
#include "transactions/TransactionUtils.h"
Expand All @@ -20,8 +21,11 @@ namespace stellar
{

TransactionQueue::TransactionQueue(Application& app, int pendingDepth,
int banDepth)
: mApp(app), mPendingDepth(pendingDepth), mBannedTransactions(banDepth)
int banDepth, int poolLedgerMultiplier)
: mApp(app)
, mPendingDepth(pendingDepth)
, mBannedTransactions(banDepth)
, mPoolLedgerMultiplier(poolLedgerMultiplier)
{
for (auto i = 0; i < pendingDepth; i++)
{
Expand All @@ -43,6 +47,12 @@ TransactionQueue::tryAdd(TransactionFramePtr tx)
return TransactionQueue::AddResult::ADD_STATUS_DUPLICATE;
}

if (tx->getNumOperations() + mQueueSizeOps > maxQueueSizeOps())
{
ban({tx});
return TransactionQueue::AddResult::ADD_STATUS_TRY_AGAIN_LATER;
}

auto info = getAccountTransactionQueueInfo(tx->getSourceID());
LedgerTxn ltx(mApp.getLedgerTxnRoot());
if (!tx->checkValid(ltx, info.mMaxSeq))
Expand All @@ -62,6 +72,9 @@ TransactionQueue::tryAdd(TransactionFramePtr tx)
mSizeByAge[pendingForAccount.mAge]->inc();
pendingForAccount.mTotalFees += tx->getFeeBid();
pendingForAccount.mTransactions.emplace_back(tx);
auto nbOps = tx->getNumOperations();
pendingForAccount.mQueueSizeOps += nbOps;
mQueueSizeOps += nbOps;
return TransactionQueue::AddResult::ADD_STATUS_PENDING;
}

Expand All @@ -85,9 +98,20 @@ TransactionQueue::ban(std::vector<TransactionFramePtr> const& dropTxs)
auto& bannedFront = mBannedTransactions.front();
for (auto const& tx : dropTxs)
{
for (auto const& extracted : extract(tx, false).second)
auto extractResult = extract(tx, false);
if (extractResult.second.empty())
{
bannedFront.insert(extracted->getFullHash());
// tx was not in the queue
bannedFront.insert(tx->getFullHash());
}
else
{
// tx was in the queue, and may have caused other transactions to
// get dropped as well
for (auto const& extracted : extractResult.second)
{
bannedFront.insert(extracted->getFullHash());
}
}
}
}
Expand Down Expand Up @@ -145,15 +169,17 @@ TransactionQueue::extract(TransactionFramePtr const& tx, bool keepBacklog)
// remove everything passed tx
txRemoveEnd = std::end(txs);
}
auto removedFeeBid =
std::accumulate(txIt, txRemoveEnd, int64_t{0},
[](int64_t fee, TransactionFramePtr const& tx) {
return fee + tx->getFeeBid();
});
accIt->second.mTotalFees -= removedFeeBid;

auto movedTxs = std::vector<TransactionFramePtr>{};
std::move(txIt, txRemoveEnd, std::back_inserter(movedTxs));

auto removedTxs = std::vector<TransactionFramePtr>{};
for (auto delit = txIt; delit != txRemoveEnd; delit++)
{
auto& remTx = *delit;
accIt->second.mTotalFees -= remTx->getFeeBid();
auto nbOps = remTx->getNumOperations();
accIt->second.mQueueSizeOps -= nbOps;
mQueueSizeOps -= nbOps;
removedTxs.emplace_back(remTx);
}
txs.erase(txIt, txRemoveEnd);

if (accIt->second.mTransactions.empty())
Expand All @@ -162,7 +188,7 @@ TransactionQueue::extract(TransactionFramePtr const& tx, bool keepBacklog)
accIt = std::end(mPendingTransactions);
}

return {accIt, std::move(movedTxs)};
return {accIt, std::move(removedTxs)};
}

TransactionQueue::AccountTxQueueInfo
Expand All @@ -176,7 +202,7 @@ TransactionQueue::getAccountTransactionQueueInfo(
}

return {i->second.mTransactions.back()->getSeqNum(), i->second.mTotalFees,
i->second.mAge};
i->second.mQueueSizeOps, i->second.mAge};
}

void
Expand All @@ -199,6 +225,7 @@ TransactionQueue::shift()
{
bannedFront.insert(toBan->getFullHash());
}
mQueueSizeOps -= it->second.mQueueSizeOps;

it = mPendingTransactions.erase(it);
}
Expand Down Expand Up @@ -252,6 +279,15 @@ bool
operator==(TransactionQueue::AccountTxQueueInfo const& x,
TransactionQueue::AccountTxQueueInfo const& y)
{
return x.mMaxSeq == y.mMaxSeq && x.mTotalFees == y.mTotalFees;
return x.mMaxSeq == y.mMaxSeq && x.mTotalFees == y.mTotalFees &&
x.mQueueSizeOps == y.mQueueSizeOps;
}

size_t
TransactionQueue::maxQueueSizeOps() const
{
size_t maxOpsLedger = mApp.getLedgerManager().getLastMaxTxSetSizeOps();
maxOpsLedger *= mPoolLedgerMultiplier;
return maxOpsLedger;
}
}
27 changes: 23 additions & 4 deletions src/herder/TransactionQueue.h
Expand Up @@ -41,8 +41,8 @@ class Application;
* * removeAndReset() should be called after transaction is successully
* included into some leger. It preserves the other pending transactions for
* accounts and resets the TTL for banning
* * ban() should be called after transaction became invalid for some reason
* (i.e. its source account cannot afford it anymore)
* * ban() bans a transactions (which may or may not be in the queue) and its
* descendants, if any, for the next few ledgers.
* * shift() should be called after each ledger close, it bans transactions
* that have associated age greater or equal to pendingDepth and removes
* transactions that were banned for more than banDepth ledgers
Expand Down Expand Up @@ -71,6 +71,7 @@ class TransactionQueue
{
SequenceNumber mMaxSeq{0};
int64_t mTotalFees{0};
size_t mQueueSizeOps{0};
int mAge{0};

friend bool operator==(AccountTxQueueInfo const& x,
Expand All @@ -87,11 +88,13 @@ class TransactionQueue
using Transactions = std::vector<TransactionFramePtr>;

int64_t mTotalFees{0};
size_t mQueueSizeOps{0};
int mAge{0};
Transactions mTransactions;
};

explicit TransactionQueue(Application& app, int pendingDepth, int banDepth);
explicit TransactionQueue(Application& app, int pendingDepth, int banDepth,
int poolLedgerMultiplier);

AddResult tryAdd(TransactionFramePtr tx);
void removeAndReset(std::vector<TransactionFramePtr> const& txs);
Expand Down Expand Up @@ -129,7 +132,7 @@ class TransactionQueue
using BannedTransactions = std::deque<std::unordered_set<Hash>>;

Application& mApp;
int mPendingDepth;
int const mPendingDepth;
std::vector<medida::Counter*> mSizeByAge;
PendingTransactions mPendingTransactions;
BannedTransactions mBannedTransactions;
Expand All @@ -143,6 +146,22 @@ class TransactionQueue
std::vector<TransactionFramePtr>>;
// keepBacklog: keeps transactions succeding tx in the account's backlog
ExtractResult extract(TransactionFramePtr const& tx, bool keepBacklog);

// size of the transaction queue, in operations
size_t mQueueSizeOps{0};
// number of ledgers we can pool in memory
int const mPoolLedgerMultiplier;

size_t maxQueueSizeOps() const;

#ifdef BUILD_TESTS
public:
size_t
getQueueSizeOps() const
{
return mQueueSizeOps;
}
#endif
};

static const char* TX_STATUS_STRING[static_cast<int>(
Expand Down
8 changes: 2 additions & 6 deletions src/herder/TxSetFrame.cpp
Expand Up @@ -221,11 +221,7 @@ TxSetFrame::surgePricingFilter(Application& app)

bool maxIsOps = header.current().ledgerVersion >= 11;

size_t opsLeft;
{
size_t maxTxSetSize = header.current().maxTxSetSize;
opsLeft = maxIsOps ? maxTxSetSize : (maxTxSetSize * MAX_OPS_PER_TX);
}
size_t opsLeft = app.getLedgerManager().getLastMaxTxSetSizeOps();

auto curSizeOps = maxIsOps ? sizeOp() : (sizeTx() * MAX_OPS_PER_TX);
if (curSizeOps > opsLeft)
Expand Down Expand Up @@ -255,7 +251,7 @@ TxSetFrame::surgePricingFilter(Application& app)
// inspect the top candidate queue
auto& curTopTx = cur->front();
size_t opsCount =
maxIsOps ? curTopTx->getOperations().size() : MAX_OPS_PER_TX;
maxIsOps ? curTopTx->getNumOperations() : MAX_OPS_PER_TX;
if (opsCount <= opsLeft)
{
// pop from this one
Expand Down
2 changes: 1 addition & 1 deletion src/herder/Upgrades.cpp
Expand Up @@ -300,7 +300,7 @@ Upgrades::isValidForApply(UpgradeType const& opaqueUpgrade,
res = res && (upgrade.newBaseFee() != 0);
break;
case LEDGER_UPGRADE_MAX_TX_SET_SIZE:
res = res && (upgrade.newMaxTxSetSize() != 0);
// any size is allowed
break;
case LEDGER_UPGRADE_BASE_RESERVE:
res = res && (upgrade.newBaseReserve() != 0);
Expand Down
37 changes: 37 additions & 0 deletions src/herder/test/HerderTests.cpp
Expand Up @@ -786,6 +786,43 @@ TEST_CASE("surge pricing", "[herder][txset]")
// (1+..+4) + (1+2) = 10+3 = 13
surgeTest(Config::CURRENT_LEDGER_PROTOCOL_VERSION, 5, 15, 13);
}
SECTION("max 0 ops per ledger")
{
Config cfg(getTestConfig());
cfg.TESTING_UPGRADE_MAX_TX_SET_SIZE = 0;

VirtualClock clock;
Application::pointer app = createTestApplication(clock, cfg);

app->start();

auto root = TestAccount::createRoot(*app);

auto destAccount = root.create("destAccount", 500000000);
auto accountB = root.create("accountB", 5000000000);
auto accountC = root.create("accountC", 5000000000);

TxSetFramePtr txSet = std::make_shared<TxSetFrame>(
app->getLedgerManager().getLastClosedLedgerHeader().hash);

auto tx = makeMultiPayment(destAccount, root, 1, 100, 0, 1);
txSet->add(tx);
txSet->sortForHash();

// txSet contains a valid transaction
auto inv = txSet->trimInvalid(*app);
REQUIRE(inv.empty());

REQUIRE(txSet->sizeOp() == 1);
// txSet is itself invalid as it's over the limit
REQUIRE(!txSet->checkValid(*app));
txSet->surgePricingFilter(*app);

REQUIRE(txSet->sizeOp() == 0);
txSet->surgePricingFilter(*app);
REQUIRE(txSet->sizeOp() == 0);
REQUIRE(txSet->checkValid(*app));
}
}

static void
Expand Down

0 comments on commit b6f8767

Please sign in to comment.