-
Notifications
You must be signed in to change notification settings - Fork 1
/
peer.go
129 lines (108 loc) · 2.11 KB
/
peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package protocol
import (
"bytes"
"context"
"errors"
"io"
"net"
"sync"
"sync/atomic"
"time"
)
type Peer struct {
closed atomic.Bool
wg sync.WaitGroup
conn net.Conn
inout FeedDrainCloser
}
func (p *Peer) keepRead(ctx context.Context) error {
var buf bytes.Buffer
for !p.closed.Load() {
select {
case <-ctx.Done():
break
default:
// continue
}
if buf.Available() < TYPICAL_MTU {
buf.Grow(TYPICAL_MTU)
}
idle := buf.AvailableBuffer()[:buf.Available()]
if n, err := p.conn.Read(idle); err != nil {
if errors.Is(err, io.EOF) {
time.Sleep(time.Millisecond)
continue
}
return err
} else {
buf.Write(idle[:n])
}
recs, err := Split(&buf)
if err != nil {
return err
}
if len(recs) == 0 {
time.Sleep(time.Millisecond)
continue
}
if err = p.inout.Drain(recs); err != nil {
return err
}
}
return nil
}
func (p *Peer) keepWrite(ctx context.Context) error {
for !p.closed.Load() {
select {
case <-ctx.Done():
break
default:
// continue
}
recs, err := p.inout.Feed()
if err != nil {
return err
}
b := net.Buffers(recs)
for len(b) > 0 && err == nil {
if _, err = b.WriteTo(p.conn); err != nil {
return err
}
}
}
return nil
}
func (p *Peer) Keep(ctx context.Context) (rerr, werr, cerr error) {
p.wg.Add(2) // read & write
defer p.wg.Add(-2)
if p.closed.Load() {
return nil, nil, nil
}
readErrCh, writeErrCh := make(chan error, 1), make(chan error, 1)
go func() { readErrCh <- p.keepRead(ctx) }()
go func() { writeErrCh <- p.keepWrite(ctx) }()
for i := 0; i < 2; i++ {
select {
case rerr = <-readErrCh:
if errors.Is(rerr, net.ErrClosed) {
// That's ok, we probably close it ourselves.
rerr = nil
}
case werr = <-writeErrCh:
// You can't close it before it's written, but you can close it before it's read.
// Close after the writing thread has finished, this will cancel all reading threads.
cerr = p.conn.Close()
}
p.closed.Store(true)
}
p.conn = nil
return
}
func (p *Peer) Close() {
p.closed.Store(true)
p.wg.Wait()
if p.conn != nil {
p.conn.Close()
p.conn = nil
}
}