-
Notifications
You must be signed in to change notification settings - Fork 14
/
subscriber.go
746 lines (649 loc) · 23.1 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
package legs
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/filecoin-project/go-legs/dtsync"
"github.com/filecoin-project/go-legs/gpubsub"
"github.com/filecoin-project/go-legs/httpsync"
"github.com/hashicorp/go-multierror"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/multiformats/go-multiaddr"
)
var log = logging.Logger("go-legs")
// defaultAddrTTL is the default amount of time that addresses discovered from
// pubsub messages will remain in the peerstore. This is twice the default
// provider poll interval.
const (
defaultAddrTTL = 48 * time.Hour
tempAddrTTL = 24 * time.Hour // must be long enough for ad chain to sync
)
// errSourceNotAllowed is the error returned when a message source peer's
// messages is not allowed to be processed. This is only used internally, and
// pre-allocated here as it may occur frequently.
var errSourceNotAllowed = errors.New("message source not allowed")
// AllowPeerFunc is the signature of a function given to Subscriber that
// determines whether to allow or reject messages originating from a peer
// passed into the function. Returning true or false indicates that messages
// from that peer are allowed rejected, respectively. Returning an error
// indicates that there was a problem evaluating the function, and results in
// the messages being rejected.
type AllowPeerFunc func(peer.ID) (bool, error)
// BlockHookFunc is the signature of a function that is called when a received.
type BlockHookFunc func(peer.ID, cid.Cid)
// Subscriber creates a single pubsub subscriber that receives messages from a
// gossip pubsub topic, and creates a stateful message handler for each message
// source peer. An optional externally-defined AllowPeerFunc determines
// whether to allow or deny messages from specific peers.
//
// Messages from separate peers are handled concurrently, and multiple messages
// from the same peer are handled serially. If a handler is busy handling a
// message, and more messages arrive from the same peer, then the last message
// replaces the previous unhandled message to avoid having to maintain queues
// of messages. Handlers do not have persistent goroutines, but start a new
// goroutine to handle a single message.
type Subscriber struct {
// dss captures the default selector sequence passed to
// ExploreRecursiveWithStopNode.
dss ipld.Node
host host.Host
addrTTL time.Duration
psub *pubsub.Subscription
topic *pubsub.Topic
topicName string
allowPeer AllowPeerFunc
handlers map[peer.ID]*handler
handlersMutex sync.Mutex
// A map of block hooks to call for a specific peer id, instead of the general
// block hook func.
scopedBlockHook map[peer.ID]BlockHookFunc
scopedBlockHookMutex *sync.RWMutex
// inEvents is used to send a SyncFinished from a peer handler to the
// distributeEvents goroutine.
inEvents chan SyncFinished
// outEventsChans is a slice of channels, where each channel delivers a
// copy of a SyncFinished to an OnSyncFinished reader.
outEventsChans []chan SyncFinished
outEventsMutex sync.Mutex
// closing signals that the Subscriber is closing.
closing chan struct{}
// cancelps cancels pubsub.
cancelps context.CancelFunc
// closeOnce ensures that the Close only happens once.
closeOnce sync.Once
// watchDone signals that the pubsub watch function exited.
watchDone chan struct{}
dtSync *dtsync.Sync
httpSync *httpsync.Sync
syncRecLimit selector.RecursionLimit
httpPeerstore peerstore.Peerstore
latestSyncHander LatestSyncHandler
}
// SyncFinished notifies an OnSyncFinished reader that a specified peer
// completed a sync. The channel receives events from providers that are
// manually synced to the latest, as well as those auto-discovered.
type SyncFinished struct {
// Cid is the CID identifying the link that finished and is now the latest
// sync for a specific peer.
Cid cid.Cid
// PeerID identifies the peer this SyncFinished event pertains to.
PeerID peer.ID
// A list of cids that this sync acquired. In order from latest to oldest. The latest cid will always be at the beginning.
SyncedCids []cid.Cid
}
func WrapBlockHookWithSyncedCidTracker(cidsSeenSoFar *[]cid.Cid, blockHook BlockHookFunc) BlockHookFunc {
return func(p peer.ID, c cid.Cid) {
*cidsSeenSoFar = append(*cidsSeenSoFar, c)
if blockHook != nil {
blockHook(p, c)
}
}
}
// handler holds state that is specific to a peer
type handler struct {
subscriber *Subscriber
// syncMutex serializes the handling of individual syncs. This should only
// guard the actual handling of a sync, nothing else.
syncMutex sync.Mutex
// If this sync will update the latestSync state (via latestSyncHandler) then
// it should grab this lock to insure no other process updates that state
// concurrently.
latestSyncMu sync.Mutex
msgChan chan cid.Cid
// peerID is the ID of the peer this handler is responsible for.
peerID peer.ID
}
// wrapBlockHook wraps a possibly nil block hook func to allow a for dispatching
// to a blockhook func that is scoped within a .Sync call.
func wrapBlockHook(generalBlockHook BlockHookFunc) (*sync.RWMutex, map[peer.ID]BlockHookFunc, BlockHookFunc) {
var scopedBlockHookMutex sync.RWMutex
scopedBlockHook := make(map[peer.ID]BlockHookFunc)
return &scopedBlockHookMutex, scopedBlockHook, func(peerID peer.ID, cid cid.Cid) {
scopedBlockHookMutex.RLock()
f, ok := scopedBlockHook[peerID]
scopedBlockHookMutex.RUnlock()
if ok {
f(peerID, cid)
}
if generalBlockHook != nil {
generalBlockHook(peerID, cid)
}
}
}
// NewSubscriber creates a new Subscriber that process pubsub messages.
func NewSubscriber(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, topic string, dss ipld.Node, options ...Option) (*Subscriber, error) {
cfg := config{
addrTTL: defaultAddrTTL,
}
err := cfg.apply(options)
if err != nil {
return nil, err
}
ctx, cancelPubsub := context.WithCancel(context.Background())
var pubsubTopic *pubsub.Topic
if cfg.topic == nil {
pubsubTopic, err = gpubsub.MakePubsub(ctx, host, topic)
if err != nil {
cancelPubsub()
return nil, err
}
cfg.topic = pubsubTopic
}
psub, err := cfg.topic.Subscribe()
if err != nil {
cancelPubsub()
return nil, err
}
scopedBlockHookMutex, scopedBlockHook, blockHook := wrapBlockHook(cfg.blockHook)
var dtSync *dtsync.Sync
if cfg.dtManager != nil {
if ds != nil {
cancelPubsub()
return nil, fmt.Errorf("datastore cannot be used with DtManager option")
}
dtSync, err = dtsync.NewSyncWithDT(host, cfg.dtManager, cfg.graphExchange, blockHook)
} else {
dtSync, err = dtsync.NewSync(host, ds, lsys, blockHook)
}
if err != nil {
cancelPubsub()
return nil, err
}
httpPeerstore, err := pstoremem.NewPeerstore()
if err != nil {
cancelPubsub()
return nil, err
}
latestSyncHandler := cfg.latestSyncHandler
if latestSyncHandler == nil {
latestSyncHandler = &DefaultLatestSyncHandler{}
}
s := &Subscriber{
dss: dss,
host: host,
addrTTL: cfg.addrTTL,
psub: psub,
topic: cfg.topic,
topicName: cfg.topic.String(),
closing: make(chan struct{}),
cancelps: cancelPubsub,
watchDone: make(chan struct{}),
allowPeer: cfg.allowPeer,
handlers: make(map[peer.ID]*handler),
inEvents: make(chan SyncFinished, 1),
dtSync: dtSync,
httpSync: httpsync.NewSync(lsys, cfg.httpClient, blockHook),
syncRecLimit: cfg.syncRecLimit,
httpPeerstore: httpPeerstore,
scopedBlockHookMutex: scopedBlockHookMutex,
scopedBlockHook: scopedBlockHook,
latestSyncHander: latestSyncHandler,
}
// Start watcher to read pubsub messages.
go s.watch(ctx)
// Start distributor to send SyncFinished messages to interested parties.
go s.distributeEvents()
return s, nil
}
// GetLatestSync returns the latest synced CID for the specified peer. If there
// is not handler for the peer, then nil is returned. This does not mean that
// no data is synced with that peer, it means that the Subscriber does not know
// about it. Calling Sync() first may be necessary.
func (s *Subscriber) GetLatestSync(peerID peer.ID) ipld.Link {
v, ok := s.latestSyncHander.GetLatestSync(peerID)
if !ok || v == cid.Undef {
return nil
}
return cidlink.Link{Cid: v}
}
// SetLatestSync sets the latest synced CID for a specified peer. If there is
// no handler for the peer, then one is created without consulting any
// AllowPeerFunc.
func (s *Subscriber) SetLatestSync(peerID peer.ID, latestSync cid.Cid) error {
if latestSync == cid.Undef {
return errors.New("cannot set latest sync to undefined value")
}
hnd, err := s.getOrCreateHandler(peerID, true)
if err != nil {
return err
}
hnd.latestSyncMu.Lock()
defer hnd.latestSyncMu.Unlock()
s.latestSyncHander.SetLatestSync(peerID, latestSync)
return nil
}
// SetAllowPeer configures Subscriber with a function to evaluate whether to
// allow or reject messages from a peer. Setting nil removes any filtering and
// allows messages from all peers. Calling SetAllowPeer replaces any
// previously configured AllowPeerFunc.
func (s *Subscriber) SetAllowPeer(allowPeer AllowPeerFunc) {
s.handlersMutex.Lock()
defer s.handlersMutex.Unlock()
s.allowPeer = allowPeer
}
// Close shuts down the Subscriber.
func (s *Subscriber) Close() error {
var err error
s.closeOnce.Do(func() {
err = s.doClose()
})
return err
}
func (s *Subscriber) doClose() error {
// Cancel pubsub and Wait for pubsub watcher to exit.
s.psub.Cancel()
<-s.watchDone
var err, errs error
if err = s.dtSync.Close(); err != nil {
errs = multierror.Append(errs, err)
}
// If Subscriber owns the pubsub topic, then close it.
if s.topic != nil {
if err = s.topic.Close(); err != nil {
log.Errorw("Failed to close pubsub topic", "err", err)
errs = multierror.Append(errs, err)
}
}
// Dismiss any event readers.
s.outEventsMutex.Lock()
for _, ch := range s.outEventsChans {
close(ch)
}
s.outEventsChans = nil
s.outEventsMutex.Unlock()
// Shutdown pubsub services.
s.cancelps()
// Stop the distribution goroutine.
close(s.inEvents)
s.httpPeerstore.Close()
return errs
}
// OnSyncFinished creates a channel that receives change notifications, and
// adds that channel to the list of notification channels.
//
// Calling the returned cancel function removes the notification channel from
// the list of channels to be notified on changes, and it closes the channel to
// allow any reading goroutines to stop waiting on the channel.
func (s *Subscriber) OnSyncFinished() (<-chan SyncFinished, context.CancelFunc) {
// Channel is buffered to prevent distribute() from blocking if a reader is
// not reading the channel immediately.
ch := make(chan SyncFinished, 1)
s.outEventsMutex.Lock()
defer s.outEventsMutex.Unlock()
s.outEventsChans = append(s.outEventsChans, ch)
cncl := func() {
s.outEventsMutex.Lock()
defer s.outEventsMutex.Unlock()
for i, ca := range s.outEventsChans {
if ca == ch {
s.outEventsChans[i] = s.outEventsChans[len(s.outEventsChans)-1]
s.outEventsChans[len(s.outEventsChans)-1] = nil
s.outEventsChans = s.outEventsChans[:len(s.outEventsChans)-1]
close(ch)
break
}
}
}
return ch, cncl
}
// Sync performs a one-off explicit sync with the given peer for a specific CID
// and updates the latest synced link to it. Completing sync may take a
// significant amount of time, so Sync should generally be run in its own
// goroutine.
//
// If given cid.Undef, the latest root CID is queried from the peer directly
// and used instead. Note that in an event where there is no latest root, i.e.
// querying the latest CID returns cid.Undef, this function returns cid.Undef
// with nil error.
//
// The latest synced CID is returned when this sync is complete. Any
// OnSyncFinished readers will also get a SyncFinished when the sync succeeds,
// but only if syncing to the latest, using `cid.Undef`, and using the default
// selector. This is because when specifying a CID, it is usually for an
// entries sync, not an advertisements sync.
//
// It is the responsibility of the caller to make sure the given CID appears
// after the latest sync in order to avid re-syncing of content that may have
// previously been synced.
//
// The selector sequence, sel, can optionally be specified to customize the
// selection sequence during traversal. If unspecified, the default selector
// sequence is used.
//
// Note that the selector sequence is wrapped with a selector logic that will
// stop traversal when the latest synced link is reached. Therefore, it must
// only specify the selection sequence itself.
//
// See: ExploreRecursiveWithStopNode.
func (s *Subscriber) Sync(ctx context.Context, peerID peer.ID, nextCid cid.Cid, sel ipld.Node, peerAddr multiaddr.Multiaddr, opts ...SyncOption) (cid.Cid, error) {
cfg := &syncCfg{}
for _, opt := range opts {
opt(cfg)
}
if peerID == "" {
return cid.Undef, errors.New("empty peer id")
}
log := log.With("peer", peerID)
var err error
var syncer Syncer
isHttpPeerAddr := false
if peerAddr != nil {
for _, p := range peerAddr.Protocols() {
if p.Code == multiaddr.P_HTTP || p.Code == multiaddr.P_HTTPS {
isHttpPeerAddr = true
break
}
}
} else {
// Check if we have an http url for this peer since we didn't get a peerAddr.
// Note that this gives a preference to use httpSync over dtsync if we have
// seen http address and we called sync with no explicit peerAddr.
possibleAddrs := s.httpPeerstore.Addrs(peerID)
if len(possibleAddrs) > 0 {
peerAddr = possibleAddrs[0]
isHttpPeerAddr = true
}
}
if isHttpPeerAddr {
syncer, err = s.httpSync.NewSyncer(peerID, peerAddr)
if err != nil {
return cid.Undef, fmt.Errorf("cannot create http sync handler: %w", err)
}
} else {
// Not an httpPeerAddr, so use the dtSync. We'll add it with a small TTL
// first, and extend it when we discover we can actually sync from it.
// In case the peerstore already has this address and the existing TTL is
// greater than this temp one, this is a no-op. In other words we never
// decrease the TTL here.
peerStore := s.host.Peerstore()
if peerStore != nil && peerAddr != nil {
peerStore.AddAddr(peerID, peerAddr, tempAddrTTL)
}
syncer = s.dtSync.NewSyncer(peerID, s.topicName)
}
updateLatest := cfg.alwaysUpdateLatest
if nextCid == cid.Undef {
// Query the peer for the latest CID
nextCid, err = syncer.GetHead(ctx)
if err != nil {
return cid.Undef, fmt.Errorf("cannot query head for sync: %w", err)
}
// Check if there is a latest CID.
if nextCid == cid.Undef {
// There is no head; nothing to sync.
log.Info("No head to sync")
return cid.Undef, nil
}
log.Infow("Sync queried head CID", "cid", nextCid)
if sel == nil {
// Update the latestSync only if no CID and no selector given.
updateLatest = true
}
}
log = log.With("cid", nextCid)
log.Info("Start sync")
if ctx.Err() != nil {
return cid.Undef, fmt.Errorf("sync canceled: %w", ctx.Err())
}
var wrapSel bool
if sel == nil {
// Fall back onto the default selector sequence if one is not
// given. Note that if selector is specified it is used as is
// without any wrapping.
sel = s.dss
wrapSel = true
}
// Check for existing handler. If none, create one if allowed.
hnd, err := s.getOrCreateHandler(peerID, true)
if err != nil {
return cid.Undef, err
}
if updateLatest {
// Grab the latestSyncMu lock so that an async handler doesn't update the
// latestSync between when we call hnd.handle and when we actually updateLatest.
hnd.latestSyncMu.Lock()
defer hnd.latestSyncMu.Unlock()
}
syncedCids, err := hnd.handle(ctx, nextCid, sel, wrapSel, syncer, cfg.scopedBlockHook)
if err != nil {
return cid.Undef, fmt.Errorf("sync handler failed: %w", err)
}
if updateLatest {
hnd.subscriber.latestSyncHander.SetLatestSync(hnd.peerID, nextCid)
hnd.subscriber.inEvents <- SyncFinished{Cid: nextCid, PeerID: hnd.peerID, SyncedCids: syncedCids}
log.Infow("Updating latest sync")
}
// The sync succeeded, so let's remember this address in the appropriate
// peerstore. If the address was already in the peerstore, this will extend
// its ttl.
if peerAddr != nil {
if isHttpPeerAddr {
// Store this http address so that future calls to sync will work without a
// peerAddr (given that it happens within the TTL)
s.httpPeerstore.AddAddr(peerID, peerAddr, s.addrTTL)
} else {
// Not an http address, so add to the host's libp2p peerstore.
peerStore := s.host.Peerstore()
if peerStore != nil {
peerStore.AddAddr(peerID, peerAddr, s.addrTTL)
}
}
}
return nextCid, nil
}
// distributeEvents reads a SyncFinished, sent by a peer handler, and copies
// the even to all channels in outEventsChans. This delivers the SyncFinished
// to all OnSyncFinished channel readers.
func (s *Subscriber) distributeEvents() {
for event := range s.inEvents {
if !event.Cid.Defined() {
panic("SyncFinished event with undefined cid")
}
// Send update to all change notification channels.
s.outEventsMutex.Lock()
for _, ch := range s.outEventsChans {
ch <- event
}
s.outEventsMutex.Unlock()
}
}
// getOrCreateHandler creates a handler for a specific peer
func (s *Subscriber) getOrCreateHandler(peerID peer.ID, force bool) (*handler, error) {
s.handlersMutex.Lock()
defer s.handlersMutex.Unlock()
// Check callback, if needed, to see if peer ID allowed.
if s.allowPeer != nil && !force {
allow, err := s.allowPeer(peerID)
if err != nil {
return nil, fmt.Errorf("error checking if peer allowed: %w", err)
}
if !allow {
return nil, errSourceNotAllowed
}
}
// Check for existing handler, return if found.
hnd, ok := s.handlers[peerID]
if ok {
return hnd, nil
}
log.Infow("Creating new handler for publisher", "peer", peerID)
hnd = &handler{
subscriber: s,
msgChan: make(chan cid.Cid, 1),
peerID: peerID,
}
s.handlers[peerID] = hnd
return hnd, nil
}
// watch reads messages from a pubsub topic subscription and passes the message
// to the handler that is responsible for the peer that originally sent the
// message. If the handler does not yet exist, then the allowPeer callback is
// consulted to determine if the peer's messages are allowed. If allowed, a
// new handler is created. Otherwise, the message is ignored.
func (s *Subscriber) watch(ctx context.Context) {
watchWG := new(sync.WaitGroup)
for {
msg, err := s.psub.Next(ctx)
if err != nil {
if ctx.Err() != nil || err == pubsub.ErrSubscriptionCancelled {
// This is a normal result of shutting down the Subscriber.
log.Debug("Canceled watching pubsub subscription")
} else {
log.Errorw("Error reading from pubsub", "err", err)
// TODO: restart subscription.
}
break
}
srcPeer, err := peer.IDFromBytes(msg.From)
if err != nil {
continue
}
hnd, err := s.getOrCreateHandler(srcPeer, false)
if err != nil {
if err == errSourceNotAllowed {
log.Infow("Ignored message", "reason", err, "peer", srcPeer)
} else {
log.Errorw("Cannot process message", "err", err)
}
continue
}
// Decode CID and originator addresses from message.
m := dtsync.Message{}
if err = m.UnmarshalCBOR(bytes.NewBuffer(msg.Data)); err != nil {
log.Errorw("Could not decode pubsub message", "err", err)
continue
}
// Add the message originator's address to the peerstore. This allows
// a connection, back to that provider that sent the message, to
// retrieve advertisements.
if len(m.Addrs) != 0 {
peerStore := s.host.Peerstore()
if peerStore != nil {
addrs, err := m.GetAddrs()
if err != nil {
log.Errorw("Could not decode pubsub message", "err", err)
continue
}
peerStore.AddAddrs(srcPeer, addrs, s.addrTTL)
}
}
log.Infow("Handling message", "peer", srcPeer)
// Start new goroutine to handle this message instead of having
// persistent goroutine for each peer.
hnd.handleAsync(ctx, m.Cid, s.dss, watchWG)
}
watchWG.Wait()
close(s.watchDone)
}
// handleAsync starts a goroutine to process the latest message received over
// pubsub.
func (h *handler) handleAsync(ctx context.Context, nextCid cid.Cid, ss ipld.Node, watchWG *sync.WaitGroup) {
watchWG.Add(1)
// Remove any previous message and replace it with the most recent. Only
// process the most recent message regardless of how many arrive while
// waiting for a previous sync.
select {
case prevCid := <-h.msgChan:
log.Infow("Pending update replaced by new", "previous_cid", prevCid, "new_cid", nextCid)
default:
}
h.msgChan <- nextCid
go func() {
defer watchWG.Done()
// Grab the lock for the latest sync mutex so we only have one async handler
// updating the latestSync state
h.latestSyncMu.Lock()
defer h.latestSyncMu.Unlock()
select {
case <-ctx.Done():
case c := <-h.msgChan:
// Wait for this handler to become available.
// Note this only wraps the handler. This is to free up the handler in
// case someone else needs it while we wait to send on the events chan.
syncedCids, err := h.handle(ctx, c, ss, true, h.subscriber.dtSync.NewSyncer(h.peerID, h.subscriber.topicName), nil)
if err != nil {
// Log error for now.
log.Errorw("Cannot process message", "err", err, "peer", h.peerID)
return
}
// Update latest head seen.
log.Infow("Updating latest sync")
h.subscriber.latestSyncHander.SetLatestSync(h.peerID, nextCid)
h.subscriber.inEvents <- SyncFinished{Cid: nextCid, PeerID: h.peerID, SyncedCids: syncedCids}
default:
// A previous goroutine, that had its message replaced, read the
// message. Or, the message was removed and will be replaced by
// another message and goroutine. Either way, nothing to do in
// this routine.
}
}()
}
// handle processes a message from the peer that the handler is responsible for.
func (h *handler) handle(ctx context.Context, nextCid cid.Cid, sel ipld.Node, wrapSel bool, syncer Syncer, hook BlockHookFunc) ([]cid.Cid, error) {
h.syncMutex.Lock()
defer h.syncMutex.Unlock()
log := log.With("cid", nextCid, "peer", h.peerID)
// This is not set to nil so we can get a pointer.
syncedCids := []cid.Cid{}
hook = WrapBlockHookWithSyncedCidTracker(&syncedCids, hook)
h.subscriber.scopedBlockHookMutex.Lock()
h.subscriber.scopedBlockHook[h.peerID] = hook
h.subscriber.scopedBlockHookMutex.Unlock()
defer func() {
h.subscriber.scopedBlockHookMutex.Lock()
delete(h.subscriber.scopedBlockHook, h.peerID)
h.subscriber.scopedBlockHookMutex.Unlock()
}()
if wrapSel {
var latestSyncLink ipld.Link
latestSync, ok := h.subscriber.latestSyncHander.GetLatestSync(h.peerID)
if ok && latestSync != cid.Undef {
latestSyncLink = cidlink.Link{Cid: latestSync}
}
sel = ExploreRecursiveWithStopNode(h.subscriber.syncRecLimit, sel, latestSyncLink)
}
stopNode, ok := getStopNode(sel)
if ok && stopNode.(cidlink.Link).Cid == nextCid {
log.Infow("cid to sync to is the stop node. Nothing to do")
return nil, nil
}
err := syncer.Sync(ctx, nextCid, sel)
if err != nil {
return nil, err
}
log.Infow("Sync completed")
return syncedCids, nil
}