forked from ipfs/go-graphsync
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reconciledloader.go
125 lines (106 loc) · 3.85 KB
/
reconciledloader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/*
Package reconciledloader implements a block loader that can load from two different sources:
- a local store
- a series of remote responses for a given graphsync selector query
It verifies the sequence of remote responses matches the sequence
of loads called from a local selector traversal.
The reconciled loader also tracks whether or not there is a remote request in progress.
When there is no request in progress, it loads from the local store only.
When there is a request in progress, waits for remote responses before loading, and only calls
upon the local store for duplicate blocks and when traversing paths the remote was missing.
The reconciled loader assumes:
1. A single thread is calling AsyncLoad to load blocks
2. When a request is online, a seperate thread may call IngestResponse
3. Either thread may call SetRemoteState or Cleanup
4. The remote sends metadata for all blocks it traverses in the query (per GraphSync protocol spec) - whether or not
the actual block is sent.
*/
package reconciledloader
import (
"context"
"errors"
"sync"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/linking"
graphsync "github.com/filecoin-project/boost-graphsync"
"github.com/filecoin-project/boost-graphsync/requestmanager/reconciledloader/traversalrecord"
"github.com/filecoin-project/boost-graphsync/requestmanager/types"
)
var log = logging.Logger("gs-reconciledlaoder")
type settableWriter interface {
SetBytes([]byte) error
}
type byteReader interface {
Bytes() []byte
}
type loadAttempt struct {
link datamodel.Link
linkContext linking.LinkContext
successful bool
usedRemote bool
}
func (lr loadAttempt) empty() bool {
return lr.link == nil
}
// ReconciledLoader is an instance of the reconciled loader
type ReconciledLoader struct {
requestID graphsync.RequestID
lsys *linking.LinkSystem
mostRecentLoadAttempt loadAttempt
traversalRecord *traversalrecord.TraversalRecord
pathTracker pathTracker
lock *sync.Mutex
signal *sync.Cond
open bool
verifier *traversalrecord.Verifier
remoteQueue remoteQueue
}
// NewReconciledLoader returns a new reconciled loader for the given requestID & localStore
func NewReconciledLoader(requestID graphsync.RequestID, localStore *linking.LinkSystem) *ReconciledLoader {
lock := &sync.Mutex{}
traversalRecord := traversalrecord.NewTraversalRecord()
return &ReconciledLoader{
requestID: requestID,
lsys: localStore,
lock: lock,
signal: sync.NewCond(lock),
traversalRecord: traversalRecord,
}
}
// SetRemoteState records whether or not the request is online
func (rl *ReconciledLoader) SetRemoteOnline(online bool) {
rl.lock.Lock()
defer rl.lock.Unlock()
wasOpen := rl.open
rl.open = online
if !rl.open && wasOpen {
// if the queue is closing, trigger any expecting new items
rl.signal.Signal()
return
}
if rl.open && !wasOpen {
// if we're opening a remote request, we need to reverify against what we've loaded so far
rl.verifier = traversalrecord.NewVerifier(rl.traversalRecord)
}
}
// Cleanup frees up some memory resources for this loader prior to throwing it away
func (rl *ReconciledLoader) Cleanup(ctx context.Context) {
rl.lock.Lock()
rl.remoteQueue.clear()
rl.lock.Unlock()
}
// RetryLastLoad retries the last offline load, assuming one is present
func (rl *ReconciledLoader) RetryLastLoad() types.AsyncLoadResult {
if rl.mostRecentLoadAttempt.link == nil {
return types.AsyncLoadResult{Err: errors.New("cannot retry offline load when none is present")}
}
retryLoadAttempt := rl.mostRecentLoadAttempt
rl.mostRecentLoadAttempt = loadAttempt{}
if retryLoadAttempt.usedRemote {
rl.lock.Lock()
rl.remoteQueue.retryLast()
rl.lock.Unlock()
}
return rl.BlockReadOpener(retryLoadAttempt.linkContext, retryLoadAttempt.link)
}