Skip to content

Commit

Permalink
Add new index to make finding spent outputs to update faster.
Browse files Browse the repository at this point in the history
  • Loading branch information
tiger5226 committed Oct 25, 2020
1 parent eb0f738 commit b924f99
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 34 deletions.
87 changes: 53 additions & 34 deletions daemon/jobs/claimtriesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package jobs
import (
"database/sql"
"encoding/json"
"fmt"
"runtime"
"strconv"
"sync"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/lbryio/chainquery/model"

"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/query"

"github.com/sirupsen/logrus"
"github.com/volatiletech/null"
Expand Down Expand Up @@ -349,40 +351,42 @@ func getUpdatedClaims(jobStatus *model.JobStatus) (model.ClaimSlice, error) {
}

func getSpentClaimsToUpdate(hasUpdate bool) (model.ClaimSlice, error) {

claim := model.TableNames.Claim

claimID := claim + "." + model.ClaimColumns.ID
claimTxByHashUpdate := claim + "." + model.ClaimColumns.TransactionHashUpdate
claimVoutUpdate := claim + "." + model.ClaimColumns.VoutUpdate
claimClaimID := claim + "." + model.ClaimColumns.ClaimID
claimTxByHash := claim + "." + model.ClaimColumns.TransactionHashID
claimVout := claim + "." + model.ClaimColumns.Vout
claimBidState := claim + "." + model.ClaimColumns.BidState

output := model.TableNames.Output
outputTxHash := output + "." + model.OutputColumns.TransactionHash
outputVout := output + "." + model.OutputColumns.Vout
outputIsSpent := output + "." + model.OutputColumns.IsSpent
outputModifiedAt := output + "." + model.OutputColumns.ModifiedAt

claimJoin := `INNER JOIN ` + claim + ` ON ` + claimTxByHash + ` = ` + outputTxHash + `
AND ` + claimVout + ` = ` + outputVout

if hasUpdate {
claimJoin = `INNER JOIN ` + claim + ` ON ` + claimTxByHashUpdate + ` = ` + outputTxHash + `
AND ` + claimVoutUpdate + ` = ` + outputVout
w := model.OutputWhere
o := model.OutputColumns
outputMods := []qm.QueryMod{
qm.Select(o.ID, o.IsSpent, o.TransactionHash),
w.ModifiedAt.GTE(lastSync.PreviousSyncTime),
w.IsSpent.EQ(true),
qm.Limit(5000),
}
var outputs model.OutputSlice
var claims model.ClaimSlice
var claimsToAdd model.ClaimSlice
var err error
outputs, err = model.Outputs(outputMods...).AllG()
for len(outputs) > 0 {
var txHashList []interface{}
for _, o := range outputs {
txHashList = append(txHashList, o.TransactionHash)
}
txHashCol := model.ClaimColumns.TransactionHashID
if hasUpdate {
txHashCol = model.ClaimColumns.TransactionHashUpdate
}
c := model.ClaimColumns
claimsToAdd, err = model.Claims(qm.Select(c.ID, c.ClaimID, c.TransactionHashUpdate), qm.WhereIn(txHashCol+" IN ?", txHashList...)).AllG()
if err != nil {
return nil, errors.Err(err)
}
claims = append(claims, claimsToAdd...)
logrus.Debug("outputs found: ", len(outputs), " claims found up to: ", len(claims))
nextOutputMods := append(outputMods, w.ID.GT(outputs[len(outputs)-1].ID))
outputs, err = model.Outputs(nextOutputMods...).AllG()
if err != nil {
return nil, errors.Err(err)
}
}

query := `
SELECT ` + claimClaimID + `,` + claimID + `,` + claimTxByHashUpdate + `
FROM ` + output + `
` + claimJoin + `
WHERE ` + outputModifiedAt + ` >= ?
AND ` + outputIsSpent + ` = ?
AND ` + claimBidState + ` != ?`
printDebug(query)
return model.Claims(qm.SQL(query, lastSync.PreviousSyncTime, 1, "Spent")).AllG()
return claims, nil
}

func updateSpentClaims() error {
Expand Down Expand Up @@ -410,9 +414,24 @@ func updateSpentClaims() error {
for _, claim := range claims {
claim.BidState = "Spent"
claim.ModifiedAt = time.Now()
if err := claim.UpdateG(boil.Whitelist(model.ClaimColumns.BidState, model.ClaimColumns.ModifiedAt)); err != nil {
}
upTo := 1000
logrus.Debugf("%d claims left to update", len(claims))
for len(claims) > 0 {
if len(claims) < upTo {
upTo = len(claims)
}
toUpdate := claims[:upTo]
args := []interface{}{time.Now()}
for _, c := range toUpdate {
args = append(args, c.ID)
}
query := fmt.Sprintf(`UPDATE claim SET bid_state="Spent", modified_at = ? WHERE id IN (%s)`, query.Qs(len(toUpdate)))
if _, err := boil.GetDB().Exec(query, args...); err != nil {
return err
}
logrus.Debugf("%d claims left to update", len(claims))
claims = claims[upTo:]
}
return nil
}
Expand Down
6 changes: 6 additions & 0 deletions migration/025_outputindices.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- +migrate Up

-- +migrate StatementBegin
ALTER TABLE output
ADD INDEX Idx_ModifedSpentTxHash (modified_at ASC, is_spent ASC, transaction_hash ASC);
-- +migrate StatementEnd

0 comments on commit b924f99

Please sign in to comment.