diff --git a/alerts/alerts.go b/alerts/alerts.go index f98d53e..bb1db0f 100644 --- a/alerts/alerts.go +++ b/alerts/alerts.go @@ -8,6 +8,7 @@ import ( "time" "go.sia.tech/core/types" + "go.uber.org/zap" ) const ( @@ -30,6 +31,11 @@ type ( // Severity indicates the severity of an alert. Severity uint8 + // An EventReporter broadcasts events to subscribers. + EventReporter interface { + BroadcastEvent(event string, scope string, data any) error + } + // An Alert is a dismissible message that is displayed to the user. Alert struct { // ID is a unique identifier for the alert. @@ -46,6 +52,9 @@ type ( // A Manager manages the host's alerts. Manager struct { + log *zap.Logger + events EventReporter + mu sync.Mutex // alerts is a map of alert IDs to their current alert. alerts map[types.Hash256]Alert @@ -99,6 +108,10 @@ func (m *Manager) Register(a Alert) { panic("cannot register alert with zero timestamp") // developer error } + if err := m.events.BroadcastEvent("alert", "alerts."+a.Severity.String(), a); err != nil { + m.log.Error("failed to broadcast alert", zap.Error(err)) + } + m.mu.Lock() m.alerts[a.ID] = a m.mu.Unlock() @@ -129,8 +142,11 @@ func (m *Manager) Active() []Alert { } // NewManager initializes a new alerts manager. -func NewManager() *Manager { +func NewManager(er EventReporter, log *zap.Logger) *Manager { return &Manager{ + log: log, + events: er, + alerts: make(map[types.Hash256]Alert), } } diff --git a/api/api.go b/api/api.go index b3e9a33..13753d5 100644 --- a/api/api.go +++ b/api/api.go @@ -17,6 +17,7 @@ import ( "go.sia.tech/hostd/host/storage" "go.sia.tech/hostd/rhp" "go.sia.tech/hostd/wallet" + "go.sia.tech/hostd/webhooks" "go.sia.tech/jape" "go.sia.tech/siad/modules" "go.uber.org/zap" @@ -113,6 +114,14 @@ type ( AcceptTransactionSet(txns []types.Transaction) error } + // WebHooks manages webhooks + WebHooks interface { + WebHooks() ([]webhooks.WebHook, error) + RegisterWebHook(callbackURL string, scopes []string) (webhooks.WebHook, error) + UpdateWebHook(id int64, callbackURL string, scopes []string) (webhooks.WebHook, error) + RemoveWebHook(id int64) error + } + // A RHPSessionReporter reports on RHP session lifecycle events RHPSessionReporter interface { Subscribe(rhp.SessionSubscriber) @@ -129,6 +138,7 @@ type ( log *zap.Logger alerts Alerts + webhooks WebHooks syncer Syncer chain ChainManager tpool TPool @@ -146,12 +156,13 @@ type ( ) // NewServer initializes the API -func NewServer(name string, hostKey types.PublicKey, a Alerts, g Syncer, chain ChainManager, tp TPool, cm ContractManager, am AccountManager, vm VolumeManager, rsr RHPSessionReporter, m Metrics, s Settings, w Wallet, log *zap.Logger) http.Handler { +func NewServer(name string, hostKey types.PublicKey, a Alerts, wh WebHooks, g Syncer, chain ChainManager, tp TPool, cm ContractManager, am AccountManager, vm VolumeManager, rsr RHPSessionReporter, m Metrics, s Settings, w Wallet, log *zap.Logger) http.Handler { api := &api{ hostKey: hostKey, name: name, alerts: a, + webhooks: wh, syncer: g, chain: chain, tpool: tp, @@ -226,5 +237,10 @@ func NewServer(name string, hostKey types.PublicKey, a Alerts, g Syncer, chain C // system endpoints "GET /system/dir": api.handleGETSystemDir, "PUT /system/dir": api.handlePUTSystemDir, + // webhook endpoints + "GET /webhooks": api.handleGETWebhooks, + "POST /webhooks": api.handlePOSTWebhooks, + "PUT /webhooks/:id": api.handlePUTWebhooks, + "DELETE /webhooks/:id": api.handleDELETEWebhooks, }) } diff --git a/api/client.go b/api/client.go index 4146cfc..4dd7aea 100644 --- a/api/client.go +++ b/api/client.go @@ -12,6 +12,7 @@ import ( "go.sia.tech/hostd/host/settings" "go.sia.tech/hostd/host/storage" "go.sia.tech/hostd/wallet" + "go.sia.tech/hostd/webhooks" "go.sia.tech/jape" ) @@ -224,6 +225,37 @@ func (c *Client) MkDir(path string) error { return c.c.PUT("/system/dir", req) } +// RegisterWebHook registers a new WebHook. +func (c *Client) RegisterWebHook(callbackURL string, scopes []string) (hook webhooks.WebHook, err error) { + req := RegisterWebHookRequest{ + CallbackURL: callbackURL, + Scopes: scopes, + } + err = c.c.POST("/webhooks", req, &hook) + return +} + +// UpdateWebHook updates the WebHook with the specified ID. +func (c *Client) UpdateWebHook(id int64, callbackURL string, scopes []string) (hook webhooks.WebHook, err error) { + req := RegisterWebHookRequest{ + CallbackURL: callbackURL, + Scopes: scopes, + } + err = c.c.PUT(fmt.Sprintf("/webhooks/%d", id), req) + return +} + +// DeleteWebHook deletes the WebHook with the specified ID. +func (c *Client) DeleteWebHook(id int64) error { + return c.c.DELETE(fmt.Sprintf("/webhooks/%d", id)) +} + +// WebHooks returns all registered WebHooks. +func (c *Client) WebHooks() (hooks []webhooks.WebHook, err error) { + err = c.c.GET("/webhooks", &hooks) + return +} + // NewClient creates a new hostd API client. func NewClient(baseURL, password string) *Client { return &Client{ diff --git a/api/endpoints.go b/api/endpoints.go index e97118b..b756610 100644 --- a/api/endpoints.go +++ b/api/endpoints.go @@ -525,6 +525,59 @@ func (a *api) handleGETAccountFunding(c jape.Context) { c.Encode(funding) } +func (a *api) handleGETWebhooks(c jape.Context) { + hooks, err := a.webhooks.WebHooks() + if err != nil { + c.Error(err, http.StatusInternalServerError) + return + } + c.Encode(hooks) +} + +func (a *api) handlePOSTWebhooks(c jape.Context) { + var req RegisterWebHookRequest + if err := c.Decode(&req); err != nil { + return + } + + hook, err := a.webhooks.RegisterWebHook(req.CallbackURL, req.Scopes) + if err != nil { + c.Error(err, http.StatusInternalServerError) + return + } + c.Encode(hook) +} + +func (a *api) handlePUTWebhooks(c jape.Context) { + var id int64 + if err := c.DecodeParam("id", &id); err != nil { + return + } + var req RegisterWebHookRequest + if err := c.Decode(&req); err != nil { + return + } + + _, err := a.webhooks.UpdateWebHook(id, req.CallbackURL, req.Scopes) + if err != nil { + c.Error(err, http.StatusInternalServerError) + return + } +} + +func (a *api) handleDELETEWebhooks(c jape.Context) { + var id int64 + if err := c.DecodeParam("id", &id); err != nil { + return + } + + err := a.webhooks.RemoveWebHook(id) + if err != nil { + c.Error(err, http.StatusInternalServerError) + return + } +} + func parseLimitParams(c jape.Context, defaultLimit, maxLimit int) (limit, offset int) { if err := c.DecodeForm("limit", &limit); err != nil { return diff --git a/api/types.go b/api/types.go index f5c9bf9..97c7dc0 100644 --- a/api/types.go +++ b/api/types.go @@ -148,6 +148,12 @@ type ( storage.SectorReference Error string `json:"error,omitempty"` } + + // RegisterWebHookRequest is the request body for the [POST] /webhooks endpoint. + RegisterWebHookRequest struct { + CallbackURL string `json:"callbackURL"` + Scopes []string `json:"scopes"` + } ) // MarshalJSON implements json.Marshaler diff --git a/cmd/hostd/main.go b/cmd/hostd/main.go index 3f63d51..ff11276 100644 --- a/cmd/hostd/main.go +++ b/cmd/hostd/main.go @@ -361,7 +361,7 @@ func main() { auth := jape.BasicAuth(cfg.HTTP.Password) web := http.Server{ Handler: webRouter{ - api: auth(api.NewServer(cfg.Name, hostKey.PublicKey(), node.a, node.g, node.cm, node.tp, node.contracts, node.accounts, node.storage, node.sessions, node.metrics, node.settings, node.w, log.Named("api"))), + api: auth(api.NewServer(cfg.Name, hostKey.PublicKey(), node.a, node.wh, node.g, node.cm, node.tp, node.contracts, node.accounts, node.storage, node.sessions, node.metrics, node.settings, node.w, log.Named("api"))), ui: hostd.Handler(), }, ReadTimeout: 30 * time.Second, diff --git a/cmd/hostd/node.go b/cmd/hostd/node.go index 14c2145..9d533cb 100644 --- a/cmd/hostd/node.go +++ b/cmd/hostd/node.go @@ -21,6 +21,7 @@ import ( rhp2 "go.sia.tech/hostd/rhp/v2" rhp3 "go.sia.tech/hostd/rhp/v3" "go.sia.tech/hostd/wallet" + "go.sia.tech/hostd/webhooks" "go.sia.tech/siad/modules" "go.sia.tech/siad/modules/consensus" "go.sia.tech/siad/modules/gateway" @@ -31,6 +32,7 @@ import ( type node struct { g modules.Gateway a *alerts.Manager + wh *webhooks.Manager cm *chain.Manager tp *chain.TransactionPool w *wallet.SingleAddressWallet @@ -61,6 +63,7 @@ func (n *node) Close() error { n.tp.Close() n.cm.Close() n.g.Close() + n.wh.Close() n.store.Close() return nil } @@ -145,6 +148,11 @@ func newNode(walletKey types.PrivateKey, logger *zap.Logger) (*node, types.Priva return nil, types.PrivateKey{}, fmt.Errorf("failed to create wallet: %w", err) } + webhookReporter, err := webhooks.NewManager(db, logger.Named("webhooks")) + if err != nil { + return nil, types.PrivateKey{}, fmt.Errorf("failed to create webhook reporter: %w", err) + } + rhp2Listener, err := net.Listen("tcp", cfg.RHP2.Address) if err != nil { return nil, types.PrivateKey{}, fmt.Errorf("failed to listen on rhp2 addr: %w", err) @@ -162,7 +170,7 @@ func newNode(walletKey types.PrivateKey, logger *zap.Logger) (*node, types.Priva discoveredAddr := net.JoinHostPort(g.Address().Host(), rhp2Port) logger.Debug("discovered address", zap.String("addr", discoveredAddr)) - am := alerts.NewManager() + am := alerts.NewManager(webhookReporter, logger.Named("alerts")) sr, err := settings.NewConfigManager(cfg.Directory, hostKey, discoveredAddr, db, cm, tp, w, am, logger.Named("settings")) if err != nil { return nil, types.PrivateKey{}, fmt.Errorf("failed to create settings manager: %w", err) @@ -198,6 +206,7 @@ func newNode(walletKey types.PrivateKey, logger *zap.Logger) (*node, types.Priva return &node{ g: g, a: am, + wh: webhookReporter, cm: cm, tp: tp, w: w, diff --git a/host/accounts/accounts_test.go b/host/accounts/accounts_test.go index 6ae5c11..e03e3bf 100644 --- a/host/accounts/accounts_test.go +++ b/host/accounts/accounts_test.go @@ -14,6 +14,7 @@ import ( "go.sia.tech/hostd/internal/chain" "go.sia.tech/hostd/persist/sqlite" "go.sia.tech/hostd/wallet" + "go.sia.tech/hostd/webhooks" "go.sia.tech/siad/modules/consensus" "go.sia.tech/siad/modules/gateway" "go.sia.tech/siad/modules/transactionpool" @@ -71,7 +72,12 @@ func TestCredit(t *testing.T) { } defer w.Close() - a := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + a := alerts.NewManager(webhookReporter, log.Named("alerts")) sm, err := storage.NewVolumeManager(db, a, cm, log.Named("storage"), 0) if err != nil { t.Fatal(err) diff --git a/host/accounts/budget_test.go b/host/accounts/budget_test.go index 19da169..4b44dc3 100644 --- a/host/accounts/budget_test.go +++ b/host/accounts/budget_test.go @@ -16,6 +16,7 @@ import ( "go.sia.tech/hostd/internal/chain" "go.sia.tech/hostd/persist/sqlite" "go.sia.tech/hostd/wallet" + "go.sia.tech/hostd/webhooks" "go.sia.tech/siad/modules/consensus" "go.sia.tech/siad/modules/gateway" "go.sia.tech/siad/modules/transactionpool" @@ -108,7 +109,12 @@ func TestBudget(t *testing.T) { } defer w.Close() - a := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + a := alerts.NewManager(webhookReporter, log.Named("alerts")) sm, err := storage.NewVolumeManager(db, a, cm, log.Named("storage"), 0) if err != nil { t.Fatal(err) diff --git a/host/contracts/contracts_test.go b/host/contracts/contracts_test.go index 142d2ad..fa49ff4 100644 --- a/host/contracts/contracts_test.go +++ b/host/contracts/contracts_test.go @@ -13,6 +13,7 @@ import ( "go.sia.tech/hostd/host/storage" "go.sia.tech/hostd/internal/test" "go.sia.tech/hostd/persist/sqlite" + "go.sia.tech/hostd/webhooks" "go.uber.org/zap/zaptest" "lukechampine.com/frand" ) @@ -36,7 +37,12 @@ func TestContractUpdater(t *testing.T) { } defer node.Close() - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) s, err := storage.NewVolumeManager(db, am, node.ChainManager(), log.Named("storage"), sectorCacheSize) if err != nil { t.Fatal(err) diff --git a/host/contracts/integrity_test.go b/host/contracts/integrity_test.go index 7a38dab..7efb016 100644 --- a/host/contracts/integrity_test.go +++ b/host/contracts/integrity_test.go @@ -16,6 +16,7 @@ import ( "go.sia.tech/hostd/host/contracts" "go.sia.tech/hostd/host/storage" "go.sia.tech/hostd/internal/test" + "go.sia.tech/hostd/webhooks" stypes "go.sia.tech/siad/types" "go.uber.org/zap/zaptest" "lukechampine.com/frand" @@ -79,7 +80,12 @@ func TestCheckIntegrity(t *testing.T) { } defer node.Close() - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(node.Store(), log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) s, err := storage.NewVolumeManager(node.Store(), am, node.ChainManager(), log.Named("storage"), 0) if err != nil { t.Fatal(err) diff --git a/host/contracts/manager_test.go b/host/contracts/manager_test.go index d37cfe1..f5d4085 100644 --- a/host/contracts/manager_test.go +++ b/host/contracts/manager_test.go @@ -16,6 +16,7 @@ import ( "go.sia.tech/hostd/host/storage" "go.sia.tech/hostd/internal/test" "go.sia.tech/hostd/persist/sqlite" + "go.sia.tech/hostd/webhooks" stypes "go.sia.tech/siad/types" "go.uber.org/zap/zaptest" "lukechampine.com/frand" @@ -90,7 +91,12 @@ func TestContractLockUnlock(t *testing.T) { } defer node.Close() - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) s, err := storage.NewVolumeManager(db, am, node.ChainManager(), log.Named("storage"), sectorCacheSize) if err != nil { t.Fatal(err) @@ -169,7 +175,12 @@ func TestContractLifecycle(t *testing.T) { } defer node.Close() - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(node.Store(), log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) s, err := storage.NewVolumeManager(node.Store(), am, node.ChainManager(), log.Named("storage"), sectorCacheSize) if err != nil { t.Fatal(err) @@ -387,7 +398,12 @@ func TestContractLifecycle(t *testing.T) { } defer node.Close() - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(node.Store(), log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) s, err := storage.NewVolumeManager(node.Store(), am, node.ChainManager(), log.Named("storage"), sectorCacheSize) if err != nil { t.Fatal(err) @@ -574,7 +590,12 @@ func TestContractLifecycle(t *testing.T) { } defer node.Close() - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(node.Store(), log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) s, err := storage.NewVolumeManager(node.Store(), am, node.ChainManager(), log.Named("storage"), sectorCacheSize) if err != nil { t.Fatal(err) @@ -735,7 +756,12 @@ func TestContractLifecycle(t *testing.T) { } defer node.Close() - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(node.Store(), log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) s, err := storage.NewVolumeManager(node.Store(), am, node.ChainManager(), log.Named("storage"), sectorCacheSize) if err != nil { t.Fatal(err) @@ -957,7 +983,12 @@ func TestSectorRoots(t *testing.T) { } defer node.Close() - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(node.Store(), log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) s, err := storage.NewVolumeManager(db, am, node.ChainManager(), log.Named("storage"), sectorCacheSize) if err != nil { t.Fatal(err) diff --git a/host/settings/announce_test.go b/host/settings/announce_test.go index 036a216..3836bf7 100644 --- a/host/settings/announce_test.go +++ b/host/settings/announce_test.go @@ -10,6 +10,7 @@ import ( "go.sia.tech/hostd/host/settings" "go.sia.tech/hostd/internal/test" "go.sia.tech/hostd/persist/sqlite" + "go.sia.tech/hostd/webhooks" "go.uber.org/zap/zaptest" "lukechampine.com/frand" ) @@ -35,8 +36,13 @@ func TestAutoAnnounce(t *testing.T) { } defer db.Close() - a := alerts.NewManager() - manager, err := settings.NewConfigManager(dir, hostKey, "localhost:9882", db, node.ChainManager(), node.TPool(), node, a, log.Named("settings")) + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) + manager, err := settings.NewConfigManager(dir, hostKey, "localhost:9882", db, node.ChainManager(), node.TPool(), node, am, log.Named("settings")) if err != nil { t.Fatal(err) } diff --git a/host/settings/settings_test.go b/host/settings/settings_test.go index 0b2154e..edfa66b 100644 --- a/host/settings/settings_test.go +++ b/host/settings/settings_test.go @@ -10,6 +10,7 @@ import ( "go.sia.tech/hostd/host/settings" "go.sia.tech/hostd/internal/test" "go.sia.tech/hostd/persist/sqlite" + "go.sia.tech/hostd/webhooks" "go.uber.org/zap/zaptest" "lukechampine.com/frand" ) @@ -30,8 +31,13 @@ func TestSettings(t *testing.T) { } defer db.Close() - a := alerts.NewManager() - manager, err := settings.NewConfigManager(dir, hostKey, "localhost:9882", db, node.ChainManager(), node.TPool(), node, a, log.Named("settings")) + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) + manager, err := settings.NewConfigManager(dir, hostKey, "localhost:9882", db, node.ChainManager(), node.TPool(), node, am, log.Named("settings")) if err != nil { t.Fatal(err) } diff --git a/host/storage/storage_test.go b/host/storage/storage_test.go index d8c2c44..874ec92 100644 --- a/host/storage/storage_test.go +++ b/host/storage/storage_test.go @@ -15,6 +15,7 @@ import ( "go.sia.tech/hostd/host/storage" "go.sia.tech/hostd/internal/chain" "go.sia.tech/hostd/persist/sqlite" + "go.sia.tech/hostd/webhooks" "go.sia.tech/siad/modules/consensus" "go.sia.tech/siad/modules/gateway" "go.uber.org/zap/zaptest" @@ -65,7 +66,12 @@ func TestVolumeLoad(t *testing.T) { defer cm.Close() defer cm.Close() - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), sectorCacheSize) if err != nil { t.Fatal(err) @@ -170,7 +176,12 @@ func TestAddVolume(t *testing.T) { defer cm.Close() defer cm.Close() - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), sectorCacheSize) if err != nil { t.Fatal(err) @@ -239,7 +250,12 @@ func TestRemoveVolume(t *testing.T) { defer cm.Close() // initialize the storage manager - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), sectorCacheSize) if err != nil { t.Fatal(err) @@ -327,7 +343,12 @@ func TestRemoveCorrupt(t *testing.T) { defer cm.Close() // initialize the storage manager - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), 0) if err != nil { t.Fatal(err) @@ -441,7 +462,12 @@ func TestRemoveMissing(t *testing.T) { defer cm.Close() // initialize the storage manager - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), sectorCacheSize) if err != nil { t.Fatal(err) @@ -560,7 +586,12 @@ func TestVolumeDistribution(t *testing.T) { defer cm.Close() // initialize the storage manager - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), 0) if err != nil { t.Fatal(err) @@ -698,7 +729,12 @@ func TestVolumeConcurrency(t *testing.T) { defer cm.Close() // initialize the storage manager - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), 0) if err != nil { t.Fatal(err) @@ -849,7 +885,12 @@ func TestVolumeGrow(t *testing.T) { defer cm.Close() // initialize the storage manager - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), 0) if err != nil { t.Fatal(err) @@ -968,7 +1009,12 @@ func TestVolumeShrink(t *testing.T) { defer cm.Close() // initialize the storage manager - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), 0) if err != nil { t.Fatal(err) @@ -1137,7 +1183,12 @@ func TestVolumeManagerReadWrite(t *testing.T) { defer cm.Close() // initialize the storage manager - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), 0) if err != nil { t.Fatal(err) @@ -1242,7 +1293,12 @@ func TestSectorCache(t *testing.T) { defer cm.Close() // initialize the storage manager - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), sectors/2) // cache half the sectors if err != nil { t.Fatal(err) @@ -1379,7 +1435,12 @@ func BenchmarkVolumeManagerWrite(b *testing.B) { defer cm.Close() // initialize the storage manager - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + b.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), sectorCacheSize) if err != nil { b.Fatal(err) @@ -1449,7 +1510,12 @@ func BenchmarkNewVolume(b *testing.B) { defer cm.Close() // initialize the storage manager - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + b.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), sectorCacheSize) if err != nil { b.Fatal(err) @@ -1502,7 +1568,12 @@ func BenchmarkVolumeManagerRead(b *testing.B) { defer cm.Close() // initialize the storage manager - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + b.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), sectorCacheSize) if err != nil { b.Fatal(err) @@ -1574,7 +1645,12 @@ func BenchmarkVolumeRemove(b *testing.B) { defer cm.Close() // initialize the storage manager - am := alerts.NewManager() + webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + b.Fatal(err) + } + + am := alerts.NewManager(webhookReporter, log.Named("alerts")) vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), sectorCacheSize) if err != nil { b.Fatal(err) diff --git a/internal/test/host.go b/internal/test/host.go index 06cdf21..1a91fea 100644 --- a/internal/test/host.go +++ b/internal/test/host.go @@ -22,6 +22,7 @@ import ( rhp2 "go.sia.tech/hostd/rhp/v2" rhp3 "go.sia.tech/hostd/rhp/v3" "go.sia.tech/hostd/wallet" + "go.sia.tech/hostd/webhooks" "go.uber.org/zap" ) @@ -166,102 +167,23 @@ func (h *Host) Store() *sqlite.Store { // NewHost initializes a new test host func NewHost(privKey types.PrivateKey, dir string, node *Node, log *zap.Logger) (*Host, error) { - db, err := sqlite.OpenDatabase(filepath.Join(dir, "hostd.db"), log.Named("sqlite")) + host, err := NewEmptyHost(privKey, dir, node, log) if err != nil { - return nil, fmt.Errorf("failed to create sql store: %w", err) + return nil, err } - wallet, err := wallet.NewSingleAddressWallet(privKey, node.cm, node.tp, db, log.Named("wallet")) - if err != nil { - return nil, fmt.Errorf("failed to create wallet: %w", err) - } - - am := alerts.NewManager() - storage, err := storage.NewVolumeManager(db, am, node.cm, log.Named("storage"), DefaultSettings.SectorCacheSize) - if err != nil { - return nil, fmt.Errorf("failed to create storage manager: %w", err) - } result := make(chan error, 1) - if _, err := storage.AddVolume(context.Background(), filepath.Join(dir, "storage.dat"), 64, result); err != nil { + if _, err := host.Storage().AddVolume(context.Background(), filepath.Join(dir, "storage.dat"), 64, result); err != nil { return nil, fmt.Errorf("failed to add storage volume: %w", err) } else if err := <-result; err != nil { return nil, fmt.Errorf("failed to add storage volume: %w", err) } - - contracts, err := contracts.NewManager(db, am, storage, node.cm, node.tp, wallet, log.Named("contracts")) - if err != nil { - return nil, fmt.Errorf("failed to create contract manager: %w", err) - } - - rhp2Listener, err := net.Listen("tcp", "localhost:0") - if err != nil { - return nil, fmt.Errorf("failed to create rhp2 listener: %w", err) - } - - rhp3Listener, err := net.Listen("tcp", "localhost:0") - if err != nil { - return nil, fmt.Errorf("failed to create rhp2 listener: %w", err) - } - - settings, err := settings.NewConfigManager(dir, privKey, rhp2Listener.Addr().String(), db, node.cm, node.tp, wallet, am, log.Named("settings")) - if err != nil { - return nil, fmt.Errorf("failed to create settings manager: %w", err) - } s := DefaultSettings - s.NetAddress = rhp2Listener.Addr().String() - if err := settings.UpdateSettings(s); err != nil { + s.NetAddress = host.RHP2Addr() + if err := host.Settings().UpdateSettings(s); err != nil { return nil, fmt.Errorf("failed to update host settings: %w", err) } - - registry := registry.NewManager(privKey, db, log.Named("registry")) - accounts := accounts.NewManager(db, settings) - - sessions := rhp.NewSessionReporter() - - rhp2, err := rhp2.NewSessionHandler(rhp2Listener, privKey, rhp3Listener.Addr().String(), node.cm, node.tp, wallet, contracts, settings, storage, stubDataMonitor{}, sessions, log.Named("rhp2")) - if err != nil { - return nil, fmt.Errorf("failed to create rhp2 session handler: %w", err) - } - go rhp2.Serve() - - rhp3, err := rhp3.NewSessionHandler(rhp3Listener, privKey, node.cm, node.tp, wallet, accounts, contracts, registry, storage, settings, stubDataMonitor{}, sessions, log.Named("rhp3")) - if err != nil { - return nil, fmt.Errorf("failed to create rhp3 session handler: %w", err) - } - go rhp3.Serve() - - rhp3WSListener, err := net.Listen("tcp", "localhost:0") - if err != nil { - return nil, fmt.Errorf("failed to create rhp3 websocket listener: %w", err) - } - - go func() { - rhp3WS := http.Server{ - Handler: rhp3.WebSocketHandler(), - ReadTimeout: 30 * time.Second, - } - - if err := rhp3WS.Serve(rhp3WSListener); err != nil { - return - } - }() - - return &Host{ - Node: node, - privKey: privKey, - store: db, - log: log, - wallet: wallet, - settings: settings, - storage: storage, - registry: registry, - accounts: accounts, - contracts: contracts, - - rhp2: rhp2, - rhp3: rhp3, - rhp3WS: rhp3WSListener, - }, nil + return host, nil } // NewEmptyHost initializes a new test host @@ -276,7 +198,12 @@ func NewEmptyHost(privKey types.PrivateKey, dir string, node *Node, log *zap.Log return nil, fmt.Errorf("failed to create wallet: %w", err) } - am := alerts.NewManager() + wr, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + return nil, fmt.Errorf("failed to create webhook reporter: %w", err) + } + + am := alerts.NewManager(wr, log.Named("alerts")) storage, err := storage.NewVolumeManager(db, am, node.cm, log.Named("storage"), DefaultSettings.SectorCacheSize) if err != nil { return nil, fmt.Errorf("failed to create storage manager: %w", err) diff --git a/persist/sqlite/init.sql b/persist/sqlite/init.sql index 76b8496..95fc631 100644 --- a/persist/sqlite/init.sql +++ b/persist/sqlite/init.sql @@ -196,6 +196,13 @@ CREATE TABLE host_settings ( sector_cache_size INTEGER NOT NULL DEFAULT 0 ); +CREATE TABLE webhooks ( + id INTEGER PRIMARY KEY, + callback_url TEXT UNIQUE NOT NULL, + scopes TEXT NOT NULL, + secret_key TEXT UNIQUE NOT NULL +); + CREATE TABLE global_settings ( id INTEGER PRIMARY KEY NOT NULL DEFAULT 0 CHECK (id = 0), -- enforce a single row db_version INTEGER NOT NULL, -- used for migrations diff --git a/persist/sqlite/migrations.go b/persist/sqlite/migrations.go index e7e0488..310c50f 100644 --- a/persist/sqlite/migrations.go +++ b/persist/sqlite/migrations.go @@ -10,6 +10,19 @@ import ( "go.uber.org/zap" ) +// migrateVersion23 creates the webhooks table. +func migrateVersion23(tx txn, _ *zap.Logger) error { + const query = `CREATE TABLE webhooks ( + id INTEGER PRIMARY KEY, + callback_url TEXT UNIQUE NOT NULL, + scopes TEXT NOT NULL, + secret_key TEXT UNIQUE NOT NULL +);` + + _, err := tx.Exec(query) + return err +} + // migrateVersion22 recalculates the locked and risked collateral and the // potential and earned revenue metrics which will be bugged if the host rescans // the blockchain. @@ -592,4 +605,5 @@ var migrations = []func(tx txn, log *zap.Logger) error{ migrateVersion20, migrateVersion21, migrateVersion22, + migrateVersion23, } diff --git a/persist/sqlite/webhooks.go b/persist/sqlite/webhooks.go new file mode 100644 index 0000000..cca63b7 --- /dev/null +++ b/persist/sqlite/webhooks.go @@ -0,0 +1,45 @@ +package sqlite + +import ( + "strings" + + "go.sia.tech/hostd/webhooks" +) + +// RegisterWebHook registers a new webhook. +func (s *Store) RegisterWebHook(url, secret string, scopes []string) (id int64, err error) { + err = s.queryRow("INSERT INTO webhooks (callback_url, secret_key, scopes) VALUES (?, ?, ?) RETURNING id", url, secret, strings.Join(scopes, ",")).Scan(&id) + return +} + +// UpdateWebHook updates a webhook. +func (s *Store) UpdateWebHook(id int64, url string, scopes []string) error { + var dbID int64 + return s.queryRow(`UPDATE webhooks SET callback_url = ?, scopes = ? WHERE id = ? RETURNING id`, url, strings.Join(scopes, ","), id).Scan(&dbID) +} + +// RemoveWebHook removes a webhook. +func (s *Store) RemoveWebHook(id int64) error { + _, err := s.exec("DELETE FROM webhooks WHERE id = ?", id) + return err +} + +// WebHooks returns all webhooks. +func (s *Store) WebHooks() ([]webhooks.WebHook, error) { + rows, err := s.query("SELECT id, callback_url, secret_key, scopes FROM webhooks") + if err != nil { + return nil, err + } + defer rows.Close() + var hooks []webhooks.WebHook + for rows.Next() { + var hook webhooks.WebHook + var scopes string + if err := rows.Scan(&hook.ID, &hook.CallbackURL, &hook.SecretKey, &scopes); err != nil { + return nil, err + } + hook.Scopes = strings.Split(scopes, ",") + hooks = append(hooks, hook) + } + return hooks, nil +} diff --git a/webhooks/webhooks.go b/webhooks/webhooks.go new file mode 100644 index 0000000..1c93c76 --- /dev/null +++ b/webhooks/webhooks.go @@ -0,0 +1,319 @@ +package webhooks + +import ( + "bytes" + "context" + "encoding/hex" + "encoding/json" + "fmt" + "net/http" + "strings" + "sync" + "time" + + "go.sia.tech/hostd/internal/threadgroup" + "go.uber.org/zap" + "lukechampine.com/frand" +) + +// event scope constants +const ( + ScopeAll = "all" + + ScopeAlerts = "alerts" + ScopeAlertsInfo = "alerts/info" + ScopeAlertsWarning = "alerts/warning" + ScopeAlertsError = "alerts/error" + ScopeAlertsCritical = "alerts/critical" + + ScopeWallet = "wallet" +) + +type ( + scope struct { + children map[string]*scope + hooks map[int64]bool + } + + // A WebHook is a callback that is invoked when an event occurs. + WebHook struct { + ID int64 `json:"id"` + CallbackURL string `json:"callbackURL"` + SecretKey string `json:"secretKey"` + Scopes []string `json:"scopes"` + } + + // A UID is a unique identifier for an event. + UID [32]byte + + // An Event is a notification sent to a WebHook callback. + Event struct { + ID UID `json:"id"` + Event string `json:"event"` + Scope string `json:"scope"` + Data any `json:"data"` + } + + // A Store stores and retrieves WebHooks. + Store interface { + RegisterWebHook(url, secret string, scopes []string) (int64, error) + UpdateWebHook(id int64, url string, scopes []string) error + RemoveWebHook(id int64) error + WebHooks() ([]WebHook, error) + } + + // A Manager manages WebHook subscribers and broadcasts events + Manager struct { + store Store + log *zap.Logger + tg *threadgroup.ThreadGroup + + mu sync.Mutex + hooks map[int64]WebHook + scopes *scope + } +) + +// Close closes the Manager. +func (m *Manager) Close() error { + m.tg.Stop() + return nil +} + +func (m *Manager) findMatchingHooks(s string) (hooks []WebHook) { + // recursively match hooks + var match func(scopeParts []string, parent *scope) + match = func(scopeParts []string, parent *scope) { + for id := range parent.hooks { + hook, ok := m.hooks[id] + if !ok { + panic("hook not found") // developer error + } + hooks = append(hooks, hook) + } + if len(scopeParts) == 0 { + return + } + child, ok := parent.children[scopeParts[0]] + if !ok { + return + } + match(scopeParts[1:], child) + } + + match(strings.Split(s, "/"), m.scopes) + return +} + +func (m *Manager) addHookScopes(id int64, scopes []string) { + for _, s := range scopes { + if s == "all" { // special case to register for all current and future scopes + m.scopes.hooks[id] = true + continue + } + + parts := strings.Split(s, "/") + parent := m.scopes + for _, part := range parts { + child, ok := parent.children[part] + if !ok { + child = &scope{children: make(map[string]*scope), hooks: make(map[int64]bool)} + parent.children[part] = child + } + parent = child + } + parent.hooks[id] = true + } +} + +func (m *Manager) removeHookScopes(id int64) { + var remove func(parent *scope) + remove = func(parent *scope) { + for _, child := range parent.children { + remove(child) + } + delete(parent.hooks, id) + } + + remove(m.scopes) +} + +// WebHooks returns all registered WebHooks. +func (m *Manager) WebHooks() (hooks []WebHook, _ error) { + m.mu.Lock() + defer m.mu.Unlock() + for _, hook := range m.hooks { + hooks = append(hooks, hook) + } + return +} + +// RegisterWebHook registers a new WebHook. +func (m *Manager) RegisterWebHook(url string, scopes []string) (WebHook, error) { + done, err := m.tg.Add() + if err != nil { + return WebHook{}, err + } + defer done() + + secret := hex.EncodeToString(frand.Bytes(16)) + + // register the hook in the database + id, err := m.store.RegisterWebHook(url, secret, scopes) + if err != nil { + return WebHook{}, fmt.Errorf("failed to register WebHook: %w", err) + } + + m.mu.Lock() + defer m.mu.Unlock() + // add the hook to the in-memory map + hook := WebHook{ + ID: id, + CallbackURL: url, + SecretKey: secret, + Scopes: scopes, + } + m.hooks[id] = hook + // add the hook to the scope tree + m.addHookScopes(id, scopes) + return hook, nil +} + +// RemoveWebHook removes a registered WebHook. +func (m *Manager) RemoveWebHook(id int64) error { + done, err := m.tg.Add() + if err != nil { + return err + } + defer done() + + // remove the hook from the database + if err := m.store.RemoveWebHook(id); err != nil { + return err + } + + m.mu.Lock() + defer m.mu.Unlock() + // remove the hook from the in-memory map and the scope tree + delete(m.hooks, id) + m.removeHookScopes(id) + return nil +} + +// UpdateWebHook updates the URL and scopes of a registered WebHook. +func (m *Manager) UpdateWebHook(id int64, url string, scopes []string) (WebHook, error) { + done, err := m.tg.Add() + if err != nil { + return WebHook{}, err + } + defer done() + + // update the hook in the database + err = m.store.UpdateWebHook(id, url, scopes) + if err != nil { + return WebHook{}, err + } + + m.mu.Lock() + defer m.mu.Unlock() + // update the hook in the in-memory map + hook, ok := m.hooks[id] + if !ok { + panic("UpdateWebHook called on nonexistent WebHook") // developer error + } + hook.CallbackURL = url + hook.Scopes = scopes + m.hooks[id] = hook + // remove the hook from the scope tree + m.removeHookScopes(id) + // readd the new scopes to the scope tree + m.addHookScopes(id, scopes) + return hook, nil +} + +func sendEventData(ctx context.Context, hook WebHook, buf []byte) error { + req, err := http.NewRequestWithContext(ctx, "POST", hook.CallbackURL, bytes.NewReader(buf)) + if err != nil { + return fmt.Errorf("failed to create WebHook request: %w", err) + } + + // set the secret key and content type + req.SetBasicAuth("", hook.SecretKey) + req.Header.Set("Content-Type", "application/json") + + // send the request + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("unexpected response status code: %d", resp.StatusCode) + } + return nil +} + +// BroadcastEvent sends an event to all registered WebHooks that match the +// event's scope. +func (m *Manager) BroadcastEvent(event string, scope string, data any) error { + done, err := m.tg.Add() + if err != nil { + return err + } + defer done() + + uid := UID(frand.Bytes(32)) + e := Event{ + ID: uid, + Event: event, + Scope: scope, + Data: data, + } + + buf, err := json.Marshal(e) + if err != nil { + return fmt.Errorf("failed to marshal event: %w", err) + } + + m.mu.Lock() + defer m.mu.Unlock() + + // find matching hooks + hooks := m.findMatchingHooks(scope) + + for _, hook := range hooks { + go func(hook WebHook) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + log := m.log.With(zap.Int64("hook", hook.ID), zap.String("url", hook.CallbackURL), zap.String("scope", scope), zap.String("event", event)) + + start := time.Now() + if err := sendEventData(ctx, hook, buf); err != nil { + log.Error("failed to send webhook event", zap.Error(err)) + return + } + log.Debug("sent webhook event", zap.Duration("elapsed", time.Since(start))) + }(hook) + } + return nil +} + +// NewManager creates a new WebHook Manager +func NewManager(store Store, log *zap.Logger) (*Manager, error) { + m := &Manager{ + store: store, + log: log, + tg: threadgroup.New(), + + hooks: make(map[int64]WebHook), + scopes: &scope{children: make(map[string]*scope), hooks: make(map[int64]bool)}, + } + + _, err := store.WebHooks() + if err != nil { + return nil, fmt.Errorf("failed to load WebHooks: %w", err) + } + return m, nil +} diff --git a/webhooks/webhooks_test.go b/webhooks/webhooks_test.go new file mode 100644 index 0000000..f30ca37 --- /dev/null +++ b/webhooks/webhooks_test.go @@ -0,0 +1,149 @@ +package webhooks_test + +import ( + "encoding/json" + "errors" + "fmt" + "net" + "net/http" + "path/filepath" + "testing" + "time" + + "go.sia.tech/hostd/persist/sqlite" + "go.sia.tech/hostd/webhooks" + "go.uber.org/zap/zaptest" +) + +type jsonEvent struct { + ID webhooks.UID `json:"id"` + Event string `json:"event"` + Scope string `json:"scope"` + Data json.RawMessage `json:"data"` +} + +func TestWebHooks(t *testing.T) { + log := zaptest.NewLogger(t) + + db, err := sqlite.OpenDatabase(filepath.Join(t.TempDir(), "hostd.db"), log.Named("sqlite")) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + wr, err := webhooks.NewManager(db, log.Named("webhooks")) + if err != nil { + t.Fatal(err) + } + + // create a listener for the webhook + l, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatal(err) + } + defer l.Close() + + // add a webhook + scopes := []string{"tld", "scope/subscope"} + hook, err := wr.RegisterWebHook("http://"+l.Addr().String(), scopes) + if err != nil { + t.Fatal(err) + } + + // create an http server to listen for the webhook + recv := make(chan jsonEvent, 1) + go func() { + http.Serve(l, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, password, ok := r.BasicAuth() + if !ok || password != hook.SecretKey { + t.Error("bad auth") + } + + // handle the webhook + var event jsonEvent + if err := json.NewDecoder(r.Body).Decode(&event); err != nil { + t.Error(err) + } + + w.WriteHeader(http.StatusNoContent) + recv <- event + })) + }() + + checkEvent := func(event, scope, data string) error { + select { + case <-time.After(time.Second): + return errors.New("timed out") + case ev := <-recv: + switch { + case ev.Event != event: + return fmt.Errorf("expected event %q, got %q", event, ev.Event) + case ev.Scope != scope: + return fmt.Errorf("expected scope %q, got %q", scope, ev.Scope) + case string(ev.Data) != data: + return fmt.Errorf("expected data %q, got %q", data, ev.Data) + } + } + return nil + } + + tests := []struct { + event, scope string + receive bool + }{ + {"test", "tld", true}, // direct match + {"test", "scope/subscope", true}, // direct match + {"test", "tld/subscope", true}, // subscope match + {"test", "scope", false}, // no match + {"test", "all", false}, // no match + } + + for _, test := range tests { + if err := wr.BroadcastEvent(test.event, test.scope, "hello, world!"); err != nil { + t.Fatal(err) + } + + if err := checkEvent(test.event, test.scope, `"hello, world!"`); test.receive && err != nil { + t.Fatal(err) + } else if !test.receive && err == nil { + t.Fatal("expected no event") + } + } + + // update the webhook to have the "all scope" + hook, err = wr.UpdateWebHook(hook.ID, "http://"+l.Addr().String(), []string{"all"}) + if err != nil { + t.Fatal(err) + } else if hooks, err := wr.WebHooks(); err != nil { + t.Fatal(err) + } else if len(hooks) != 1 { + t.Fatal("expected 1 webhook") + } + + // ensure all events are received + for _, test := range tests { + if err := wr.BroadcastEvent(test.event, test.scope, "hello, world!"); err != nil { + t.Fatal(err) + } else if err := checkEvent(test.event, test.scope, `"hello, world!"`); err != nil { + t.Fatal(err) + } + } + + // unregister the webhook + if err := wr.RemoveWebHook(hook.ID); err != nil { + t.Fatal(err) + } else if hooks, err := wr.WebHooks(); err != nil { + t.Fatal(err) + } else if len(hooks) != 0 { + t.Fatal("expected no webhooks") + } + + // ensure no more events are received + for _, test := range tests { + if err := wr.BroadcastEvent(test.event, test.scope, "hello, world!"); err != nil { + t.Fatal(err) + } else if err := checkEvent(test.event, test.scope, `"hello, world!"`); err == nil { + t.Fatal("expected no event") + } + } +}