forked from v2fly/v2ray-core
-
Notifications
You must be signed in to change notification settings - Fork 1
/
copy.go
128 lines (110 loc) · 3.02 KB
/
copy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package buf
import (
"io"
"time"
"v2ray.com/core/common/errors"
"v2ray.com/core/common/signal"
)
type errorHandler func(error) error
type dataHandler func(MultiBuffer)
type copyHandler struct {
onReadError []errorHandler
onData []dataHandler
onWriteError []errorHandler
}
func (h *copyHandler) readFrom(reader Reader) (MultiBuffer, error) {
mb, err := reader.ReadMultiBuffer()
if err != nil {
for _, handler := range h.onReadError {
err = handler(err)
}
}
return mb, err
}
func (h *copyHandler) writeTo(writer Writer, mb MultiBuffer) error {
err := writer.WriteMultiBuffer(mb)
if err != nil {
for _, handler := range h.onWriteError {
err = handler(err)
}
}
return err
}
// SizeCounter is for counting bytes copied by Copy().
type SizeCounter struct {
Size int64
}
// CopyOption is an option for copying data.
type CopyOption func(*copyHandler)
// IgnoreReaderError is a CopyOption that ignores errors from reader. Copy will continue in such case.
func IgnoreReaderError() CopyOption {
return func(handler *copyHandler) {
handler.onReadError = append(handler.onReadError, func(err error) error {
return nil
})
}
}
// IgnoreWriterError is a CopyOption that ignores errors from writer. Copy will continue in such case.
func IgnoreWriterError() CopyOption {
return func(handler *copyHandler) {
handler.onWriteError = append(handler.onWriteError, func(err error) error {
return nil
})
}
}
// UpdateActivity is a CopyOption to update activity on each data copy operation.
func UpdateActivity(timer signal.ActivityUpdater) CopyOption {
return func(handler *copyHandler) {
handler.onData = append(handler.onData, func(MultiBuffer) {
timer.Update()
})
}
}
// CountSize is a CopyOption that sums the total size of data copied into the given SizeCounter.
func CountSize(sc *SizeCounter) CopyOption {
return func(handler *copyHandler) {
handler.onData = append(handler.onData, func(b MultiBuffer) {
sc.Size += int64(b.Len())
})
}
}
func copyInternal(reader Reader, writer Writer, handler *copyHandler) error {
for {
buffer, err := handler.readFrom(reader)
if !buffer.IsEmpty() {
for _, handler := range handler.onData {
handler(buffer)
}
if werr := handler.writeTo(writer, buffer); werr != nil {
return werr
}
}
if err != nil {
return err
}
}
}
// Copy dumps all payload from reader to writer or stops when an error occurs. It returns nil when EOF.
func Copy(reader Reader, writer Writer, options ...CopyOption) error {
var handler copyHandler
for _, option := range options {
option(&handler)
}
err := copyInternal(reader, writer, &handler)
if err != nil && errors.Cause(err) != io.EOF {
return err
}
return nil
}
var ErrNotTimeoutReader = newError("not a TimeoutReader")
func CopyOnceTimeout(reader Reader, writer Writer, timeout time.Duration) error {
timeoutReader, ok := reader.(TimeoutReader)
if !ok {
return ErrNotTimeoutReader
}
mb, err := timeoutReader.ReadMultiBufferTimeout(timeout)
if err != nil {
return err
}
return writer.WriteMultiBuffer(mb)
}