-
Notifications
You must be signed in to change notification settings - Fork 0
/
concurrent_client.go
149 lines (137 loc) · 3.4 KB
/
concurrent_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package tcpless
import (
"github.com/dimonrus/gocli"
"sync"
)
// concurrentClient GetFreeClient for concurrent request
type concurrentClient struct {
m sync.RWMutex
// concurrent clients
clients []IClient
// current GetFreeClient
free []bool
// max parallel requests
concurrent int
// buffer pool
buffer *buffer
// options
options options
}
// Close all connection and release buffer
func (c *concurrentClient) Close() error {
c.m.Lock()
defer c.m.Unlock()
var err error
for i := range c.clients {
err = c.clients[i].Stream().Release()
c.buffer.Release(c.clients[i].Stream().Index())
}
return err
}
// set concurrent count
func (c *concurrentClient) concurrentCount(n int) {
if n > 0 {
c.concurrent = n
} else {
c.concurrent = 1
}
}
// init buffers
func (c *concurrentClient) initBuffers(bufferSize int) {
// init buffer
if bufferSize == 0 {
bufferSize = MinimumSharedBufferSize
}
c.buffer = CreateBuffer(c.concurrent, bufferSize)
}
// dial to server
func (c *concurrentClient) dialClients(constructor ClientConstructor, config *Config, logger gocli.Logger) error {
c.m.Lock()
defer c.m.Unlock()
if c.clients == nil {
c.clients = make([]IClient, 0, c.concurrent)
}
if c.free == nil {
c.free = make([]bool, 0, c.concurrent)
}
for i := len(c.clients); i < c.concurrent; i++ {
buf, index := c.buffer.Pull()
client := constructor(config, logger)
_, err := client.Dial()
if err != nil {
return err
}
client.SetStream(NewConnection(client.Stream().Connection(), buf, index))
c.clients = append(c.clients, client)
c.free = append(c.free, true)
}
return nil
}
// RegisterType register type in GetFreeClient
func (c *concurrentClient) RegisterType(v ...any) {
c.m.RLock()
defer c.m.RUnlock()
for i := range c.clients {
for _, t := range v {
c.clients[i].Signature().Encryptor().RegisterType(t)
}
}
}
// GetConcurrent get number of concurrent connections
func (c *concurrentClient) GetConcurrent() (n int) {
return c.concurrent
}
// GetFreeClient get free stream
func (c *concurrentClient) GetFreeClient() (client IClient, i int) {
c.m.Lock()
defer c.m.Unlock()
for ; !c.free[i]; i++ {
if i == c.concurrent {
i = 0
}
}
c.free[i] = false
client = c.clients[i]
return
}
// ReleaseClient release by index
func (c *concurrentClient) ReleaseClient(index int) {
c.m.Lock()
defer c.m.Unlock()
c.free[index] = true
return
}
// Call concurrent ask
// route - URI to server handler& Example: api.v1.hello
// request - chan with collection of requests
// processor - process handler if server respond to client
func (c *concurrentClient) Call(route string, request chan any, processor Handler) {
for i := 0; i < c.concurrent; i++ {
go func(route string, request chan any) {
client, index := c.GetFreeClient()
defer c.ReleaseClient(index)
for r := range request {
err := client.Ask(route, r)
if err != nil {
c.options.logger.Errorln(err)
return
}
if processor != nil {
processor(client)
}
}
}(route, request)
}
return
}
// ConcurrentClient create concurrent GetFreeClient with n, n <= 0 ignores
// bufferSize - shared buffer size
func ConcurrentClient(n int, bufferSize int, client ClientConstructor, config *Config, logger gocli.Logger) (*concurrentClient, error) {
c := &concurrentClient{}
// set n
c.concurrentCount(n)
// init buffers
c.initBuffers(bufferSize)
// construct clients
return c, c.dialClients(client, config, logger)
}