Skip to content

Commit

Permalink
Merge pull request #229 from SiaFoundation/nate/webhooks
Browse files Browse the repository at this point in the history
Add webhooks
  • Loading branch information
n8maninger committed Dec 10, 2023
2 parents 696e1cd + 9c94d30 commit 87a51dd
Show file tree
Hide file tree
Showing 21 changed files with 855 additions and 119 deletions.
18 changes: 17 additions & 1 deletion alerts/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"go.sia.tech/core/types"
"go.uber.org/zap"
)

const (
Expand All @@ -30,6 +31,11 @@ type (
// Severity indicates the severity of an alert.
Severity uint8

// An EventReporter broadcasts events to subscribers.
EventReporter interface {
BroadcastEvent(event string, scope string, data any) error
}

// An Alert is a dismissible message that is displayed to the user.
Alert struct {
// ID is a unique identifier for the alert.
Expand All @@ -46,6 +52,9 @@ type (

// A Manager manages the host's alerts.
Manager struct {
log *zap.Logger
events EventReporter

mu sync.Mutex
// alerts is a map of alert IDs to their current alert.
alerts map[types.Hash256]Alert
Expand Down Expand Up @@ -99,6 +108,10 @@ func (m *Manager) Register(a Alert) {
panic("cannot register alert with zero timestamp") // developer error
}

if err := m.events.BroadcastEvent("alert", "alerts."+a.Severity.String(), a); err != nil {
m.log.Error("failed to broadcast alert", zap.Error(err))
}

m.mu.Lock()
m.alerts[a.ID] = a
m.mu.Unlock()
Expand Down Expand Up @@ -129,8 +142,11 @@ func (m *Manager) Active() []Alert {
}

// NewManager initializes a new alerts manager.
func NewManager() *Manager {
func NewManager(er EventReporter, log *zap.Logger) *Manager {
return &Manager{
log: log,
events: er,

alerts: make(map[types.Hash256]Alert),
}
}
18 changes: 17 additions & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.sia.tech/hostd/host/storage"
"go.sia.tech/hostd/rhp"
"go.sia.tech/hostd/wallet"
"go.sia.tech/hostd/webhooks"
"go.sia.tech/jape"
"go.sia.tech/siad/modules"
"go.uber.org/zap"
Expand Down Expand Up @@ -113,6 +114,14 @@ type (
AcceptTransactionSet(txns []types.Transaction) error
}

// WebHooks manages webhooks
WebHooks interface {
WebHooks() ([]webhooks.WebHook, error)
RegisterWebHook(callbackURL string, scopes []string) (webhooks.WebHook, error)
UpdateWebHook(id int64, callbackURL string, scopes []string) (webhooks.WebHook, error)
RemoveWebHook(id int64) error
}

// A RHPSessionReporter reports on RHP session lifecycle events
RHPSessionReporter interface {
Subscribe(rhp.SessionSubscriber)
Expand All @@ -129,6 +138,7 @@ type (
log *zap.Logger

alerts Alerts
webhooks WebHooks
syncer Syncer
chain ChainManager
tpool TPool
Expand All @@ -146,12 +156,13 @@ type (
)

// NewServer initializes the API
func NewServer(name string, hostKey types.PublicKey, a Alerts, g Syncer, chain ChainManager, tp TPool, cm ContractManager, am AccountManager, vm VolumeManager, rsr RHPSessionReporter, m Metrics, s Settings, w Wallet, log *zap.Logger) http.Handler {
func NewServer(name string, hostKey types.PublicKey, a Alerts, wh WebHooks, g Syncer, chain ChainManager, tp TPool, cm ContractManager, am AccountManager, vm VolumeManager, rsr RHPSessionReporter, m Metrics, s Settings, w Wallet, log *zap.Logger) http.Handler {
api := &api{
hostKey: hostKey,
name: name,

alerts: a,
webhooks: wh,
syncer: g,
chain: chain,
tpool: tp,
Expand Down Expand Up @@ -226,5 +237,10 @@ func NewServer(name string, hostKey types.PublicKey, a Alerts, g Syncer, chain C
// system endpoints
"GET /system/dir": api.handleGETSystemDir,
"PUT /system/dir": api.handlePUTSystemDir,
// webhook endpoints
"GET /webhooks": api.handleGETWebhooks,
"POST /webhooks": api.handlePOSTWebhooks,
"PUT /webhooks/:id": api.handlePUTWebhooks,
"DELETE /webhooks/:id": api.handleDELETEWebhooks,
})
}
32 changes: 32 additions & 0 deletions api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.sia.tech/hostd/host/settings"
"go.sia.tech/hostd/host/storage"
"go.sia.tech/hostd/wallet"
"go.sia.tech/hostd/webhooks"
"go.sia.tech/jape"
)

Expand Down Expand Up @@ -224,6 +225,37 @@ func (c *Client) MkDir(path string) error {
return c.c.PUT("/system/dir", req)
}

// RegisterWebHook registers a new WebHook.
func (c *Client) RegisterWebHook(callbackURL string, scopes []string) (hook webhooks.WebHook, err error) {
req := RegisterWebHookRequest{
CallbackURL: callbackURL,
Scopes: scopes,
}
err = c.c.POST("/webhooks", req, &hook)
return
}

// UpdateWebHook updates the WebHook with the specified ID.
func (c *Client) UpdateWebHook(id int64, callbackURL string, scopes []string) (hook webhooks.WebHook, err error) {
req := RegisterWebHookRequest{
CallbackURL: callbackURL,
Scopes: scopes,
}
err = c.c.PUT(fmt.Sprintf("/webhooks/%d", id), req)
return
}

// DeleteWebHook deletes the WebHook with the specified ID.
func (c *Client) DeleteWebHook(id int64) error {
return c.c.DELETE(fmt.Sprintf("/webhooks/%d", id))
}

// WebHooks returns all registered WebHooks.
func (c *Client) WebHooks() (hooks []webhooks.WebHook, err error) {
err = c.c.GET("/webhooks", &hooks)
return
}

// NewClient creates a new hostd API client.
func NewClient(baseURL, password string) *Client {
return &Client{
Expand Down
53 changes: 53 additions & 0 deletions api/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,59 @@ func (a *api) handleGETAccountFunding(c jape.Context) {
c.Encode(funding)
}

func (a *api) handleGETWebhooks(c jape.Context) {
hooks, err := a.webhooks.WebHooks()
if err != nil {
c.Error(err, http.StatusInternalServerError)
return
}
c.Encode(hooks)
}

func (a *api) handlePOSTWebhooks(c jape.Context) {
var req RegisterWebHookRequest
if err := c.Decode(&req); err != nil {
return
}

hook, err := a.webhooks.RegisterWebHook(req.CallbackURL, req.Scopes)
if err != nil {
c.Error(err, http.StatusInternalServerError)
return
}
c.Encode(hook)
}

func (a *api) handlePUTWebhooks(c jape.Context) {
var id int64
if err := c.DecodeParam("id", &id); err != nil {
return
}
var req RegisterWebHookRequest
if err := c.Decode(&req); err != nil {
return
}

_, err := a.webhooks.UpdateWebHook(id, req.CallbackURL, req.Scopes)
if err != nil {
c.Error(err, http.StatusInternalServerError)
return
}
}

func (a *api) handleDELETEWebhooks(c jape.Context) {
var id int64
if err := c.DecodeParam("id", &id); err != nil {
return
}

err := a.webhooks.RemoveWebHook(id)
if err != nil {
c.Error(err, http.StatusInternalServerError)
return
}
}

func parseLimitParams(c jape.Context, defaultLimit, maxLimit int) (limit, offset int) {
if err := c.DecodeForm("limit", &limit); err != nil {
return
Expand Down
6 changes: 6 additions & 0 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ type (
storage.SectorReference
Error string `json:"error,omitempty"`
}

// RegisterWebHookRequest is the request body for the [POST] /webhooks endpoint.
RegisterWebHookRequest struct {
CallbackURL string `json:"callbackURL"`
Scopes []string `json:"scopes"`
}
)

// MarshalJSON implements json.Marshaler
Expand Down
2 changes: 1 addition & 1 deletion cmd/hostd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func main() {
auth := jape.BasicAuth(cfg.HTTP.Password)
web := http.Server{
Handler: webRouter{
api: auth(api.NewServer(cfg.Name, hostKey.PublicKey(), node.a, node.g, node.cm, node.tp, node.contracts, node.accounts, node.storage, node.sessions, node.metrics, node.settings, node.w, log.Named("api"))),
api: auth(api.NewServer(cfg.Name, hostKey.PublicKey(), node.a, node.wh, node.g, node.cm, node.tp, node.contracts, node.accounts, node.storage, node.sessions, node.metrics, node.settings, node.w, log.Named("api"))),
ui: hostd.Handler(),
},
ReadTimeout: 30 * time.Second,
Expand Down
11 changes: 10 additions & 1 deletion cmd/hostd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
rhp2 "go.sia.tech/hostd/rhp/v2"
rhp3 "go.sia.tech/hostd/rhp/v3"
"go.sia.tech/hostd/wallet"
"go.sia.tech/hostd/webhooks"
"go.sia.tech/siad/modules"
"go.sia.tech/siad/modules/consensus"
"go.sia.tech/siad/modules/gateway"
Expand All @@ -31,6 +32,7 @@ import (
type node struct {
g modules.Gateway
a *alerts.Manager
wh *webhooks.Manager
cm *chain.Manager
tp *chain.TransactionPool
w *wallet.SingleAddressWallet
Expand Down Expand Up @@ -61,6 +63,7 @@ func (n *node) Close() error {
n.tp.Close()
n.cm.Close()
n.g.Close()
n.wh.Close()
n.store.Close()
return nil
}
Expand Down Expand Up @@ -145,6 +148,11 @@ func newNode(walletKey types.PrivateKey, logger *zap.Logger) (*node, types.Priva
return nil, types.PrivateKey{}, fmt.Errorf("failed to create wallet: %w", err)
}

webhookReporter, err := webhooks.NewManager(db, logger.Named("webhooks"))
if err != nil {
return nil, types.PrivateKey{}, fmt.Errorf("failed to create webhook reporter: %w", err)
}

rhp2Listener, err := net.Listen("tcp", cfg.RHP2.Address)
if err != nil {
return nil, types.PrivateKey{}, fmt.Errorf("failed to listen on rhp2 addr: %w", err)
Expand All @@ -162,7 +170,7 @@ func newNode(walletKey types.PrivateKey, logger *zap.Logger) (*node, types.Priva
discoveredAddr := net.JoinHostPort(g.Address().Host(), rhp2Port)
logger.Debug("discovered address", zap.String("addr", discoveredAddr))

am := alerts.NewManager()
am := alerts.NewManager(webhookReporter, logger.Named("alerts"))
sr, err := settings.NewConfigManager(cfg.Directory, hostKey, discoveredAddr, db, cm, tp, w, am, logger.Named("settings"))
if err != nil {
return nil, types.PrivateKey{}, fmt.Errorf("failed to create settings manager: %w", err)
Expand Down Expand Up @@ -198,6 +206,7 @@ func newNode(walletKey types.PrivateKey, logger *zap.Logger) (*node, types.Priva
return &node{
g: g,
a: am,
wh: webhookReporter,
cm: cm,
tp: tp,
w: w,
Expand Down
8 changes: 7 additions & 1 deletion host/accounts/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.sia.tech/hostd/internal/chain"
"go.sia.tech/hostd/persist/sqlite"
"go.sia.tech/hostd/wallet"
"go.sia.tech/hostd/webhooks"
"go.sia.tech/siad/modules/consensus"
"go.sia.tech/siad/modules/gateway"
"go.sia.tech/siad/modules/transactionpool"
Expand Down Expand Up @@ -71,7 +72,12 @@ func TestCredit(t *testing.T) {
}
defer w.Close()

a := alerts.NewManager()
webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks"))
if err != nil {
t.Fatal(err)
}

a := alerts.NewManager(webhookReporter, log.Named("alerts"))
sm, err := storage.NewVolumeManager(db, a, cm, log.Named("storage"), 0)
if err != nil {
t.Fatal(err)
Expand Down
8 changes: 7 additions & 1 deletion host/accounts/budget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.sia.tech/hostd/internal/chain"
"go.sia.tech/hostd/persist/sqlite"
"go.sia.tech/hostd/wallet"
"go.sia.tech/hostd/webhooks"
"go.sia.tech/siad/modules/consensus"
"go.sia.tech/siad/modules/gateway"
"go.sia.tech/siad/modules/transactionpool"
Expand Down Expand Up @@ -108,7 +109,12 @@ func TestBudget(t *testing.T) {
}
defer w.Close()

a := alerts.NewManager()
webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks"))
if err != nil {
t.Fatal(err)
}

a := alerts.NewManager(webhookReporter, log.Named("alerts"))
sm, err := storage.NewVolumeManager(db, a, cm, log.Named("storage"), 0)
if err != nil {
t.Fatal(err)
Expand Down
8 changes: 7 additions & 1 deletion host/contracts/contracts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.sia.tech/hostd/host/storage"
"go.sia.tech/hostd/internal/test"
"go.sia.tech/hostd/persist/sqlite"
"go.sia.tech/hostd/webhooks"
"go.uber.org/zap/zaptest"
"lukechampine.com/frand"
)
Expand All @@ -36,7 +37,12 @@ func TestContractUpdater(t *testing.T) {
}
defer node.Close()

am := alerts.NewManager()
webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks"))
if err != nil {
t.Fatal(err)
}

am := alerts.NewManager(webhookReporter, log.Named("alerts"))
s, err := storage.NewVolumeManager(db, am, node.ChainManager(), log.Named("storage"), sectorCacheSize)
if err != nil {
t.Fatal(err)
Expand Down
8 changes: 7 additions & 1 deletion host/contracts/integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.sia.tech/hostd/host/contracts"
"go.sia.tech/hostd/host/storage"
"go.sia.tech/hostd/internal/test"
"go.sia.tech/hostd/webhooks"
stypes "go.sia.tech/siad/types"
"go.uber.org/zap/zaptest"
"lukechampine.com/frand"
Expand Down Expand Up @@ -79,7 +80,12 @@ func TestCheckIntegrity(t *testing.T) {
}
defer node.Close()

am := alerts.NewManager()
webhookReporter, err := webhooks.NewManager(node.Store(), log.Named("webhooks"))
if err != nil {
t.Fatal(err)
}

am := alerts.NewManager(webhookReporter, log.Named("alerts"))
s, err := storage.NewVolumeManager(node.Store(), am, node.ChainManager(), log.Named("storage"), 0)
if err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit 87a51dd

Please sign in to comment.