-
Notifications
You must be signed in to change notification settings - Fork 338
/
feeder.go
121 lines (104 loc) · 3.07 KB
/
feeder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package feeder
import (
"encoding/binary"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/swarm"
)
const span = swarm.SpanSize
type chunkFeeder struct {
size int
next pipeline.ChainWriter
buffer []byte
bufferIdx int
wrote int64
}
// newChunkFeederWriter creates a new chunkFeeder that allows for partial
// writes into the pipeline. Any pending data in the buffer is flushed to
// subsequent writers when Sum() is called.
func NewChunkFeederWriter(size int, next pipeline.ChainWriter) pipeline.Interface {
return &chunkFeeder{
size: size,
next: next,
buffer: make([]byte, size),
}
}
// Write writes data to the chunk feeder. It returns the number of bytes written
// to the feeder. The number of bytes written does not necessarily reflect how many
// bytes were actually flushed to subsequent writers, since the feeder is buffered
// and works in chunk-size quantiles.
func (f *chunkFeeder) Write(b []byte) (int, error) {
l := len(b) // data length
w := 0 // written
if l+f.bufferIdx < f.size {
// write the data into the buffer and return
n := copy(f.buffer[f.bufferIdx:], b)
f.bufferIdx += n
return n, nil
}
// if we are here it means we have to do at least one write
d := make([]byte, f.size+span)
sp := 0 // span of current write
//copy from existing buffer to this one
sp = copy(d[span:], f.buffer[:f.bufferIdx])
// don't account what was already in the buffer when returning
// number of written bytes
if sp > 0 {
w -= sp
}
var n int
for i := 0; i < len(b); {
// if we can't fill a whole write, buffer the rest and return
if sp+len(b[i:]) < f.size {
n = copy(f.buffer, b[i:])
f.bufferIdx = n
return w + n, nil
}
// fill stuff up from the incoming write
n = copy(d[span+f.bufferIdx:], b[i:])
i += n
sp += n
binary.LittleEndian.PutUint64(d[:span], uint64(sp))
args := &pipeline.PipeWriteArgs{Data: d[:span+sp], Span: d[:span]}
err := f.next.ChainWrite(args)
if err != nil {
return 0, err
}
f.bufferIdx = 0
w += sp
sp = 0
}
f.wrote += int64(w)
return w, nil
}
// Sum flushes any pending data to subsequent writers and returns
// the cryptographic root-hash respresenting the data written to
// the feeder.
func (f *chunkFeeder) Sum() ([]byte, error) {
// flush existing data in the buffer
if f.bufferIdx > 0 {
d := make([]byte, f.bufferIdx+span)
copy(d[span:], f.buffer[:f.bufferIdx])
binary.LittleEndian.PutUint64(d[:span], uint64(f.bufferIdx))
args := &pipeline.PipeWriteArgs{Data: d, Span: d[:span]}
err := f.next.ChainWrite(args)
if err != nil {
return nil, err
}
f.wrote += int64(len(d))
}
if f.wrote == 0 {
// this is an empty file, we should write the span of
// an empty file (0).
d := make([]byte, span)
args := &pipeline.PipeWriteArgs{Data: d, Span: d}
err := f.next.ChainWrite(args)
if err != nil {
return nil, err
}
f.wrote += int64(len(d))
}
return f.next.Sum()
}