-
Notifications
You must be signed in to change notification settings - Fork 2
/
io.go
124 lines (108 loc) · 2.75 KB
/
io.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package file
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"github.com/ethsana/sana/pkg/swarm"
)
// simpleReadCloser wraps a byte slice in a io.ReadCloser implementation.
type simpleReadCloser struct {
buffer io.Reader
closed bool
}
// NewSimpleReadCloser creates a new simpleReadCloser.
func NewSimpleReadCloser(buffer []byte) io.ReadCloser {
return &simpleReadCloser{
buffer: bytes.NewBuffer(buffer),
}
}
// Read implements io.Reader.
func (s *simpleReadCloser) Read(b []byte) (int, error) {
if s.closed {
return 0, errors.New("read on closed reader")
}
return s.buffer.Read(b)
}
// Close implements io.Closer.
func (s *simpleReadCloser) Close() error {
if s.closed {
return errors.New("close on already closed reader")
}
s.closed = true
return nil
}
// JoinReadAll reads all output from the provided Joiner.
func JoinReadAll(ctx context.Context, j Joiner, outFile io.Writer) (int64, error) {
l := j.Size()
// join, rinse, repeat until done
data := make([]byte, swarm.ChunkSize)
var total int64
for i := int64(0); i < l; i += swarm.ChunkSize {
cr, err := j.Read(data)
if err != nil {
return total, err
}
total += int64(cr)
cw, err := outFile.Write(data[:cr])
if err != nil {
return total, err
}
if cw != cr {
return total, fmt.Errorf("short wrote %d of %d for chunk %d", cw, cr, i)
}
}
if total != l {
return total, fmt.Errorf("received only %d of %d total bytes", total, l)
}
return total, nil
}
// SplitWriteAll writes all input from provided reader to the provided splitter
func SplitWriteAll(ctx context.Context, s Splitter, r io.Reader, l int64, toEncrypt bool) (swarm.Address, error) {
chunkPipe := NewChunkPipe()
errC := make(chan error)
go func() {
buf := make([]byte, swarm.ChunkSize)
c, err := io.CopyBuffer(chunkPipe, r, buf)
if err != nil {
errC <- err
}
if c != l {
errC <- errors.New("read count mismatch")
}
err = chunkPipe.Close()
if err != nil {
errC <- err
}
close(errC)
}()
addr, err := s.Split(ctx, chunkPipe, l, toEncrypt)
if err != nil {
return swarm.ZeroAddress, err
}
select {
case err := <-errC:
if err != nil {
return swarm.ZeroAddress, err
}
case <-ctx.Done():
return swarm.ZeroAddress, ctx.Err()
}
return addr, nil
}
type Loader interface {
// Load a reference in byte slice representation and return all content associated with the reference.
Load(context.Context, []byte) ([]byte, error)
}
type Saver interface {
// Save an arbitrary byte slice and return the reference byte slice representation.
Save(context.Context, []byte) ([]byte, error)
}
type LoadSaver interface {
Loader
Saver
}