Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools: block-generator locked table retry and additional metrics #5653

Merged
merged 12 commits into from
Aug 15, 2023
64 changes: 20 additions & 44 deletions tools/block-generator/generator/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package generator

import (
_ "embed"
"encoding/json"
"errors"
"fmt"
Expand All @@ -32,49 +31,19 @@ import (
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
txn "github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/data/transactions/logic"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/go-algorand/tools/block-generator/util"
)

// ---- templates ----
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to generate_apps.go


//go:embed teal/poap_boxes.teal
var approvalBoxes string
var approvalBoxesBytes interface{}

//go:embed teal/poap_clear.teal
var clearBoxes string
var clearBoxesBytes interface{}

//go:embed teal/swap_amm.teal
var approvalSwap string
var approvalSwapBytes interface{}

//go:embed teal/swap_clear.teal
var clearSwap string
var clearSwapBytes interface{}

func init() {
prog, err := logic.AssembleString(approvalBoxes)
util.MaybeFail(err, "failed to assemble approval program")
approvalBoxesBytes = prog.Program

prog, err = logic.AssembleString(clearBoxes)
util.MaybeFail(err, "failed to assemble clear program")
clearBoxesBytes = prog.Program

prog, err = logic.AssembleString(approvalSwap)
util.MaybeFail(err, "failed to assemble approvalSwap program")
approvalSwapBytes = prog.Program

prog, err = logic.AssembleString(clearSwap)
util.MaybeFail(err, "failed to assemble clearSwap program")
clearSwapBytes = prog.Program
}
const (
BlockTotalSizeBytes = "blocks_total_size_bytes"
CommitWaitTimeMS = "commit_wait_time_ms"
BlockgenGenerateTimeMS = "blockgen_generate_time_ms"
LedgerEvalTimeMS = "ledger_eval_time_ms"
LedgerValidateTimeMS = "ledger_validate_time_ms"
)

// ---- constructors ----

Expand Down Expand Up @@ -105,10 +74,11 @@ func MakeGenerator(log logging.Logger, dbround uint64, bkGenesis bookkeeping.Gen
rewardsResidue: 0,
rewardsRate: 0,
rewardsRecalculationRound: 0,
reportData: make(map[TxTypeID]TxData),
latestData: make(map[TxTypeID]uint64),
roundOffset: dbround,
}
gen.reportData.Transactions = make(map[TxTypeID]TxData)
gen.reportData.Counters = make(map[string]uint64)

gen.feeSink[31] = 1
gen.rewardsPool[31] = 2
Expand Down Expand Up @@ -357,7 +327,7 @@ func (g *generator) WriteBlock(output io.Writer, round uint64) error {
g.setBlockHeader(&cert)

intra := uint64(0)
txGroupsAD := [][]txn.SignedTxnWithAD{}
var txGroupsAD [][]txn.SignedTxnWithAD
for intra < minTxnsForBlock {
txGroupAD, numTxns, err := g.generateTxGroup(g.round, intra)
if err != nil {
Expand All @@ -371,21 +341,25 @@ func (g *generator) WriteBlock(output io.Writer, round uint64) error {
intra += numTxns
}
generated = time.Now()
g.reportData.Counters[BlockgenGenerateTimeMS] += uint64(generated.Sub(start).Milliseconds())

vBlock, ledgerTxnCount, err := g.evaluateBlock(cert.Block.BlockHeader, txGroupsAD, int(intra))
vBlock, ledgerTxnCount, commitWaitTime, err := g.evaluateBlock(cert.Block.BlockHeader, txGroupsAD, int(intra))
if err != nil {
return fmt.Errorf("failed to evaluate block: %w", err)
}
if ledgerTxnCount != g.txnCounter+intra {
return fmt.Errorf("evaluateBlock() txn count mismatches theoretical intra: %d != %d", ledgerTxnCount, g.txnCounter+intra)
}
evaluated = time.Now()
g.reportData.Counters[LedgerEvalTimeMS] += uint64(evaluated.Sub(generated).Milliseconds())
winder marked this conversation as resolved.
Show resolved Hide resolved

err = g.ledger.AddValidatedBlock(*vBlock, cert.Certificate)
if err != nil {
return fmt.Errorf("failed to add validated block: %w", err)
}
validated = time.Now()
g.reportData.Counters[CommitWaitTimeMS] += uint64(commitWaitTime.Milliseconds())
g.reportData.Counters[LedgerValidateTimeMS] += uint64((validated.Sub(evaluated) - commitWaitTime).Milliseconds())
Comment on lines +361 to +362
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collecting some additional metrics in the reportData object.


cert.Block.Payset = vBlock.Block().Payset

Expand All @@ -400,6 +374,8 @@ func (g *generator) WriteBlock(output io.Writer, round uint64) error {

// write the msgpack bytes for a block
g.latestBlockMsgp = protocol.EncodeMsgp(&cert)
g.reportData.Counters[BlockTotalSizeBytes] += uint64(len(g.latestBlockMsgp))
winder marked this conversation as resolved.
Show resolved Hide resolved

_, err = output.Write(g.latestBlockMsgp)
if err != nil {
return err
Expand Down Expand Up @@ -812,7 +788,7 @@ func (g *generator) generateAssetTxnInternalHint(txType TxTypeID, round uint64,
}

if g.balances[senderIndex] < txn.Fee.ToUint64() {
fmt.Printf("\n\nthe sender account does not have enough algos for the transfer. idx %d, asset transaction type %v, num %d\n\n", senderIndex, actual, g.reportData[actual].GenerationCount)
fmt.Printf("\n\nthe sender account does not have enough algos for the transfer. idx %d, asset transaction type %v, num %d\n\n", senderIndex, actual, g.reportData.Transactions[actual].GenerationCount)
os.Exit(1)
}

Expand All @@ -835,10 +811,10 @@ func track(id TxTypeID) (TxTypeID, time.Time) {

func (g *generator) recordData(id TxTypeID, start time.Time) {
g.latestData[id]++
data := g.reportData[id]
data := g.reportData.Transactions[id]
data.GenerationCount += 1
data.GenerationTime += time.Since(start)
g.reportData[id] = data
g.reportData.Transactions[id] = data
}

// ---- sign transactions ----
Expand Down
42 changes: 41 additions & 1 deletion tools/block-generator/generator/generate_apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,53 @@
package generator

import (
_ "embed"
"fmt"
"math/rand"
"time"

txn "github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/data/transactions/logic"
"github.com/algorand/go-algorand/tools/block-generator/util"
)

// ---- templates ----

//go:embed teal/poap_boxes.teal
var approvalBoxes string
var approvalBoxesBytes interface{}

//go:embed teal/poap_clear.teal
var clearBoxes string
var clearBoxesBytes interface{}

//go:embed teal/swap_amm.teal
var approvalSwap string
var approvalSwapBytes interface{}

//go:embed teal/swap_clear.teal
var clearSwap string
var clearSwapBytes interface{}

// Precompile teal programs
func init() {
prog, err := logic.AssembleString(approvalBoxes)
util.MaybeFail(err, "failed to assemble approval program")
approvalBoxesBytes = prog.Program

prog, err = logic.AssembleString(clearBoxes)
util.MaybeFail(err, "failed to assemble clear program")
clearBoxesBytes = prog.Program

prog, err = logic.AssembleString(approvalSwap)
util.MaybeFail(err, "failed to assemble approvalSwap program")
approvalSwapBytes = prog.Program

prog, err = logic.AssembleString(clearSwap)
util.MaybeFail(err, "failed to assemble clearSwap program")
clearSwapBytes = prog.Program
}

// ---- generator app state ----

func (g *generator) resetPendingApps() {
Expand Down Expand Up @@ -71,7 +111,7 @@ func countEffects(actual TxTypeID) uint64 {

func CumulativeEffects(report Report) EffectsReport {
effsReport := make(EffectsReport)
for txType, data := range report {
for txType, data := range report.Transactions {
rootCount := data.GenerationCount
effsReport[string(txType)] += rootCount
for _, effect := range effects[txType] {
Expand Down
18 changes: 10 additions & 8 deletions tools/block-generator/generator/generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,16 +674,16 @@ func TestRecordData(t *testing.T) {
gen := makePrivateGenerator(t, 0, bookkeeping.Genesis{})

id := TxTypeID("test")
data, ok := gen.reportData[id]
data, ok := gen.reportData.Transactions[id]
require.False(t, ok)

gen.recordData(id, time.Now())
data, ok = gen.reportData[id]
data, ok = gen.reportData.Transactions[id]
require.True(t, ok)
require.Equal(t, uint64(1), data.GenerationCount)

gen.recordData(id, time.Now())
data, ok = gen.reportData[id]
data, ok = gen.reportData.Transactions[id]
require.True(t, ok)
require.Equal(t, uint64(2), data.GenerationCount)
}
Expand Down Expand Up @@ -725,11 +725,13 @@ func TestCumulativeEffects(t *testing.T) {
partitiontest.PartitionTest(t)

report := Report{
TxTypeID("app_boxes_optin"): {GenerationCount: uint64(42)},
TxTypeID("app_boxes_create"): {GenerationCount: uint64(1337)},
TxTypeID("pay_pay"): {GenerationCount: uint64(999)},
TxTypeID("asset_optin_total"): {GenerationCount: uint64(13)},
TxTypeID("app_boxes_call"): {GenerationCount: uint64(413)},
Transactions: map[TxTypeID]TxData{
TxTypeID("app_boxes_optin"): {GenerationCount: uint64(42)},
TxTypeID("app_boxes_create"): {GenerationCount: uint64(1337)},
TxTypeID("pay_pay"): {GenerationCount: uint64(999)},
TxTypeID("asset_optin_total"): {GenerationCount: uint64(13)},
TxTypeID("app_boxes_call"): {GenerationCount: uint64(413)},
},
}

expectedEffectsReport := EffectsReport{
Expand Down
25 changes: 19 additions & 6 deletions tools/block-generator/generator/generator_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"encoding/binary"
"fmt"
"os"
"strings"
"time"

"github.com/algorand/avm-abi/apps"
cconfig "github.com/algorand/go-algorand/config"
Expand Down Expand Up @@ -167,19 +169,30 @@ func (g *generator) startEvaluator(hdr bookkeeping.BlockHeader, paysetHint int)
})
}

func (g *generator) evaluateBlock(hdr bookkeeping.BlockHeader, txGroups [][]txn.SignedTxnWithAD, paysetHint int) (*ledgercore.ValidatedBlock, uint64 /* txnCount */, error) {
func (g *generator) evaluateBlock(hdr bookkeeping.BlockHeader, txGroups [][]txn.SignedTxnWithAD, paysetHint int) (*ledgercore.ValidatedBlock, uint64 /* txnCount */, time.Duration /* commit wait time */, error) {
commitWaitTime := time.Duration(0)
waitDelay := 10 * time.Millisecond
eval, err := g.startEvaluator(hdr, paysetHint)
if err != nil {
return nil, 0, fmt.Errorf("could not start evaluator: %w", err)
return nil, 0, 0, fmt.Errorf("could not start evaluator: %w", err)
}
for i, txGroup := range txGroups {
err := eval.TransactionGroup(txGroup)
if err != nil {
return nil, 0, fmt.Errorf("could not evaluate transaction group %d: %w", i, err)
for {
err := eval.TransactionGroup(txGroup)
if err != nil {
if strings.Contains(err.Error(), "database table is locked") {
time.Sleep(waitDelay)
commitWaitTime += waitDelay
// sometimes the database is locked, so we retry
continue
}
return nil, 0, 0, fmt.Errorf("could not evaluate transaction group %d: %w", i, err)
}
break
Comment on lines +180 to +191
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the "bug fix". The error always seems to happen on the first transaction group. Simply retrying until it works seems to allow the tests to complete.

Copy link
Contributor

@tzaffi tzaffi Aug 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider reporting a retry count as well (though I guess this is roughly commitWaitTime / 10 * time.Millisecond)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe more informative would be a retriedBlocks counter. So a contribution of 1 for blocks which had to retry, and 0 for those without any retries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mainly looking for magnitude with this. In my first test the importer was waiting for ~5 seconds over the course of a 1h test, so I don't think we need to worry about the wait time too much.

}
}
lvb, err := eval.GenerateBlock()
return lvb, eval.TestingTxnCounter(), err
return lvb, eval.TestingTxnCounter(), commitWaitTime, err
}

func countInners(ad txn.ApplyData) int {
Expand Down
5 changes: 4 additions & 1 deletion tools/block-generator/generator/generator_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ type assetHolding struct {
}

// Report is the generation report.
type Report map[TxTypeID]TxData
type Report struct {
Counters map[string]uint64 `json:"counters"`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More data for the report.

Transactions map[TxTypeID]TxData `json:"transactions"`
}

// EffectsReport collates transaction counts caused by a root transaction.
type EffectsReport map[string]uint64
Expand Down
94 changes: 94 additions & 0 deletions tools/block-generator/runner/reporting_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (C) 2019-2023 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package runner

import (
"fmt"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/tools/block-generator/generator"
)

func makeDummyData() (time.Time, time.Duration, generator.Report, *MetricsCollector) {
start := time.Now().Add(-10 * time.Minute)
duration := time.Hour
generatorReport := generator.Report{
Counters: make(map[string]uint64),
Transactions: make(map[generator.TxTypeID]generator.TxData),
}
collector := &MetricsCollector{Data: make([]Entry, 10)}
return start, duration, generatorReport, collector
}

// makeMetrics creates a set of metrics for testing.
func makeMetrics(start time.Time) *MetricsCollector {
collector := &MetricsCollector{}
for i := 0; i <= 10; i++ {
var data []string

// should be converted to an average.
data = append(data, fmt.Sprintf("import_time_sec_sum %d", i*100))
data = append(data, fmt.Sprintf("import_time_sec_count %d", i))
// should be converted to an average.
data = append(data, fmt.Sprintf("imported_tx_per_block_sum %d", i*100))
data = append(data, fmt.Sprintf("imported_tx_per_block_count %d", i))

data = append(data, fmt.Sprintf("imported_round %d", i))
collector.Data = append(collector.Data, Entry{
Timestamp: start.Add(time.Duration(i) * time.Minute),
Data: data,
})
}
return collector
}

func TestWriteReport_MissingMetrics(t *testing.T) {
start, duration, generatorReport, collector := makeDummyData()
var builder strings.Builder
err := writeReport(&builder, t.Name(), start, duration, generatorReport, collector)
require.ErrorContains(t, err, "metric incomplete or not found")
}

func TestWriterReport_Good(t *testing.T) {
start, duration, generatorReport, _ := makeDummyData()
collector := makeMetrics(start)

generatorReport.Counters[generator.BlockTotalSizeBytes] = 1024
generatorReport.Counters[generator.BlockgenGenerateTimeMS] = 0
generatorReport.Counters[generator.CommitWaitTimeMS] = 1000
generatorReport.Counters[generator.LedgerEvalTimeMS] = 2000
generatorReport.Counters[generator.LedgerValidateTimeMS] = 3000

var builder strings.Builder
err := writeReport(&builder, t.Name(), start, duration, generatorReport, collector)
require.NoError(t, err)

report := builder.String()

// both rounds of metrics are reported.
require.Contains(t, report, "final_imported_round:10")
require.Contains(t, report, "early_imported_round:2")

// counters are reported.
for k, v := range generatorReport.Counters {
winder marked this conversation as resolved.
Show resolved Hide resolved
require.Contains(t, report, fmt.Sprintf("%s:%d", k, v))
}
}