Skip to content

Commit

Permalink
Store each ticket in its own DB bucket.
Browse files Browse the repository at this point in the history
**NOTE: This contains a backwards incompatible database migration, so if you plan to test it, please make a copy of your database first.**

Moves tickets from a single database bucket containing JSON encoded strings, to a bucket for each ticket.

This change is to preemptively deal with scaling issues seen with databases containing tens of thousands of tickets.

The change increases the size of the database on disk by 2-5x, however, the removal of JSON encoding reduces the insertion and lookup times by ~75%.
  • Loading branch information
jholdstock committed May 6, 2021
1 parent 5f8ad65 commit 219aee0
Show file tree
Hide file tree
Showing 9 changed files with 350 additions and 97 deletions.
2 changes: 1 addition & 1 deletion background/background.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ func checkWalletConsistency() {
}
}

func findOldestHeight(tickets []database.Ticket) int64 {
func findOldestHeight(tickets []*database.Ticket) int64 {
var oldestHeight int64
for _, ticket := range tickets {
// skip unconfirmed tickets
Expand Down
14 changes: 12 additions & 2 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ func writeHotBackupFile(db *bolt.DB) error {
return err
}

func int64ToBytes(i int64) []byte {
bytes := make([]byte, 8)
binary.LittleEndian.PutUint64(bytes, uint64(i))
return bytes
}

func bytesToInt64(bytes []byte) int64 {
return int64(binary.LittleEndian.Uint64(bytes))
}

func uint32ToBytes(i uint32) []byte {
bytes := make([]byte, 4)
binary.LittleEndian.PutUint32(bytes, i)
Expand Down Expand Up @@ -203,11 +213,11 @@ func Open(ctx context.Context, shutdownWg *sync.WaitGroup, dbFile string, backup
return nil, fmt.Errorf("unable to get db version: %w", err)
}

log.Debugf("Opened database (version=%d, file=%s)", dbVersion, dbFile)
log.Infof("Opened database (version=%d, file=%s)", dbVersion, dbFile)

err = vdb.Upgrade(dbVersion)
if err != nil {
return nil, fmt.Errorf("database upgrade failed: %w", err)
return nil, fmt.Errorf("upgrade failed: %w", err)
}

// Start a ticker to update the backup file at the specified interval.
Expand Down
145 changes: 139 additions & 6 deletions database/database_upgrades.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
package database

import (
"encoding/json"
"fmt"

bolt "go.etcd.io/bbolt"
)

const (
// initialVersion is the version of a freshly created database which has had
// no upgrades applied.
initialVersion = 1

// latestVersion is the latest version of the bolt database that is
// understood by vspd. Databases with recorded versions higher than
// this will fail to open (meaning any upgrades prevent reverting to older
// software).
latestVersion = initialVersion
// ticketBucketVersion changes the way tickets are stored. Previously they
// were stored as JSON encoded strings in a single bucket. This upgrade
// moves each ticket into its own bucket and does away with JSON encoding.
ticketBucketVersion = 2

// latestVersion is the latest version of the database that is understood by
// vspd. Databases with recorded versions higher than this will fail to open
// (meaning any upgrades prevent reverting to older software).
latestVersion = ticketBucketVersion
)

// upgrades maps between old database versions and the upgrade function to
// upgrade the database to the next version.
var upgrades = []func(tx *bolt.DB) error{
initialVersion: ticketBucketUpgrade,
}

// Upgrade will update the database to the latest known version.
func (vdb *VspDatabase) Upgrade(currentVersion uint32) error {
if currentVersion == latestVersion {
Expand All @@ -25,8 +38,128 @@ func (vdb *VspDatabase) Upgrade(currentVersion uint32) error {

if currentVersion > latestVersion {
// Database is too new.
return fmt.Errorf("expected database version <= %d, got %d", latestVersion, currentVersion)
return fmt.Errorf("expected database version <= %d, got %d",
latestVersion, currentVersion)
}

// Execute all necessary upgrades in order.
for _, upgrade := range upgrades[currentVersion:] {
err := upgrade(vdb.db)
if err != nil {
return err
}
}

return nil
}

func ticketBucketUpgrade(db *bolt.DB) error {
log.Infof("Upgrading database to version %d", ticketBucketVersion)

// oldTicket has the json tags required to unmarshal tickets stored in the
// old database format.
type oldTicket struct {
Hash string `json:"hsh"`
PurchaseHeight int64 `json:"phgt"`
CommitmentAddress string `json:"cmtaddr"`
FeeAddressIndex uint32 `json:"faddridx"`
FeeAddress string `json:"faddr"`
FeeAmount int64 `json:"famt"`
FeeExpiration int64 `json:"fexp"`
Confirmed bool `json:"conf"`
VotingWIF string `json:"vwif"`
VoteChoices map[string]string `json:"vchces"`
FeeTxHex string `json:"fhex"`
FeeTxHash string `json:"fhsh"`
FeeTxStatus FeeStatus `json:"fsts"`
Outcome TicketOutcome `json:"otcme"`
}

// Run the upgrade in a single database transaction so it can be safely
// rolled back if an error is encountered.
err := db.Update(func(tx *bolt.Tx) error {
vspBkt := tx.Bucket(vspBktK)
ticketBkt := vspBkt.Bucket(ticketBktK)

// Count tickets so migration progress can be logged.
todo := 0
err := ticketBkt.ForEach(func(k, v []byte) error {
todo++
return nil
})
if err != nil {
return fmt.Errorf("could not count tickets: %w", err)
}

done := 0
err = ticketBkt.ForEach(func(k, v []byte) error {
// Deserialize the old ticket.
var ticket oldTicket
err := json.Unmarshal(v, &ticket)
if err != nil {
return fmt.Errorf("could not unmarshal ticket: %w", err)
}

// Delete the old ticket.
err = ticketBkt.Delete(k)
if err != nil {
return fmt.Errorf("could not delete ticket: %w", err)
}

// Insert the new ticket.
newBkt, err := ticketBkt.CreateBucket(k)
if err != nil {
return fmt.Errorf("could not create new ticket bucket: %w", err)
}

err = putTicketInBucket(newBkt, &Ticket{
Hash: ticket.Hash,
PurchaseHeight: ticket.PurchaseHeight,
CommitmentAddress: ticket.CommitmentAddress,
FeeAddressIndex: ticket.FeeAddressIndex,
FeeAddress: ticket.FeeAddress,
FeeAmount: ticket.FeeAmount,
FeeExpiration: ticket.FeeExpiration,
Confirmed: ticket.Confirmed,
VotingWIF: ticket.VotingWIF,
VoteChoices: ticket.VoteChoices,
FeeTxHex: ticket.FeeTxHex,
FeeTxHash: ticket.FeeTxHash,
FeeTxStatus: ticket.FeeTxStatus,
Outcome: ticket.Outcome,
})
if err != nil {
return fmt.Errorf("could not put new ticket in bucket: %w", err)
}

done++

if done%2000 == 0 {
log.Infof("Migrated %d/%d tickets", done, todo)
}

return nil
})
if err != nil {
return err
}

if done > 0 {
log.Infof("Migrated %d/%d tickets", done, todo)
}

// Update database version.
err = vspBkt.Put(versionK, uint32ToBytes(ticketBucketVersion))
if err != nil {
return err
}

return nil
})
if err != nil {
return err
}

log.Info("Upgrade completed")
return nil
}
Loading

0 comments on commit 219aee0

Please sign in to comment.