Skip to content

Commit de2e757

Browse files
authored
fix: refreshes versions for agent properly after restart (#3160)
1 parent ed84b9a commit de2e757

File tree

6 files changed

+132
-79
lines changed

6 files changed

+132
-79
lines changed

internal/agent/client.go

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ import (
2323
)
2424

2525
type Client struct {
26-
client pb.AgentServiceClient
27-
host docker.Host
28-
conn *grpc.ClientConn
26+
client pb.AgentServiceClient
27+
conn *grpc.ClientConn
28+
endpoint string
2929
}
3030

3131
func NewClient(endpoint string, certificates tls.Certificate, opts ...grpc.DialOption) (*Client, error) {
@@ -51,26 +51,11 @@ func NewClient(endpoint string, certificates tls.Certificate, opts ...grpc.DialO
5151
}
5252

5353
client := pb.NewAgentServiceClient(conn)
54-
info, err := client.HostInfo(context.Background(), &pb.HostInfoRequest{})
55-
if err != nil {
56-
conn.Close()
57-
return nil, fmt.Errorf("failed to get host info: %w", err)
58-
}
5954

6055
return &Client{
61-
client: client,
62-
conn: conn,
63-
64-
host: docker.Host{
65-
ID: info.Host.Id,
66-
Name: info.Host.Name,
67-
NCPU: int(info.Host.CpuCores),
68-
MemTotal: int64(info.Host.Memory),
69-
Endpoint: endpoint,
70-
Type: "agent",
71-
DockerVersion: info.Host.DockerVersion,
72-
AgentVersion: info.Host.AgentVersion,
73-
},
56+
client: client,
57+
conn: conn,
58+
endpoint: endpoint,
7459
}, nil
7560
}
7661

@@ -361,8 +346,26 @@ func (c *Client) ListContainers() ([]docker.Container, error) {
361346
return containers, nil
362347
}
363348

364-
func (c *Client) Host() docker.Host {
365-
return c.host
349+
func (c *Client) Host() (docker.Host, error) {
350+
info, err := c.client.HostInfo(context.Background(), &pb.HostInfoRequest{})
351+
if err != nil {
352+
return docker.Host{
353+
Endpoint: c.endpoint,
354+
Type: "agent",
355+
Available: false,
356+
}, err
357+
}
358+
359+
return docker.Host{
360+
ID: info.Host.Id,
361+
Name: info.Host.Name,
362+
NCPU: int(info.Host.CpuCores),
363+
MemTotal: int64(info.Host.Memory),
364+
Endpoint: c.endpoint,
365+
Type: "agent",
366+
DockerVersion: info.Host.DockerVersion,
367+
AgentVersion: info.Host.AgentVersion,
368+
}, nil
366369
}
367370

368371
func (c *Client) Close() error {

internal/support/docker/agent_service.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
type agentService struct {
1313
client *agent.Client
14+
host docker.Host
1415
}
1516

1617
func NewAgentService(client *agent.Client) ClientService {
@@ -39,8 +40,16 @@ func (a *agentService) ListContainers() ([]docker.Container, error) {
3940
return a.client.ListContainers()
4041
}
4142

42-
func (a *agentService) Host() docker.Host {
43-
return a.client.Host()
43+
func (a *agentService) Host() (docker.Host, error) {
44+
host, err := a.client.Host()
45+
if err != nil {
46+
host := a.host
47+
host.Available = false
48+
return host, err
49+
}
50+
51+
a.host = host
52+
return a.host, err
4453
}
4554

4655
func (a *agentService) SubscribeStats(ctx context.Context, stats chan<- docker.ContainerStat) {

internal/support/docker/client_service.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
type ClientService interface {
1212
FindContainer(id string) (docker.Container, error)
1313
ListContainers() ([]docker.Container, error)
14-
Host() docker.Host
14+
Host() (docker.Host, error)
1515
ContainerAction(container docker.Container, action docker.ContainerAction) error
1616
LogsBetweenDates(ctx context.Context, container docker.Container, from time.Time, to time.Time, stdTypes docker.StdType) (<-chan *docker.LogEvent, error)
1717
RawLogs(ctx context.Context, container docker.Container, from time.Time, to time.Time, stdTypes docker.StdType) (io.ReadCloser, error)
@@ -82,8 +82,8 @@ func (d *dockerClientService) ListContainers() ([]docker.Container, error) {
8282
return d.store.ListContainers()
8383
}
8484

85-
func (d *dockerClientService) Host() docker.Host {
86-
return d.client.Host()
85+
func (d *dockerClientService) Host() (docker.Host, error) {
86+
return d.client.Host(), nil
8787
}
8888

8989
func (d *dockerClientService) SubscribeStats(ctx context.Context, stats chan<- docker.ContainerStat) {

internal/support/docker/multi_host_service.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ func (m *MultiHostService) ListAllContainers() ([]docker.Container, []error) {
7575
for _, client := range clients {
7676
list, err := client.ListContainers()
7777
if err != nil {
78-
log.Debugf("error listing containers for host %s: %v", client.Host().ID, err)
79-
host := client.Host()
78+
host, _ := client.Host()
79+
log.Debugf("error listing containers for host %s: %v", host.ID, err)
8080
host.Available = false
8181
errors = append(errors, &HostUnavailableError{Host: host, Err: err})
8282
continue

internal/support/docker/retriable_client_manager.go

Lines changed: 63 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/amir20/dozzle/internal/agent"
1010
"github.com/amir20/dozzle/internal/docker"
1111
"github.com/puzpuzpuz/xsync/v3"
12+
lop "github.com/samber/lo/parallel"
1213

1314
log "github.com/sirupsen/logrus"
1415
)
@@ -26,24 +27,39 @@ func NewRetriableClientManager(agents []string, certs tls.Certificate, clients .
2627

2728
clientMap := make(map[string]ClientService)
2829
for _, client := range clients {
29-
if _, ok := clientMap[client.Host().ID]; ok {
30-
log.Warnf("duplicate client found for host %s", client.Host().ID)
30+
host, err := client.Host()
31+
if err != nil {
32+
log.Warnf("error fetching host info for client %s: %v", host.ID, err)
33+
continue
34+
}
35+
36+
if _, ok := clientMap[host.ID]; ok {
37+
log.Warnf("duplicate client found for host %s", host.ID)
3138
} else {
32-
clientMap[client.Host().ID] = client
39+
clientMap[host.ID] = client
3340
}
3441
}
3542

3643
failed := make([]string, 0)
3744
for _, endpoint := range agents {
38-
if agent, err := agent.NewClient(endpoint, certs); err == nil {
39-
if _, ok := clientMap[agent.Host().ID]; ok {
40-
log.Warnf("duplicate client found for host %s", agent.Host().ID)
41-
} else {
42-
clientMap[agent.Host().ID] = NewAgentService(agent)
43-
}
44-
} else {
45+
agent, err := agent.NewClient(endpoint, certs)
46+
if err != nil {
4547
log.Warnf("error creating agent client for %s: %v", endpoint, err)
4648
failed = append(failed, endpoint)
49+
continue
50+
}
51+
52+
host, err := agent.Host()
53+
if err != nil {
54+
log.Warnf("error fetching host info for agent %s: %v", endpoint, err)
55+
failed = append(failed, endpoint)
56+
continue
57+
}
58+
59+
if _, ok := clientMap[host.ID]; ok {
60+
log.Warnf("duplicate client found for host %s", host.ID)
61+
} else {
62+
clientMap[host.ID] = NewAgentService(agent)
4763
}
4864
}
4965

@@ -70,29 +86,36 @@ func (m *RetriableClientManager) RetryAndList() ([]ClientService, []error) {
7086
if len(m.failedAgents) > 0 {
7187
newFailed := make([]string, 0)
7288
for _, endpoint := range m.failedAgents {
73-
if agent, err := agent.NewClient(endpoint, m.certs); err == nil {
74-
m.clients[agent.Host().ID] = NewAgentService(agent)
75-
76-
m.subscribers.Range(func(ctx context.Context, channel chan<- docker.Host) bool {
77-
host := agent.Host()
78-
host.Available = true
79-
80-
// We don't want to block the subscribers in event.go
81-
go func() {
82-
select {
83-
case channel <- host:
84-
case <-ctx.Done():
85-
}
86-
}()
87-
88-
return true
89-
})
90-
91-
} else {
89+
agent, err := agent.NewClient(endpoint, m.certs)
90+
if err != nil {
9291
log.Warnf("error creating agent client for %s: %v", endpoint, err)
9392
errors = append(errors, err)
9493
newFailed = append(newFailed, endpoint)
94+
continue
9595
}
96+
97+
host, err := agent.Host()
98+
if err != nil {
99+
log.Warnf("error fetching host info for agent %s: %v", endpoint, err)
100+
errors = append(errors, err)
101+
newFailed = append(newFailed, endpoint)
102+
continue
103+
}
104+
105+
m.clients[host.ID] = NewAgentService(agent)
106+
m.subscribers.Range(func(ctx context.Context, channel chan<- docker.Host) bool {
107+
host.Available = true
108+
109+
// We don't want to block the subscribers in event.go
110+
go func() {
111+
select {
112+
case channel <- host:
113+
case <-ctx.Done():
114+
}
115+
}()
116+
117+
return true
118+
})
96119
}
97120
m.failedAgents = newFailed
98121
}
@@ -128,12 +151,17 @@ func (m *RetriableClientManager) String() string {
128151
func (m *RetriableClientManager) Hosts() []docker.Host {
129152
clients := m.List()
130153

131-
hosts := make([]docker.Host, 0, len(clients))
132-
for _, client := range clients {
133-
host := client.Host()
134-
host.Available = true
135-
hosts = append(hosts, host)
136-
}
154+
hosts := lop.Map(clients, func(client ClientService, _ int) docker.Host {
155+
host, err := client.Host()
156+
log.Debugf("host: %v, err: %v", host, err)
157+
if err != nil {
158+
host.Available = false
159+
} else {
160+
host.Available = true
161+
}
162+
163+
return host
164+
})
137165

138166
for _, endpoint := range m.failedAgents {
139167
hosts = append(hosts, docker.Host{

internal/support/docker/swarm_client_manager.go

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/amir20/dozzle/internal/docker"
1212
"github.com/puzpuzpuz/xsync/v3"
1313
"github.com/samber/lo"
14+
lop "github.com/samber/lo/parallel"
1415

1516
log "github.com/sirupsen/logrus"
1617
)
@@ -78,7 +79,8 @@ func (m *SwarmClientManager) RetryAndList() ([]ClientService, []error) {
7879

7980
clients := lo.Values(m.clients)
8081
endpoints := lo.KeyBy(clients, func(client ClientService) string {
81-
return client.Host().Endpoint
82+
host, _ := client.Host()
83+
return host.Endpoint
8284
})
8385

8486
log.Debugf("tasks.dozzle = %v, localIP = %v, clients.endpoints = %v", ips, m.localIPs, lo.Keys(endpoints))
@@ -101,20 +103,29 @@ func (m *SwarmClientManager) RetryAndList() ([]ClientService, []error) {
101103
continue
102104
}
103105

104-
if agent.Host().ID == m.localClient.Host().ID {
105-
log.Debugf("skipping local client with ID %s", agent.Host().ID)
106+
host, err := agent.Host()
107+
if err != nil {
108+
log.Warnf("error getting host data for agent %s: %v", ip, err)
109+
errors = append(errors, err)
110+
if err := agent.Close(); err != nil {
111+
log.Warnf("error closing local client: %v", err)
112+
}
113+
continue
114+
}
115+
116+
if host.ID == m.localClient.Host().ID {
117+
log.Debugf("skipping local client with ID %s", host.ID)
106118
if err := agent.Close(); err != nil {
107119
log.Warnf("error closing local client: %v", err)
108120
}
109121
continue
110122
}
111123

112124
client := NewAgentService(agent)
113-
m.clients[agent.Host().ID] = client
114-
log.Infof("added client for %s", agent.Host().ID)
125+
m.clients[host.ID] = client
126+
log.Infof("added client for %s", host.ID)
115127

116128
m.subscribers.Range(func(ctx context.Context, channel chan<- docker.Host) bool {
117-
host := agent.Host()
118129
host.Available = true
119130
host.Type = "swarm"
120131

@@ -153,16 +164,18 @@ func (m *SwarmClientManager) Find(id string) (ClientService, bool) {
153164
func (m *SwarmClientManager) Hosts() []docker.Host {
154165
clients := m.List()
155166

156-
hosts := make([]docker.Host, 0, len(clients))
157-
158-
for _, client := range clients {
159-
host := client.Host()
160-
host.Available = true
167+
return lop.Map(clients, func(client ClientService, _ int) docker.Host {
168+
host, err := client.Host()
169+
if err != nil {
170+
host.Available = false
171+
} else {
172+
host.Available = true
173+
}
161174
host.Type = "swarm"
162-
hosts = append(hosts, host)
163-
}
164175

165-
return hosts
176+
return host
177+
})
178+
166179
}
167180

168181
func (m *SwarmClientManager) String() string {

0 commit comments

Comments
 (0)