Skip to content
Permalink
Browse files

Block/Tracker DB reset on switching to archival mode

* Add blockCount to ledger/blockdb

* Reset local blocks DB if the node switches from non-archival to archival

* Archival flag is now part of LoadLedger/OpenLedger signature.
SetArchival method removed. The reason blocks DB clean up and its initialization
needs to be done simultaneously, so that archival to non-archival detection on later
stage actually requires to re-init blocks DB and blocks queue
* Introduced ledger.InitState struct to reduce amount of parameters in OpenLedger
* Most changes in tests except archical_test are mechanical
* Fixed blockQ closing on exit

* CR fixes: Log warning messages on DB reset

* CR fixes: Switch from COUNT to MIN

* CR fixes: accounts DB reset

* CR Fixes: Do not repeat 'init' in InitState

* CR fixes: Self-review
  • Loading branch information
pzbitskiy authored and tsachiherman committed Sep 10, 2019
1 parent ffa4fe0 commit 0962014781f74296b6fd8a21f5be48c8dde2551c
@@ -48,7 +48,9 @@ func BenchmarkServiceFetchBlocks(b *testing.B) {
net := &mocks.MockNetwork{}

for i := 0; i < b.N; i++ {
local, err := data.LoadLedger(logging.Base(), b.Name()+"empty"+strconv.Itoa(i), true, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, nil)
archival := true
inMem := true
local, err := data.LoadLedger(logging.Base(), b.Name()+"empty"+strconv.Itoa(i), inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, nil, archival)
require.NoError(b, err)

// Make Service
@@ -126,7 +128,9 @@ func benchenv(t testing.TB, numAccounts, numBlocks int) (ledger, emptyLedger *da

var err error
genesisBalances = data.MakeGenesisBalances(genesis, sinkAddr, poolAddr)
emptyLedger, err = data.LoadLedger(logging.Base(), t.Name()+"empty", true, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, nil)
const inMem = true
const archival = true
emptyLedger, err = data.LoadLedger(logging.Base(), t.Name()+"empty", inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, nil, archival)
require.NoError(t, err)

ledger, err = datatest.FabricateLedger(logging.Base(), t.Name(), parts, genesisBalances, emptyLedger.LastRound()+basics.Round(numBlocks))
@@ -105,7 +105,9 @@ func testingenv(t testing.TB, numAccounts, numTxs int, offlineAccounts bool) (*L
bootstrap := MakeGenesisBalances(genesis, poolAddr, sinkAddr)

// generate test transactions
ledger, err := LoadLedger(logging.Base(), t.Name(), true, protocol.ConsensusCurrentVersion, bootstrap, genesisID, genesisHash, nil)
const inMem = true
const archival = true
ledger, err := LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusCurrentVersion, bootstrap, genesisID, genesisHash, nil, archival)
if err != nil {
panic(err)
}
@@ -33,7 +33,9 @@ var roundDeadline = 0 * time.Second

// FabricateLedger is a test-only helper to create a new in-memory Ledger and run the protocol through the specified Round with the given accounts
func FabricateLedger(log logging.Logger, ledgerName string, accounts []account.Participation, genesis data.GenesisBalances, lastRound basics.Round) (*data.Ledger, error) {
ledger, err := data.LoadLedger(log, ledgerName, true, protocol.ConsensusCurrentVersion, genesis, "", crypto.Digest{}, nil)
const inMem = true
const archival = true
ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genesis, "", crypto.Digest{}, nil, archival)
if err != nil {
return nil, err
}
@@ -93,7 +93,11 @@ func makeGenesisBlocks(proto protocol.ConsensusVersion, genesisBal GenesisBalanc

// LoadLedger creates a Ledger object to represent the ledger with the
// specified database file prefix, initializing it if necessary.
func LoadLedger(log logging.Logger, dbFilenamePrefix string, memory bool, genesisProto protocol.ConsensusVersion, genesisBal GenesisBalances, genesisID string, genesisHash crypto.Digest, blockListeners []ledger.BlockListener) (*Ledger, error) {
func LoadLedger(
log logging.Logger, dbFilenamePrefix string, memory bool,
genesisProto protocol.ConsensusVersion, genesisBal GenesisBalances, genesisID string, genesisHash crypto.Digest,
blockListeners []ledger.BlockListener, isArchival bool,
) (*Ledger, error) {
if genesisBal.balances == nil {
genesisBal.balances = make(map[basics.Address]basics.AccountData)
}
@@ -113,9 +117,14 @@ func LoadLedger(log logging.Logger, dbFilenamePrefix string, memory bool, genesi
l := &Ledger{
log: log,
}
genesisInitState := ledger.InitState{
Blocks: genBlocks,
Accounts: genesisBal.balances,
GenesisHash: genesisHash,
}
l.log.Debugf("Initializing Ledger(%s)", dbFilenamePrefix)

ll, err := ledger.OpenLedger(log, dbFilenamePrefix, memory, genBlocks, genesisBal.balances, genesisHash)
ll, err := ledger.OpenLedger(log, dbFilenamePrefix, memory, genesisInitState, isArchival)
if err != nil {
return nil, err
}
@@ -93,7 +93,9 @@ func BenchmarkAssemblePayset(b *testing.B) {
require.Equal(b, len(genesis), numUsers+1)
genBal := MakeGenesisBalances(genesis, sinkAddr, poolAddr)
ledgerName := fmt.Sprintf("%s-mem-%d", b.Name(), b.N)
ledger, err := LoadLedger(log, ledgerName, true, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil)
const inMem = true
const archival = true
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, archival)
require.NoError(b, err)

l := ledger
@@ -83,7 +83,10 @@ func makeMockLedger(t TestingT, initAccounts map[basics.Address]basics.AccountDa
initBlocks := []bookkeeping.Block{initBlock}

fn := fmt.Sprintf("/tmp/%s.%d.sqlite3", t.Name(), crypto.RandUint64())
l, err := ledger.OpenLedger(logging.Base(), fn, true, initBlocks, initAccounts, hash)
const inMem = true
const archival = true
genesisInitState := ledger.InitState{Blocks: initBlocks, Accounts: initAccounts, GenesisHash: hash}
l, err := ledger.OpenLedger(logging.Base(), fn, true, genesisInitState, archival)
require.NoError(t, err)
return l
}
@@ -18,12 +18,12 @@ package data

import (
"fmt"
"github.com/algorand/go-algorand/components/mocks"
"math/rand"
"testing"

"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/components/mocks"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
@@ -63,7 +63,9 @@ func BenchmarkTxHandlerProcessDecoded(b *testing.B) {
require.Equal(b, len(genesis), numUsers+1)
genBal := MakeGenesisBalances(genesis, sinkAddr, poolAddr)
ledgerName := fmt.Sprintf("%s-mem-%d", b.Name(), b.N)
ledger, err := LoadLedger(log, ledgerName, true, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil)
const inMem = true
const archival = true
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, archival)
require.NoError(b, err)

l := ledger
@@ -52,6 +52,12 @@ var accountsSchema = []string{
data blob)`,
}

var accountsResetExprs = []string{
`DROP TABLE IF EXISTS acctrounds`,
`DROP TABLE IF EXISTS accounttotals`,
`DROP TABLE IF EXISTS accountbase`,
}

type accountDelta struct {
old basics.AccountData
new basics.AccountData
@@ -105,6 +111,16 @@ func accountsInit(tx *sql.Tx, initAccounts map[basics.Address]basics.AccountData
return nil
}

func accountsReset(tx *sql.Tx) error {
for _, stmt := range accountsResetExprs {
_, err := tx.Exec(stmt)
if err != nil {
return err
}
}
return nil
}

func accountsRound(tx *sql.Tx) (rnd basics.Round, err error) {
err = tx.QueryRow("SELECT rnd FROM acctrounds WHERE id='acctbase'").Scan(&rnd)
return
@@ -90,16 +90,24 @@ func (au *accountUpdates) loadFromDisk(l ledgerForTracker) error {
return fmt.Errorf("accountUpdates.loadFromDisk: initAccounts not set")
}

latest := l.Latest()
err := au.dbs.wdb.Atomic(func(tx *sql.Tx) error {
var err0 error
err0 = accountsInit(tx, au.initAccounts, au.initProto)
au.dbRound, err0 = au.accountsInitialize(tx)
if err0 != nil {
return err0
}

au.dbRound, err0 = accountsRound(tx)
if err0 != nil {
return err0
// Check for blocks DB and tracker DB un-sync
if au.dbRound > latest {
au.log.Warnf("resetting accounts DB (on round %v, but blocks DB's latest is %v)", au.dbRound, latest)
err0 = accountsReset(tx)
if err0 != nil {
return err0
}
au.dbRound, err0 = au.accountsInitialize(tx)
if err0 != nil {
return err0
}
}

totals, err0 := accountsTotals(tx)
@@ -125,7 +133,6 @@ func (au *accountUpdates) loadFromDisk(l ledgerForTracker) error {
}
au.protos = []config.ConsensusParams{config.Consensus[hdr.CurrentProtocol]}

latest := l.Latest()
au.deltas = nil
au.accounts = make(map[basics.Address]modifiedAccount)
loaded := au.dbRound
@@ -149,6 +156,20 @@ func (au *accountUpdates) loadFromDisk(l ledgerForTracker) error {
return nil
}

// Initialize accounts DB if needed and return account round
func (au *accountUpdates) accountsInitialize(tx *sql.Tx) (basics.Round, error) {
err := accountsInit(tx, au.initAccounts, au.initProto)
if err != nil {
return 0, err
}

rnd, err := accountsRound(tx)
if err != nil {
return 0, err
}
return rnd, nil
}

func (au *accountUpdates) close() {
}

@@ -17,7 +17,11 @@
package ledger

import (
"database/sql"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"testing"

@@ -73,15 +77,7 @@ func (wl *wrappedLedger) trackerLog() logging.Logger {
return wl.l.trackerLog()
}

func TestArchival(t *testing.T) {
// This test ensures that trackers return the correct value from
// committedUpTo() -- that is, if they return round rnd, then they
// do not ask for any round before rnd on a subsequent call to
// loadFromDisk().
//
// We generate mostly empty blocks, with the exception of timestamps,
// which affect participationTracker.committedUpTo()'s return value.

func getInitState() (genesisInitState InitState) {
blk := bookkeeping.Block{}
blk.CurrentProtocol = protocol.ConsensusCurrentVersion
blk.RewardsPool = testPoolAddr
@@ -91,14 +87,33 @@ func TestArchival(t *testing.T) {
accts[testPoolAddr] = basics.MakeAccountData(basics.NotParticipating, basics.MicroAlgos{Raw: 1234567890})
accts[testSinkAddr] = basics.MakeAccountData(basics.NotParticipating, basics.MicroAlgos{Raw: 1234567890})

genesisInitState.Accounts = accts
genesisInitState.Blocks = []bookkeeping.Block{blk}
genesisInitState.GenesisHash = crypto.Digest{}
return genesisInitState
}

func TestArchival(t *testing.T) {
// This test ensures that trackers return the correct value from
// committedUpTo() -- that is, if they return round rnd, then they
// do not ask for any round before rnd on a subsequent call to
// loadFromDisk().
//
// We generate mostly empty blocks, with the exception of timestamps,
// which affect participationTracker.committedUpTo()'s return value.

dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64())
l, err := OpenLedger(logging.Base(), dbName, true, []bookkeeping.Block{blk}, accts, crypto.Digest{})
genesisInitState := getInitState()
const inMem = true
const archival = true
l, err := OpenLedger(logging.Base(), dbName, inMem, genesisInitState, archival)
require.NoError(t, err)
wl := &wrappedLedger{
l: l,
}

nonZeroMinSaves := 0
blk := genesisInitState.Blocks[0]

for i := 0; i < 2000; i++ {
blk.BlockHeader.Round++
@@ -132,6 +147,121 @@ func TestArchival(t *testing.T) {
t.Error("Did not observe every tracker GCing the ledger")
}

func TestArchivalRestart(t *testing.T) {
// Start in archival mode, add 2K blocks, restart, ensure all blocks are there

dbTempDir, err := ioutil.TempDir("", "testdir"+t.Name())
require.NoError(t, err)
dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64())
dbPrefix := filepath.Join(dbTempDir, dbName)
defer os.RemoveAll(dbTempDir)

genesisInitState := getInitState()
const inMem = false // use persistent storage
const archival = true

l, err := OpenLedger(logging.Base(), dbPrefix, inMem, genesisInitState, archival)
require.NoError(t, err)
blk := genesisInitState.Blocks[0]

const maxBlocks = 2000
for i := 0; i < maxBlocks; i++ {
blk.BlockHeader.Round++
blk.BlockHeader.TimeStamp += int64(crypto.RandUint64() % 100 * 1000)
l.AddBlock(blk, agreement.Certificate{})
}
l.WaitForCommit(blk.Round())

var latest, earliest basics.Round
err = l.blockDBs.rdb.Atomic(func(tx *sql.Tx) error {
latest, err = blockLatest(tx)
require.NoError(t, err)

earliest, err = blockEarliest(tx)
require.NoError(t, err)
return err
})
require.NoError(t, err)
require.Equal(t, basics.Round(maxBlocks), latest)
require.Equal(t, basics.Round(0), earliest)

// close and reopen the same DB, ensure latest/earliest are not changed
l.Close()

l, err = OpenLedger(logging.Base(), dbPrefix, inMem, genesisInitState, archival)
require.NoError(t, err)

err = l.blockDBs.rdb.Atomic(func(tx *sql.Tx) error {
latest, err = blockLatest(tx)
require.NoError(t, err)

earliest, err = blockEarliest(tx)
require.NoError(t, err)
return err
})
require.NoError(t, err)
require.Equal(t, basics.Round(maxBlocks), latest)
require.Equal(t, basics.Round(0), earliest)
}

func TestArchivalFromNonArchival(t *testing.T) {
// Start in non-archival mode, add 2K blocks, restart in archival mode ensure only genesis block is there

dbTempDir, err := ioutil.TempDir(os.TempDir(), "testdir")
require.NoError(t, err)
dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64())
dbPrefix := filepath.Join(dbTempDir, dbName)
defer os.RemoveAll(dbTempDir)

genesisInitState := getInitState()
const inMem = false // use persistent storage
archival := false

l, err := OpenLedger(logging.Base(), dbPrefix, inMem, genesisInitState, archival)
require.NoError(t, err)
blk := genesisInitState.Blocks[0]

const maxBlocks = 2000
for i := 0; i < maxBlocks; i++ {
blk.BlockHeader.Round++
blk.BlockHeader.TimeStamp += int64(crypto.RandUint64() % 100 * 1000)
l.AddBlock(blk, agreement.Certificate{})
}
l.WaitForCommit(blk.Round())

var latest, earliest basics.Round
err = l.blockDBs.rdb.Atomic(func(tx *sql.Tx) error {
latest, err = blockLatest(tx)
require.NoError(t, err)

earliest, err = blockEarliest(tx)
require.NoError(t, err)
return err
})
require.NoError(t, err)
require.Equal(t, basics.Round(maxBlocks), latest)
require.True(t, basics.Round(0) < earliest, fmt.Sprintf("%d < %d", basics.Round(0), earliest))

// close and reopen the same DB, ensure the DB truncated
l.Close()

archival = true
l, err = OpenLedger(logging.Base(), dbPrefix, inMem, genesisInitState, archival)
require.NoError(t, err)

err = l.blockDBs.rdb.Atomic(func(tx *sql.Tx) error {
latest, err = blockLatest(tx)
require.NoError(t, err)

earliest, err = blockEarliest(tx)
require.NoError(t, err)
return err
})
require.NoError(t, err)
require.Equal(t, basics.Round(0), earliest)
require.Equal(t, basics.Round(0), latest)
}

func checkTrackers(t *testing.T, wl *wrappedLedger, rnd basics.Round) (basics.Round, error) {
minMinSave := rnd

0 comments on commit 0962014

Please sign in to comment.
You can’t perform that action at this time.