-
Notifications
You must be signed in to change notification settings - Fork 885
/
bitswap.go
101 lines (88 loc) · 2.61 KB
/
bitswap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package p2p
import (
"context"
"errors"
"fmt"
"github.com/ipfs/boxo/bitswap/client"
"github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/server"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
"github.com/ipfs/go-datastore"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
hst "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/fx"
"github.com/celestiaorg/celestia-node/share/eds"
)
const (
// default size of bloom filter in blockStore
defaultBloomFilterSize = 512 << 10
// default amount of hash functions defined for bloom filter
defaultBloomFilterHashes = 7
// default size of arc cache in blockStore
defaultARCCacheSize = 64 << 10
)
// dataExchange provides a constructor for IPFS block's DataExchange over BitSwap.
func dataExchange(params bitSwapParams) exchange.Interface {
prefix := protocolID(params.Net)
net := network.NewFromIpfsHost(params.Host, &routinghelpers.Null{}, network.Prefix(prefix))
srvr := server.New(
params.Ctx,
net,
params.Bs,
server.ProvideEnabled(false), // we don't provide blocks over DHT
// NOTE: These below are required for our protocol to work reliably.
// // See https://github.com/celestiaorg/celestia-node/issues/732
server.SetSendDontHaves(false),
)
clnt := client.New(
params.Ctx,
net,
params.Bs,
client.WithBlockReceivedNotifier(srvr),
client.SetSimulateDontHavesOnTimeout(false),
client.WithoutDuplicatedBlockStats(),
)
net.Start(srvr, clnt) // starting with hook does not work
params.Lifecycle.Append(fx.Hook{
OnStop: func(_ context.Context) (err error) {
err = errors.Join(err, clnt.Close())
err = errors.Join(err, srvr.Close())
net.Stop()
return err
},
})
return clnt
}
func blockstoreFromDatastore(ctx context.Context, ds datastore.Batching) (blockstore.Blockstore, error) {
return blockstore.CachedBlockstore(
ctx,
blockstore.NewBlockstore(ds),
blockstore.CacheOpts{
HasBloomFilterSize: defaultBloomFilterSize,
HasBloomFilterHashes: defaultBloomFilterHashes,
HasTwoQueueCacheSize: defaultARCCacheSize,
},
)
}
func blockstoreFromEDSStore(ctx context.Context, store *eds.Store) (blockstore.Blockstore, error) {
return blockstore.CachedBlockstore(
ctx,
store.Blockstore(),
blockstore.CacheOpts{
HasTwoQueueCacheSize: defaultARCCacheSize,
},
)
}
type bitSwapParams struct {
fx.In
Lifecycle fx.Lifecycle
Ctx context.Context
Net Network
Host hst.Host
Bs blockstore.Blockstore
}
func protocolID(network Network) protocol.ID {
return protocol.ID(fmt.Sprintf("/celestia/%s", network))
}