/
provider.go
151 lines (129 loc) · 4.51 KB
/
provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package node
import (
"context"
"fmt"
"time"
"github.com/bittorrent/go-btfs/core/node/helpers"
"github.com/bittorrent/go-btfs/repo"
irouting "github.com/bittorrent/go-btfs/routing"
"github.com/ipfs/go-fetcher"
pin "github.com/ipfs/go-ipfs-pinner"
provider "github.com/ipfs/go-ipfs-provider"
"github.com/ipfs/go-ipfs-provider/batched"
q "github.com/ipfs/go-ipfs-provider/queue"
"github.com/ipfs/go-ipfs-provider/simple"
"go.uber.org/fx"
)
const kReprovideFrequency = time.Hour * 12
// SIMPLE
// ProviderQueue creates new datastore backed provider queue
func ProviderQueue(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*q.Queue, error) {
return q.NewQueue(helpers.LifecycleCtx(mctx, lc), "provider-v1", repo.Datastore())
}
// SimpleProvider creates new record provider
func SimpleProvider(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *q.Queue, rt irouting.ProvideManyRouter) provider.Provider {
return simple.NewProvider(helpers.LifecycleCtx(mctx, lc), queue, rt)
}
// SimpleReprovider creates new reprovider
func SimpleReprovider(reproviderInterval time.Duration) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, rt irouting.ProvideManyRouter, keyProvider simple.KeyChanFunc) (provider.Reprovider, error) {
return simple.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil
}
}
// SimpleProviderSys creates new provider system
func SimpleProviderSys(isOnline bool) interface{} {
return func(lc fx.Lifecycle, p provider.Provider, r provider.Reprovider) provider.System {
sys := provider.NewSystem(p, r)
if isOnline {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
sys.Run()
return nil
},
OnStop: func(ctx context.Context) error {
return sys.Close()
},
})
}
return sys
}
}
// BatchedProviderSys creates new provider system
func BatchedProviderSys(isOnline bool, reprovideInterval time.Duration) interface{} {
return func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, q *q.Queue, keyProvider simple.KeyChanFunc, repo repo.Repo) (provider.System, error) {
sys, err := batched.New(cr, q,
batched.ReproviderInterval(reprovideInterval),
batched.Datastore(repo.Datastore()),
batched.KeyProvider(keyProvider))
if err != nil {
return nil, err
}
if isOnline {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
sys.Run()
return nil
},
OnStop: func(ctx context.Context) error {
return sys.Close()
},
})
}
return sys, nil
}
}
// ONLINE/OFFLINE
// OnlineProviders groups units managing provider routing records online
func OnlineProviders(useStrategicProviding bool, useBatchedProviding bool, reprovideStrategy string, reprovideInterval time.Duration) fx.Option {
if useStrategicProviding {
return fx.Provide(provider.NewOfflineProvider)
}
return fx.Options(
SimpleProviders(reprovideStrategy, reprovideInterval),
maybeProvide(SimpleProviderSys(true), !useBatchedProviding),
maybeProvide(BatchedProviderSys(true, reprovideInterval), useBatchedProviding),
)
}
// OfflineProviders groups units managing provider routing records offline
func OfflineProviders(useStrategicProviding bool, useBatchedProviding bool, reprovideStrategy string, reprovideInterval time.Duration) fx.Option {
if useStrategicProviding {
return fx.Provide(provider.NewOfflineProvider)
}
return fx.Options(
SimpleProviders(reprovideStrategy, reprovideInterval),
maybeProvide(SimpleProviderSys(false), true),
//maybeProvide(BatchedProviderSys(false, reprovideInterval), useBatchedProviding),
)
}
// SimpleProviders creates the simple provider/reprovider dependencies
func SimpleProviders(reprovideStrategy string, reproviderInterval time.Duration) fx.Option {
var keyProvider fx.Option
switch reprovideStrategy {
case "all":
fallthrough
case "":
keyProvider = fx.Provide(simple.NewBlockstoreProvider)
case "roots":
keyProvider = fx.Provide(pinnedProviderStrategy(true))
case "pinned":
keyProvider = fx.Provide(pinnedProviderStrategy(false))
default:
return fx.Error(fmt.Errorf("unknown reprovider strategy '%s'", reprovideStrategy))
}
return fx.Options(
fx.Provide(ProviderQueue),
fx.Provide(SimpleProvider),
keyProvider,
fx.Provide(SimpleReprovider(reproviderInterval)),
)
}
func pinnedProviderStrategy(onlyRoots bool) interface{} {
type input struct {
fx.In
Pinner pin.Pinner
IPLDFetcher fetcher.Factory `name:"ipldFetcher"`
}
return func(in input) simple.KeyChanFunc {
return simple.NewPinnedProvider(onlyRoots, in.Pinner, in.IPLDFetcher)
}
}