-
Notifications
You must be signed in to change notification settings - Fork 888
/
constructors.go
116 lines (103 loc) · 3.31 KB
/
constructors.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package share
import (
"context"
"errors"
"github.com/filecoin-project/dagstore"
"github.com/ipfs/boxo/blockservice"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/routing"
routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/celestiaorg/celestia-app/pkg/da"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/ipld"
disc "github.com/celestiaorg/celestia-node/share/p2p/discovery"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
)
const (
// fullNodesTag is the tag used to identify full nodes in the discovery service.
fullNodesTag = "full"
)
func newDiscovery(cfg *disc.Parameters,
) func(routing.ContentRouting, host.Host, *peers.Manager) (*disc.Discovery, error) {
return func(
r routing.ContentRouting,
h host.Host,
manager *peers.Manager,
) (*disc.Discovery, error) {
return disc.NewDiscovery(
cfg,
h,
routingdisc.NewRoutingDiscovery(r),
fullNodesTag,
disc.WithOnPeersUpdate(manager.UpdateNodePool),
)
}
}
func newShareModule(getter share.Getter, avail share.Availability) Module {
return &module{getter, avail}
}
// ensureEmptyCARExists adds an empty EDS to the provided EDS store.
func ensureEmptyCARExists(ctx context.Context, store *eds.Store) error {
emptyEDS := share.EmptyExtendedDataSquare()
emptyDAH, err := da.NewDataAvailabilityHeader(emptyEDS)
if err != nil {
return err
}
err = store.Put(ctx, emptyDAH.Hash(), emptyEDS)
if errors.Is(err, dagstore.ErrShardExists) {
return nil
}
return err
}
// ensureEmptyEDSInBS checks if the given DAG contains an empty block data square.
// If it does not, it stores an empty block. This optimization exists to prevent
// redundant storing of empty block data so that it is only stored once and returned
// upon request for a block with an empty data square.
func ensureEmptyEDSInBS(ctx context.Context, bServ blockservice.BlockService) error {
_, err := ipld.AddShares(ctx, share.EmptyBlockShares(), bServ)
return err
}
func lightGetter(
shrexGetter *getters.ShrexGetter,
ipldGetter *getters.IPLDGetter,
cfg Config,
) share.Getter {
var cascade []share.Getter
if cfg.UseShareExchange {
cascade = append(cascade, shrexGetter)
}
cascade = append(cascade, ipldGetter)
return getters.NewCascadeGetter(cascade)
}
// ShrexGetter is added to bridge nodes for the case that a shard is removed
// after detected shard corruption. This ensures the block is fetched and stored
// by shrex the next time the data is retrieved (meaning shard recovery is
// manual after corruption is detected).
func bridgeGetter(
storeGetter *getters.StoreGetter,
shrexGetter *getters.ShrexGetter,
cfg Config,
) share.Getter {
var cascade []share.Getter
cascade = append(cascade, storeGetter)
if cfg.UseShareExchange {
cascade = append(cascade, shrexGetter)
}
return getters.NewCascadeGetter(cascade)
}
func fullGetter(
storeGetter *getters.StoreGetter,
shrexGetter *getters.ShrexGetter,
ipldGetter *getters.IPLDGetter,
cfg Config,
) share.Getter {
var cascade []share.Getter
cascade = append(cascade, storeGetter)
if cfg.UseShareExchange {
cascade = append(cascade, shrexGetter)
}
cascade = append(cascade, ipldGetter)
return getters.NewCascadeGetter(cascade)
}