-
Notifications
You must be signed in to change notification settings - Fork 62
/
reprovide.go
148 lines (124 loc) · 3.35 KB
/
reprovide.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package reprovide
import (
"context"
"fmt"
"time"
"github.com/ipfs/go-ipfs/thirdparty/verifcid"
backoff "gx/ipfs/QmPJUtEJsm5YLUWhF6imvyCH8KZXRJa9Wup7FDMwTy5Ufz/backoff"
logging "gx/ipfs/QmRb5jh8z2E8hMGN2tkvs1yHynUanqnZ3UeKwgN1i9P1F8/go-log"
routing "gx/ipfs/QmTiWLZ6Fo5j4KcTVutZJ5KWRRJrbxzmxA4td8NfEdrPh7/go-libp2p-routing"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
)
var log = logging.Logger("reprovider")
//KeyChanFunc is function streaming CIDs to pass to content routing
type KeyChanFunc func(context.Context) (<-chan *cid.Cid, error)
type doneFunc func(error)
type Reprovider struct {
ctx context.Context
trigger chan doneFunc
// The routing system to provide values through
rsys routing.ContentRouting
keyProvider KeyChanFunc
}
// NewReprovider creates new Reprovider instance.
func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
return &Reprovider{
ctx: ctx,
trigger: make(chan doneFunc),
rsys: rsys,
keyProvider: keyProvider,
}
}
// Run re-provides keys with 'tick' interval or when triggered
func (rp *Reprovider) Run(tick time.Duration) {
// dont reprovide immediately.
// may have just started the daemon and shutting it down immediately.
// probability( up another minute | uptime ) increases with uptime.
after := time.After(time.Minute)
var done doneFunc
for {
if tick == 0 {
after = make(chan time.Time)
}
select {
case <-rp.ctx.Done():
return
case done = <-rp.trigger:
case <-after:
}
//'mute' the trigger channel so when `ipfs bitswap reprovide` is called
//a 'reprovider is already running' error is returned
unmute := rp.muteTrigger()
err := rp.Reprovide()
if err != nil {
log.Debug(err)
}
if done != nil {
done(err)
}
unmute()
after = time.After(tick)
}
}
// Reprovide registers all keys given by rp.keyProvider to libp2p content routing
func (rp *Reprovider) Reprovide() error {
keychan, err := rp.keyProvider(rp.ctx)
if err != nil {
return fmt.Errorf("failed to get key chan: %s", err)
}
for c := range keychan {
// hash security
if err := verifcid.ValidateCid(c); err != nil {
log.Errorf("insecure hash in reprovider, %s (%s)", c, err)
continue
}
op := func() error {
err := rp.rsys.Provide(rp.ctx, c, true)
if err != nil {
log.Debugf("Failed to provide key: %s", err)
}
return err
}
// TODO: this backoff library does not respect our context, we should
// eventually work contexts into it. low priority.
err := backoff.Retry(op, backoff.NewExponentialBackOff())
if err != nil {
log.Debugf("Providing failed after number of retries: %s", err)
return err
}
}
return nil
}
// Trigger starts reprovision process in rp.Run and waits for it
func (rp *Reprovider) Trigger(ctx context.Context) error {
progressCtx, done := context.WithCancel(ctx)
var err error
df := func(e error) {
err = e
done()
}
select {
case <-rp.ctx.Done():
return context.Canceled
case <-ctx.Done():
return context.Canceled
case rp.trigger <- df:
<-progressCtx.Done()
return err
}
}
func (rp *Reprovider) muteTrigger() context.CancelFunc {
ctx, cf := context.WithCancel(rp.ctx)
go func() {
defer cf()
for {
select {
case <-ctx.Done():
return
case done := <-rp.trigger:
done(fmt.Errorf("reprovider is already running"))
}
}
}()
return cf
}