-
Notifications
You must be signed in to change notification settings - Fork 922
/
client.go
177 lines (158 loc) · 5.21 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package shrexeds
import (
"context"
"errors"
"fmt"
"io"
"net"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/celestiaorg/go-libp2p-messenger/serde"
"github.com/celestiaorg/rsmt2d"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/p2p"
pb "github.com/celestiaorg/celestia-node/share/p2p/shrexeds/pb"
)
// Client is responsible for requesting EDSs for blocksync over the ShrEx/EDS protocol.
type Client struct {
params *Parameters
protocolID protocol.ID
host host.Host
metrics *p2p.Metrics
}
// NewClient creates a new ShrEx/EDS client.
func NewClient(params *Parameters, host host.Host) (*Client, error) {
if err := params.Validate(); err != nil {
return nil, fmt.Errorf("shrex-eds: client creation failed: %w", err)
}
return &Client{
params: params,
host: host,
protocolID: p2p.ProtocolID(params.NetworkID(), protocolString),
}, nil
}
// RequestEDS requests the ODS from the given peers and returns the EDS upon success.
func (c *Client) RequestEDS(
ctx context.Context,
dataHash share.DataHash,
peer peer.ID,
) (*rsmt2d.ExtendedDataSquare, error) {
eds, err := c.doRequest(ctx, dataHash, peer)
if err == nil {
return eds, nil
}
log.Debugw("client: eds request to peer failed", "peer", peer.String(), "hash", dataHash.String(), "error", err)
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
c.metrics.ObserveRequests(ctx, 1, p2p.StatusTimeout)
return nil, err
}
// some net.Errors also mean the context deadline was exceeded, but yamux/mocknet do not
// unwrap to a ctx err
var ne net.Error
if errors.As(err, &ne) && ne.Timeout() {
if deadline, _ := ctx.Deadline(); deadline.Before(time.Now()) {
c.metrics.ObserveRequests(ctx, 1, p2p.StatusTimeout)
return nil, context.DeadlineExceeded
}
}
if !errors.Is(err, p2p.ErrNotFound) {
log.Warnw("client: eds request to peer failed",
"peer", peer.String(),
"hash", dataHash.String(),
"err", err)
}
return nil, err
}
func (c *Client) doRequest(
ctx context.Context,
dataHash share.DataHash,
to peer.ID,
) (*rsmt2d.ExtendedDataSquare, error) {
streamOpenCtx, cancel := context.WithTimeout(ctx, c.params.ServerReadTimeout)
defer cancel()
stream, err := c.host.NewStream(streamOpenCtx, to, c.protocolID)
if err != nil {
return nil, fmt.Errorf("failed to open stream: %w", err)
}
defer stream.Close()
c.setStreamDeadlines(ctx, stream)
req := &pb.EDSRequest{Hash: dataHash}
// request ODS
log.Debugw("client: requesting ods", "hash", dataHash.String(), "peer", to.String())
_, err = serde.Write(stream, req)
if err != nil {
stream.Reset() //nolint:errcheck
return nil, fmt.Errorf("failed to write request to stream: %w", err)
}
err = stream.CloseWrite()
if err != nil {
log.Debugw("client: error closing write", "err", err)
}
// read and parse status from peer
resp := new(pb.EDSResponse)
err = stream.SetReadDeadline(time.Now().Add(c.params.ServerReadTimeout))
if err != nil {
log.Debugw("client: failed to set read deadline for reading status", "err", err)
}
_, err = serde.Read(stream, resp)
if err != nil {
// server closes the stream here if we are rate limited
if errors.Is(err, io.EOF) {
c.metrics.ObserveRequests(ctx, 1, p2p.StatusRateLimited)
return nil, p2p.ErrNotFound
}
stream.Reset() //nolint:errcheck
return nil, fmt.Errorf("failed to read status from stream: %w", err)
}
switch resp.Status {
case pb.Status_OK:
// reset stream deadlines to original values, since read deadline was changed during status read
c.setStreamDeadlines(ctx, stream)
// use header and ODS bytes to construct EDS and verify it against dataHash
eds, err := eds.ReadEDS(ctx, stream, dataHash)
if err != nil {
return nil, fmt.Errorf("failed to read eds from ods bytes: %w", err)
}
c.metrics.ObserveRequests(ctx, 1, p2p.StatusSuccess)
return eds, nil
case pb.Status_NOT_FOUND:
c.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound)
return nil, p2p.ErrNotFound
case pb.Status_INVALID:
log.Debug("client: invalid request")
fallthrough
case pb.Status_INTERNAL:
fallthrough
default:
c.metrics.ObserveRequests(ctx, 1, p2p.StatusInternalErr)
return nil, p2p.ErrInvalidResponse
}
}
func (c *Client) setStreamDeadlines(ctx context.Context, stream network.Stream) {
// set read/write deadline to use context deadline if it exists
if dl, ok := ctx.Deadline(); ok {
err := stream.SetDeadline(dl)
if err == nil {
return
}
log.Debugw("client: setting deadline: %s", "err", err)
}
// if deadline not set, client read deadline defaults to server write deadline
if c.params.ServerWriteTimeout != 0 {
err := stream.SetReadDeadline(time.Now().Add(c.params.ServerWriteTimeout))
if err != nil {
log.Debugw("client: setting read deadline", "err", err)
}
}
// if deadline not set, client write deadline defaults to server read deadline
if c.params.ServerReadTimeout != 0 {
err := stream.SetWriteDeadline(time.Now().Add(c.params.ServerReadTimeout))
if err != nil {
log.Debugw("client: setting write deadline", "err", err)
}
}
}