From 5584c6dca2f2f3a492f7650aeb905ab24ffac682 Mon Sep 17 00:00:00 2001 From: Will Winder Date: Thu, 23 Jun 2022 12:59:48 -0400 Subject: [PATCH] LocalLedger Refactoring + Catchpoint Service (#1049) Part 1 cleanup genesis file access. put node catchup into a function that can be swapped out with the catchup service. pass the indexer logger into the block processor. move open ledger into a util function, and move the initial state util function into a new ledger util file. add initial catchupservice implementation. move ledger init from daemon.go to constructor. Merge multiple read genesis functions. Part 2 Merge local_ledger migration package into blockprocessor. Rename Migration to Initialize Use logger in catchup service catchup Part 3 Update submodule and use NewWrappedLogger. Make util.CreateInitState private --- api/handlers_e2e_test.go | 21 ++-- cmd/algorand-indexer/daemon.go | 58 ++------- cmd/algorand-indexer/daemon_test.go | 43 ++----- cmd/import-validator/core/service.go | 16 ++- .../postgres_integration_common_test.go | 6 +- idb/postgres/postgres_integration_test.go | 35 ++++-- importer/helper.go | 45 ++----- processor/blockprocessor/block_processor.go | 42 +++++-- .../blockprocessor/block_processor_test.go | 11 +- .../blockprocessor/initialize.go | 82 +++++++------ .../blockprocessor/initialize_test.go | 25 ++-- .../blockprocessor/internal/catchupservice.go | 114 ++++++++++++++++++ processor/eval/ledger_for_evaluator_test.go | 43 ++++--- test/common.sh | 7 +- third_party/go-algorand | 2 +- util/ledger_util.go | 73 +++++++++++ util/ledger_util_test.go | 40 ++++++ util/test/testutil.go | 17 +-- util/util.go | 30 ----- 19 files changed, 437 insertions(+), 273 deletions(-) rename migrations/local_ledger/migration.go => processor/blockprocessor/initialize.go (72%) rename migrations/local_ledger/migration_test.go => processor/blockprocessor/initialize_test.go (78%) create mode 100644 processor/blockprocessor/internal/catchupservice.go create mode 100644 util/ledger_util.go create mode 100644 util/ledger_util_test.go diff --git a/api/handlers_e2e_test.go b/api/handlers_e2e_test.go index 6cabbc454..29000ac66 100644 --- a/api/handlers_e2e_test.go +++ b/api/handlers_e2e_test.go @@ -12,26 +12,27 @@ import ( "testing" "time" - "github.com/algorand/go-algorand/crypto" - "github.com/algorand/go-algorand/crypto/merklesignature" - "github.com/algorand/go-algorand/data/basics" - "github.com/algorand/go-algorand/data/bookkeeping" - "github.com/algorand/go-algorand/ledger" - "github.com/algorand/go-algorand/rpcs" - "github.com/algorand/indexer/processor" - "github.com/algorand/indexer/processor/blockprocessor" "github.com/labstack/echo/v4" "github.com/sirupsen/logrus" + test2 "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/algorand/go-algorand-sdk/encoding/json" + "github.com/algorand/go-algorand/crypto" + "github.com/algorand/go-algorand/crypto/merklesignature" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/data/transactions" + "github.com/algorand/go-algorand/ledger" + "github.com/algorand/go-algorand/rpcs" + "github.com/algorand/indexer/processor" "github.com/algorand/indexer/api/generated/v2" "github.com/algorand/indexer/idb" "github.com/algorand/indexer/idb/postgres" pgtest "github.com/algorand/indexer/idb/postgres/testing" + "github.com/algorand/indexer/processor/blockprocessor" "github.com/algorand/indexer/util/test" ) @@ -74,7 +75,9 @@ func setupIdb(t *testing.T, genesis bookkeeping.Genesis) (*postgres.IndexerDb, f err = db.LoadGenesis(genesis) require.NoError(t, err) - l := test.MakeTestLedger("ledger") + log, _ := test2.NewNullLogger() + l, err := test.MakeTestLedger(log) + require.NoError(t, err) proc, err := blockprocessor.MakeProcessorWithLedger(l, db.AddBlock) require.NoError(t, err, "failed to open ledger") return db, newShutdownFunc, proc, l diff --git a/cmd/algorand-indexer/daemon.go b/cmd/algorand-indexer/daemon.go index 0c42b137c..13cee1a31 100644 --- a/cmd/algorand-indexer/daemon.go +++ b/cmd/algorand-indexer/daemon.go @@ -3,8 +3,6 @@ package main import ( "context" "fmt" - "io" - "io/ioutil" "os" "os/signal" "path/filepath" @@ -14,24 +12,22 @@ import ( "syscall" "time" - "github.com/algorand/go-algorand/data/bookkeeping" - "github.com/algorand/go-algorand/ledger/ledgercore" - "github.com/algorand/go-algorand/logging" - "github.com/algorand/go-algorand/protocol" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/algorand/go-algorand/rpcs" "github.com/algorand/go-algorand/util" + "github.com/algorand/indexer/api" "github.com/algorand/indexer/api/generated/v2" "github.com/algorand/indexer/config" "github.com/algorand/indexer/fetcher" "github.com/algorand/indexer/idb" "github.com/algorand/indexer/importer" - localledger "github.com/algorand/indexer/migrations/local_ledger" "github.com/algorand/indexer/processor" "github.com/algorand/indexer/processor/blockprocessor" + iutil "github.com/algorand/indexer/util" "github.com/algorand/indexer/util/metrics" - "github.com/spf13/cobra" - "github.com/spf13/viper" ) var ( @@ -233,39 +229,21 @@ var daemonCmd = &cobra.Command{ // Initial import if needed. genesisReader := importer.GetGenesisFile(genesisJSONPath, bot.Algod(), logger) - _, err := importer.EnsureInitialImport(db, genesisReader, logger) + genesis, err := iutil.ReadGenesis(genesisReader) + maybeFail(err, "Error reading genesis file") + + _, err = importer.EnsureInitialImport(db, genesis, logger) maybeFail(err, "importer.EnsureInitialImport() error") // sync local ledger nextDBRound, err := db.GetNextRoundToAccount() maybeFail(err, "Error getting DB round") - if nextDBRound > 0 { - if catchpoint != "" { - round, _, err := ledgercore.ParseCatchpointLabel(catchpoint) - if err != nil { - maybeFail(err, "catchpoint error") - } - if uint64(round) >= nextDBRound { - logger.Warnf("round for given catchpoint is ahead of db round. skip fast catchup") - } else { - err = localledger.RunMigrationFastCatchup(logging.NewLogger(), catchpoint, &opts) - maybeFail(err, "Error running ledger migration in fast catchup mode") - } - - } - err = localledger.RunMigrationSimple(nextDBRound-1, &opts) - maybeFail(err, "Error running ledger migration") - } logger.Info("Initializing block import handler.") imp := importer.NewImporter(db) logger.Info("Initializing local ledger.") - genesisReader = importer.GetGenesisFile(genesisJSONPath, bot.Algod(), logger) - genesis, err := readGenesis(genesisReader) - maybeFail(err, "Error reading genesis file") - - proc, err := blockprocessor.MakeProcessor(&genesis, nextDBRound, indexerDataDir, imp.ImportBlock) + proc, err := blockprocessor.MakeProcessorWithLedgerInit(logger, catchpoint, &genesis, nextDBRound, opts, imp.ImportBlock) if err != nil { maybeFail(err, "blockprocessor.MakeProcessor() err %v", err) } @@ -450,19 +428,3 @@ func handleBlock(block *rpcs.EncodedBlockCert, proc processor.Processor) error { return nil } - -func readGenesis(reader io.Reader) (bookkeeping.Genesis, error) { - var genesis bookkeeping.Genesis - if reader == nil { - return bookkeeping.Genesis{}, fmt.Errorf("readGenesis() err: reader is nil") - } - gbytes, err := ioutil.ReadAll(reader) - if err != nil { - return bookkeeping.Genesis{}, fmt.Errorf("readGenesis() err: %w", err) - } - err = protocol.DecodeJSON(gbytes, &genesis) - if err != nil { - return bookkeeping.Genesis{}, fmt.Errorf("readGenesis() err: %w", err) - } - return genesis, nil -} diff --git a/cmd/algorand-indexer/daemon_test.go b/cmd/algorand-indexer/daemon_test.go index 4669e04d4..251b179d3 100644 --- a/cmd/algorand-indexer/daemon_test.go +++ b/cmd/algorand-indexer/daemon_test.go @@ -3,20 +3,20 @@ package main import ( "context" "errors" - "io" - "strings" "sync" "testing" "time" - "github.com/algorand/go-algorand-sdk/encoding/json" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/rpcs" + "github.com/algorand/indexer/processor/blockprocessor" itest "github.com/algorand/indexer/util/test" - "github.com/sirupsen/logrus/hooks/test" - "github.com/stretchr/testify/assert" ) type mockImporter struct { @@ -39,7 +39,9 @@ func TestImportRetryAndCancel(t *testing.T) { // create handler with mock importer and start, it should generate errors until cancelled. imp := &mockImporter{} - l := itest.MakeTestLedger("ledger") + ledgerLogger, _ := test.NewNullLogger() + l, err := itest.MakeTestLedger(ledgerLogger) + require.NoError(t, err) defer l.Close() proc, err := blockprocessor.MakeProcessorWithLedger(l, nil) assert.Nil(t, err) @@ -73,32 +75,3 @@ func TestImportRetryAndCancel(t *testing.T) { cancel() wg.Wait() } - -func TestReadGenesis(t *testing.T) { - var reader io.Reader - // nil reader - _, err := readGenesis(reader) - assert.Contains(t, err.Error(), "readGenesis() err: reader is nil") - // no match struct field - genesisStr := "{\"version\": 2}" - reader = strings.NewReader(genesisStr) - _, err = readGenesis(reader) - assert.Contains(t, err.Error(), "json decode error") - - genesis := bookkeeping.Genesis{ - SchemaID: "1", - Network: "test", - Proto: "test", - RewardsPool: "AAAA", - FeeSink: "AAAA", - } - - // read and decode genesis - reader = strings.NewReader(string(json.Encode(genesis))) - _, err = readGenesis(reader) - assert.Nil(t, err) - // read from empty reader - _, err = readGenesis(reader) - assert.Contains(t, err.Error(), "readGenesis() err: EOF") - -} diff --git a/cmd/import-validator/core/service.go b/cmd/import-validator/core/service.go index 9dcb0d143..4474a66ab 100644 --- a/cmd/import-validator/core/service.go +++ b/cmd/import-validator/core/service.go @@ -9,7 +9,7 @@ import ( "sync" "time" - "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" "github.com/algorand/go-algorand-sdk/client/v2/algod" "github.com/algorand/go-algorand/agreement" @@ -52,7 +52,7 @@ func getGenesis(client *algod.Client) (bookkeeping.Genesis, error) { return res, nil } -func openIndexerDb(postgresConnStr string, genesis *bookkeeping.Genesis, logger *logrus.Logger) (*postgres.IndexerDb, error) { +func openIndexerDb(postgresConnStr string, genesis *bookkeeping.Genesis, logger *log.Logger) (*postgres.IndexerDb, error) { db, availableCh, err := postgres.OpenPostgres(postgresConnStr, idb.IndexerDbOptions{}, logger) if err != nil { @@ -92,9 +92,7 @@ func openIndexerDb(postgresConnStr string, genesis *bookkeeping.Genesis, logger return db, nil } -func openLedger(ledgerPath string, genesis *bookkeeping.Genesis) (*ledger.Ledger, error) { - logger := logging.NewLogger() - +func openLedger(logger *log.Logger, ledgerPath string, genesis *bookkeeping.Genesis) (*ledger.Ledger, error) { accounts := make(map[basics.Address]basics.AccountData) for _, alloc := range genesis.Allocation { address, err := basics.UnmarshalChecksumAddress(alloc.Address) @@ -115,7 +113,7 @@ func openLedger(ledgerPath string, genesis *bookkeeping.Genesis) (*ledger.Ledger } l, err := ledger.OpenLedger( - logger, path.Join(ledgerPath, "ledger"), false, initState, config.GetDefaultLocal()) + logging.NewWrappedLogger(logger), path.Join(ledgerPath, "ledger"), false, initState, config.GetDefaultLocal()) if err != nil { return nil, fmt.Errorf("openLedger() open err: %w", err) } @@ -309,7 +307,7 @@ func checkModifiedState(db *postgres.IndexerDb, l *ledger.Ledger, block *bookkee return nil } -func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logger *logrus.Logger) error { +func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logger *log.Logger) error { nextRoundIndexer, err := db.GetNextRoundToAccount() if err != nil { return fmt.Errorf("catchup err: %w", err) @@ -391,7 +389,7 @@ func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logg // Run is a blocking call that runs the import validator service. func Run(args ImportValidatorArgs) { - logger := logrus.New() + logger := log.New() bot, err := fetcher.ForNetAndToken(args.AlgodAddr, args.AlgodToken, logger) if err != nil { @@ -408,7 +406,7 @@ func Run(args ImportValidatorArgs) { fmt.Printf("error opening indexer database err: %v", err) os.Exit(1) } - l, err := openLedger(args.AlgodLedger, &genesis) + l, err := openLedger(logger, args.AlgodLedger, &genesis) if err != nil { fmt.Printf("error opening algod database err: %v", err) os.Exit(1) diff --git a/idb/postgres/postgres_integration_common_test.go b/idb/postgres/postgres_integration_common_test.go index fc8f6d57e..1f6966542 100644 --- a/idb/postgres/postgres_integration_common_test.go +++ b/idb/postgres/postgres_integration_common_test.go @@ -4,6 +4,8 @@ import ( "context" "testing" + test2 "github.com/sirupsen/logrus/hooks/test" + "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/ledger" "github.com/algorand/indexer/processor" @@ -35,7 +37,9 @@ func setupIdb(t *testing.T, genesis bookkeeping.Genesis) (*IndexerDb, func(), pr shutdownFunc() } - l := test.MakeTestLedger("ledger") + log, _ := test2.NewNullLogger() + l, err := test.MakeTestLedger(log) + require.NoError(t, err) proc, err := blockprocessor.MakeProcessorWithLedger(l, db.AddBlock) require.NoError(t, err, "failed to open ledger") diff --git a/idb/postgres/postgres_integration_test.go b/idb/postgres/postgres_integration_test.go index cbac35320..55fadfe3d 100644 --- a/idb/postgres/postgres_integration_test.go +++ b/idb/postgres/postgres_integration_test.go @@ -10,26 +10,29 @@ import ( "testing" "time" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "github.com/sirupsen/logrus" + test2 "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/rpcs" "github.com/algorand/go-codec/codec" + "github.com/algorand/indexer/api/generated/v2" + "github.com/algorand/indexer/idb" "github.com/algorand/indexer/idb/postgres/internal/encoding" "github.com/algorand/indexer/idb/postgres/internal/schema" + pgtest "github.com/algorand/indexer/idb/postgres/internal/testing" pgutil "github.com/algorand/indexer/idb/postgres/internal/util" "github.com/algorand/indexer/importer" "github.com/algorand/indexer/processor/blockprocessor" - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/algorand/indexer/idb" - pgtest "github.com/algorand/indexer/idb/postgres/internal/testing" + "github.com/algorand/indexer/util" "github.com/algorand/indexer/util/test" ) @@ -1411,7 +1414,9 @@ func TestAddBlockIncrementsMaxRoundAccounted(t *testing.T) { require.NoError(t, err) assert.Equal(t, uint64(0), round) - l := test.MakeTestLedger("ledger") + log, _ := test2.NewNullLogger() + l, err := test.MakeTestLedger(log) + require.NoError(t, err) defer l.Close() proc, err := blockprocessor.MakeProcessorWithLedger(l, db.AddBlock) require.NoError(t, err, "failed to open ledger") @@ -1707,7 +1712,9 @@ func TestSearchForInnerTransactionReturnsRootTransaction(t *testing.T) { rootTxid := appCall.Txn.ID() err = pgutil.TxWithRetry(pdb, serializable, func(tx pgx.Tx) error { - l := test.MakeTestLedger("ledger") + log, _ := test2.NewNullLogger() + l, err := test.MakeTestLedger(log) + require.NoError(t, err) defer l.Close() proc, err := blockprocessor.MakeProcessorWithLedger(l, db.AddBlock) require.NoError(t, err, "failed to open ledger") @@ -2141,7 +2148,9 @@ func TestGenesisHashCheckAtInitialImport(t *testing.T) { require.ErrorIs(t, err, idb.ErrorNotInitialized) logger := logrus.New() genesisReader := bytes.NewReader(protocol.EncodeJSON(genesis)) - imported, err := importer.EnsureInitialImport(db, genesisReader, logger) + gen, err := util.ReadGenesis(genesisReader) + require.NoError(t, err) + imported, err := importer.EnsureInitialImport(db, gen, logger) require.NoError(t, err) require.True(t, true, imported) // network state should be set @@ -2152,8 +2161,10 @@ func TestGenesisHashCheckAtInitialImport(t *testing.T) { // change genesis value genesis.Network = "testnest" genesisReader = bytes.NewReader(protocol.EncodeJSON(genesis)) + gen, err = util.ReadGenesis(genesisReader) + require.NoError(t, err) // different genesisHash, should fail - _, err = importer.EnsureInitialImport(db, genesisReader, logger) + _, err = importer.EnsureInitialImport(db, gen, logger) require.Error(t, err) require.Contains(t, err.Error(), "genesis hash not matching") diff --git a/importer/helper.go b/importer/helper.go index 6e9c6a966..d1faf5a92 100644 --- a/importer/helper.go +++ b/importer/helper.go @@ -16,11 +16,8 @@ import ( "time" "github.com/algorand/go-algorand-sdk/client/v2/algod" - "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/bookkeeping" - "github.com/algorand/go-algorand/ledger" - "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/rpcs" "github.com/algorand/indexer/processor/blockprocessor" @@ -54,8 +51,12 @@ type ImportHelper struct { func (h *ImportHelper) Import(db idb.IndexerDb, args []string) { // Initial import if needed. genesisReader := GetGenesisFile(h.GenesisJSONPath, nil, h.Log) - _, err := EnsureInitialImport(db, genesisReader, h.Log) + genesis, err := util.ReadGenesis(genesisReader) + maybeFail(err, h.Log, "readGenesis() error") + + _, err = EnsureInitialImport(db, genesis, h.Log) maybeFail(err, h.Log, "EnsureInitialImport() error") + imp := NewImporter(db) blocks := 0 txCount := 0 @@ -140,10 +141,8 @@ func importTar(imp Importer, tarfile io.Reader, l *log.Logger, genesisReader io. if err != nil { maybeFail(err, l, "error decoding genesis, %v", err) } - initState, err := util.CreateInitState(&genesis) - maybeFail(err, l, "Error getting genesis block") - ld, err := ledger.OpenLedger(logging.NewLogger(), "ledger", true, initState, config.GetDefaultLocal()) + ld, err := util.MakeLedger(l, false, &genesis, "") maybeFail(err, l, "Cannot open ledger") proc, err := blockprocessor.MakeProcessorWithLedger(ld, imp.ImportBlock) @@ -188,29 +187,15 @@ func importFile(fname string, imp Importer, l *log.Logger, genesisPath string) ( return } -func loadGenesis(db idb.IndexerDb, in io.Reader) (err error) { - var genesis bookkeeping.Genesis - gbytes, err := ioutil.ReadAll(in) - if err != nil { - return fmt.Errorf("error reading genesis, %v", err) - } - err = protocol.DecodeJSON(gbytes, &genesis) - if err != nil { - return fmt.Errorf("error decoding genesis, %v", err) - } - - return db.LoadGenesis(genesis) -} - // EnsureInitialImport imports the genesis block if needed. Returns true if the initial import occurred. -func EnsureInitialImport(db idb.IndexerDb, genesisReader io.Reader, l *log.Logger) (bool, error) { +func EnsureInitialImport(db idb.IndexerDb, genesis bookkeeping.Genesis, l *log.Logger) (bool, error) { _, err := db.GetNextRoundToAccount() // Exit immediately or crash if we don't see ErrorNotInitialized. if err != idb.ErrorNotInitialized { if err != nil { return false, fmt.Errorf("getting import state, %v", err) } - err = checkGenesisHash(db, genesisReader) + err = checkGenesisHash(db, genesis) if err != nil { return false, err } @@ -218,7 +203,7 @@ func EnsureInitialImport(db idb.IndexerDb, genesisReader io.Reader, l *log.Logge } // Import genesis file from file or algod. - err = loadGenesis(db, genesisReader) + err = db.LoadGenesis(genesis) if err != nil { return false, fmt.Errorf("could not load genesis json, %v", err) } @@ -280,19 +265,11 @@ func GetGenesisFile(genesisJSONPath string, client *algod.Client, l *log.Logger) } else { l.Fatal("Neither genesis file path or algod client provided for initial import.") } + return genesisReader } -func checkGenesisHash(db idb.IndexerDb, genesisReader io.Reader) error { - var genesis bookkeeping.Genesis - gbytes, err := ioutil.ReadAll(genesisReader) - if err != nil { - return fmt.Errorf("error reading genesis, %w", err) - } - err = protocol.DecodeJSON(gbytes, &genesis) - if err != nil { - return fmt.Errorf("error decoding genesis, %w", err) - } +func checkGenesisHash(db idb.IndexerDb, genesis bookkeeping.Genesis) error { network, err := db.GetNetworkState() if errors.Is(err, idb.ErrorNotInitialized) { err = db.SetNetworkState(genesis) diff --git a/processor/blockprocessor/block_processor.go b/processor/blockprocessor/block_processor.go index 4fda13b10..5cacda195 100644 --- a/processor/blockprocessor/block_processor.go +++ b/processor/blockprocessor/block_processor.go @@ -2,25 +2,23 @@ package blockprocessor import ( "fmt" - "path/filepath" + log "github.com/sirupsen/logrus" "github.com/algorand/go-algorand/config" - algodConfig "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/ledger" "github.com/algorand/go-algorand/ledger/ledgercore" - "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/rpcs" + "github.com/algorand/indexer/accounting" + "github.com/algorand/indexer/idb" "github.com/algorand/indexer/processor" indexerledger "github.com/algorand/indexer/processor/eval" "github.com/algorand/indexer/util" ) -const prefix = "ledger" - type blockProcessor struct { handler func(block *ledgercore.ValidatedBlock) error ledger *ledger.Ledger @@ -38,13 +36,35 @@ func MakeProcessorWithLedger(l *ledger.Ledger, handler func(block *ledgercore.Va return &blockProcessor{ledger: l, handler: handler}, nil } -// MakeProcessor creates a block processor -func MakeProcessor(genesis *bookkeeping.Genesis, dbRound uint64, datadir string, handler func(block *ledgercore.ValidatedBlock) error) (processor.Processor, error) { - initState, err := util.CreateInitState(genesis) - if err != nil { - return nil, fmt.Errorf("MakeProcessor() err: %w", err) +// MakeProcessorWithLedgerInit creates a block processor and initializes the ledger. +func MakeProcessorWithLedgerInit(logger *log.Logger, catchpoint string, genesis *bookkeeping.Genesis, nextDBRound uint64, opts idb.IndexerDbOptions, handler func(block *ledgercore.ValidatedBlock) error) (processor.Processor, error) { + if nextDBRound > 0 { + if catchpoint != "" { + round, _, err := ledgercore.ParseCatchpointLabel(catchpoint) + if err != nil { + return &blockProcessor{}, fmt.Errorf("MakeProcessorWithCatchup() label err: %w", err) + } + if uint64(round) >= nextDBRound { + logger.Warnf("round for given catchpoint is ahead of db round. skip fast catchup") + } else { + err = InitializeLedgerFastCatchup(logger, catchpoint, opts.IndexerDatadir, *genesis) + if err != nil { + return &blockProcessor{}, fmt.Errorf("MakeProcessorWithCatchup() fast catchup err: %w", err) + } + } + + } + err := InitializeLedgerSimple(logger, nextDBRound-1, &opts) + if err != nil { + return &blockProcessor{}, fmt.Errorf("MakeProcessorWithCatchup() slow catchup err: %w", err) + } } - l, err := ledger.OpenLedger(logging.NewLogger(), filepath.Join(datadir, prefix), false, initState, algodConfig.GetDefaultLocal()) + return MakeProcessor(logger, genesis, nextDBRound, opts.AlgodDataDir, handler) +} + +// MakeProcessor creates a block processor +func MakeProcessor(logger *log.Logger, genesis *bookkeeping.Genesis, dbRound uint64, datadir string, handler func(block *ledgercore.ValidatedBlock) error) (processor.Processor, error) { + l, err := util.MakeLedger(logger, false, genesis, datadir) if err != nil { return nil, fmt.Errorf("MakeProcessor() err: %w", err) } diff --git a/processor/blockprocessor/block_processor_test.go b/processor/blockprocessor/block_processor_test.go index 2ff7f9471..00a9aaa36 100644 --- a/processor/blockprocessor/block_processor_test.go +++ b/processor/blockprocessor/block_processor_test.go @@ -4,6 +4,9 @@ import ( "fmt" "testing" + test2 "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/require" + "github.com/algorand/go-algorand/agreement" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/ledger/ledgercore" @@ -14,7 +17,9 @@ import ( ) func TestProcess(t *testing.T) { - l := test.MakeTestLedger("local_ledger") + log, _ := test2.NewNullLogger() + l, err := test.MakeTestLedger(log) + require.NoError(t, err) defer l.Close() genesisBlock, err := l.Block(basics.Round(0)) assert.Nil(t, err) @@ -46,7 +51,9 @@ func TestProcess(t *testing.T) { } func TestFailedProcess(t *testing.T) { - l := test.MakeTestLedger("local_ledger") + log, _ := test2.NewNullLogger() + l, err := test.MakeTestLedger(log) + require.NoError(t, err) defer l.Close() // invalid processor pr, err := block_processor.MakeProcessorWithLedger(nil, nil) diff --git a/migrations/local_ledger/migration.go b/processor/blockprocessor/initialize.go similarity index 72% rename from migrations/local_ledger/migration.go rename to processor/blockprocessor/initialize.go index 2a2b21f23..73e327eda 100644 --- a/migrations/local_ledger/migration.go +++ b/processor/blockprocessor/initialize.go @@ -1,4 +1,4 @@ -package localledger +package blockprocessor import ( "context" @@ -11,22 +11,22 @@ import ( "github.com/algorand/go-algorand-sdk/client/v2/algod" algodConfig "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/node" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/rpcs" + log "github.com/sirupsen/logrus" + "github.com/algorand/indexer/fetcher" "github.com/algorand/indexer/idb" "github.com/algorand/indexer/processor" - "github.com/algorand/indexer/processor/blockprocessor" - log "github.com/sirupsen/logrus" ) -// RunMigrationSimple executes the migration core functionality. -func RunMigrationSimple(round uint64, opts *idb.IndexerDbOptions) error { - logger := log.New() +// InitializeLedgerSimple executes the migration core functionality. +func InitializeLedgerSimple(logger *log.Logger, round uint64, opts *idb.IndexerDbOptions) error { ctx, cf := context.WithCancel(context.Background()) defer cf() { @@ -44,20 +44,20 @@ func RunMigrationSimple(round uint64, opts *idb.IndexerDbOptions) error { var bot fetcher.Fetcher var err error if opts.IndexerDatadir == "" { - return fmt.Errorf("RunMigrationSimple() err: indexer data directory missing") + return fmt.Errorf("InitializeLedgerSimple() err: indexer data directory missing") } // create algod client bot, err = getFetcher(opts) if err != nil { - return fmt.Errorf("RunMigrationSimple() err: %w", err) + return fmt.Errorf("InitializeLedgerSimple() err: %w", err) } logger.Info("initializing ledger") genesis, err := getGenesis(bot.Algod()) if err != nil { - return fmt.Errorf("RunMigrationSimple() err: %w", err) + return fmt.Errorf("InitializeLedgerSimple() err: %w", err) } - proc, err := blockprocessor.MakeProcessor(&genesis, round, opts.IndexerDatadir, nil) + proc, err := MakeProcessor(logger, &genesis, round, opts.IndexerDatadir, nil) if err != nil { return fmt.Errorf("RunMigration() err: %w", err) } @@ -81,33 +81,20 @@ func RunMigrationSimple(round uint64, opts *idb.IndexerDbOptions) error { return nil } -// RunMigrationFastCatchup executes the migration core functionality. -func RunMigrationFastCatchup(logger logging.Logger, catchpoint string, opts *idb.IndexerDbOptions) error { - if opts.IndexerDatadir == "" { - return fmt.Errorf("RunMigrationFastCatchup() err: indexer data directory missing") - } - // catchpoint round - round, _, err := ledgercore.ParseCatchpointLabel(catchpoint) - if err != nil { - return fmt.Errorf("RunMigrationFastCatchup() err: %w", err) - } - // create algod client - bot, err := getFetcher(opts) - if err != nil { - return fmt.Errorf("RunMigrationFastCatchup() err: %w", err) - } - genesis, err := getGenesis(bot.Algod()) - if err != nil { - return fmt.Errorf("RunMigrationFastCatchup() err: %w", err) - } +func fullNodeCatchup(logger *log.Logger, round basics.Round, catchpoint, dataDir string, genesis bookkeeping.Genesis) error { + wrappedLogger := logging.NewWrappedLogger(logger) + node, err := node.MakeFull( - logging.NewLogger(), - opts.IndexerDatadir, + wrappedLogger, + dataDir, algodConfig.AutogenLocal, nil, genesis) + if err != nil { + return err + } // remove node directory after when exiting fast catchup mode - defer os.RemoveAll(filepath.Join(opts.IndexerDatadir, genesis.ID())) + defer os.RemoveAll(filepath.Join(dataDir, genesis.ID())) node.Start() time.Sleep(5 * time.Second) logger.Info("algod node running") @@ -125,6 +112,27 @@ func RunMigrationFastCatchup(logger logging.Logger, catchpoint string, opts *idb logger.Info("fast catchup completed") node.Stop() logger.Info("algod node stopped") + return nil +} + +// InitializeLedgerFastCatchup executes the migration core functionality. +func InitializeLedgerFastCatchup(logger *log.Logger, catchpoint, dataDir string, genesis bookkeeping.Genesis) error { + if dataDir == "" { + return fmt.Errorf("InitializeLedgerFastCatchup() err: indexer data directory missing") + } + // catchpoint round + round, _, err := ledgercore.ParseCatchpointLabel(catchpoint) + if err != nil { + return fmt.Errorf("InitializeLedgerFastCatchup() err: %w", err) + } + + // TODO: switch to catchup service catchup. + //err = internal.CatchupServiceCatchup(logger, round, catchpoint, dataDir, genesis) + err = fullNodeCatchup(logger, round, catchpoint, dataDir, genesis) + if err != nil { + return fmt.Errorf("fullNodeCatchup() err: %w", err) + } + // move ledger to indexer directory ledgerFiles := []string{ "ledger.block.sqlite", @@ -135,9 +143,9 @@ func RunMigrationFastCatchup(logger logging.Logger, catchpoint string, opts *idb "ledger.tracker.sqlite-wal", } for _, f := range ledgerFiles { - err = os.Rename(filepath.Join(opts.IndexerDatadir, genesis.ID(), f), filepath.Join(opts.IndexerDatadir, f)) + err = os.Rename(filepath.Join(dataDir, genesis.ID(), f), filepath.Join(dataDir, f)) if err != nil { - return fmt.Errorf("RunMigrationFastCatchup() err: %w", err) + return fmt.Errorf("InitializeLedgerFastCatchup() err: %w", err) } } return nil @@ -200,15 +208,15 @@ func getFetcher(opts *idb.IndexerDbOptions) (fetcher.Fetcher, error) { if opts.AlgodDataDir != "" { bot, err = fetcher.ForDataDir(opts.AlgodDataDir, logger) if err != nil { - return nil, fmt.Errorf("RunMigrationFastCatchup() err: %w", err) + return nil, fmt.Errorf("InitializeLedgerFastCatchup() err: %w", err) } } else if opts.AlgodAddr != "" && opts.AlgodToken != "" { bot, err = fetcher.ForNetAndToken(opts.AlgodAddr, opts.AlgodToken, logger) if err != nil { - return nil, fmt.Errorf("RunMigrationFastCatchup() err: %w", err) + return nil, fmt.Errorf("InitializeLedgerFastCatchup() err: %w", err) } } else { - return nil, fmt.Errorf("RunMigrationFastCatchup() err: unable to create algod client") + return nil, fmt.Errorf("InitializeLedgerFastCatchup() err: unable to create algod client") } return bot, nil } diff --git a/migrations/local_ledger/migration_test.go b/processor/blockprocessor/initialize_test.go similarity index 78% rename from migrations/local_ledger/migration_test.go rename to processor/blockprocessor/initialize_test.go index cbeb0e619..25a10aef3 100644 --- a/migrations/local_ledger/migration_test.go +++ b/processor/blockprocessor/initialize_test.go @@ -1,24 +1,24 @@ -package localledger +package blockprocessor import ( "fmt" "os" - "path/filepath" "testing" + "github.com/jarcoal/httpmock" + "github.com/sirupsen/logrus" + test2 "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/algorand/go-algorand-sdk/client/v2/algod" "github.com/algorand/go-algorand-sdk/encoding/json" "github.com/algorand/go-algorand-sdk/encoding/msgpack" - algodConfig "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/data/basics" - "github.com/algorand/go-algorand/ledger" - "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/rpcs" + "github.com/algorand/indexer/idb" "github.com/algorand/indexer/util" "github.com/algorand/indexer/util/test" - "github.com/jarcoal/httpmock" - "github.com/stretchr/testify/assert" ) func TestRunMigration(t *testing.T) { @@ -64,21 +64,20 @@ func TestRunMigration(t *testing.T) { } // migrate 3 rounds - err = RunMigrationSimple(3, &opts) - assert.NoError(t, err) - initState, err := util.CreateInitState(&genesis) + err = InitializeLedgerSimple(logrus.New(), 3, &opts) assert.NoError(t, err) - l, err := ledger.OpenLedger(logging.NewLogger(), filepath.Join(opts.IndexerDatadir, "ledger"), false, initState, algodConfig.GetDefaultLocal()) + log, _ := test2.NewNullLogger() + l, err := util.MakeLedger(log, false, &genesis, opts.IndexerDatadir) assert.NoError(t, err) // check 3 rounds written to ledger assert.Equal(t, uint64(3), uint64(l.Latest())) l.Close() // migration continues from last round - err = RunMigrationSimple(6, &opts) + err = InitializeLedgerSimple(logrus.New(), 6, &opts) assert.NoError(t, err) - l, err = ledger.OpenLedger(logging.NewLogger(), filepath.Join(opts.IndexerDatadir, "ledger"), false, initState, algodConfig.GetDefaultLocal()) + l, err = util.MakeLedger(log, false, &genesis, opts.IndexerDatadir) assert.NoError(t, err) assert.Equal(t, uint64(6), uint64(l.Latest())) l.Close() diff --git a/processor/blockprocessor/internal/catchupservice.go b/processor/blockprocessor/internal/catchupservice.go new file mode 100644 index 000000000..59f6d5995 --- /dev/null +++ b/processor/blockprocessor/internal/catchupservice.go @@ -0,0 +1,114 @@ +package internal + +import ( + "context" + "fmt" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/algorand/go-algorand/catchup" + "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-algorand/network" + + "github.com/algorand/indexer/util" +) + +// makeNodeProvider initializes the node provider. +func makeNodeProvider(ctx context.Context) nodeProvider { + return nodeProvider{ + ctx: ctx, + } +} + +// nodeProvider implements two services required to start the catchpoint catchup service. +type nodeProvider struct { + ctx context.Context +} + +// IsParticipating is from the NodeInfo interface used by the WebsocketNetwork +// in order to determine which gossip messages to subscribe to. +func (n nodeProvider) IsParticipating() bool { + return false +} + +// SetCatchpointCatchupMode is a callback provided by the catchpoint catchup +// service which notifies listeners that the catchup mode is changing. The +// context channel is used to to stop the catchpoint service, or if the channel +// is closed, indicate that the listener is stopping. +func (n nodeProvider) SetCatchpointCatchupMode(enabled bool) (newContextCh <-chan context.Context) { + ch := make(chan context.Context) + go func() { + if enabled { + ch <- n.ctx + } + }() + return ch +} + +// CatchupServiceCatchup initializes a ledger using the catchup service. +func CatchupServiceCatchup(logger *log.Logger, round basics.Round, catchpoint, dataDir string, genesis bookkeeping.Genesis) error { + logger.Infof("Starting catchup service with catchpoint: %s", catchpoint) + wrappedLogger := logging.NewWrappedLogger(logger) + + start := time.Now() + ctx := context.Background() + cfg := config.AutogenLocal + + node := makeNodeProvider(ctx) + l, err := util.MakeLedger(logger, false, &genesis, dataDir) + if err != nil { + return fmt.Errorf("CatchupServiceCatchup() MakeLedger err: %w", err) + } + + p2pNode, err := network.NewWebsocketNetwork(wrappedLogger, cfg, nil, genesis.ID(), genesis.Network, node) + if err != nil { + return fmt.Errorf("CatchupServiceCatchup() NewWebsocketNetwork err: %w", err) + } + // TODO: Do we need to implement the peer prioritization interface? + //p2pNode.SetPrioScheme(node) + p2pNode.Start() + + // TODO: if the ledger already has a catchpoint, use MakeResumedCatchpointCatchupService + service, err := catchup.MakeNewCatchpointCatchupService( + catchpoint, + node, + wrappedLogger, + p2pNode, + l, + cfg, + ) + if err != nil { + return fmt.Errorf("CatchupServiceCatchup() MakeNewCatchpointCatchupService err: %w", err) + } + + time.Sleep(5 * time.Second) + service.Start(ctx) + + running := true + for running { + time.Sleep(5 * time.Second) + stats := service.GetStatistics() + running = stats.TotalBlocks == 0 || stats.TotalBlocks != stats.VerifiedBlocks + + switch { + case !running: + break + case stats.VerifiedBlocks > 0: + logger.Infof("catchup phase 4 of 4 (Verified Blocks): %d / %d", stats.VerifiedBlocks, stats.TotalBlocks) + case stats.AcquiredBlocks > 0: + logger.Infof("catchup phase 3 of 4 (Aquired Blocks): %d / %d", stats.AcquiredBlocks, stats.TotalBlocks) + case stats.VerifiedAccounts > 0: + logger.Infof("catchup phase 2 of 4 (Verified Accounts): %d / %d", stats.VerifiedAccounts, stats.TotalAccounts) + case stats.ProcessedAccounts > 0: + logger.Infof("catchup phase 1 of 4 (Processed Accounts): %d / %d", stats.ProcessedAccounts, stats.TotalAccounts) + } + } + + logger.Infof("Catchup finished in %s", time.Since(start)) + l.WaitForCommit(l.Latest()) + return nil +} diff --git a/processor/eval/ledger_for_evaluator_test.go b/processor/eval/ledger_for_evaluator_test.go index bc95cd2b4..441073dfb 100644 --- a/processor/eval/ledger_for_evaluator_test.go +++ b/processor/eval/ledger_for_evaluator_test.go @@ -4,21 +4,30 @@ import ( "crypto/rand" "testing" + test2 "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/algorand/go-algorand/agreement" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/ledger" "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/rpcs" + block_processor "github.com/algorand/indexer/processor/blockprocessor" indxLeder "github.com/algorand/indexer/processor/eval" "github.com/algorand/indexer/util/test" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) -func TestLedgerForEvaluatorLatestBlockHdr(t *testing.T) { +func makeTestLedger(t *testing.T) *ledger.Ledger { + log, _ := test2.NewNullLogger() + l, err := test.MakeTestLedger(log) + require.NoError(t, err) + return l +} - l := test.MakeTestLedger("ledger") +func TestLedgerForEvaluatorLatestBlockHdr(t *testing.T) { + l := makeTestLedger(t) defer l.Close() pr, _ := block_processor.MakeProcessorWithLedger(l, nil) txn := test.MakePaymentTxn(0, 100, 0, 1, 1, @@ -40,7 +49,7 @@ func TestLedgerForEvaluatorLatestBlockHdr(t *testing.T) { } func TestLedgerForEvaluatorAccountDataBasic(t *testing.T) { - l := test.MakeTestLedger("ledger") + l := makeTestLedger(t) defer l.Close() block_processor.MakeProcessorWithLedger(l, nil) accountData, _, err := l.LookupWithoutRewards(0, test.AccountB) @@ -60,7 +69,7 @@ func TestLedgerForEvaluatorAccountDataBasic(t *testing.T) { } func TestLedgerForEvaluatorAccountDataMissingAccount(t *testing.T) { - l := test.MakeTestLedger("ledger") + l := makeTestLedger(t) ld := indxLeder.MakeLedgerForEvaluator(l) defer l.Close() defer ld.Close() @@ -76,7 +85,7 @@ func TestLedgerForEvaluatorAccountDataMissingAccount(t *testing.T) { } func TestLedgerForEvaluatorAsset(t *testing.T) { - l := test.MakeTestLedger("ledger") + l := makeTestLedger(t) defer l.Close() pr, _ := block_processor.MakeProcessorWithLedger(l, nil) @@ -141,7 +150,7 @@ func TestLedgerForEvaluatorAsset(t *testing.T) { } func TestLedgerForEvaluatorApp(t *testing.T) { - l := test.MakeTestLedger("ledger") + l := makeTestLedger(t) defer l.Close() pr, _ := block_processor.MakeProcessorWithLedger(l, nil) @@ -223,7 +232,7 @@ func TestLedgerForEvaluatorApp(t *testing.T) { } func TestLedgerForEvaluatorFetchAllResourceTypes(t *testing.T) { - l := test.MakeTestLedger("ledger") + l := makeTestLedger(t) defer l.Close() pr, _ := block_processor.MakeProcessorWithLedger(l, nil) @@ -273,7 +282,7 @@ func TestLedgerForEvaluatorFetchAllResourceTypes(t *testing.T) { } func TestLedgerForEvaluatorLookupMultipleAccounts(t *testing.T) { - l := test.MakeTestLedger("ledger") + l := makeTestLedger(t) defer l.Close() block_processor.MakeProcessorWithLedger(l, nil) @@ -301,7 +310,7 @@ func TestLedgerForEvaluatorLookupMultipleAccounts(t *testing.T) { } func TestLedgerForEvaluatorAssetCreatorBasic(t *testing.T) { - l := test.MakeTestLedger("ledger") + l := makeTestLedger(t) defer l.Close() pr, _ := block_processor.MakeProcessorWithLedger(l, nil) @@ -332,7 +341,7 @@ func TestLedgerForEvaluatorAssetCreatorBasic(t *testing.T) { } func TestLedgerForEvaluatorAssetCreatorDeleted(t *testing.T) { - l := test.MakeTestLedger("ledger") + l := makeTestLedger(t) defer l.Close() pr, _ := block_processor.MakeProcessorWithLedger(l, nil) @@ -361,7 +370,7 @@ func TestLedgerForEvaluatorAssetCreatorDeleted(t *testing.T) { func TestLedgerForEvaluatorAssetCreatorMultiple(t *testing.T) { - l := test.MakeTestLedger("ledger") + l := makeTestLedger(t) defer l.Close() pr, _ := block_processor.MakeProcessorWithLedger(l, nil) @@ -415,7 +424,7 @@ func TestLedgerForEvaluatorAssetCreatorMultiple(t *testing.T) { } func TestLedgerForEvaluatorAppCreatorBasic(t *testing.T) { - l := test.MakeTestLedger("ledger") + l := makeTestLedger(t) defer l.Close() pr, _ := block_processor.MakeProcessorWithLedger(l, nil) @@ -447,7 +456,7 @@ func TestLedgerForEvaluatorAppCreatorBasic(t *testing.T) { } func TestLedgerForEvaluatorAppCreatorDeleted(t *testing.T) { - l := test.MakeTestLedger("ledger") + l := makeTestLedger(t) defer l.Close() pr, _ := block_processor.MakeProcessorWithLedger(l, nil) @@ -476,7 +485,7 @@ func TestLedgerForEvaluatorAppCreatorDeleted(t *testing.T) { func TestLedgerForEvaluatorAppCreatorMultiple(t *testing.T) { - l := test.MakeTestLedger("ledger") + l := makeTestLedger(t) defer l.Close() pr, _ := block_processor.MakeProcessorWithLedger(l, nil) @@ -531,7 +540,7 @@ func TestLedgerForEvaluatorAppCreatorMultiple(t *testing.T) { } func TestLedgerForEvaluatorAccountTotals(t *testing.T) { - l := test.MakeTestLedger("ledger") + l := makeTestLedger(t) defer l.Close() ld := indxLeder.MakeLedgerForEvaluator(l) diff --git a/test/common.sh b/test/common.sh index 4da52ac0c..f70484329 100755 --- a/test/common.sh +++ b/test/common.sh @@ -2,6 +2,7 @@ # The cleanup hook ensures these containers are removed when the script exits. POSTGRES_CONTAINER=test-container +export INDEXER_DATA=/tmp/e2e_test/ NET=localhost:8981 CURL_TEMPFILE=curl_out.txt @@ -197,10 +198,10 @@ function start_indexer_with_connection_string() { # we may start up from canned data, but need to update for the current running binary. RO="--allow-migration" fi + mkdir -p $INDEXER_DATA ALGORAND_DATA= ../cmd/algorand-indexer/algorand-indexer daemon \ -S $NET "$RO" \ -P "$1" \ - -i /tmp \ --enable-all-parameters \ "$RO" \ --pidfile $PIDFILE 2>&1 > /dev/null & @@ -289,6 +290,10 @@ function kill_indexer() { if test -f "$PIDFILE"; then kill -9 $(cat "$PIDFILE") > /dev/null 2>&1 || true rm $PIDFILE + rm -rf $INDEXER_DATA + pwd + ls -l + rm ledger*sqlite* || true fi } diff --git a/third_party/go-algorand b/third_party/go-algorand index c6a4ddc4d..688b3d715 160000 --- a/third_party/go-algorand +++ b/third_party/go-algorand @@ -1 +1 @@ -Subproject commit c6a4ddc4d057c95905ded86e6dbfc2a381d6f5d8 +Subproject commit 688b3d7159b4644e1d6d6993dcef221c3eea7c96 diff --git a/util/ledger_util.go b/util/ledger_util.go new file mode 100644 index 000000000..e4f184e7c --- /dev/null +++ b/util/ledger_util.go @@ -0,0 +1,73 @@ +package util + +import ( + "fmt" + "io" + "io/ioutil" + "path/filepath" + + log "github.com/sirupsen/logrus" + + algodConfig "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/ledger" + "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-algorand/protocol" +) + +// ReadGenesis converts a reader into a Genesis file. +func ReadGenesis(in io.Reader) (bookkeeping.Genesis, error) { + var genesis bookkeeping.Genesis + if in == nil { + return bookkeeping.Genesis{}, fmt.Errorf("ReadGenesis() err: reader is nil") + } + gbytes, err := ioutil.ReadAll(in) + if err != nil { + return bookkeeping.Genesis{}, fmt.Errorf("ReadGenesis() err: %w", err) + } + err = protocol.DecodeJSON(gbytes, &genesis) + if err != nil { + return bookkeeping.Genesis{}, fmt.Errorf("ReadGenesis() decode err: %w", err) + } + return genesis, nil +} + +// CreateInitState makes an initState +func createInitState(genesis *bookkeeping.Genesis) (ledgercore.InitState, error) { + balances, err := genesis.Balances() + if err != nil { + return ledgercore.InitState{}, fmt.Errorf("MakeProcessor() err: %w", err) + } + genesisBlock, err := bookkeeping.MakeGenesisBlock(genesis.Proto, balances, genesis.ID(), genesis.Hash()) + if err != nil { + return ledgercore.InitState{}, fmt.Errorf("MakeProcessor() err: %w", err) + } + + accounts := make(map[basics.Address]basics.AccountData) + for _, alloc := range genesis.Allocation { + address, err := basics.UnmarshalChecksumAddress(alloc.Address) + if err != nil { + return ledgercore.InitState{}, fmt.Errorf("openLedger() decode address err: %w", err) + } + accounts[address] = alloc.State + } + initState := ledgercore.InitState{ + Block: genesisBlock, + Accounts: accounts, + GenesisHash: genesisBlock.GenesisHash(), + } + return initState, nil +} + +// MakeLedger opens a ledger, initializing if necessary. +func MakeLedger(logger *log.Logger, inMemory bool, genesis *bookkeeping.Genesis, dataDir string) (*ledger.Ledger, error) { + const prefix = "ledger" + dbPrefix := filepath.Join(dataDir, prefix) + initState, err := createInitState(genesis) + if err != nil { + return nil, fmt.Errorf("MakeProcessor() err: %w", err) + } + return ledger.OpenLedger(logging.NewWrappedLogger(logger), dbPrefix, inMemory, initState, algodConfig.GetDefaultLocal()) +} diff --git a/util/ledger_util_test.go b/util/ledger_util_test.go new file mode 100644 index 000000000..1bdb390e9 --- /dev/null +++ b/util/ledger_util_test.go @@ -0,0 +1,40 @@ +package util + +import ( + "io" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/algorand/go-algorand-sdk/encoding/json" + "github.com/algorand/go-algorand/data/bookkeeping" +) + +func TestReadGenesis(t *testing.T) { + var reader io.Reader + // nil reader + _, err := ReadGenesis(reader) + assert.Contains(t, err.Error(), "reader is nil") + // no match struct field + genesisStr := "{\"version\": 2}" + reader = strings.NewReader(genesisStr) + _, err = ReadGenesis(reader) + assert.Contains(t, err.Error(), "json decode error") + + genesis := bookkeeping.Genesis{ + SchemaID: "1", + Network: "test", + Proto: "test", + RewardsPool: "AAAA", + FeeSink: "AAAA", + } + + // read and decode genesis + reader = strings.NewReader(string(json.Encode(genesis))) + _, err = ReadGenesis(reader) + assert.Nil(t, err) + // read from empty reader + _, err = ReadGenesis(reader) + assert.Contains(t, err.Error(), "EOF") +} diff --git a/util/test/testutil.go b/util/test/testutil.go index 9b97c19cd..d3479d2d2 100644 --- a/util/test/testutil.go +++ b/util/test/testutil.go @@ -7,13 +7,12 @@ import ( "os" "runtime" - "github.com/algorand/go-algorand/config" + log "github.com/sirupsen/logrus" + "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/ledger" - "github.com/algorand/go-algorand/logging" "github.com/algorand/indexer/idb" "github.com/algorand/indexer/util" - "github.com/sirupsen/logrus" ) var quiet = false @@ -119,15 +118,7 @@ func PrintTxnQuery(db idb.IndexerDb, q idb.TransactionFilter) { } // MakeTestLedger creates an in-memory local ledger -func MakeTestLedger(prefix string) *ledger.Ledger { +func MakeTestLedger(logger *log.Logger) (*ledger.Ledger, error) { genesis := MakeGenesis() - initState, err := util.CreateInitState(&genesis) - if err != nil { - logrus.Panicf("test init err: %v", err) - } - l, err := ledger.OpenLedger(logging.NewLogger(), prefix, true, initState, config.GetDefaultLocal()) - if err != nil { - logrus.Panicf("test init err: %v", err) - } - return l + return util.MakeLedger(logger, true, &genesis, "ledger") } diff --git a/util/util.go b/util/util.go index d43ee484e..52a366ea7 100644 --- a/util/util.go +++ b/util/util.go @@ -7,9 +7,6 @@ import ( "unicode" "unicode/utf8" - "github.com/algorand/go-algorand/data/basics" - "github.com/algorand/go-algorand/data/bookkeeping" - "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-codec/codec" ) @@ -56,33 +53,6 @@ func JSONOneLine(obj interface{}) string { return string(b) } -// CreateInitState makes an initState -func CreateInitState(genesis *bookkeeping.Genesis) (ledgercore.InitState, error) { - balances, err := genesis.Balances() - if err != nil { - return ledgercore.InitState{}, fmt.Errorf("MakeProcessor() err: %w", err) - } - genesisBlock, err := bookkeeping.MakeGenesisBlock(genesis.Proto, balances, genesis.ID(), genesis.Hash()) - if err != nil { - return ledgercore.InitState{}, fmt.Errorf("MakeProcessor() err: %w", err) - } - - accounts := make(map[basics.Address]basics.AccountData) - for _, alloc := range genesis.Allocation { - address, err := basics.UnmarshalChecksumAddress(alloc.Address) - if err != nil { - return ledgercore.InitState{}, fmt.Errorf("openLedger() decode address err: %w", err) - } - accounts[address] = alloc.State - } - initState := ledgercore.InitState{ - Block: genesisBlock, - Accounts: accounts, - GenesisHash: genesisBlock.GenesisHash(), - } - return initState, nil -} - func init() { oneLineJSONCodecHandle = new(codec.JsonHandle) oneLineJSONCodecHandle.ErrorIfNoField = true