Skip to content

Commit

Permalink
LocalLedger Refactoring + Catchpoint Service (#1049)
Browse files Browse the repository at this point in the history
Part 1

    cleanup genesis file access.
    put node catchup into a function that can be swapped out with the catchup service.
    pass the indexer logger into the block processor.
    move open ledger into a util function, and move the initial state util function into a new ledger util file.
    add initial catchupservice implementation.
    move ledger init from daemon.go to constructor.
    Merge multiple read genesis functions.

Part 2

    Merge local_ledger migration package into blockprocessor.
    Rename Migration to Initialize
    Use logger in catchup service catchup

Part 3

    Update submodule and use NewWrappedLogger.
    Make util.CreateInitState private
  • Loading branch information
winder committed Jun 23, 2022
1 parent 5f5c7fe commit 5584c6d
Show file tree
Hide file tree
Showing 19 changed files with 437 additions and 273 deletions.
21 changes: 12 additions & 9 deletions api/handlers_e2e_test.go
Expand Up @@ -12,26 +12,27 @@ import (
"testing"
"time"

"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/crypto/merklesignature"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/indexer/processor"
"github.com/algorand/indexer/processor/blockprocessor"
"github.com/labstack/echo/v4"
"github.com/sirupsen/logrus"
test2 "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand-sdk/encoding/json"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/crypto/merklesignature"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/indexer/processor"

"github.com/algorand/indexer/api/generated/v2"
"github.com/algorand/indexer/idb"
"github.com/algorand/indexer/idb/postgres"
pgtest "github.com/algorand/indexer/idb/postgres/testing"
"github.com/algorand/indexer/processor/blockprocessor"
"github.com/algorand/indexer/util/test"
)

Expand Down Expand Up @@ -74,7 +75,9 @@ func setupIdb(t *testing.T, genesis bookkeeping.Genesis) (*postgres.IndexerDb, f
err = db.LoadGenesis(genesis)
require.NoError(t, err)

l := test.MakeTestLedger("ledger")
log, _ := test2.NewNullLogger()
l, err := test.MakeTestLedger(log)
require.NoError(t, err)
proc, err := blockprocessor.MakeProcessorWithLedger(l, db.AddBlock)
require.NoError(t, err, "failed to open ledger")
return db, newShutdownFunc, proc, l
Expand Down
58 changes: 10 additions & 48 deletions cmd/algorand-indexer/daemon.go
Expand Up @@ -3,8 +3,6 @@ package main
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/signal"
"path/filepath"
Expand All @@ -14,24 +12,22 @@ import (
"syscall"
"time"

"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/go-algorand/util"

"github.com/algorand/indexer/api"
"github.com/algorand/indexer/api/generated/v2"
"github.com/algorand/indexer/config"
"github.com/algorand/indexer/fetcher"
"github.com/algorand/indexer/idb"
"github.com/algorand/indexer/importer"
localledger "github.com/algorand/indexer/migrations/local_ledger"
"github.com/algorand/indexer/processor"
"github.com/algorand/indexer/processor/blockprocessor"
iutil "github.com/algorand/indexer/util"
"github.com/algorand/indexer/util/metrics"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

var (
Expand Down Expand Up @@ -233,39 +229,21 @@ var daemonCmd = &cobra.Command{

// Initial import if needed.
genesisReader := importer.GetGenesisFile(genesisJSONPath, bot.Algod(), logger)
_, err := importer.EnsureInitialImport(db, genesisReader, logger)
genesis, err := iutil.ReadGenesis(genesisReader)
maybeFail(err, "Error reading genesis file")

_, err = importer.EnsureInitialImport(db, genesis, logger)
maybeFail(err, "importer.EnsureInitialImport() error")

// sync local ledger
nextDBRound, err := db.GetNextRoundToAccount()
maybeFail(err, "Error getting DB round")
if nextDBRound > 0 {
if catchpoint != "" {
round, _, err := ledgercore.ParseCatchpointLabel(catchpoint)
if err != nil {
maybeFail(err, "catchpoint error")
}
if uint64(round) >= nextDBRound {
logger.Warnf("round for given catchpoint is ahead of db round. skip fast catchup")
} else {
err = localledger.RunMigrationFastCatchup(logging.NewLogger(), catchpoint, &opts)
maybeFail(err, "Error running ledger migration in fast catchup mode")
}

}
err = localledger.RunMigrationSimple(nextDBRound-1, &opts)
maybeFail(err, "Error running ledger migration")
}

logger.Info("Initializing block import handler.")
imp := importer.NewImporter(db)

logger.Info("Initializing local ledger.")
genesisReader = importer.GetGenesisFile(genesisJSONPath, bot.Algod(), logger)
genesis, err := readGenesis(genesisReader)
maybeFail(err, "Error reading genesis file")

proc, err := blockprocessor.MakeProcessor(&genesis, nextDBRound, indexerDataDir, imp.ImportBlock)
proc, err := blockprocessor.MakeProcessorWithLedgerInit(logger, catchpoint, &genesis, nextDBRound, opts, imp.ImportBlock)
if err != nil {
maybeFail(err, "blockprocessor.MakeProcessor() err %v", err)
}
Expand Down Expand Up @@ -450,19 +428,3 @@ func handleBlock(block *rpcs.EncodedBlockCert, proc processor.Processor) error {

return nil
}

func readGenesis(reader io.Reader) (bookkeeping.Genesis, error) {
var genesis bookkeeping.Genesis
if reader == nil {
return bookkeeping.Genesis{}, fmt.Errorf("readGenesis() err: reader is nil")
}
gbytes, err := ioutil.ReadAll(reader)
if err != nil {
return bookkeeping.Genesis{}, fmt.Errorf("readGenesis() err: %w", err)
}
err = protocol.DecodeJSON(gbytes, &genesis)
if err != nil {
return bookkeeping.Genesis{}, fmt.Errorf("readGenesis() err: %w", err)
}
return genesis, nil
}
43 changes: 8 additions & 35 deletions cmd/algorand-indexer/daemon_test.go
Expand Up @@ -3,20 +3,20 @@ package main
import (
"context"
"errors"
"io"
"strings"
"sync"
"testing"
"time"

"github.com/algorand/go-algorand-sdk/encoding/json"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/rpcs"

"github.com/algorand/indexer/processor/blockprocessor"
itest "github.com/algorand/indexer/util/test"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
)

type mockImporter struct {
Expand All @@ -39,7 +39,9 @@ func TestImportRetryAndCancel(t *testing.T) {

// create handler with mock importer and start, it should generate errors until cancelled.
imp := &mockImporter{}
l := itest.MakeTestLedger("ledger")
ledgerLogger, _ := test.NewNullLogger()
l, err := itest.MakeTestLedger(ledgerLogger)
require.NoError(t, err)
defer l.Close()
proc, err := blockprocessor.MakeProcessorWithLedger(l, nil)
assert.Nil(t, err)
Expand Down Expand Up @@ -73,32 +75,3 @@ func TestImportRetryAndCancel(t *testing.T) {
cancel()
wg.Wait()
}

func TestReadGenesis(t *testing.T) {
var reader io.Reader
// nil reader
_, err := readGenesis(reader)
assert.Contains(t, err.Error(), "readGenesis() err: reader is nil")
// no match struct field
genesisStr := "{\"version\": 2}"
reader = strings.NewReader(genesisStr)
_, err = readGenesis(reader)
assert.Contains(t, err.Error(), "json decode error")

genesis := bookkeeping.Genesis{
SchemaID: "1",
Network: "test",
Proto: "test",
RewardsPool: "AAAA",
FeeSink: "AAAA",
}

// read and decode genesis
reader = strings.NewReader(string(json.Encode(genesis)))
_, err = readGenesis(reader)
assert.Nil(t, err)
// read from empty reader
_, err = readGenesis(reader)
assert.Contains(t, err.Error(), "readGenesis() err: EOF")

}
16 changes: 7 additions & 9 deletions cmd/import-validator/core/service.go
Expand Up @@ -9,7 +9,7 @@ import (
"sync"
"time"

"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"

"github.com/algorand/go-algorand-sdk/client/v2/algod"
"github.com/algorand/go-algorand/agreement"
Expand Down Expand Up @@ -52,7 +52,7 @@ func getGenesis(client *algod.Client) (bookkeeping.Genesis, error) {
return res, nil
}

func openIndexerDb(postgresConnStr string, genesis *bookkeeping.Genesis, logger *logrus.Logger) (*postgres.IndexerDb, error) {
func openIndexerDb(postgresConnStr string, genesis *bookkeeping.Genesis, logger *log.Logger) (*postgres.IndexerDb, error) {
db, availableCh, err :=
postgres.OpenPostgres(postgresConnStr, idb.IndexerDbOptions{}, logger)
if err != nil {
Expand Down Expand Up @@ -92,9 +92,7 @@ func openIndexerDb(postgresConnStr string, genesis *bookkeeping.Genesis, logger
return db, nil
}

func openLedger(ledgerPath string, genesis *bookkeeping.Genesis) (*ledger.Ledger, error) {
logger := logging.NewLogger()

func openLedger(logger *log.Logger, ledgerPath string, genesis *bookkeeping.Genesis) (*ledger.Ledger, error) {
accounts := make(map[basics.Address]basics.AccountData)
for _, alloc := range genesis.Allocation {
address, err := basics.UnmarshalChecksumAddress(alloc.Address)
Expand All @@ -115,7 +113,7 @@ func openLedger(ledgerPath string, genesis *bookkeeping.Genesis) (*ledger.Ledger
}

l, err := ledger.OpenLedger(
logger, path.Join(ledgerPath, "ledger"), false, initState, config.GetDefaultLocal())
logging.NewWrappedLogger(logger), path.Join(ledgerPath, "ledger"), false, initState, config.GetDefaultLocal())
if err != nil {
return nil, fmt.Errorf("openLedger() open err: %w", err)
}
Expand Down Expand Up @@ -309,7 +307,7 @@ func checkModifiedState(db *postgres.IndexerDb, l *ledger.Ledger, block *bookkee
return nil
}

func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logger *logrus.Logger) error {
func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logger *log.Logger) error {
nextRoundIndexer, err := db.GetNextRoundToAccount()
if err != nil {
return fmt.Errorf("catchup err: %w", err)
Expand Down Expand Up @@ -391,7 +389,7 @@ func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logg

// Run is a blocking call that runs the import validator service.
func Run(args ImportValidatorArgs) {
logger := logrus.New()
logger := log.New()

bot, err := fetcher.ForNetAndToken(args.AlgodAddr, args.AlgodToken, logger)
if err != nil {
Expand All @@ -408,7 +406,7 @@ func Run(args ImportValidatorArgs) {
fmt.Printf("error opening indexer database err: %v", err)
os.Exit(1)
}
l, err := openLedger(args.AlgodLedger, &genesis)
l, err := openLedger(logger, args.AlgodLedger, &genesis)
if err != nil {
fmt.Printf("error opening algod database err: %v", err)
os.Exit(1)
Expand Down
6 changes: 5 additions & 1 deletion idb/postgres/postgres_integration_common_test.go
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"testing"

test2 "github.com/sirupsen/logrus/hooks/test"

"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/indexer/processor"
Expand Down Expand Up @@ -35,7 +37,9 @@ func setupIdb(t *testing.T, genesis bookkeeping.Genesis) (*IndexerDb, func(), pr
shutdownFunc()
}

l := test.MakeTestLedger("ledger")
log, _ := test2.NewNullLogger()
l, err := test.MakeTestLedger(log)
require.NoError(t, err)
proc, err := blockprocessor.MakeProcessorWithLedger(l, db.AddBlock)
require.NoError(t, err, "failed to open ledger")

Expand Down
35 changes: 23 additions & 12 deletions idb/postgres/postgres_integration_test.go
Expand Up @@ -10,26 +10,29 @@ import (
"testing"
"time"

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sirupsen/logrus"
test2 "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/go-codec/codec"

"github.com/algorand/indexer/api/generated/v2"
"github.com/algorand/indexer/idb"
"github.com/algorand/indexer/idb/postgres/internal/encoding"
"github.com/algorand/indexer/idb/postgres/internal/schema"
pgtest "github.com/algorand/indexer/idb/postgres/internal/testing"
pgutil "github.com/algorand/indexer/idb/postgres/internal/util"
"github.com/algorand/indexer/importer"
"github.com/algorand/indexer/processor/blockprocessor"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/algorand/indexer/idb"
pgtest "github.com/algorand/indexer/idb/postgres/internal/testing"
"github.com/algorand/indexer/util"
"github.com/algorand/indexer/util/test"
)

Expand Down Expand Up @@ -1411,7 +1414,9 @@ func TestAddBlockIncrementsMaxRoundAccounted(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, uint64(0), round)

l := test.MakeTestLedger("ledger")
log, _ := test2.NewNullLogger()
l, err := test.MakeTestLedger(log)
require.NoError(t, err)
defer l.Close()
proc, err := blockprocessor.MakeProcessorWithLedger(l, db.AddBlock)
require.NoError(t, err, "failed to open ledger")
Expand Down Expand Up @@ -1707,7 +1712,9 @@ func TestSearchForInnerTransactionReturnsRootTransaction(t *testing.T) {
rootTxid := appCall.Txn.ID()

err = pgutil.TxWithRetry(pdb, serializable, func(tx pgx.Tx) error {
l := test.MakeTestLedger("ledger")
log, _ := test2.NewNullLogger()
l, err := test.MakeTestLedger(log)
require.NoError(t, err)
defer l.Close()
proc, err := blockprocessor.MakeProcessorWithLedger(l, db.AddBlock)
require.NoError(t, err, "failed to open ledger")
Expand Down Expand Up @@ -2141,7 +2148,9 @@ func TestGenesisHashCheckAtInitialImport(t *testing.T) {
require.ErrorIs(t, err, idb.ErrorNotInitialized)
logger := logrus.New()
genesisReader := bytes.NewReader(protocol.EncodeJSON(genesis))
imported, err := importer.EnsureInitialImport(db, genesisReader, logger)
gen, err := util.ReadGenesis(genesisReader)
require.NoError(t, err)
imported, err := importer.EnsureInitialImport(db, gen, logger)
require.NoError(t, err)
require.True(t, true, imported)
// network state should be set
Expand All @@ -2152,8 +2161,10 @@ func TestGenesisHashCheckAtInitialImport(t *testing.T) {
// change genesis value
genesis.Network = "testnest"
genesisReader = bytes.NewReader(protocol.EncodeJSON(genesis))
gen, err = util.ReadGenesis(genesisReader)
require.NoError(t, err)
// different genesisHash, should fail
_, err = importer.EnsureInitialImport(db, genesisReader, logger)
_, err = importer.EnsureInitialImport(db, gen, logger)
require.Error(t, err)
require.Contains(t, err.Error(), "genesis hash not matching")

Expand Down

0 comments on commit 5584c6d

Please sign in to comment.