Skip to content

Commit

Permalink
Store each ticket in its own DB bucket.
Browse files Browse the repository at this point in the history
**NOTE: This contains a backwards incompatible database migration, so if you plan to test it, please make a copy of your database first.**

Moves tickets from a single database bucket containing JSON encoded strings, to a bucket for each ticket.

This change is to preemptively deal with scaling issues seen with databases containing tens of thousands of tickets.
  • Loading branch information
jholdstock committed May 24, 2021
1 parent 136e389 commit 20cb546
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 66 deletions.
10 changes: 10 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ func writeHotBackupFile(db *bolt.DB) error {
return err
}

func int64ToBytes(i int64) []byte {
bytes := make([]byte, 8)
binary.LittleEndian.PutUint64(bytes, uint64(i))
return bytes
}

func bytesToInt64(bytes []byte) int64 {
return int64(binary.LittleEndian.Uint64(bytes))
}

func uint32ToBytes(i uint32) []byte {
bytes := make([]byte, 4)
binary.LittleEndian.PutUint32(bytes, i)
Expand Down
205 changes: 148 additions & 57 deletions database/ticket.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,37 +39,53 @@ const (
Voted TicketOutcome = "voted"
)

// Ticket is serialized to json and stored in bbolt db. The json keys are
// deliberately kept short because they are duplicated many times in the db.
// The keys used to store ticket values in the database.
var (
hashK = []byte("Hash")
purchaseHeightK = []byte("PurchaseHeight")
commitmentAddressK = []byte("CommitmentAddress")
feeAddressIndexK = []byte("FeeAddressIndex")
feeAddressK = []byte("FeeAddress")
feeAmountK = []byte("FeeAmount")
feeExpirationK = []byte("FeeExpiration")
confirmedK = []byte("Confirmed")
votingWIFK = []byte("VotingWIF")
voteChoicesK = []byte("VoteChoices")
feeTxHexK = []byte("FeeTxHex")
feeTxHashK = []byte("FeeTxHash")
feeTxStatusK = []byte("FeeTxStatus")
outcomeK = []byte("Outcome")
)

type Ticket struct {
Hash string `json:"hsh"`
PurchaseHeight int64 `json:"phgt"`
CommitmentAddress string `json:"cmtaddr"`
FeeAddressIndex uint32 `json:"faddridx"`
FeeAddress string `json:"faddr"`
FeeAmount int64 `json:"famt"`
FeeExpiration int64 `json:"fexp"`
Hash string
PurchaseHeight int64
CommitmentAddress string
FeeAddressIndex uint32
FeeAddress string
FeeAmount int64
FeeExpiration int64

// Confirmed will be set when the ticket has 6+ confirmations.
Confirmed bool `json:"conf"`
Confirmed bool

// VotingWIF is set in /payfee.
VotingWIF string `json:"vwif"`
VotingWIF string

// VoteChoices is initially set in /payfee, but can be updated in
// /setvotechoices.
VoteChoices map[string]string `json:"vchces"`
VoteChoices map[string]string

// FeeTxHex and FeeTxHash will be set when the fee tx has been received.
FeeTxHex string `json:"fhex"`
FeeTxHash string `json:"fhsh"`
FeeTxHex string
FeeTxHash string

// FeeTxStatus indicates the current state of the fee transaction.
FeeTxStatus FeeStatus `json:"fsts"`
FeeTxStatus FeeStatus

// Outcome is set once a ticket is either voted or revoked. An empty outcome
// indicates that a ticket is still votable.
Outcome TicketOutcome `json:"otcme"`
Outcome TicketOutcome
}

func (t *Ticket) FeeExpired() bool {
Expand All @@ -86,22 +102,19 @@ func (vdb *VspDatabase) InsertNewTicket(ticket Ticket) error {
return vdb.db.Update(func(tx *bolt.Tx) error {
ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK)

hashBytes := []byte(ticket.Hash)

if ticketBkt.Get(hashBytes) != nil {
return fmt.Errorf("ticket already exists with hash %s", ticket.Hash)
// Create a bucket for the new ticket. Returns an error if bucket
// already exists.
newTicketBkt, err := ticketBkt.CreateBucket([]byte(ticket.Hash))
if err != nil {
return fmt.Errorf("could not create bucket for ticket: %w", err)
}

// Error if a ticket already exists with the same fee address.
err := ticketBkt.ForEach(func(k, v []byte) error {
var t Ticket
err := json.Unmarshal(v, &t)
if err != nil {
return fmt.Errorf("could not unmarshal ticket: %w", err)
}
err = ticketBkt.ForEach(func(k, v []byte) error {
tbkt := ticketBkt.Bucket(k)

if t.FeeAddress == ticket.FeeAddress {
return fmt.Errorf("ticket with fee address %s already exists", t.FeeAddress)
if string(tbkt.Get(feeAddressK)) == ticket.FeeAddress {
return fmt.Errorf("ticket with fee address %s already exists", ticket.FeeAddress)
}

return nil
Expand All @@ -110,23 +123,112 @@ func (vdb *VspDatabase) InsertNewTicket(ticket Ticket) error {
return err
}

ticketBytes, err := json.Marshal(ticket)
err = putTicketInBucket(newTicketBkt, ticket)
if err != nil {
return fmt.Errorf("could not marshal ticket: %w", err)
return fmt.Errorf("putting ticket in bucket failed: %w", err)
}

return ticketBkt.Put(hashBytes, ticketBytes)
return nil
})
}

// putTicketInBucket encodes each of the fields of the provided ticket as a byte
// array, and stores them as values within the provided db bucket.
func putTicketInBucket(bkt *bolt.Bucket, ticket Ticket) error {
var err error
if err = bkt.Put(hashK, []byte(ticket.Hash)); err != nil {
return err
}
if err = bkt.Put(commitmentAddressK, []byte(ticket.CommitmentAddress)); err != nil {
return err
}
if err = bkt.Put(feeAddressK, []byte(ticket.FeeAddress)); err != nil {
return err
}
if err = bkt.Put(votingWIFK, []byte(ticket.VotingWIF)); err != nil {
return err
}
if err = bkt.Put(feeTxHexK, []byte(ticket.FeeTxHex)); err != nil {
return err
}
if err = bkt.Put(feeTxHashK, []byte(ticket.FeeTxHash)); err != nil {
return err
}
if err = bkt.Put(feeTxStatusK, []byte(ticket.FeeTxStatus)); err != nil {
return err
}
if err = bkt.Put(outcomeK, []byte(ticket.Outcome)); err != nil {
return err
}
if err = bkt.Put(purchaseHeightK, int64ToBytes(ticket.PurchaseHeight)); err != nil {
return err
}
if err = bkt.Put(feeAddressIndexK, uint32ToBytes(ticket.FeeAddressIndex)); err != nil {
return err
}
if err = bkt.Put(feeAmountK, int64ToBytes(ticket.FeeAmount)); err != nil {
return err
}
if err = bkt.Put(feeExpirationK, int64ToBytes(ticket.FeeExpiration)); err != nil {
return err
}

confirmed := []byte{0}
if ticket.Confirmed {
confirmed = []byte{1}
}
if err = bkt.Put(confirmedK, confirmed); err != nil {
return err
}

jsonVoteChoices, err := json.Marshal(ticket.VoteChoices)
if err != nil {
return err
}
return bkt.Put(voteChoicesK, jsonVoteChoices)
}

func getTicketFromBkt(bkt *bolt.Bucket) (Ticket, error) {
var ticket Ticket

ticket.Hash = string(bkt.Get(hashK))
ticket.CommitmentAddress = string(bkt.Get(commitmentAddressK))
ticket.FeeAddress = string(bkt.Get(feeAddressK))
ticket.VotingWIF = string(bkt.Get(votingWIFK))
ticket.FeeTxHex = string(bkt.Get(feeTxHexK))
ticket.FeeTxHash = string(bkt.Get(feeTxHashK))
ticket.FeeTxStatus = FeeStatus(bkt.Get(feeTxStatusK))
ticket.Outcome = TicketOutcome(bkt.Get(outcomeK))

ticket.PurchaseHeight = bytesToInt64(bkt.Get(purchaseHeightK))
ticket.FeeAddressIndex = bytesToUint32(bkt.Get(feeAddressIndexK))
ticket.FeeAmount = bytesToInt64(bkt.Get(feeAmountK))
ticket.FeeExpiration = bytesToInt64(bkt.Get(feeExpirationK))

// TODO is this dodgy?
if bkt.Get(confirmedK)[0] == byte(1) {
ticket.Confirmed = true
}

voteChoices := make(map[string]string)
err := json.Unmarshal(bkt.Get(voteChoicesK), &voteChoices)
if err != nil {
return ticket, err
}

ticket.VoteChoices = voteChoices

return ticket, nil
}

func (vdb *VspDatabase) DeleteTicket(ticket Ticket) error {
vdb.ticketsMtx.Lock()
defer vdb.ticketsMtx.Unlock()

return vdb.db.Update(func(tx *bolt.Tx) error {
ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK)

err := ticketBkt.Delete([]byte(ticket.Hash))
err := ticketBkt.DeleteBucket([]byte(ticket.Hash))
if err != nil {
return fmt.Errorf("could not delete ticket: %w", err)
}
Expand All @@ -142,18 +244,13 @@ func (vdb *VspDatabase) UpdateTicket(ticket Ticket) error {
return vdb.db.Update(func(tx *bolt.Tx) error {
ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK)

hashBytes := []byte(ticket.Hash)
bkt := ticketBkt.Bucket([]byte(ticket.Hash))

if ticketBkt.Get(hashBytes) == nil {
if bkt == nil {
return fmt.Errorf("ticket does not exist with hash %s", ticket.Hash)
}

ticketBytes, err := json.Marshal(ticket)
if err != nil {
return fmt.Errorf("could not marshal ticket: %w", err)
}

return ticketBkt.Put(hashBytes, ticketBytes)
return putTicketInBucket(bkt, ticket)
})
}

Expand All @@ -164,16 +261,16 @@ func (vdb *VspDatabase) GetTicketByHash(ticketHash string) (Ticket, bool, error)
var ticket Ticket
var found bool
err := vdb.db.View(func(tx *bolt.Tx) error {
ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK)
ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK).Bucket([]byte(ticketHash))

ticketBytes := ticketBkt.Get([]byte(ticketHash))
if ticketBytes == nil {
if ticketBkt == nil {
return nil
}

err := json.Unmarshal(ticketBytes, &ticket)
var err error
ticket, err = getTicketFromBkt(ticketBkt)
if err != nil {
return fmt.Errorf("could not unmarshal ticket: %w", err)
return fmt.Errorf("could not get ticket: %w", err)
}

found = true
Expand All @@ -185,8 +282,7 @@ func (vdb *VspDatabase) GetTicketByHash(ticketHash string) (Ticket, bool, error)
}

// CountTickets returns the total number of voted, revoked, and currently voting
// tickets. Requires deserializing every ticket in the db so should be used
// sparingly.
// tickets. This func iterates over every ticket so should be used sparingly.
func (vdb *VspDatabase) CountTickets() (int64, int64, int64, error) {
vdb.ticketsMtx.RLock()
defer vdb.ticketsMtx.RUnlock()
Expand All @@ -196,14 +292,10 @@ func (vdb *VspDatabase) CountTickets() (int64, int64, int64, error) {
ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK)

return ticketBkt.ForEach(func(k, v []byte) error {
var ticket Ticket
err := json.Unmarshal(v, &ticket)
if err != nil {
return fmt.Errorf("could not unmarshal ticket: %w", err)
}
tBkt := ticketBkt.Bucket(k)

if ticket.FeeTxStatus == FeeConfirmed {
switch ticket.Outcome {
if FeeStatus(tBkt.Get(feeTxStatusK)) == FeeConfirmed {
switch TicketOutcome(tBkt.Get(outcomeK)) {
case Voted:
voted++
case Revoked:
Expand Down Expand Up @@ -270,10 +362,9 @@ func (vdb *VspDatabase) filterTickets(filter func(Ticket) bool) ([]Ticket, error
ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK)

return ticketBkt.ForEach(func(k, v []byte) error {
var ticket Ticket
err := json.Unmarshal(v, &ticket)
ticket, err := getTicketFromBkt(ticketBkt.Bucket(k))
if err != nil {
return fmt.Errorf("could not unmarshal ticket: %w", err)
return fmt.Errorf("could not get ticket: %w", err)
}

if filter(ticket) {
Expand Down
7 changes: 5 additions & 2 deletions database/ticket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,14 @@ func testUpdateTicket(t *testing.T) {
// Update ticket with new values.
ticket.FeeAmount = ticket.FeeAmount + 1
ticket.FeeExpiration = ticket.FeeExpiration + 1
ticket.VoteChoices = map[string]string{"New agenda": "New value"}

err = db.UpdateTicket(ticket)
if err != nil {
t.Fatalf("error updating ticket: %v", err)
}

// Retrieve ticket from database.
// Retrieve updated ticket from database.
retrieved, found, err := db.GetTicketByHash(ticket.Hash)
if err != nil {
t.Fatalf("error retrieving ticket by ticket hash: %v", err)
Expand All @@ -174,7 +176,8 @@ func testUpdateTicket(t *testing.T) {
}

if ticket.FeeAmount != retrieved.FeeAmount ||
ticket.FeeExpiration != retrieved.FeeExpiration {
ticket.FeeExpiration != retrieved.FeeExpiration ||
!reflect.DeepEqual(retrieved.VoteChoices, ticket.VoteChoices) {
t.Fatal("retrieved ticket value didnt match expected")
}

Expand Down
4 changes: 2 additions & 2 deletions database/v2_upgrade.go → database/upgrade_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func removeOldFeeTxUpgrade(db *bolt.DB) error {
count := 0
err := ticketBkt.ForEach(func(k, v []byte) error {
// Deserialize the old ticket.
var ticket Ticket
var ticket v1Ticket
err := json.Unmarshal(v, &ticket)
if err != nil {
return fmt.Errorf("could not unmarshal ticket: %w", err)
Expand Down Expand Up @@ -51,7 +51,7 @@ func removeOldFeeTxUpgrade(db *bolt.DB) error {
// Update database version.
err = vspBkt.Put(versionK, uint32ToBytes(removeOldFeeTxVersion))
if err != nil {
return err
return fmt.Errorf("failed to update db version: %w", err)
}

return nil
Expand Down
Loading

0 comments on commit 20cb546

Please sign in to comment.