-
Notifications
You must be signed in to change notification settings - Fork 64
/
transports.go
120 lines (95 loc) · 3.65 KB
/
transports.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package lp2pimpl
import (
"context"
"fmt"
"time"
"github.com/filecoin-project/boost/retrievalmarket/types"
"github.com/filecoin-project/go-fil-markets/shared"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
var clog = logging.Logger("boost:lp2p:tspt:client")
var slog = logging.Logger("boost:lp2p:tspt")
// TransportsProtocolID is the protocol for querying which retrieval transports
// the Storage Provider supports (http, libp2p, etc)
const TransportsProtocolID = protocol.ID("/fil/retrieval/transports/1.0.0")
// TransportsListener listens for incoming queries over libp2p
type TransportsListener struct {
host host.Host
protocols []types.Protocol
}
const streamReadDeadline = 30 * time.Second
const streamWriteDeadline = 30 * time.Second
// QueryClientOption is an option for configuring the libp2p storage deal client
type QueryClientOption func(*TransportsClient)
// RetryParameters changes the default parameters around connection reopening
func RetryParameters(minDuration time.Duration, maxDuration time.Duration, attempts float64, backoffFactor float64) QueryClientOption {
return func(c *TransportsClient) {
c.retryStream.SetOptions(shared.RetryParameters(minDuration, maxDuration, attempts, backoffFactor))
}
}
// TransportsClient sends retrieval queries over libp2p
type TransportsClient struct {
retryStream *shared.RetryStream
}
func NewTransportsClient(h host.Host, options ...QueryClientOption) *TransportsClient {
c := &TransportsClient{
retryStream: shared.NewRetryStream(h),
}
for _, option := range options {
option(c)
}
return c
}
// SendQuery sends a retrieval query over a libp2p stream to the peer
func (c *TransportsClient) SendQuery(ctx context.Context, id peer.ID) (*types.QueryResponse, error) {
clog.Debugw("query", "peer", id)
// Create a libp2p stream to the provider
s, err := c.retryStream.OpenStream(ctx, id, []protocol.ID{TransportsProtocolID})
if err != nil {
return nil, err
}
defer s.Close() // nolint
// Set a deadline on reading from the stream so it doesn't hang
_ = s.SetReadDeadline(time.Now().Add(streamReadDeadline))
defer s.SetReadDeadline(time.Time{}) // nolint
// Read the response from the stream
queryResponsei, err := types.BindnodeRegistry.TypeFromReader(s, (*types.QueryResponse)(nil), dagcbor.Decode)
if err != nil {
return nil, fmt.Errorf("reading query response: %w", err)
}
queryResponse := queryResponsei.(*types.QueryResponse)
clog.Debugw("response", "peer", id)
return queryResponse, nil
}
func NewTransportsListener(h host.Host, protos []types.Protocol) *TransportsListener {
return &TransportsListener{
host: h,
protocols: protos,
}
}
func (p *TransportsListener) Start() {
p.host.SetStreamHandler(TransportsProtocolID, p.handleNewQueryStream)
}
func (p *TransportsListener) Stop() {
p.host.RemoveStreamHandler(TransportsProtocolID)
}
// Called when the client opens a libp2p stream
func (l *TransportsListener) handleNewQueryStream(s network.Stream) {
defer s.Close()
slog.Debugw("query", "peer", s.Conn().RemotePeer())
response := types.QueryResponse{Protocols: l.protocols}
// Set a deadline on writing to the stream so it doesn't hang
_ = s.SetWriteDeadline(time.Now().Add(streamWriteDeadline))
defer s.SetWriteDeadline(time.Time{}) // nolint
// Write the response to the client
err := types.BindnodeRegistry.TypeToWriter(&response, s, dagcbor.Encode)
if err != nil {
slog.Infow("error writing query response", "peer", s.Conn().RemotePeer(), "err", err)
return
}
}