-
Notifications
You must be signed in to change notification settings - Fork 20.1k
/
dpa.go
241 lines (208 loc) · 7.01 KB
/
dpa.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package storage
import (
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
)
/*
DPA provides the client API entrypoints Store and Retrieve to store and retrieve
It can store anything that has a byte slice representation, so files or serialised objects etc.
Storage: DPA calls the Chunker to segment the input datastream of any size to a merkle hashed tree of chunks. The key of the root block is returned to the client.
Retrieval: given the key of the root block, the DPA retrieves the block chunks and reconstructs the original data and passes it back as a lazy reader. A lazy reader is a reader with on-demand delayed processing, i.e. the chunks needed to reconstruct a large file are only fetched and processed if that particular part of the document is actually read.
As the chunker produces chunks, DPA dispatches them to its own chunk store
implementation for storage or retrieval.
*/
const (
storeChanCapacity = 100
retrieveChanCapacity = 100
singletonSwarmDbCapacity = 50000
singletonSwarmCacheCapacity = 500
maxStoreProcesses = 8
maxRetrieveProcesses = 8
)
var (
notFound = errors.New("not found")
)
type DPA struct {
ChunkStore
storeC chan *Chunk
retrieveC chan *Chunk
Chunker Chunker
lock sync.Mutex
running bool
quitC chan bool
}
// for testing locally
func NewLocalDPA(datadir string) (*DPA, error) {
hash := MakeHashFunc("SHA256")
dbStore, err := NewDbStore(datadir, hash, singletonSwarmDbCapacity, 0)
if err != nil {
return nil, err
}
return NewDPA(&LocalStore{
NewMemStore(dbStore, singletonSwarmCacheCapacity),
dbStore,
}, NewChunkerParams()), nil
}
func NewDPA(store ChunkStore, params *ChunkerParams) *DPA {
chunker := NewTreeChunker(params)
return &DPA{
Chunker: chunker,
ChunkStore: store,
}
}
// Public API. Main entry point for document retrieval directly. Used by the
// FS-aware API and httpaccess
// Chunk retrieval blocks on netStore requests with a timeout so reader will
// report error if retrieval of chunks within requested range time out.
func (self *DPA) Retrieve(key Key) LazySectionReader {
return self.Chunker.Join(key, self.retrieveC)
}
// Public API. Main entry point for document storage directly. Used by the
// FS-aware API and httpaccess
func (self *DPA) Store(data io.Reader, size int64, swg *sync.WaitGroup, wwg *sync.WaitGroup) (key Key, err error) {
return self.Chunker.Split(data, size, self.storeC, swg, wwg)
}
func (self *DPA) Start() {
self.lock.Lock()
defer self.lock.Unlock()
if self.running {
return
}
self.running = true
self.retrieveC = make(chan *Chunk, retrieveChanCapacity)
self.storeC = make(chan *Chunk, storeChanCapacity)
self.quitC = make(chan bool)
self.storeLoop()
self.retrieveLoop()
}
func (self *DPA) Stop() {
self.lock.Lock()
defer self.lock.Unlock()
if !self.running {
return
}
self.running = false
close(self.quitC)
}
// retrieveLoop dispatches the parallel chunk retrieval requests received on the
// retrieve channel to its ChunkStore (NetStore or LocalStore)
func (self *DPA) retrieveLoop() {
for i := 0; i < maxRetrieveProcesses; i++ {
go self.retrieveWorker()
}
log.Trace(fmt.Sprintf("dpa: retrieve loop spawning %v workers", maxRetrieveProcesses))
}
func (self *DPA) retrieveWorker() {
for chunk := range self.retrieveC {
log.Trace(fmt.Sprintf("dpa: retrieve loop : chunk %v", chunk.Key.Log()))
storedChunk, err := self.Get(chunk.Key)
if err == notFound {
log.Trace(fmt.Sprintf("chunk %v not found", chunk.Key.Log()))
} else if err != nil {
log.Trace(fmt.Sprintf("error retrieving chunk %v: %v", chunk.Key.Log(), err))
} else {
chunk.SData = storedChunk.SData
chunk.Size = storedChunk.Size
}
close(chunk.C)
select {
case <-self.quitC:
return
default:
}
}
}
// storeLoop dispatches the parallel chunk store request processors
// received on the store channel to its ChunkStore (NetStore or LocalStore)
func (self *DPA) storeLoop() {
for i := 0; i < maxStoreProcesses; i++ {
go self.storeWorker()
}
log.Trace(fmt.Sprintf("dpa: store spawning %v workers", maxStoreProcesses))
}
func (self *DPA) storeWorker() {
for chunk := range self.storeC {
self.Put(chunk)
if chunk.wg != nil {
log.Trace(fmt.Sprintf("dpa: store processor %v", chunk.Key.Log()))
chunk.wg.Done()
}
select {
case <-self.quitC:
return
default:
}
}
}
// DpaChunkStore implements the ChunkStore interface,
// this chunk access layer assumed 2 chunk stores
// local storage eg. LocalStore and network storage eg., NetStore
// access by calling network is blocking with a timeout
type dpaChunkStore struct {
n int
localStore ChunkStore
netStore ChunkStore
}
func NewDpaChunkStore(localStore, netStore ChunkStore) *dpaChunkStore {
return &dpaChunkStore{0, localStore, netStore}
}
// Get is the entrypoint for local retrieve requests
// waits for response or times out
func (self *dpaChunkStore) Get(key Key) (chunk *Chunk, err error) {
chunk, err = self.netStore.Get(key)
// timeout := time.Now().Add(searchTimeout)
if chunk.SData != nil {
log.Trace(fmt.Sprintf("DPA.Get: %v found locally, %d bytes", key.Log(), len(chunk.SData)))
return
}
// TODO: use self.timer time.Timer and reset with defer disableTimer
timer := time.After(searchTimeout)
select {
case <-timer:
log.Trace(fmt.Sprintf("DPA.Get: %v request time out ", key.Log()))
err = notFound
case <-chunk.Req.C:
log.Trace(fmt.Sprintf("DPA.Get: %v retrieved, %d bytes (%p)", key.Log(), len(chunk.SData), chunk))
}
return
}
// Put is the entrypoint for local store requests coming from storeLoop
func (self *dpaChunkStore) Put(entry *Chunk) {
chunk, err := self.localStore.Get(entry.Key)
if err != nil {
log.Trace(fmt.Sprintf("DPA.Put: %v new chunk. call netStore.Put", entry.Key.Log()))
chunk = entry
} else if chunk.SData == nil {
log.Trace(fmt.Sprintf("DPA.Put: %v request entry found", entry.Key.Log()))
chunk.SData = entry.SData
chunk.Size = entry.Size
} else {
log.Trace(fmt.Sprintf("DPA.Put: %v chunk already known", entry.Key.Log()))
return
}
// from this point on the storage logic is the same with network storage requests
log.Trace(fmt.Sprintf("DPA.Put %v: %v", self.n, chunk.Key.Log()))
self.n++
self.netStore.Put(chunk)
}
// Close chunk store
func (self *dpaChunkStore) Close() {}