forked from gocircuit/circuit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dialer.go
115 lines (104 loc) · 2.52 KB
/
dialer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// Copyright 2013 The Go Circuit Project
// Use of this source code is governed by the license for
// The Go Circuit Project, found in the LICENSE file.
//
// Authors:
// 2013 Petar Maymounkov <p@gocircuit.org>
package tele
import (
// "log"
"sync"
"time"
"github.com/hoijui/circuit/pkg/kit/tele/blend"
"github.com/hoijui/circuit/pkg/use/errors"
"github.com/hoijui/circuit/pkg/use/n"
)
// Dialer
type Dialer struct {
dialback n.Addr
sub *blend.Transport // Encloses *blend.Dialer
sync.Mutex
open map[n.WorkerID]*blend.DialSession // Open dial sessions
}
func newDialer(dialback n.Addr, sub *blend.Transport) *Dialer {
return &Dialer{
dialback: dialback,
sub: sub,
open: make(map[n.WorkerID]*blend.DialSession),
}
}
func (d *Dialer) Dial(addr n.Addr) (conn n.Conn, err error) {
d.Lock()
defer d.Unlock()
//
workerID := addr.WorkerID()
s, present := d.open[workerID]
if !present {
// Make new session to worker if one not present
s, err = d.sub.DialSession(addr.(*Addr).TCP, func() {
d.scrub(addr.WorkerID())
})
if err != nil {
return nil, err
}
if err = d.auth(addr, s.Dial()); err != nil {
s.Close()
return nil, err
}
d.open[workerID] = s
go d.watch(workerID, s) // Watch for idleness and close
}
return NewConn(s.Dial(), addr.(*Addr)), nil
}
// Idleness duration should be greater than the locus heartbeats over permanent cross-references
const IdleDuration = time.Second * 10
func (d *Dialer) watch(workerID n.WorkerID, s *blend.DialSession) {
var ready bool
for {
time.Sleep(IdleDuration)
if d.expire(workerID, s, &ready) {
return
}
}
}
func (d *Dialer) expire(workerID n.WorkerID, s *blend.DialSession, ready *bool) (closed bool) {
d.Lock()
defer d.Unlock()
//
numconn, lastuse := s.NumConn()
if numconn == 0 && time.Now().Sub(lastuse) > IdleDuration {
if *ready {
delete(d.open, workerID)
// log.Printf("idle session %s expiring", s)
s.Close()
return true
}
*ready = true
}
return false
}
func (d *Dialer) scrub(workerID n.WorkerID) {
d.Lock()
defer d.Unlock()
delete(d.open, workerID)
}
func (d *Dialer) auth(addr n.Addr, conn *blend.Conn) error {
defer conn.Close()
if err := conn.Write(&HelloMsg{
SourceAddr: d.dialback,
TargetAddr: addr,
}); err != nil {
return err
}
msg, err := conn.Read()
if err != nil {
return err
}
switch q := msg.(type) {
case *WelcomeMsg:
return nil
case *RejectMsg:
return errors.NewError("dial rejected by remote (%s)", errors.Unpack(q.Err))
}
return errors.NewError("unknown welcome response")
}