-
Notifications
You must be signed in to change notification settings - Fork 37
/
client.go
128 lines (100 loc) · 2.3 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package http2
import (
"container/list"
"sync"
"time"
"github.com/valyala/fasthttp"
)
const DefaultPingInterval = time.Second * 5
// ClientOpts defines the client options for the HTTP/2 connection.
type ClientOpts struct {
// PingInterval defines the interval in which the client will ping the server.
//
// An interval of 0 will make the library to use DefaultPingInterval. Because ping intervals can't be disabled.
PingInterval time.Duration
// OnRTT is assigned to every client after creation, and the handler
// will be called after every RTT measurement (after receiving a PONG mesage).
OnRTT func(time.Duration)
}
// Ctx represents a context for a stream. Every stream is related to a context.
type Ctx struct {
// Request ...
Request *fasthttp.Request
// Response ...
Response *fasthttp.Response
// Err ...
Err chan error
}
// Client ...
type Client struct {
d *Dialer
// TODO: impl rtt
onRTT func(time.Duration)
lck sync.Mutex
conns list.List
}
func createClient(d *Dialer, opts ClientOpts) *Client {
cl := &Client{
d: d,
onRTT: opts.OnRTT,
}
return cl
}
func (cl *Client) onConnectionDropped(c *Conn) {
cl.lck.Lock()
defer cl.lck.Unlock()
for e := cl.conns.Front(); e != nil; e = e.Next() {
if e.Value.(*Conn) == c {
cl.conns.Remove(e)
cl.createConn()
break
}
}
}
func (cl *Client) createConn() (*Conn, *list.Element, error) {
c, err := cl.d.Dial(ConnOpts{
PingInterval: cl.d.PingInterval,
OnDisconnect: cl.onConnectionDropped,
})
if err != nil {
return nil, nil, err
}
return c, cl.conns.PushFront(c), nil
}
func (cl *Client) Do(req *fasthttp.Request, res *fasthttp.Response) (err error) {
var c *Conn
cl.lck.Lock()
var next *list.Element
for e := cl.conns.Front(); c == nil; e = next {
if e != nil {
c = e.Value.(*Conn)
} else {
c, e, err = cl.createConn()
if err != nil {
return err
}
}
// if we can't open a stream, then move on to the next one.
if !c.CanOpenStream() {
c = nil
next = e.Next()
}
// if the connection has been closed, then just remove the connection.
if c != nil && c.Closed() {
next = e.Next()
cl.conns.Remove(e)
c = nil
}
}
cl.lck.Unlock()
ch := make(chan error, 1)
c.Write(&Ctx{
Request: req,
Response: res,
Err: ch,
})
select {
case err = <-ch:
}
return err
}