/
pool.go
144 lines (121 loc) · 3.02 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package client
import (
"context"
"fmt"
"log"
"sync"
"time"
)
// Pool manage a pool of connection to a remote Server
type Pool struct {
client *Client
target string
connections []*Connection
lock sync.RWMutex
done chan struct{}
}
// NewPool creates a new Pool
func NewPool(client *Client, target string) (pool *Pool) {
pool = new(Pool)
pool.client = client
pool.target = target
pool.connections = make([]*Connection, 0)
pool.done = make(chan struct{})
return
}
// Start connect to the remote Server
func (pool *Pool) Start(ctx context.Context) {
pool.connector(ctx)
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
L:
for {
select {
case <-pool.done:
break L
case <-ticker.C:
pool.connector(ctx)
}
}
}()
}
// The garbage collector
func (pool *Pool) connector(ctx context.Context) {
pool.lock.Lock()
defer pool.lock.Unlock()
poolSize := pool.Size()
// Create enough connection to fill the pool
toCreate := pool.client.Config.PoolIdleSize - poolSize.idle
// Create only one connection if the pool is empty
if poolSize.total == 0 {
toCreate = 1
}
// Ensure to open at most PoolMaxSize connections
if poolSize.total+toCreate > pool.client.Config.PoolMaxSize {
toCreate = pool.client.Config.PoolMaxSize - poolSize.total
}
// Try to reach ideal pool size
for i := 0; i < toCreate; i++ {
conn := NewConnection(pool)
pool.add(conn)
go func() {
err := conn.Connect(ctx)
if err != nil {
log.Printf("Unable to connect to %s : %s", pool.target, err)
pool.lock.Lock()
defer pool.lock.Unlock()
pool.remove(conn)
}
}()
}
}
// Add a connection to the pool
func (pool *Pool) add(conn *Connection) {
pool.connections = append(pool.connections, conn)
}
// Remove a connection from the pool
func (pool *Pool) remove(conn *Connection) {
// This trick uses the fact that a slice shares the same backing array and capacity as the original,
// so the storage is reused for the filtered slice. Of course, the original contents are modified.
var filtered []*Connection // == nil
for _, c := range pool.connections {
if conn != c {
filtered = append(filtered, c)
}
}
pool.connections = filtered
}
// Shutdown close all connection in the pool
func (pool *Pool) Shutdown() {
close(pool.done)
for _, conn := range pool.connections {
conn.Close()
}
}
// PoolSize represent the number of open connections per status
type PoolSize struct {
connecting int
idle int
running int
total int
}
func (poolSize *PoolSize) String() string {
return fmt.Sprintf("Connecting %d, idle %d, running %d, total %d", poolSize.connecting, poolSize.idle, poolSize.running, poolSize.total)
}
// Size return the current state of the pool
func (pool *Pool) Size() (poolSize *PoolSize) {
poolSize = new(PoolSize)
poolSize.total = len(pool.connections)
for _, connection := range pool.connections {
switch connection.status {
case CONNECTING:
poolSize.connecting++
case IDLE:
poolSize.idle++
case RUNNING:
poolSize.running++
}
}
return
}