Skip to content

Commit

Permalink
[vm] Block Pruning (Default to Only Keeping Last 768 Blocks) (#436)
Browse files Browse the repository at this point in the history
* remove all block storage expectations

* controller compiles

* remove unnecessary cache config

* limit block history to 256

* integration passing

* default to no tx store

* fix vm tests

* fix lint

* tokenvm integration fixed

* handle genesis block fetch

* only store txs during test (tokenvm)

* update README

* add debugging code

* update invariant

* add caches to store last X blocks

* e2e tests passing

* restore version

* remove unnecessary config

* fix VM test

* add check to prevent runaway block production

* add more TODOs

* add quick restart test

* refactor tx backfill logic

* load blocks from disk on init

* fix off-by-one issue

* var rename

* nits

* remove unused var

* fix test

* fix lint

* fix TODO on vm

* ensure block is marked as accepted

* change log level

* add deleted blocks metrics

* add more pebble metrics

* compact disk blocks periodically

* ensure all deletion does not happen at the same time

* fix lint

* update allocation amount

* update tokenvm to allow unlimited usage

* use new genesis alloc

* fix startAmount
  • Loading branch information
patrick-ogrady committed Sep 5, 2023
1 parent 0662c62 commit fa48f6e
Show file tree
Hide file tree
Showing 22 changed files with 626 additions and 324 deletions.
28 changes: 25 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,36 @@ to that team for all the work they put into researching this approach.
Instead of requiring nodes to execute all previous transactions when joining
any `hyperchain` (which may not be possible if there is very high throughput on a Subnet),
the `hypersdk` just syncs the most recent state from the network. To avoid falling
behind the network while syncing this state, the `hypersdk` acts as an Avalanche Light
behind the network while syncing this state, the `hypersdk` acts as an Avalanche Lite
Client and performs consensus on newly processed blocks without verifying them (updating its
state sync target whenever a new block is accepted).

The `hypersdk` relies on [`x/sync`](https://github.com/ava-labs/avalanchego/tree/master/x/sync),
a bandwidth-aware dynamic sync implementation provided by `avalanchego`, to
sync to the tip of any `hyperchain`.

#### Pebble as Default
#### Block Pruning
By default, the `hypersdk` only stores what is necessary to build/verfiy the next block
and to help new nodes sync the current state (not execute all historical state transitions).
If the `hypersdk` did not limit block storage grwoth, the storage requirements for validators
would grow at an alarming rate each day (making running any `hypervm` impractical).
Consider the simple example where we process 25k transactions per second (assume each
transaction is ~400 bytes). This would would require the `hypersdk` to store 10MB per
second (not including any overhead in the database for doing so). **This works out to
864GB per day or 20.7TB per year.**

In practice, this means the `hypersdk` only stores the last 768 accepted blocks the genesis block,
and the last 256 revisions of state (the [ProposerVM](https://github.com/ava-labs/avalanchego/blob/master/vms/proposervm/README.md)
also stores the last 768 blocks). With a 100ms `MinimumBlockGap`, the `hypersdk` must
store at least ~600 blocks to allow for the entire `ValidityWindow` to be backfilled (otherwise
a fully-synced, restarting `hypervm` will not become "ready" until it accepts a block at
least `ValidityWindow` after the last accepted block).

_The number of blocks and/or state revisions that the `hypersdk` stores, the `AcceptedBlockWindow`, can
be tuned by any `hypervm`. It is not possible, however, to configure the `hypersdk` to store
all historical blocks (the `AcceptedBlockWindow` is pinned to memory)._

#### PebbleDB
Instead of employing [`goleveldb`](https://github.com/syndtr/goleveldb), the
`hypersdk` uses CockroachDB's [`pebble`](https://github.com/cockroachdb/pebble) database for
on-disk storage. This database is inspired by LevelDB/RocksDB but offers [a few
Expand Down Expand Up @@ -129,7 +150,8 @@ All `hypersdk` blocks include a state root to support dynamic state sync. In dyn
state sync, the state target is updated to the root of the last accepted block while
the sync is ongoing instead of staying pinned to the last accepted root when the sync
started. Root block inclusion means consensus can be used to select the next state
target to sync to instead of using some less secure, out-of-consensus mechanism.
target to sync to instead of using some less secure, out-of-consensus mechanism (i.e.
Avalanche Lite Client).

Dynamic state sync is required for high-throughput blockchains because it relieves
the nodes that serve state sync queries from storing all historical state revisions
Expand Down
28 changes: 17 additions & 11 deletions chain/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,22 @@ func (b *StatelessBlock) verify(ctx context.Context, stateReady bool) error {
// context. Otherwise, the parent block will be used as the execution context.
vctx, err := b.vm.GetVerifyContext(ctx, b.Hght, b.Prnt)
if err != nil {
b.vm.Logger().Warn("unable to get verify context",
zap.Uint64("height", b.Hght),
zap.Stringer("blkID", b.ID()),
zap.Error(err),
)
return fmt.Errorf("%w: unable to load verify context", err)
}

// Parent block may not be processed when we verify this block, so [innerVerify] may
// recursively verify ancestry.
if err := b.innerVerify(ctx, vctx); err != nil {
b.vm.Logger().Warn("verification failed",
zap.Uint64("height", b.Hght),
zap.Stringer("blkID", b.ID()),
zap.Error(err),
)
return err
}
}
Expand Down Expand Up @@ -721,25 +731,21 @@ func (b *StatelessBlock) Accept(ctx context.Context) error {
return fmt.Errorf("%w: unable to commit block", err)
}

// Set last accepted block
return b.SetLastAccepted(ctx)
// Mark block as accepted and update last accepted in storage
b.MarkAccepted(ctx)
return nil
}

// SetLastAccepted is called during [Accept] and at the start and end of state
// sync.
func (b *StatelessBlock) SetLastAccepted(ctx context.Context) error {
if err := b.vm.SetLastAccepted(b); err != nil {
return err
}
func (b *StatelessBlock) MarkAccepted(ctx context.Context) {
// Accept block and free unnecessary memory
b.st = choices.Accepted
b.txsSet = nil // only used for replay protection when processing

// [Accepted] will set in-memory variables needed to ensure we don't resync
// all blocks when state sync finishes
// [Accepted] will persist the block to disk and set in-memory variables
// needed to ensure we don't resync all blocks when state sync finishes.
//
// Note: We will not call [b.vm.Verified] before accepting during state sync
b.vm.Accepted(ctx, b)
return nil
}

// implements "snowman.Block.choices.Decidable"
Expand Down
1 change: 0 additions & 1 deletion chain/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type VM interface {

IsBootstrapped() bool
LastAcceptedBlock() *StatelessBlock
SetLastAccepted(*StatelessBlock) error
GetStatelessBlock(context.Context, ids.ID) (*StatelessBlock, error)

GetVerifyContext(ctx context.Context, blockHeight uint64, parent ids.ID) (VerifyContext, error)
Expand Down
16 changes: 8 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,10 @@ import (
"github.com/ava-labs/avalanchego/utils/profiler"
"github.com/ava-labs/avalanchego/utils/units"
"github.com/ava-labs/hypersdk/trace"
"github.com/ava-labs/hypersdk/vm"
)

const avalancheGoMinCPU = 4

var _ vm.Config = (*Config)(nil)

type Config struct{}

func (c *Config) GetLogLevel() logging.Level { return logging.Info }
Expand All @@ -33,21 +30,24 @@ func (c *Config) GetMempoolSize() int { return 2_048 }
func (c *Config) GetMempoolPayerSize() int { return 32 }
func (c *Config) GetMempoolExemptPayers() [][]byte { return nil }
func (c *Config) GetStreamingBacklogSize() int { return 1024 }
func (c *Config) GetStateHistoryLength() int { return 256 }
func (c *Config) GetStateEvictionBatchSize() int { return 4 * units.MiB }
func (c *Config) GetIntermediateNodeCacheSize() int { return 2 * units.GiB }
func (c *Config) GetValueNodeCacheSize() int { return 2 * units.GiB }
func (c *Config) GetAcceptorSize() int { return 1024 }
func (c *Config) GetTraceConfig() *trace.Config { return &trace.Config{Enabled: false} }
func (c *Config) GetStateSyncParallelism() int { return 4 }
func (c *Config) GetStateSyncMinBlocks() uint64 { return 256 }
func (c *Config) GetStateSyncServerDelay() time.Duration { return 0 } // used for testing
func (c *Config) GetParsedBlockCacheSize() int { return 128 }
func (c *Config) GetAcceptedBlockCacheSize() int { return 128 }

func (c *Config) GetParsedBlockCacheSize() int { return 128 }
func (c *Config) GetStateHistoryLength() int { return 256 }
func (c *Config) GetAcceptedBlockWindow() int { return 768 }
func (c *Config) GetStateSyncMinBlocks() uint64 { return 768 }
func (c *Config) GetAcceptorSize() int { return 1024 }

func (c *Config) GetContinuousProfilerConfig() *profiler.Config {
return &profiler.Config{Enabled: false}
}
func (c *Config) GetVerifySignatures() bool { return true }
func (c *Config) GetTargetBuildDuration() time.Duration { return 100 * time.Millisecond }
func (c *Config) GetProcessingBuildSkip() int { return 16 }
func (c *Config) GetTargetGossipDuration() time.Duration { return 20 * time.Millisecond }
func (c *Config) GetBlockCompactionFrequency() int { return 32 } // 64 MB of deletion if 2 MB blocks
3 changes: 3 additions & 0 deletions examples/morpheusvm/cmd/morpheus-cli/cmd/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ var generatePrometheusCmd = &cobra.Command{
panels = append(panels, fmt.Sprintf("increase(avalanche_%s_blks_rejected_count[5s])/5", chainID))
utils.Outf("{{yellow}}blocks rejected per second:{{/}} %s\n", panels[len(panels)-1])

panels = append(panels, fmt.Sprintf("increase(avalanche_%s_vm_hypersdk_vm_deleted_blocks[5s])/5", chainID))
utils.Outf("{{yellow}}blocks deleted per second:{{/}} %s\n", panels[len(panels)-1])

panels = append(panels, fmt.Sprintf("avalanche_%s_vm_hypersdk_chain_bandwidth_price", chainID))
utils.Outf("{{yellow}}bandwidth unit price:{{/}} %s\n", panels[len(panels)-1])

Expand Down
29 changes: 22 additions & 7 deletions examples/morpheusvm/scripts/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,27 @@ if ! [[ "$0" =~ scripts/run.sh ]]; then
fi

VERSION=2eabd228952b6b7c9075bc45653f70643d9a5a7c
MAX_UINT64=18446744073709551615
MODE=${MODE:-run}
LOGLEVEL=${LOGLEVEL:-info}
STATESYNC_DELAY=${STATESYNC_DELAY:-0}
MIN_BLOCK_GAP=${MIN_BLOCK_GAP:-100}
CREATE_TARGET=${CREATE_TARGET:-75000}
STORE_TXS=${STORE_TXS:-false}
UNLIMITED_USAGE=${UNLIMITED_USAGE:-false}
if [[ ${MODE} != "run" ]]; then
LOGLEVEL=debug
STATESYNC_DELAY=100000000 # 100ms
MIN_BLOCK_GAP=250 #ms
CREATE_TARGET=100000000 # 4M accounts (we send to random addresses)
STORE_TXS=true
UNLIMITED_USAGE=true
fi

WINDOW_TARGET_UNITS="40000000,450000,450000,450000,450000"
MAX_BLOCK_UNITS="1800000,15000,15000,2500,15000"
if ${UNLIMITED_USAGE}; then
WINDOW_TARGET_UNITS="${MAX_UINT64},${MAX_UINT64},${MAX_UINT64},${MAX_UINT64},${MAX_UINT64}"
# If we don't limit the block size, AvalancheGo will reject the block.
MAX_BLOCK_UNITS="1800000,${MAX_UINT64},${MAX_UINT64},${MAX_UINT64},${MAX_UINT64}"
fi

echo "Running with:"
Expand All @@ -36,6 +47,9 @@ echo MODE: ${MODE}
echo LOG LEVEL: ${LOGLEVEL}
echo STATESYNC_DELAY \(ns\): ${STATESYNC_DELAY}
echo MIN_BLOCK_GAP \(ms\): ${MIN_BLOCK_GAP}
echo STORE_TXS: ${STORE_TXS}
echo WINDOW_TARGET_UNITS: ${WINDOW_TARGET_UNITS}
echo MAX_BLOCK_UNITS: ${MAX_BLOCK_UNITS}

############################
# build avalanchego
Expand Down Expand Up @@ -100,16 +114,16 @@ find ${TMPDIR}/avalanchego-${VERSION}
# Always create allocations (linter doesn't like tab)
echo "creating allocations file"
cat <<EOF > ${TMPDIR}/allocations.json
[{"address":"morpheus1rvzhmceq997zntgvravfagsks6w0ryud3rylh4cdvayry0dl97nsp30ucp", "balance":1000000000000}]
[{"address":"morpheus1rvzhmceq997zntgvravfagsks6w0ryud3rylh4cdvayry0dl97nsp30ucp", "balance":10000000000000000000}]
EOF

GENESIS_PATH=$2
if [[ -z "${GENESIS_PATH}" ]]; then
echo "creating VM genesis file with allocations"
rm -f ${TMPDIR}/morpheusvm.genesis
${TMPDIR}/morpheus-cli genesis generate ${TMPDIR}/allocations.json \
--window-target-units "40000000,450000,450000,${CREATE_TARGET},450000" \
--max-block-units "1800000,15000,15000,2500,15000" \
--window-target-units ${WINDOW_TARGET_UNITS} \
--max-block-units ${MAX_BLOCK_UNITS} \
--min-block-gap ${MIN_BLOCK_GAP} \
--genesis-file ${TMPDIR}/morpheusvm.genesis
else
Expand All @@ -132,7 +146,7 @@ cat <<EOF > ${TMPDIR}/morpheusvm.config
"mempoolExemptPayers":["morpheus1rvzhmceq997zntgvravfagsks6w0ryud3rylh4cdvayry0dl97nsp30ucp"],
"parallelism": 5,
"verifySignatures":true,
"storeTransactions":true,
"storeTransactions": ${STORE_TXS},
"streamingBacklogSize": 10000000,
"logLevel": "${LOGLEVEL}",
"stateSyncServerDelay": ${STATESYNC_DELAY}
Expand All @@ -150,7 +164,8 @@ echo "creating subnet config"
rm -f ${TMPDIR}/morpheusvm.subnet
cat <<EOF > ${TMPDIR}/morpheusvm.subnet
{
"proposerMinBlockDelay": 0
"proposerMinBlockDelay": 0,
"proposerNumHistoricalBlocks": 768
}
EOF

Expand Down
38 changes: 34 additions & 4 deletions examples/morpheusvm/tests/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import (
)

const (
startAmount = uint64(1000000000000)
startAmount = uint64(10000000000000000000)
sendAmount = uint64(5000)

healthPollInterval = 10 * time.Second
healthPollInterval = 3 * time.Second
)

func TestE2e(t *testing.T) {
Expand Down Expand Up @@ -397,7 +397,7 @@ var _ = ginkgo.Describe("[Test]", func() {
ginkgo.It("transfer in a single node (raw)", func() {
nativeBalance, err := instances[0].lcli.Balance(context.TODO(), sender)
gomega.Ω(err).Should(gomega.BeNil())
gomega.Ω(nativeBalance).Should(gomega.Equal(uint64(1000000000000)))
gomega.Ω(nativeBalance).Should(gomega.Equal(startAmount))

other, err := ed25519.GeneratePrivateKey()
gomega.Ω(err).Should(gomega.BeNil())
Expand Down Expand Up @@ -516,7 +516,37 @@ var _ = ginkgo.Describe("[Test]", func() {
acceptTransaction(syncClient, lsyncClient)
})

// Create blocks before state sync starts (state sync requires at least 256
ginkgo.It("becomes ready quickly after restart", func() {
cluster, err := anrCli.RestartNode(context.Background(), "bootstrap")
gomega.Expect(err).To(gomega.BeNil())

// Upon restart, the node should be able to read blocks from disk
// to initialize its [seen] index and become ready in less than
// [ValidityWindow].
start := time.Now()
awaitHealthy(anrCli)
gomega.Expect(time.Since(start) < 20*time.Second).To(gomega.BeTrue())

// Update bootstrap info to latest in case it was assigned a new port
nodeURI := cluster.ClusterInfo.NodeInfos["bootstrap"].Uri
uri := nodeURI + fmt.Sprintf("/ext/bc/%s", blockchainID)
bid, err := ids.FromString(blockchainID)
gomega.Expect(err).To(gomega.BeNil())
hutils.Outf("{{blue}}bootstrap node uri: %s{{/}}\n", uri)
c := rpc.NewJSONRPCClient(uri)
syncClient = c
networkID, _, _, err := syncClient.Network(context.TODO())
gomega.Expect(err).Should(gomega.BeNil())
tc := lrpc.NewJSONRPCClient(uri, networkID, bid)
lsyncClient = tc
instances[len(instances)-1] = instance{
uri: uri,
cli: c,
lcli: tc,
}
})

// Create blocks before state sync starts (state sync requires at least 1024
// blocks)
//
// We do 1024 so that there are a number of ranges of data to fetch.
Expand Down

0 comments on commit fa48f6e

Please sign in to comment.