From 20cb546e7444f4176eb1726e469e434a58ecf9e4 Mon Sep 17 00:00:00 2001 From: Jamie Holdstock Date: Mon, 24 May 2021 17:45:47 +0800 Subject: [PATCH] Store each ticket in its own DB bucket. **NOTE: This contains a backwards incompatible database migration, so if you plan to test it, please make a copy of your database first.** Moves tickets from a single database bucket containing JSON encoded strings, to a bucket for each ticket. This change is to preemptively deal with scaling issues seen with databases containing tens of thousands of tickets. --- database/database.go | 10 + database/ticket.go | 205 +++++++++++++----- database/ticket_test.go | 7 +- database/{v2_upgrade.go => upgrade_v2.go} | 4 +- database/upgrade_v3.go | 86 ++++++++ .../{database_upgrades.go => upgrades.go} | 29 ++- webapi/getfeeaddress.go | 7 +- 7 files changed, 282 insertions(+), 66 deletions(-) rename database/{v2_upgrade.go => upgrade_v2.go} (94%) create mode 100644 database/upgrade_v3.go rename database/{database_upgrades.go => upgrades.go} (52%) diff --git a/database/database.go b/database/database.go index 476e82b4..77779a3d 100644 --- a/database/database.go +++ b/database/database.go @@ -85,6 +85,16 @@ func writeHotBackupFile(db *bolt.DB) error { return err } +func int64ToBytes(i int64) []byte { + bytes := make([]byte, 8) + binary.LittleEndian.PutUint64(bytes, uint64(i)) + return bytes +} + +func bytesToInt64(bytes []byte) int64 { + return int64(binary.LittleEndian.Uint64(bytes)) +} + func uint32ToBytes(i uint32) []byte { bytes := make([]byte, 4) binary.LittleEndian.PutUint32(bytes, i) diff --git a/database/ticket.go b/database/ticket.go index ba329813..0354455e 100644 --- a/database/ticket.go +++ b/database/ticket.go @@ -39,37 +39,53 @@ const ( Voted TicketOutcome = "voted" ) -// Ticket is serialized to json and stored in bbolt db. The json keys are -// deliberately kept short because they are duplicated many times in the db. +// The keys used to store ticket values in the database. +var ( + hashK = []byte("Hash") + purchaseHeightK = []byte("PurchaseHeight") + commitmentAddressK = []byte("CommitmentAddress") + feeAddressIndexK = []byte("FeeAddressIndex") + feeAddressK = []byte("FeeAddress") + feeAmountK = []byte("FeeAmount") + feeExpirationK = []byte("FeeExpiration") + confirmedK = []byte("Confirmed") + votingWIFK = []byte("VotingWIF") + voteChoicesK = []byte("VoteChoices") + feeTxHexK = []byte("FeeTxHex") + feeTxHashK = []byte("FeeTxHash") + feeTxStatusK = []byte("FeeTxStatus") + outcomeK = []byte("Outcome") +) + type Ticket struct { - Hash string `json:"hsh"` - PurchaseHeight int64 `json:"phgt"` - CommitmentAddress string `json:"cmtaddr"` - FeeAddressIndex uint32 `json:"faddridx"` - FeeAddress string `json:"faddr"` - FeeAmount int64 `json:"famt"` - FeeExpiration int64 `json:"fexp"` + Hash string + PurchaseHeight int64 + CommitmentAddress string + FeeAddressIndex uint32 + FeeAddress string + FeeAmount int64 + FeeExpiration int64 // Confirmed will be set when the ticket has 6+ confirmations. - Confirmed bool `json:"conf"` + Confirmed bool // VotingWIF is set in /payfee. - VotingWIF string `json:"vwif"` + VotingWIF string // VoteChoices is initially set in /payfee, but can be updated in // /setvotechoices. - VoteChoices map[string]string `json:"vchces"` + VoteChoices map[string]string // FeeTxHex and FeeTxHash will be set when the fee tx has been received. - FeeTxHex string `json:"fhex"` - FeeTxHash string `json:"fhsh"` + FeeTxHex string + FeeTxHash string // FeeTxStatus indicates the current state of the fee transaction. - FeeTxStatus FeeStatus `json:"fsts"` + FeeTxStatus FeeStatus // Outcome is set once a ticket is either voted or revoked. An empty outcome // indicates that a ticket is still votable. - Outcome TicketOutcome `json:"otcme"` + Outcome TicketOutcome } func (t *Ticket) FeeExpired() bool { @@ -86,22 +102,19 @@ func (vdb *VspDatabase) InsertNewTicket(ticket Ticket) error { return vdb.db.Update(func(tx *bolt.Tx) error { ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK) - hashBytes := []byte(ticket.Hash) - - if ticketBkt.Get(hashBytes) != nil { - return fmt.Errorf("ticket already exists with hash %s", ticket.Hash) + // Create a bucket for the new ticket. Returns an error if bucket + // already exists. + newTicketBkt, err := ticketBkt.CreateBucket([]byte(ticket.Hash)) + if err != nil { + return fmt.Errorf("could not create bucket for ticket: %w", err) } // Error if a ticket already exists with the same fee address. - err := ticketBkt.ForEach(func(k, v []byte) error { - var t Ticket - err := json.Unmarshal(v, &t) - if err != nil { - return fmt.Errorf("could not unmarshal ticket: %w", err) - } + err = ticketBkt.ForEach(func(k, v []byte) error { + tbkt := ticketBkt.Bucket(k) - if t.FeeAddress == ticket.FeeAddress { - return fmt.Errorf("ticket with fee address %s already exists", t.FeeAddress) + if string(tbkt.Get(feeAddressK)) == ticket.FeeAddress { + return fmt.Errorf("ticket with fee address %s already exists", ticket.FeeAddress) } return nil @@ -110,15 +123,104 @@ func (vdb *VspDatabase) InsertNewTicket(ticket Ticket) error { return err } - ticketBytes, err := json.Marshal(ticket) + err = putTicketInBucket(newTicketBkt, ticket) if err != nil { - return fmt.Errorf("could not marshal ticket: %w", err) + return fmt.Errorf("putting ticket in bucket failed: %w", err) } - return ticketBkt.Put(hashBytes, ticketBytes) + return nil }) } +// putTicketInBucket encodes each of the fields of the provided ticket as a byte +// array, and stores them as values within the provided db bucket. +func putTicketInBucket(bkt *bolt.Bucket, ticket Ticket) error { + var err error + if err = bkt.Put(hashK, []byte(ticket.Hash)); err != nil { + return err + } + if err = bkt.Put(commitmentAddressK, []byte(ticket.CommitmentAddress)); err != nil { + return err + } + if err = bkt.Put(feeAddressK, []byte(ticket.FeeAddress)); err != nil { + return err + } + if err = bkt.Put(votingWIFK, []byte(ticket.VotingWIF)); err != nil { + return err + } + if err = bkt.Put(feeTxHexK, []byte(ticket.FeeTxHex)); err != nil { + return err + } + if err = bkt.Put(feeTxHashK, []byte(ticket.FeeTxHash)); err != nil { + return err + } + if err = bkt.Put(feeTxStatusK, []byte(ticket.FeeTxStatus)); err != nil { + return err + } + if err = bkt.Put(outcomeK, []byte(ticket.Outcome)); err != nil { + return err + } + if err = bkt.Put(purchaseHeightK, int64ToBytes(ticket.PurchaseHeight)); err != nil { + return err + } + if err = bkt.Put(feeAddressIndexK, uint32ToBytes(ticket.FeeAddressIndex)); err != nil { + return err + } + if err = bkt.Put(feeAmountK, int64ToBytes(ticket.FeeAmount)); err != nil { + return err + } + if err = bkt.Put(feeExpirationK, int64ToBytes(ticket.FeeExpiration)); err != nil { + return err + } + + confirmed := []byte{0} + if ticket.Confirmed { + confirmed = []byte{1} + } + if err = bkt.Put(confirmedK, confirmed); err != nil { + return err + } + + jsonVoteChoices, err := json.Marshal(ticket.VoteChoices) + if err != nil { + return err + } + return bkt.Put(voteChoicesK, jsonVoteChoices) +} + +func getTicketFromBkt(bkt *bolt.Bucket) (Ticket, error) { + var ticket Ticket + + ticket.Hash = string(bkt.Get(hashK)) + ticket.CommitmentAddress = string(bkt.Get(commitmentAddressK)) + ticket.FeeAddress = string(bkt.Get(feeAddressK)) + ticket.VotingWIF = string(bkt.Get(votingWIFK)) + ticket.FeeTxHex = string(bkt.Get(feeTxHexK)) + ticket.FeeTxHash = string(bkt.Get(feeTxHashK)) + ticket.FeeTxStatus = FeeStatus(bkt.Get(feeTxStatusK)) + ticket.Outcome = TicketOutcome(bkt.Get(outcomeK)) + + ticket.PurchaseHeight = bytesToInt64(bkt.Get(purchaseHeightK)) + ticket.FeeAddressIndex = bytesToUint32(bkt.Get(feeAddressIndexK)) + ticket.FeeAmount = bytesToInt64(bkt.Get(feeAmountK)) + ticket.FeeExpiration = bytesToInt64(bkt.Get(feeExpirationK)) + + // TODO is this dodgy? + if bkt.Get(confirmedK)[0] == byte(1) { + ticket.Confirmed = true + } + + voteChoices := make(map[string]string) + err := json.Unmarshal(bkt.Get(voteChoicesK), &voteChoices) + if err != nil { + return ticket, err + } + + ticket.VoteChoices = voteChoices + + return ticket, nil +} + func (vdb *VspDatabase) DeleteTicket(ticket Ticket) error { vdb.ticketsMtx.Lock() defer vdb.ticketsMtx.Unlock() @@ -126,7 +228,7 @@ func (vdb *VspDatabase) DeleteTicket(ticket Ticket) error { return vdb.db.Update(func(tx *bolt.Tx) error { ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK) - err := ticketBkt.Delete([]byte(ticket.Hash)) + err := ticketBkt.DeleteBucket([]byte(ticket.Hash)) if err != nil { return fmt.Errorf("could not delete ticket: %w", err) } @@ -142,18 +244,13 @@ func (vdb *VspDatabase) UpdateTicket(ticket Ticket) error { return vdb.db.Update(func(tx *bolt.Tx) error { ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK) - hashBytes := []byte(ticket.Hash) + bkt := ticketBkt.Bucket([]byte(ticket.Hash)) - if ticketBkt.Get(hashBytes) == nil { + if bkt == nil { return fmt.Errorf("ticket does not exist with hash %s", ticket.Hash) } - ticketBytes, err := json.Marshal(ticket) - if err != nil { - return fmt.Errorf("could not marshal ticket: %w", err) - } - - return ticketBkt.Put(hashBytes, ticketBytes) + return putTicketInBucket(bkt, ticket) }) } @@ -164,16 +261,16 @@ func (vdb *VspDatabase) GetTicketByHash(ticketHash string) (Ticket, bool, error) var ticket Ticket var found bool err := vdb.db.View(func(tx *bolt.Tx) error { - ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK) + ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK).Bucket([]byte(ticketHash)) - ticketBytes := ticketBkt.Get([]byte(ticketHash)) - if ticketBytes == nil { + if ticketBkt == nil { return nil } - err := json.Unmarshal(ticketBytes, &ticket) + var err error + ticket, err = getTicketFromBkt(ticketBkt) if err != nil { - return fmt.Errorf("could not unmarshal ticket: %w", err) + return fmt.Errorf("could not get ticket: %w", err) } found = true @@ -185,8 +282,7 @@ func (vdb *VspDatabase) GetTicketByHash(ticketHash string) (Ticket, bool, error) } // CountTickets returns the total number of voted, revoked, and currently voting -// tickets. Requires deserializing every ticket in the db so should be used -// sparingly. +// tickets. This func iterates over every ticket so should be used sparingly. func (vdb *VspDatabase) CountTickets() (int64, int64, int64, error) { vdb.ticketsMtx.RLock() defer vdb.ticketsMtx.RUnlock() @@ -196,14 +292,10 @@ func (vdb *VspDatabase) CountTickets() (int64, int64, int64, error) { ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK) return ticketBkt.ForEach(func(k, v []byte) error { - var ticket Ticket - err := json.Unmarshal(v, &ticket) - if err != nil { - return fmt.Errorf("could not unmarshal ticket: %w", err) - } + tBkt := ticketBkt.Bucket(k) - if ticket.FeeTxStatus == FeeConfirmed { - switch ticket.Outcome { + if FeeStatus(tBkt.Get(feeTxStatusK)) == FeeConfirmed { + switch TicketOutcome(tBkt.Get(outcomeK)) { case Voted: voted++ case Revoked: @@ -270,10 +362,9 @@ func (vdb *VspDatabase) filterTickets(filter func(Ticket) bool) ([]Ticket, error ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK) return ticketBkt.ForEach(func(k, v []byte) error { - var ticket Ticket - err := json.Unmarshal(v, &ticket) + ticket, err := getTicketFromBkt(ticketBkt.Bucket(k)) if err != nil { - return fmt.Errorf("could not unmarshal ticket: %w", err) + return fmt.Errorf("could not get ticket: %w", err) } if filter(ticket) { diff --git a/database/ticket_test.go b/database/ticket_test.go index 0214072f..4119d2aa 100644 --- a/database/ticket_test.go +++ b/database/ticket_test.go @@ -159,12 +159,14 @@ func testUpdateTicket(t *testing.T) { // Update ticket with new values. ticket.FeeAmount = ticket.FeeAmount + 1 ticket.FeeExpiration = ticket.FeeExpiration + 1 + ticket.VoteChoices = map[string]string{"New agenda": "New value"} + err = db.UpdateTicket(ticket) if err != nil { t.Fatalf("error updating ticket: %v", err) } - // Retrieve ticket from database. + // Retrieve updated ticket from database. retrieved, found, err := db.GetTicketByHash(ticket.Hash) if err != nil { t.Fatalf("error retrieving ticket by ticket hash: %v", err) @@ -174,7 +176,8 @@ func testUpdateTicket(t *testing.T) { } if ticket.FeeAmount != retrieved.FeeAmount || - ticket.FeeExpiration != retrieved.FeeExpiration { + ticket.FeeExpiration != retrieved.FeeExpiration || + !reflect.DeepEqual(retrieved.VoteChoices, ticket.VoteChoices) { t.Fatal("retrieved ticket value didnt match expected") } diff --git a/database/v2_upgrade.go b/database/upgrade_v2.go similarity index 94% rename from database/v2_upgrade.go rename to database/upgrade_v2.go index 9999ea17..2b3d7818 100644 --- a/database/v2_upgrade.go +++ b/database/upgrade_v2.go @@ -19,7 +19,7 @@ func removeOldFeeTxUpgrade(db *bolt.DB) error { count := 0 err := ticketBkt.ForEach(func(k, v []byte) error { // Deserialize the old ticket. - var ticket Ticket + var ticket v1Ticket err := json.Unmarshal(v, &ticket) if err != nil { return fmt.Errorf("could not unmarshal ticket: %w", err) @@ -51,7 +51,7 @@ func removeOldFeeTxUpgrade(db *bolt.DB) error { // Update database version. err = vspBkt.Put(versionK, uint32ToBytes(removeOldFeeTxVersion)) if err != nil { - return err + return fmt.Errorf("failed to update db version: %w", err) } return nil diff --git a/database/upgrade_v3.go b/database/upgrade_v3.go new file mode 100644 index 00000000..8f69c28b --- /dev/null +++ b/database/upgrade_v3.go @@ -0,0 +1,86 @@ +package database + +import ( + "encoding/json" + "fmt" + + bolt "go.etcd.io/bbolt" +) + +func ticketBucketUpgrade(db *bolt.DB) error { + log.Infof("Upgrading database to version %d", ticketBucketVersion) + + // Run the upgrade in a single database transaction so it can be safely + // rolled back if an error is encountered. + err := db.Update(func(tx *bolt.Tx) error { + vspBkt := tx.Bucket(vspBktK) + ticketBkt := vspBkt.Bucket(ticketBktK) + + // Count tickets so migration progress can be logged. + todo := 0 + err := ticketBkt.ForEach(func(k, v []byte) error { + todo++ + return nil + }) + if err != nil { + return fmt.Errorf("could not count tickets: %w", err) + } + + done := 0 + const batchSize = 2000 + err = ticketBkt.ForEach(func(k, v []byte) error { + // Deserialize the old ticket. + var ticket v1Ticket + err := json.Unmarshal(v, &ticket) + if err != nil { + return fmt.Errorf("could not unmarshal ticket: %w", err) + } + + // Delete the old ticket. + err = ticketBkt.Delete(k) + if err != nil { + return fmt.Errorf("could not delete ticket: %w", err) + } + + // Insert the new ticket. + newBkt, err := ticketBkt.CreateBucket(k) + if err != nil { + return fmt.Errorf("could not create new ticket bucket: %w", err) + } + + err = putTicketInBucket(newBkt, Ticket(ticket)) + if err != nil { + return fmt.Errorf("could not put new ticket in bucket: %w", err) + } + + done++ + + if done%batchSize == 0 { + log.Infof("Migrated %d/%d tickets", done, todo) + } + + return nil + }) + if err != nil { + return err + } + + if done > 0 && done%batchSize != 0 { + log.Infof("Migrated %d/%d tickets", done, todo) + } + + // Update database version. + err = vspBkt.Put(versionK, uint32ToBytes(ticketBucketVersion)) + if err != nil { + return fmt.Errorf("failed to update db version: %w", err) + } + + return nil + }) + if err != nil { + return err + } + + log.Info("Upgrade completed") + return nil +} diff --git a/database/database_upgrades.go b/database/upgrades.go similarity index 52% rename from database/database_upgrades.go rename to database/upgrades.go index e3f0b843..97853bb4 100644 --- a/database/database_upgrades.go +++ b/database/upgrades.go @@ -16,16 +16,41 @@ const ( // need to keep these, and they take up a lot of space. removeOldFeeTxVersion = 2 + // ticketBucketVersion changes the way tickets are stored. Previously they + // were stored as JSON encoded strings in a single bucket. This upgrade + // moves each ticket into its own bucket and does away with JSON encoding. + ticketBucketVersion = 3 + // latestVersion is the latest version of the database that is understood by // vspd. Databases with recorded versions higher than this will fail to open // (meaning any upgrades prevent reverting to older software). - latestVersion = removeOldFeeTxVersion + latestVersion = ticketBucketVersion ) // upgrades maps between old database versions and the upgrade function to // upgrade the database to the next version. var upgrades = []func(tx *bolt.DB) error{ - initialVersion: removeOldFeeTxUpgrade, + initialVersion: removeOldFeeTxUpgrade, + removeOldFeeTxVersion: ticketBucketUpgrade, +} + +// v1Ticket has the json tags required to unmarshal tickets stored in the +// v1 database format. +type v1Ticket struct { + Hash string `json:"hsh"` + PurchaseHeight int64 `json:"phgt"` + CommitmentAddress string `json:"cmtaddr"` + FeeAddressIndex uint32 `json:"faddridx"` + FeeAddress string `json:"faddr"` + FeeAmount int64 `json:"famt"` + FeeExpiration int64 `json:"fexp"` + Confirmed bool `json:"conf"` + VotingWIF string `json:"vwif"` + VoteChoices map[string]string `json:"vchces"` + FeeTxHex string `json:"fhex"` + FeeTxHash string `json:"fhsh"` + FeeTxStatus FeeStatus `json:"fsts"` + Outcome TicketOutcome `json:"otcme"` } // Upgrade will update the database to the latest known version. diff --git a/webapi/getfeeaddress.go b/webapi/getfeeaddress.go index 5e906e63..ceefff0a 100644 --- a/webapi/getfeeaddress.go +++ b/webapi/getfeeaddress.go @@ -89,9 +89,10 @@ func feeAddress(c *gin.Context) { ticketHash := request.TicketHash // Respond early if we already have the fee tx for this ticket. - if ticket.FeeTxStatus == database.FeeReceieved || - ticket.FeeTxStatus == database.FeeBroadcast || - ticket.FeeTxStatus == database.FeeConfirmed { + if knownTicket && + (ticket.FeeTxStatus == database.FeeReceieved || + ticket.FeeTxStatus == database.FeeBroadcast || + ticket.FeeTxStatus == database.FeeConfirmed) { log.Warnf("%s: Fee tx already received (clientIP=%s, ticketHash=%s)", funcName, c.ClientIP(), ticket.Hash) sendError(errFeeAlreadyReceived, c)