From 2a4b4e568d7c6761566eac4b2ec93998ecf96172 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 16 Oct 2023 14:36:02 +0200 Subject: [PATCH 1/4] stores: add announcement max age --- bus/client/client_test.go | 7 ++++--- cmd/renterd/main.go | 2 ++ config/config.go | 1 + internal/node/node.go | 3 ++- internal/testing/cluster.go | 9 +++++---- stores/autopilot_test.go | 21 --------------------- stores/hostdb.go | 22 +++++++++++----------- stores/hostdb_test.go | 2 +- stores/metadata_test.go | 2 +- stores/sql.go | 12 +++++++++++- stores/sql_test.go | 2 +- 11 files changed, 39 insertions(+), 44 deletions(-) diff --git a/bus/client/client_test.go b/bus/client/client_test.go index 37f5c88c3..db3d7fe3a 100644 --- a/bus/client/client_test.go +++ b/bus/client/client_test.go @@ -72,9 +72,10 @@ func newTestClient(dir string) (*client.Client, func() error, func(context.Conte client := client.New("http://"+l.Addr().String(), "test") b, cleanup, err := node.NewBus(node.BusConfig{ Bus: config.Bus{ - Bootstrap: false, - GatewayAddr: "127.0.0.1:0", - UsedUTXOExpiry: time.Minute, + AnnouncementMaxAgeHours: 24 * 7 * 52, // 1 year + Bootstrap: false, + GatewayAddr: "127.0.0.1:0", + UsedUTXOExpiry: time.Minute, }, Miner: node.NewMiner(client), }, filepath.Join(dir, "bus"), types.GeneratePrivateKey(), zap.New(zapcore.NewNopCore())) diff --git a/cmd/renterd/main.go b/cmd/renterd/main.go index 434314099..8685e0cc1 100644 --- a/cmd/renterd/main.go +++ b/cmd/renterd/main.go @@ -72,6 +72,7 @@ var ( Level: "warn", }, Bus: config.Bus{ + AnnouncementMaxAgeHours: 24 * 7 * 52, // 1 year Bootstrap: true, GatewayAddr: build.DefaultGatewayAddress, PersistInterval: time.Minute, @@ -276,6 +277,7 @@ func main() { flag.DurationVar(&cfg.Database.Log.SlowThreshold, "db.logger.slowThreshold", cfg.Database.Log.SlowThreshold, "slow threshold for logger - can be overwritten using RENTERD_DB_LOGGER_SLOW_THRESHOLD environment variable") // bus + flag.Uint64Var(&cfg.Bus.AnnouncementMaxAgeHours, "bus.announcementMaxAgeHours", cfg.Bus.AnnouncementMaxAgeHours, "announcements older than this are ignored") flag.BoolVar(&cfg.Bus.Bootstrap, "bus.bootstrap", cfg.Bus.Bootstrap, "bootstrap the gateway and consensus modules") flag.StringVar(&cfg.Bus.GatewayAddr, "bus.gatewayAddr", cfg.Bus.GatewayAddr, "address to listen on for Sia peer connections - can be overwritten using RENTERD_BUS_GATEWAY_ADDR environment variable") flag.DurationVar(&cfg.Bus.PersistInterval, "bus.persistInterval", cfg.Bus.PersistInterval, "interval at which to persist the consensus updates") diff --git a/config/config.go b/config/config.go index d8a5b52fe..83f3586be 100644 --- a/config/config.go +++ b/config/config.go @@ -49,6 +49,7 @@ type ( // Bus contains the configuration for a bus. Bus struct { + AnnouncementMaxAgeHours uint64 `yaml:"announcementMaxAgeHours"` Bootstrap bool `yaml:"bootstrap"` GatewayAddr string `yaml:"gatewayAddr"` RemoteAddr string `yaml:"remoteAddr"` diff --git a/internal/node/node.go b/internal/node/node.go index 0cdf8d116..2e9d29369 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -232,7 +232,8 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht sqlLogger := stores.NewSQLLogger(l.Named("db"), cfg.DBLoggerConfig) walletAddr := wallet.StandardAddress(seed.PublicKey()) sqlStoreDir := filepath.Join(dir, "partial_slabs") - sqlStore, ccid, err := stores.NewSQLStore(dbConn, alerts.WithOrigin(alertsMgr, "bus"), sqlStoreDir, true, cfg.PersistInterval, walletAddr, cfg.SlabBufferCompletionThreshold, l.Sugar(), sqlLogger) + announcementMaxAge := time.Duration(cfg.AnnouncementMaxAgeHours) * time.Hour + sqlStore, ccid, err := stores.NewSQLStore(dbConn, alerts.WithOrigin(alertsMgr, "bus"), sqlStoreDir, true, announcementMaxAge, cfg.PersistInterval, walletAddr, cfg.SlabBufferCompletionThreshold, l.Sugar(), sqlLogger) if err != nil { return nil, nil, err } diff --git a/internal/testing/cluster.go b/internal/testing/cluster.go index a856cf3cc..9ad07f0c5 100644 --- a/internal/testing/cluster.go +++ b/internal/testing/cluster.go @@ -918,10 +918,11 @@ func testNetwork() *consensus.Network { func testBusCfg() node.BusConfig { return node.BusConfig{ Bus: config.Bus{ - Bootstrap: false, - GatewayAddr: "127.0.0.1:0", - PersistInterval: testPersistInterval, - UsedUTXOExpiry: time.Minute, + AnnouncementMaxAgeHours: 24 * 7 * 52, // 1 year + Bootstrap: false, + GatewayAddr: "127.0.0.1:0", + PersistInterval: testPersistInterval, + UsedUTXOExpiry: time.Minute, }, Network: testNetwork(), } diff --git a/stores/autopilot_test.go b/stores/autopilot_test.go index da75b357e..5d07374a0 100644 --- a/stores/autopilot_test.go +++ b/stores/autopilot_test.go @@ -90,24 +90,3 @@ func TestAutopilotStore(t *testing.T) { t.Fatal("expected amount to be 99") } } - -// testAutopilotConfig is the autopilot used for testing unless a different -// one is explicitly set. -var testAutopilotConfig = api.AutopilotConfig{ - Contracts: api.ContractsConfig{ - Allowance: types.Siacoins(1).Mul64(1e3), - Amount: 3, - Period: 144, - RenewWindow: 72, - - Download: rhpv2.SectorSize * 500, - Upload: rhpv2.SectorSize * 500, - Storage: rhpv2.SectorSize * 5e3, - - Set: testContractSet, - }, - Hosts: api.HostsConfig{ - MaxDowntimeHours: 10, - AllowRedundantIPs: true, // allow for integration tests by default - }, -} diff --git a/stores/hostdb.go b/stores/hostdb.go index c0227e108..04b1005f6 100644 --- a/stores/hostdb.go +++ b/stores/hostdb.go @@ -33,10 +33,6 @@ const ( // database per batch. Empirically tested to verify that this is a value // that performs reasonably well. hostRetrievalBatchSize = 10000 - - // interactionInsertionBatchSize is the number of interactions we insert at - // once. - interactionInsertionBatchSize = 100 ) var ( @@ -919,16 +915,20 @@ func (ss *SQLStore) processConsensusChangeHostDB(cc modules.ConsensusChange) { var newAnnouncements []announcement for _, sb := range cc.AppliedBlocks { - // Fetch announcements and add them to the queue. var b types.Block convertToCore(sb, &b) - hostdb.ForEachAnnouncement(b, height, func(hostKey types.PublicKey, ha hostdb.Announcement) { - newAnnouncements = append(newAnnouncements, announcement{ - hostKey: publicKey(hostKey), - announcement: ha, + + // Process announcements, but only if they are not too old. + if b.Timestamp.After(time.Now().UTC().Add(-ss.announcementMaxAge)) { + hostdb.ForEachAnnouncement(b, height, func(hostKey types.PublicKey, ha hostdb.Announcement) { + newAnnouncements = append(newAnnouncements, announcement{ + hostKey: publicKey(hostKey), + announcement: ha, + }) + ss.unappliedHostKeys[hostKey] = struct{}{} }) - ss.unappliedHostKeys[hostKey] = struct{}{} - }) + } + // Update RevisionHeight and RevisionNumber for our contracts. for _, txn := range sb.Transactions { for _, rev := range txn.FileContractRevisions { diff --git a/stores/hostdb_test.go b/stores/hostdb_test.go index e781d22de..0dcf53ab9 100644 --- a/stores/hostdb_test.go +++ b/stores/hostdb_test.go @@ -149,7 +149,7 @@ func TestSQLHostDB(t *testing.T) { // Connect to the same DB again. conn2 := NewEphemeralSQLiteConnection(dbName) am := alerts.WithOrigin(alerts.NewManager(), "test") - hdb2, ccid, err := NewSQLStore(conn2, am, dir, false, time.Second, types.Address{}, 0, zap.NewNop().Sugar(), nil) + hdb2, ccid, err := NewSQLStore(conn2, am, dir, false, time.Hour, time.Second, types.Address{}, 0, zap.NewNop().Sugar(), nil) if err != nil { t.Fatal(err) } diff --git a/stores/metadata_test.go b/stores/metadata_test.go index 63971101c..f9c0d8eaa 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -2979,7 +2979,7 @@ func TestPartialSlab(t *testing.T) { // Restart it. The buffer should still be there. conn := NewEphemeralSQLiteConnection(dbName) - db2, _, err := NewSQLStore(conn, alerts.NewManager(), dir, false, time.Hour, types.Address{}, 0, zap.NewNop().Sugar(), newTestLogger()) + db2, _, err := NewSQLStore(conn, alerts.NewManager(), dir, false, time.Hour, time.Hour, types.Address{}, 0, zap.NewNop().Sugar(), newTestLogger()) if err != nil { t.Fatal(err) } diff --git a/stores/sql.go b/stores/sql.go index fa4e797ce..fdc854104 100644 --- a/stores/sql.go +++ b/stores/sql.go @@ -56,6 +56,9 @@ type ( unappliedOutputChanges []outputChange unappliedTxnChanges []txnChange + // HostDB related fields + announcementMaxAge time.Duration + // SettingsDB related fields. settingsMu sync.Mutex settings map[string]string @@ -127,7 +130,12 @@ func DBConfigFromEnv() (uri, user, password, dbName string) { // NewSQLStore uses a given Dialector to connect to a SQL database. NOTE: Only // pass migrate=true for the first instance of SQLHostDB if you connect via the // same Dialector multiple times. -func NewSQLStore(conn gorm.Dialector, alerts alerts.Alerter, partialSlabDir string, migrate bool, persistInterval time.Duration, walletAddress types.Address, slabBufferCompletionThreshold int64, logger *zap.SugaredLogger, gormLogger glogger.Interface) (*SQLStore, modules.ConsensusChangeID, error) { +func NewSQLStore(conn gorm.Dialector, alerts alerts.Alerter, partialSlabDir string, migrate bool, announcementMaxAge, persistInterval time.Duration, walletAddress types.Address, slabBufferCompletionThreshold int64, logger *zap.SugaredLogger, gormLogger glogger.Interface) (*SQLStore, modules.ConsensusChangeID, error) { + // Sanity check announcement max age. + if announcementMaxAge == 0 { + return nil, modules.ConsensusChangeID{}, errors.New("announcementMaxAge must be non-zero") + } + if err := os.MkdirAll(partialSlabDir, 0700); err != nil { return nil, modules.ConsensusChangeID{}, fmt.Errorf("failed to create partial slab dir: %v", err) } @@ -202,6 +210,8 @@ func NewSQLStore(conn gorm.Dialector, alerts alerts.Alerter, partialSlabDir stri unappliedRevisions: make(map[types.FileContractID]revisionUpdate), unappliedProofs: make(map[types.FileContractID]uint64), + announcementMaxAge: announcementMaxAge, + walletAddress: walletAddress, chainIndex: types.ChainIndex{ Height: ci.Height, diff --git a/stores/sql_test.go b/stores/sql_test.go index 205454eee..799eb211f 100644 --- a/stores/sql_test.go +++ b/stores/sql_test.go @@ -32,7 +32,7 @@ func newTestSQLStore(dir string) (*SQLStore, string, modules.ConsensusChangeID, conn := NewEphemeralSQLiteConnection(dbName) walletAddrs := types.Address(frand.Entropy256()) alerts := alerts.WithOrigin(alerts.NewManager(), "test") - sqlStore, ccid, err := NewSQLStore(conn, alerts, dir, true, time.Second, walletAddrs, 0, zap.NewNop().Sugar(), newTestLogger()) + sqlStore, ccid, err := NewSQLStore(conn, alerts, dir, true, time.Hour, time.Second, walletAddrs, 0, zap.NewNop().Sugar(), newTestLogger()) if err != nil { return nil, "", modules.ConsensusChangeID{}, err } From 6f7f900067e03d508cf5bd74ef1f4a644c878150 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 16 Oct 2023 21:11:51 +0200 Subject: [PATCH 2/4] testing: add TestAnnouncementMaxAge --- stores/hostdb.go | 2 +- stores/hostdb_test.go | 63 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/stores/hostdb.go b/stores/hostdb.go index 04b1005f6..bfaa13613 100644 --- a/stores/hostdb.go +++ b/stores/hostdb.go @@ -919,7 +919,7 @@ func (ss *SQLStore) processConsensusChangeHostDB(cc modules.ConsensusChange) { convertToCore(sb, &b) // Process announcements, but only if they are not too old. - if b.Timestamp.After(time.Now().UTC().Add(-ss.announcementMaxAge)) { + if b.Timestamp.After(time.Now().Add(-ss.announcementMaxAge)) { hostdb.ForEachAnnouncement(b, height, func(hostKey types.PublicKey, ha hostdb.Announcement) { newAnnouncements = append(newAnnouncements, announcement{ hostKey: publicKey(hostKey), diff --git a/stores/hostdb_test.go b/stores/hostdb_test.go index 0dcf53ab9..e9afad780 100644 --- a/stores/hostdb_test.go +++ b/stores/hostdb_test.go @@ -1,6 +1,7 @@ package stores import ( + "bytes" "context" "errors" "fmt" @@ -9,11 +10,13 @@ import ( "time" "github.com/google/go-cmp/cmp" + "gitlab.com/NebulousLabs/encoding" rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/api" "go.sia.tech/renterd/hostdb" + "go.sia.tech/siad/crypto" "go.sia.tech/siad/modules" stypes "go.sia.tech/siad/types" "go.uber.org/zap" @@ -1068,6 +1071,41 @@ func TestSQLHostBlocklistBasic(t *testing.T) { } } +// TestAnnouncementMaxAge verifies old announcements are ignored. +func TestAnnouncementMaxAge(t *testing.T) { + db, _, _, err := newTestSQLStore(t.TempDir()) + if err != nil { + t.Fatal(err) + } + + if len(db.unappliedAnnouncements) != 0 { + t.Fatal("expected 0 announcements") + } + + db.processConsensusChangeHostDB( + modules.ConsensusChange{ + ID: modules.ConsensusChangeID{1}, + BlockHeight: 1, + AppliedBlocks: []stypes.Block{ + { + Timestamp: stypes.Timestamp(time.Now().Add(-time.Hour).Add(-time.Minute).Unix()), + Transactions: []stypes.Transaction{newTestTransaction(newTestHostAnnouncement("foo.com:1000"))}, + }, + { + Timestamp: stypes.Timestamp(time.Now().Add(-time.Hour).Add(time.Minute).Unix()), + Transactions: []stypes.Transaction{newTestTransaction(newTestHostAnnouncement("foo.com:1001"))}, + }, + }, + }, + ) + + if len(db.unappliedAnnouncements) != 1 { + t.Fatal("expected 1 announcement") + } else if db.unappliedAnnouncements[0].announcement.NetAddress != "foo.com:1001" { + t.Fatal("unexpected announcement") + } +} + // addTestHosts adds 'n' hosts to the db and returns their keys. func (s *SQLStore) addTestHosts(n int) (keys []types.PublicKey, err error) { cnt, err := s.contractsCount() @@ -1127,3 +1165,28 @@ func newTestScan(hk types.PublicKey, scanTime time.Time, settings rhpv2.HostSett Settings: settings, } } + +func newTestPK() (stypes.SiaPublicKey, types.PrivateKey) { + sk := types.GeneratePrivateKey() + pk := sk.PublicKey() + return stypes.SiaPublicKey{ + Algorithm: stypes.SignatureEd25519, + Key: pk[:], + }, sk +} + +func newTestHostAnnouncement(na modules.NetAddress) (modules.HostAnnouncement, types.PrivateKey) { + spk, sk := newTestPK() + return modules.HostAnnouncement{ + Specifier: modules.PrefixHostAnnouncement, + NetAddress: na, + PublicKey: spk, + }, sk +} + +func newTestTransaction(ha modules.HostAnnouncement, sk types.PrivateKey) stypes.Transaction { + var buf bytes.Buffer + buf.Write(encoding.Marshal(ha)) + buf.Write(encoding.Marshal(sk.SignHash(types.Hash256(crypto.HashObject(ha))))) + return stypes.Transaction{ArbitraryData: [][]byte{buf.Bytes()}} +} From 9775a4556cd671d3aa7a8a054c470ff85788ca89 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 16 Oct 2023 21:15:47 +0200 Subject: [PATCH 3/4] fix: lint --- internal/node/node.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/node/node.go b/internal/node/node.go index 5c50d0646..ba0f0aa9a 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "path/filepath" + "time" "go.sia.tech/core/consensus" "go.sia.tech/core/types" From 4d0ea10bb0ee3faaa38bf5a0f4b90d7124c15303 Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 17 Oct 2023 09:39:59 +0200 Subject: [PATCH 4/4] hostdb: fix copy paste --- stores/hostdb.go | 1 + 1 file changed, 1 insertion(+) diff --git a/stores/hostdb.go b/stores/hostdb.go index 03caf2d44..6f695d143 100644 --- a/stores/hostdb.go +++ b/stores/hostdb.go @@ -925,6 +925,7 @@ func (ss *SQLStore) processConsensusChangeHostDB(cc modules.ConsensusChange) { hostKey: publicKey(hostKey), announcement: ha, }) + ss.unappliedHostKeys[hostKey] = struct{}{} }) }