-
Notifications
You must be signed in to change notification settings - Fork 892
/
ipld.go
165 lines (143 loc) · 4.61 KB
/
ipld.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package getters
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"github.com/ipfs/boxo/blockservice"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"github.com/celestiaorg/rsmt2d"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/libs/utils"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/ipld"
)
var _ share.Getter = (*IPLDGetter)(nil)
// IPLDGetter is a share.Getter that retrieves shares from the bitswap network. Result caching is
// handled by the provided blockservice. A blockservice session will be created for retrieval if the
// passed context is wrapped with WithSession.
type IPLDGetter struct {
rtrv *eds.Retriever
bServ blockservice.BlockService
}
// NewIPLDGetter creates a new share.Getter that retrieves shares from the bitswap network.
func NewIPLDGetter(bServ blockservice.BlockService) *IPLDGetter {
return &IPLDGetter{
rtrv: eds.NewRetriever(bServ),
bServ: bServ,
}
}
// GetShare gets a single share at the given EDS coordinates from the bitswap network.
func (ig *IPLDGetter) GetShare(ctx context.Context, header *header.ExtendedHeader, row, col int) (share.Share, error) {
var err error
ctx, span := tracer.Start(ctx, "ipld/get-share", trace.WithAttributes(
attribute.Int("row", row),
attribute.Int("col", col),
))
defer func() {
utils.SetStatusAndEnd(span, err)
}()
dah := header.DAH
upperBound := len(dah.RowRoots)
if row >= upperBound || col >= upperBound {
err := share.ErrOutOfBounds
span.RecordError(err)
return nil, err
}
root, leaf := ipld.Translate(dah, row, col)
// wrap the blockservice in a session if it has been signaled in the context.
blockGetter := getGetter(ctx, ig.bServ)
s, err := ipld.GetShare(ctx, blockGetter, root, leaf, len(dah.RowRoots))
if errors.Is(err, ipld.ErrNodeNotFound) {
// convert error to satisfy getter interface contract
err = share.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("getter/ipld: failed to retrieve share: %w", err)
}
return s, nil
}
func (ig *IPLDGetter) GetEDS(
ctx context.Context,
header *header.ExtendedHeader,
) (eds *rsmt2d.ExtendedDataSquare, err error) {
ctx, span := tracer.Start(ctx, "ipld/get-eds")
defer func() {
utils.SetStatusAndEnd(span, err)
}()
// rtrv.Retrieve calls shares.GetShares until enough shares are retrieved to reconstruct the EDS
eds, err = ig.rtrv.Retrieve(ctx, header.DAH)
if errors.Is(err, ipld.ErrNodeNotFound) {
// convert error to satisfy getter interface contract
err = share.ErrNotFound
}
var errByz *byzantine.ErrByzantine
if errors.As(err, &errByz) {
return nil, err
}
if err != nil {
return nil, fmt.Errorf("getter/ipld: failed to retrieve eds: %w", err)
}
return eds, nil
}
func (ig *IPLDGetter) GetSharesByNamespace(
ctx context.Context,
header *header.ExtendedHeader,
namespace share.Namespace,
) (shares share.NamespacedShares, err error) {
ctx, span := tracer.Start(ctx, "ipld/get-shares-by-namespace", trace.WithAttributes(
attribute.String("namespace", namespace.String()),
))
defer func() {
utils.SetStatusAndEnd(span, err)
}()
if err = namespace.ValidateForData(); err != nil {
return nil, err
}
// wrap the blockservice in a session if it has been signaled in the context.
blockGetter := getGetter(ctx, ig.bServ)
shares, err = eds.CollectSharesByNamespace(ctx, blockGetter, header.DAH, namespace)
if errors.Is(err, ipld.ErrNodeNotFound) {
// convert error to satisfy getter interface contract
err = share.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("getter/ipld: failed to retrieve shares by namespace: %w", err)
}
return shares, nil
}
var sessionKey = &session{}
// session is a struct that can optionally be passed by context to the share.Getter methods using
// WithSession to indicate that a blockservice session should be created.
type session struct {
sync.Mutex
atomic.Pointer[blockservice.Session]
ctx context.Context
}
// WithSession stores an empty session in the context, indicating that a blockservice session should
// be created.
func WithSession(ctx context.Context) context.Context {
return context.WithValue(ctx, sessionKey, &session{ctx: ctx})
}
func getGetter(ctx context.Context, service blockservice.BlockService) blockservice.BlockGetter {
s, ok := ctx.Value(sessionKey).(*session)
if !ok {
return service
}
val := s.Load()
if val != nil {
return val
}
s.Lock()
defer s.Unlock()
val = s.Load()
if val == nil {
val = blockservice.NewSession(s.ctx, service)
s.Store(val)
}
return val
}