-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
rpc.go
107 lines (83 loc) · 3.17 KB
/
rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package kit
import (
"context"
"fmt"
"net"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/lotus/api/client"
"github.com/filecoin-project/lotus/cmd/lotus-worker/sealworker"
"github.com/filecoin-project/lotus/node"
)
type Closer func()
func CreateRPCServer(t *testing.T, handler http.Handler, listener net.Listener) (*httptest.Server, multiaddr.Multiaddr, Closer) {
testServ := &httptest.Server{
Listener: listener,
Config: &http.Server{
Handler: handler,
ReadHeaderTimeout: 30 * time.Second,
},
}
testServ.Start()
addr := testServ.Listener.Addr()
maddr, err := manet.FromNetAddr(addr)
require.NoError(t, err)
closer := func() {
testServ.CloseClientConnections()
testServ.Close()
}
return testServ, maddr, closer
}
func fullRpc(t *testing.T, f *TestFullNode) (*TestFullNode, Closer) {
handler, err := node.FullNodeHandler(f.FullNode, false)
require.NoError(t, err)
l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
srv, maddr, rpcCloser := CreateRPCServer(t, handler, l)
fmt.Printf("FULLNODE RPC ENV FOR CLI DEBUGGING `export FULLNODE_API_INFO=%s`\n", "ws://"+srv.Listener.Addr().String())
sendItestdNotif("FULLNODE_API_INFO", t.Name(), "ws://"+srv.Listener.Addr().String())
rpcOpts := []jsonrpc.Option{
jsonrpc.WithClientHandler("Filecoin", f.EthSubRouter),
jsonrpc.WithClientHandlerAlias("eth_subscription", "Filecoin.EthSubscription"),
}
cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil, rpcOpts...)
require.NoError(t, err)
f.ListenAddr, f.ListenURL, f.FullNode = maddr, srv.URL, cl
return f, func() { stop(); rpcCloser() }
}
func minerRpc(t *testing.T, m *TestMiner) *TestMiner {
handler, err := node.MinerHandler(m.StorageMiner, false)
require.NoError(t, err)
srv, maddr, _ := CreateRPCServer(t, handler, m.RemoteListener)
fmt.Printf("creating RPC server for %s at %s\n", m.ActorAddr, srv.Listener.Addr().String())
fmt.Printf("SP RPC ENV FOR CLI DEBUGGING `export MINER_API_INFO=%s`\n", "ws://"+srv.Listener.Addr().String())
sendItestdNotif("MINER_API_INFO", t.Name(), "ws://"+srv.Listener.Addr().String())
url := "ws://" + srv.Listener.Addr().String() + "/rpc/v0"
cl, stop, err := client.NewStorageMinerRPCV0(context.Background(), url, nil)
require.NoError(t, err)
t.Cleanup(stop)
m.ListenAddr, m.StorageMiner = maddr, cl
return m
}
func workerRpc(t *testing.T, m *TestWorker) *TestWorker {
handler := sealworker.WorkerHandler(m.MinerNode.AuthVerify, m.FetchHandler, m.Worker, false)
srv, maddr, _ := CreateRPCServer(t, handler, m.RemoteListener)
fmt.Println("creating RPC server for a worker at: ", srv.Listener.Addr().String())
url := "ws://" + srv.Listener.Addr().String() + "/rpc/v0"
cl, stop, err := client.NewWorkerRPCV0(context.Background(), url, nil)
require.NoError(t, err)
t.Cleanup(stop)
m.Stop = func(ctx context.Context) error {
srv.Close()
srv.CloseClientConnections()
return nil
}
m.ListenAddr, m.Worker = maddr, cl
return m
}