Skip to content

Commit

Permalink
rhp/v3: add websocket handler
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Apr 18, 2023
1 parent 7457e82 commit 6eae867
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 0 deletions.
50 changes: 50 additions & 0 deletions rhp/v3/websocket_test.go
@@ -0,0 +1,50 @@
package rhp_test

import (
"context"
"encoding/json"
"testing"

rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/hostd/internal/test"
"go.uber.org/zap/zaptest"
"nhooyr.io/websocket"
)

func TestWebSockets(t *testing.T) {
log := zaptest.NewLogger(t)
renter, host, err := test.NewTestingPair(t.TempDir(), log)
if err != nil {
t.Fatal(err)
}
defer renter.Close()
defer host.Close()

c, _, err := websocket.Dial(context.Background(), "ws://"+host.RHPv3WSAddr()+"/ws", nil)
if err != nil {
t.Fatal(err)
}
defer c.Close(websocket.StatusNormalClosure, "")

conn := websocket.NetConn(context.Background(), c, websocket.MessageBinary)
transport, err := rhpv3.NewRenterTransport(conn, host.PublicKey())
if err != nil {
t.Fatal(err)
}
defer transport.Close()

stream := transport.DialStream()
defer stream.Close()

if err := stream.WriteRequest(rhpv3.RPCUpdatePriceTableID, nil); err != nil {
t.Fatal(err)
}
var resp rhpv3.RPCUpdatePriceTableResponse
if err := stream.ReadResponse(&resp, 4096); err != nil {
t.Fatal(err)
}
var pt rhpv3.HostPriceTable
if err := json.Unmarshal(resp.PriceTableJSON, &pt); err != nil {
t.Fatal(err)
}
}
48 changes: 48 additions & 0 deletions rhp/v3/websockets.go
@@ -0,0 +1,48 @@
package rhp

import (
"context"
"net/http"

rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/hostd/rhp"
"go.uber.org/zap"
"nhooyr.io/websocket"
)

// handleWebSockets handles websocket connections to the host.
func (sh *SessionHandler) handleWebSockets(w http.ResponseWriter, r *http.Request) {
log := sh.log.Named("websockets").With(zap.String("remoteAddr", r.RemoteAddr))
wsConn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
OriginPatterns: []string{"*"},
})
if err != nil {
log.Error("failed to accept websocket connection", zap.Error(err))
return
}
defer wsConn.Close(websocket.StatusNormalClosure, "")

conn := websocket.NetConn(context.Background(), wsConn, websocket.MessageBinary)
ingress, egress := sh.settings.BandwidthLimiters()
t, err := rhpv3.NewHostTransport(rhp.NewConn(conn, sh.monitor, ingress, egress), sh.privateKey)
if err != nil {
sh.log.Debug("failed to upgrade conn", zap.Error(err), zap.String("remoteAddress", conn.RemoteAddr().String()))
return
}
defer t.Close()

for {
stream, err := t.AcceptStream()
if err != nil {
log.Debug("failed to accept stream", zap.Error(err))
return
}
go sh.handleHostStream(conn.RemoteAddr().String(), stream)
}
}

// WebSocketHandler returns an http.Handler that upgrades the connection to a
// WebSocket and then passes the connection to the RHP3 host transport.
func (sh *SessionHandler) WebSocketHandler() http.HandlerFunc {
return sh.handleWebSockets
}

0 comments on commit 6eae867

Please sign in to comment.