Skip to content

Commit

Permalink
Add atxSync flag to disable atx syncing and http bulk sync api now sy…
Browse files Browse the repository at this point in the history
…ncs in batches
  • Loading branch information
kacpersaw committed Apr 17, 2024
1 parent d48c6c3 commit 2108597
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 17 deletions.
11 changes: 10 additions & 1 deletion cmd/collector/main.go
Expand Up @@ -36,6 +36,7 @@ var (
apiHostFlag string
apiPortFlag int
recalculateEpochStatsBoolFlag bool
atxSyncFlag bool
)

var flags = []cli.Flag{
Expand Down Expand Up @@ -134,6 +135,14 @@ var flags = []cli.Flag{
Destination: &apiPortFlag,
EnvVars: []string{"SPACEMESH_API_PORT"},
},
&cli.BoolFlag{
Name: "atxSync",
Usage: ``,
Required: false,
Value: true,
Destination: &atxSyncFlag,
EnvVars: []string{"SPACEMESH_ATX_SYNC"},
},
}

func main() {
Expand Down Expand Up @@ -165,7 +174,7 @@ func main() {
dbClient := &sql.Client{}

c := collector.NewCollector(nodePublicAddressStringFlag, nodePrivateAddressStringFlag,
syncMissingLayersBoolFlag, syncFromLayerFlag, recalculateEpochStatsBoolFlag, mongoStorage, db, dbClient)
syncMissingLayersBoolFlag, syncFromLayerFlag, recalculateEpochStatsBoolFlag, mongoStorage, db, dbClient, atxSyncFlag)
mongoStorage.AccountUpdater = c

sigs := make(chan os.Signal, 1)
Expand Down
14 changes: 10 additions & 4 deletions collector/collector.go
Expand Up @@ -57,6 +57,7 @@ type Collector struct {
syncMissingLayersFlag bool
recalculateEpochStatsFlag bool
syncFromLayerFlag uint32
atxSyncFlag bool

listener Listener
db *sql2.Database
Expand All @@ -79,7 +80,9 @@ type Collector struct {
notify chan int
}

func NewCollector(nodePublicAddress string, nodePrivateAddress string, syncMissingLayersFlag bool, syncFromLayerFlag int, recalculateEpochStatsFlag bool, listener Listener, db *sql2.Database, dbClient sql.DatabaseClient) *Collector {
func NewCollector(nodePublicAddress string, nodePrivateAddress string, syncMissingLayersFlag bool,
syncFromLayerFlag int, recalculateEpochStatsFlag bool,
listener Listener, db *sql2.Database, dbClient sql.DatabaseClient, atxSyncFlag bool) *Collector {
return &Collector{
apiPublicUrl: nodePublicAddress,
apiPrivateUrl: nodePrivateAddress,
Expand All @@ -90,6 +93,7 @@ func NewCollector(nodePublicAddress string, nodePrivateAddress string, syncMissi
notify: make(chan int),
db: db,
dbClient: dbClient,
atxSyncFlag: atxSyncFlag,
}
}

Expand Down Expand Up @@ -132,9 +136,11 @@ func (c *Collector) Run() error {
return errors.Join(errors.New("cannot get network info"), err)
}

err = c.syncActivations()
if err != nil {
return errors.Join(errors.New("cannot sync activations"), err)
if c.atxSyncFlag {
err = c.syncActivations()
if err != nil {
return errors.Join(errors.New("cannot sync activations"), err)
}
}

if c.syncMissingLayersFlag {
Expand Down
2 changes: 1 addition & 1 deletion collector/collector_test.go
Expand Up @@ -88,7 +88,7 @@ func TestMain(m *testing.M) {

collectorApp = collector.NewCollector(fmt.Sprintf("localhost:%d", node.NodePort),
fmt.Sprintf("localhost:%d", privateNode.NodePort), false,
0, false, storageDB, sqlDb, dbClient)
0, false, storageDB, sqlDb, dbClient, true)
storageDB.AccountUpdater = collectorApp
defer storageDB.Close()
go collectorApp.Run()
Expand Down
23 changes: 17 additions & 6 deletions collector/http.go
Expand Up @@ -94,16 +94,27 @@ func (c *Collector) StartHttpServer(apiHost string, apiPort int) {

log.Info("http syncing atxs for epoch %s", epoch)
go func() {
var atxs []*model.Activation
err = c.dbClient.GetAtxsByEpoch(c.db, epochId, func(atx *types.VerifiedActivationTx) bool {
atxs = append(atxs, model.NewActivation(atx))
return true
})
count, err := c.dbClient.CountAtxsByEpoch(c.db, epochId)
if err != nil {
log.Warning("syncing atxs for %s failed with error %d", epoch, err)
return
}
c.listener.OnActivations(atxs)
batchSize := 100000
totalPages := (count + batchSize - 1) / batchSize
for page := 0; page < totalPages; page++ {
offset := page * batchSize
var atxs []*model.Activation
err = c.dbClient.GetAtxsByEpochPaginated(c.db, epochId, int64(batchSize), int64(offset), func(atx *types.VerifiedActivationTx) bool {
atxs = append(atxs, model.NewActivation(atx))
return true
})
if err != nil {
log.Warning("syncing atxs for %s failed with error %d", epoch, err)
return
}
c.listener.OnActivations(atxs)
atxs = nil
}
c.listener.RecalculateEpochStats()
}()

Expand Down
6 changes: 5 additions & 1 deletion collector/mesh.go
Expand Up @@ -3,6 +3,7 @@ package collector
import (
"context"
"fmt"
"github.com/spacemeshos/explorer-backend/model"
"github.com/spacemeshos/explorer-backend/utils"
"github.com/spacemeshos/go-spacemesh/common/types"
"go.mongodb.org/mongo-driver/bson"
Expand Down Expand Up @@ -238,14 +239,17 @@ func (c *Collector) syncActivations() error {
received := c.listener.GetLastActivationReceived()
log.Info("Syncing activations from %d", received)

var atxs []*model.Activation
err := c.dbClient.GetAtxsReceivedAfter(c.db, received, func(atx *types.VerifiedActivationTx) bool {
c.listener.OnActivation(atx)
atxs = append(atxs, model.NewActivation(atx))
return true
})
if err != nil {
return err
}

c.listener.OnActivations(atxs)

return nil
}

Expand Down
8 changes: 5 additions & 3 deletions collector/node.go
Expand Up @@ -54,9 +54,11 @@ func (c *Collector) syncStatusPump() error {
fmt.Errorf("syncNotProcessedTxs error: %v", err)
}

err = c.syncActivations()
if err != nil {
fmt.Errorf("syncActivations error: %v", err)
if c.atxSyncFlag {
err = c.syncActivations()
if err != nil {
fmt.Errorf("syncActivations error: %v", err)

Check failure on line 60 in collector/node.go

View workflow job for this annotation

GitHub Actions / lint

unusedresult: result of fmt.Errorf call not used (govet)
}
}

err = c.createFutureEpoch()
Expand Down
41 changes: 40 additions & 1 deletion collector/sql/atxs.go
Expand Up @@ -53,7 +53,7 @@ func decoder(fn decoderCallback) sql.Decoder {
func (c *Client) GetAtxsReceivedAfter(db *sql.Database, ts int64, fn func(tx *types.VerifiedActivationTx) bool) error {
var derr error
_, err := db.Exec(
fullQuery+` WHERE received > ?1 ORDER BY epoch asc, id asc`,
fullQuery+` WHERE received > ?1`,
func(stmt *sql.Statement) {
stmt.BindInt64(1, ts)
},
Expand Down Expand Up @@ -91,3 +91,42 @@ func (c *Client) GetAtxsByEpoch(db *sql.Database, epoch int64, fn func(tx *types
}
return derr
}

func (c *Client) CountAtxsByEpoch(db *sql.Database, epoch int64) (int, error) {
var totalCount int
_, err := db.Exec(
`SELECT COUNT(*) FROM atxs WHERE epoch = ?1`,
func(stmt *sql.Statement) {
stmt.BindInt64(1, epoch)
}, func(stmt *sql.Statement) bool {
totalCount = stmt.ColumnInt(0)
return true
})
if err != nil {
return 0, err
}
return totalCount, nil
}

func (c *Client) GetAtxsByEpochPaginated(db *sql.Database, epoch, limit, offset int64, fn func(tx *types.VerifiedActivationTx) bool) error {
var derr error
_, err := db.Exec(
fullQuery+` WHERE epoch = ?1 ORDER BY epoch asc, id asc LIMIT ?2 OFFSET ?3`,
func(stmt *sql.Statement) {
stmt.BindInt64(1, epoch)
stmt.BindInt64(2, limit)
stmt.BindInt64(3, offset)
},
decoder(func(atx *types.VerifiedActivationTx, err error) bool {
if atx != nil {
return fn(atx)
}
derr = err
return derr == nil
}),
)
if err != nil {
return err
}
return derr
}
2 changes: 2 additions & 0 deletions collector/sql/sql.go
Expand Up @@ -14,6 +14,8 @@ type DatabaseClient interface {
AccountsSnapshot(db *sql.Database, lid types.LayerID) (rst []*types.Account, err error)
GetAtxsReceivedAfter(db *sql.Database, ts int64, fn func(tx *types.VerifiedActivationTx) bool) error
GetAtxsByEpoch(db *sql.Database, epoch int64, fn func(tx *types.VerifiedActivationTx) bool) error
CountAtxsByEpoch(db *sql.Database, epoch int64) (int, error)
GetAtxsByEpochPaginated(db *sql.Database, epoch, limit, offset int64, fn func(tx *types.VerifiedActivationTx) bool) error
}

type Client struct{}
Expand Down
8 changes: 8 additions & 0 deletions test/testseed/db.go
Expand Up @@ -206,6 +206,14 @@ func (c *Client) GetAtxsByEpoch(db *sql.Database, epoch int64, fn func(tx *types
return nil
}

func (c *Client) CountAtxsByEpoch(db *sql.Database, epoch int64) (int, error) {
return 0, nil
}

func (c *Client) GetAtxsByEpochPaginated(db *sql.Database, epoch, limit, offset int64, fn func(tx *types.VerifiedActivationTx) bool) error {
return nil
}

func mustParse(str string) []byte {
res, err := utils.StringToBytes(str)
if err != nil {
Expand Down

0 comments on commit 2108597

Please sign in to comment.