Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: optimize /transactions/pending/{txid} endpoint #5891

Merged
merged 6 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,15 @@
return l.txTail.checkDup(currentProto, current, firstValid, lastValid, txid, txl)
}

// CheckConfirmedTail checks if a transaction txid happens to have LastValid greater than the current round at the time of calling and has been already committed to the ledger.
// If both conditions are met it returns true.
// This function could be used as filter to check if a transaction is committed to the ledger, and no extra checks needed if it says true.
//
// Note, this cannot be used to check if transaction happened or not in past MaxTxnLife rounds.
func (l *Ledger) CheckConfirmedTail(txid transactions.Txid) (basics.Round, bool) {
return l.txTail.checkConfirmed(txid)

Check warning on line 666 in ledger/ledger.go

View check run for this annotation

Codecov / codecov/patch

ledger/ledger.go#L665-L666

Added lines #L665 - L666 were not covered by tests
}

// Latest returns the latest known block round added to the ledger.
func (l *Ledger) Latest() basics.Round {
return l.blockQ.latest()
Expand Down
48 changes: 36 additions & 12 deletions ledger/txtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@
// lastValid, recent, lowWaterMark, roundTailHashes, roundTailSerializedDeltas and blockHeaderData.
tailMu deadlock.RWMutex

lastValid map[basics.Round]map[transactions.Txid]struct{} // map tx.LastValid -> tx confirmed set
// lastValid allows looking up all of the transactions that expire in a given round.
// The map for an expiration round gives the round the transaction was originally confirmed, so it can be found for the /pending endpoint.
lastValid map[basics.Round]map[transactions.Txid]int16 // map tx.LastValid -> tx confirmed map: txid -> (last valid - confirmed) delta
jannotti marked this conversation as resolved.
Show resolved Hide resolved

// duplicate detection queries with LastValid before
// lowWaterMark are not guaranteed to succeed
Expand Down Expand Up @@ -115,14 +117,18 @@
}

t.lowWaterMark = l.Latest()
t.lastValid = make(map[basics.Round]map[transactions.Txid]struct{})
t.lastValid = make(map[basics.Round]map[transactions.Txid]int16)
jannotti marked this conversation as resolved.
Show resolved Hide resolved
t.recent = make(map[basics.Round]roundLeases)

// the lastValid is a temporary map used during the execution of
// loadFromDisk, allowing us to construct the lastValid maps in their
// optimal size. This would ensure that upon startup, we don't preallocate
// more memory than we truly need.
lastValid := make(map[basics.Round][]transactions.Txid)
type lastValidEntry struct {
rnd basics.Round
txid transactions.Txid
}
lastValid := make(map[basics.Round][]lastValidEntry)

// the roundTailHashes and blockHeaderData need a single element to start with
// in order to allow lookups on zero offsets when they are empty (new database)
Expand Down Expand Up @@ -153,16 +159,16 @@
list := lastValid[txTailRound.LastValid[i]]
// if the list reached capacity, resize.
if len(list) == cap(list) {
var newList []transactions.Txid
var newList []lastValidEntry
if cap(list) == 0 {
newList = make([]transactions.Txid, 0, initialLastValidArrayLen)
newList = make([]lastValidEntry, 0, initialLastValidArrayLen)
} else {
newList = make([]transactions.Txid, len(list), len(list)*2)
newList = make([]lastValidEntry, len(list), len(list)*2)

Check warning on line 166 in ledger/txtail.go

View check run for this annotation

Codecov / codecov/patch

ledger/txtail.go#L166

Added line #L166 was not covered by tests
}
copy(newList[:], list[:])
list = newList
}
list = append(list, txTailRound.TxnIDs[i])
list = append(list, lastValidEntry{txTailRound.Hdr.Round, txTailRound.TxnIDs[i]})
lastValid[txTailRound.LastValid[i]] = list
}
}
Expand All @@ -173,9 +179,13 @@

// add all the entries in roundsLastValids to their corresponding map entry in t.lastValid
for lastValid, list := range lastValid {
lastValueMap := make(map[transactions.Txid]struct{}, len(list))
for _, id := range list {
lastValueMap[id] = struct{}{}
lastValueMap := make(map[transactions.Txid]int16, len(list))
jannotti marked this conversation as resolved.
Show resolved Hide resolved
for _, entry := range list {
if lastValid < entry.rnd {
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("txTail: invalid lastValid %d / rnd %d for txid %s", lastValid, entry.rnd, entry.txid)

Check warning on line 185 in ledger/txtail.go

View check run for this annotation

Codecov / codecov/patch

ledger/txtail.go#L185

Added line #L185 was not covered by tests
}
deltaR := int16(lastValid - entry.rnd)
lastValueMap[entry.txid] = deltaR
}
t.lastValid[lastValid] = lastValueMap
}
Expand Down Expand Up @@ -210,9 +220,10 @@

for txid, txnInc := range delta.Txids {
if _, ok := t.lastValid[txnInc.LastValid]; !ok {
t.lastValid[txnInc.LastValid] = make(map[transactions.Txid]struct{})
t.lastValid[txnInc.LastValid] = make(map[transactions.Txid]int16)
}
t.lastValid[txnInc.LastValid][txid] = struct{}{}
deltaR := int16(txnInc.LastValid - blk.BlockHeader.Round)
t.lastValid[txnInc.LastValid][txid] = deltaR

tail.TxnIDs[txnInc.Intra] = txid
tail.LastValid[txnInc.Intra] = txnInc.LastValid
Expand Down Expand Up @@ -381,6 +392,19 @@
return nil
}

// checkConfirmed test to see if the given transaction id already exists.
func (t *txTail) checkConfirmed(txid transactions.Txid) (basics.Round, bool) {
t.tailMu.RLock()
defer t.tailMu.RUnlock()

for lastValidRound, lastValid := range t.lastValid {
if deltaR, confirmed := lastValid[txid]; confirmed {
return lastValidRound - basics.Round(deltaR), true
}
}
return 0, false
}

func (t *txTail) recentTailHash(offset uint64, retainSize uint64) (crypto.Digest, error) {
// prepare a buffer to hash.
buffer := make([]byte, (retainSize)*crypto.DigestSize)
Expand Down
80 changes: 79 additions & 1 deletion ledger/txtail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,15 @@ func TestTxTailCheckdup(t *testing.T) {
type txTailTestLedger struct {
Ledger
protoVersion protocol.ConsensusVersion
blocks map[basics.Round]bookkeeping.Block
}

const testTxTailValidityRange = 200
const testTxTailTxnPerRound = 150
const testTxTailExtraRounds = 10

func (t *txTailTestLedger) Latest() basics.Round {
return basics.Round(config.Consensus[t.protoVersion].MaxTxnLife + 10)
return basics.Round(config.Consensus[t.protoVersion].MaxTxnLife + testTxTailExtraRounds)
}

func (t *txTailTestLedger) BlockHdr(r basics.Round) (bookkeeping.BlockHeader, error) {
Expand All @@ -130,6 +132,10 @@ func (t *txTailTestLedger) BlockHdr(r basics.Round) (bookkeeping.BlockHeader, er
}

func (t *txTailTestLedger) Block(r basics.Round) (bookkeeping.Block, error) {
if bkl, found := t.blocks[r]; found {
return bkl, nil
}

blk := bookkeeping.Block{
BlockHeader: bookkeeping.BlockHeader{
UpgradeState: bookkeeping.UpgradeState{
Expand All @@ -142,6 +148,10 @@ func (t *txTailTestLedger) Block(r basics.Round) (bookkeeping.Block, error) {
for i := range blk.Payset {
blk.Payset[i] = makeTxTailTestTransaction(r, i)
}
if t.blocks == nil {
t.blocks = make(map[basics.Round]bookkeeping.Block)
}
t.blocks[r] = blk

return blk, nil
}
Expand Down Expand Up @@ -330,6 +340,74 @@ func TestTxTailDeltaTracking(t *testing.T) {
}
}

func TestTxTailCheckConfirmed(t *testing.T) {
partitiontest.PartitionTest(t)

var ledger txTailTestLedger
txtail := txTail{}
protoVersion := protocol.ConsensusCurrentVersion
proto := config.Consensus[protoVersion]
require.NoError(t, ledger.initialize(t, protoVersion))
require.NoError(t, txtail.loadFromDisk(&ledger, ledger.Latest()))

// ensure block retrieval from txTailTestLedger works
startRound := ledger.Latest() - basics.Round(proto.MaxTxnLife) + 1
b1, err := ledger.Block(startRound)
require.NoError(t, err)
b2, err := ledger.Block(startRound)
require.NoError(t, err)
require.Equal(t, b1, b2)

// check all txids in blocks are in txTail as well
// note, txtail does not store txids for transactions with lastValid < ledger.Latest()
for i := ledger.Latest() - testTxTailValidityRange + 1; i < ledger.Latest(); i++ {
blk, err := ledger.Block(i)
require.NoError(t, err)
for _, txn := range blk.Payset {
confirmedAt, found := txtail.checkConfirmed(txn.Txn.ID())
require.True(t, found, "failed to find txn at round %d (startRound=%d, latest=%d)", i, startRound, ledger.Latest())
require.Equal(t, basics.Round(i), confirmedAt)
}
}

rnd := ledger.Latest() + 1
lv := basics.Round(rnd + 50)
blk := bookkeeping.Block{
BlockHeader: bookkeeping.BlockHeader{
Round: rnd,
TimeStamp: int64(rnd << 10),
UpgradeState: bookkeeping.UpgradeState{
CurrentProtocol: protoVersion,
},
},
Payset: make(transactions.Payset, 1),
}
sender := &basics.Address{}
sender[0] = byte(rnd)
sender[1] = byte(rnd >> 8)
sender[2] = byte(rnd >> 16)
blk.Payset[0].Txn.Sender = *sender
blk.Payset[0].Txn.FirstValid = rnd
blk.Payset[0].Txn.LastValid = lv
deltas := ledgercore.MakeStateDelta(&blk.BlockHeader, 0, 0, 0)
deltas.Txids[blk.Payset[0].Txn.ID()] = ledgercore.IncludedTransactions{
LastValid: lv,
Intra: 0,
}
deltas.AddTxLease(ledgercore.Txlease{Sender: blk.Payset[0].Txn.Sender, Lease: blk.Payset[0].Txn.Lease}, basics.Round(rnd+50))

txtail.newBlock(blk, deltas)
txtail.committedUpTo(basics.Round(rnd))

confirmedAt, found := txtail.checkConfirmed(blk.Payset[0].Txn.ID())
require.True(t, found)
require.Equal(t, basics.Round(rnd), confirmedAt)

confirmedAt, found = txtail.checkConfirmed(transactions.Txid{})
require.False(t, found)
require.Equal(t, basics.Round(0), confirmedAt)
}

// BenchmarkTxTailBlockHeaderCache adds 2M random blocks by calling
// newBlock and postCommit on txTail tracker, and reports memory allocations
func BenchmarkTxTailBlockHeaderCache(b *testing.B) {
Expand Down
22 changes: 21 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,25 @@
// Keep looking in the ledger.
}

// quick check for confirmed transactions with LastValid in future
// this supposed to cover most of the cases where REST checks for the most recent txns
if r, confirmed := node.ledger.CheckConfirmedTail(txID); confirmed {
jannotti marked this conversation as resolved.
Show resolved Hide resolved
tx, foundBlk, err := node.ledger.LookupTxid(txID, r)
if err == nil && foundBlk {
return TxnWithStatus{
Txn: tx.SignedTxn,
ConfirmedRound: r,
ApplyData: tx.ApplyData,
}, true

Check warning on line 665 in node/node.go

View check run for this annotation

Codecov / codecov/patch

node/node.go#L658-L665

Added lines #L658 - L665 were not covered by tests
}
}
// if found in the pool and not in the tail then return without looking into blocks
// because the check appears to be too early
if found {
return res, found

Check warning on line 671 in node/node.go

View check run for this annotation

Codecov / codecov/patch

node/node.go#L670-L671

Added lines #L670 - L671 were not covered by tests
}

// fallback to blocks lookup
var maxLife basics.Round
latest := node.ledger.Latest()
proto, err := node.ledger.ConsensusParams(latest)
Expand Down Expand Up @@ -688,6 +707,7 @@
if err != nil || !found {
continue
}

return TxnWithStatus{
Txn: tx.SignedTxn,
ConfirmedRound: r,
Expand All @@ -696,7 +716,7 @@
}

// Return whatever we found in the pool (if anything).
return
return res, found

Check warning on line 719 in node/node.go

View check run for this annotation

Codecov / codecov/patch

node/node.go#L719

Added line #L719 was not covered by tests
}

// Status returns a StatusReport structure reporting our status as Active and with our ledger's LastRound
Expand Down