-
Notifications
You must be signed in to change notification settings - Fork 160
/
valve.go
106 lines (92 loc) · 1.92 KB
/
valve.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// Copyright 2013 The Go Circuit Project
// Use of this source code is governed by the license for
// The Go Circuit Project, found in the LICENSE file.
//
// Authors:
// 2014 Petar Maymounkov <p@gocircuit.org>
package valve
import (
"encoding/json"
"io"
"sync"
"github.com/gocircuit/circuit/use/circuit"
)
type Valve interface {
Send() (io.WriteCloser, error)
IsDone() bool
Scrub()
Close() error
Recv() (io.ReadCloser, error)
Cap() int
Stat() Stat
X() circuit.X
}
// valve
type valve struct {
send struct {
abr <-chan struct{} // abort when closed
sync.Mutex
tun chan<- interface{}
}
recv struct {
abr <-chan struct{} // abort when closed
tun <-chan interface{}
}
ctrl struct {
sync.Mutex
abr chan<- struct{}
stat Stat
}
}
type Stat struct {
Cap int `json:"cap"`
Opened bool `json:"opened"`
Closed bool `json:"closed"`
Aborted bool `json:"aborted"`
NumSend int `json:"numsend"`
NumRecv int `json:"numrecv"`
}
// Sender-receiver pipe capacity (once matched)
const MessageCap = 32e3 // 32K
func (s *Stat) String() string {
b, err := json.MarshalIndent(s, "", "\t")
if err != nil {
panic(err)
}
return string(b)
}
func MakeValve(n int) Valve {
v := &valve{}
tun, abr := make(chan interface{}, n), make(chan struct{})
v.send.tun, v.recv.tun = tun, tun
v.ctrl.abr, v.send.abr, v.recv.abr = abr, abr, abr
v.ctrl.stat.Opened, v.ctrl.stat.Cap = true, n
return v
}
func (v *valve) X() circuit.X {
return circuit.Ref(XValve{v})
}
func (v *valve) incSend() {
v.ctrl.Lock()
defer v.ctrl.Unlock()
v.ctrl.stat.NumSend++
}
func (v *valve) incRecv() {
v.ctrl.Lock()
defer v.ctrl.Unlock()
v.ctrl.stat.NumRecv++
}
// Cap returns the capacity of the valve and whether it was set.
func (v *valve) Cap() int {
v.ctrl.Lock()
defer v.ctrl.Unlock()
if v.ctrl.stat.Opened {
return v.ctrl.stat.Cap
}
return -1
}
func (v *valve) Stat() Stat {
v.ctrl.Lock()
defer v.ctrl.Unlock()
return v.ctrl.stat
}