Skip to content

Commit 8c9c9a8

Browse files
authored
fix: fixes deadlocks in store (#3617)
1 parent 571de40 commit 8c9c9a8

File tree

6 files changed

+109
-83
lines changed

6 files changed

+109
-83
lines changed

assets/components/HostList.vue

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
</ul>
3232
<ul class="flex flex-row flex-wrap gap-x-2 text-sm md:gap-3">
3333
<li class="flex items-center gap-1">
34-
<octicon:container-24 class="inline-block" /> {{ $t("label.container", hostContainers[host.id]?.length) }}
34+
<octicon:container-24 class="inline-block" />
35+
{{ $t("label.container", hostContainers[host.id]?.length ?? 0) }}
3536
</li>
3637
<li class="flex items-center gap-1"><mdi:docker class="inline-block" /> {{ host.dockerVersion }}</li>
3738
</ul>
@@ -107,7 +108,10 @@ useIntervalFn(
107108
stat.totalCPU += container.stat.cpu;
108109
stat.totalMem += container.stat.memoryUsage;
109110
}
110-
weightedStats[host].mostRecent = stat;
111+
if (weightedStats[host]) {
112+
// TODO fix this init
113+
weightedStats[host].mostRecent = stat;
114+
}
111115
}
112116
},
113117
1000,

internal/container/container_store.go

Lines changed: 47 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -176,23 +176,30 @@ func (s *ContainerStore) FindContainer(id string, labels ContainerLabels) (Conta
176176
defer cancel()
177177
if newContainer, err := s.client.FindContainer(ctx, id); err == nil {
178178
return &newContainer, false
179+
} else {
180+
log.Error().Err(err).Msg("failed to fetch container")
181+
return c, false
179182
}
180183
}
181184
return c, false
182185
}); ok {
183-
event := ContainerEvent{
184-
Name: "update",
185-
Host: s.client.Host().ID,
186-
ActorID: id,
187-
}
188-
s.subscribers.Range(func(c context.Context, events chan<- ContainerEvent) bool {
189-
select {
190-
case events <- event:
191-
case <-c.Done():
192-
s.subscribers.Delete(c)
186+
go func() {
187+
event := ContainerEvent{
188+
Name: "update",
189+
Host: newContainer.Host,
190+
ActorID: id,
191+
Container: newContainer,
193192
}
194-
return true
195-
})
193+
194+
s.subscribers.Range(func(c context.Context, events chan<- ContainerEvent) bool {
195+
select {
196+
case events <- event:
197+
case <-c.Done():
198+
s.subscribers.Delete(c)
199+
}
200+
return true
201+
})
202+
}()
196203
return *newContainer, nil
197204
}
198205
}
@@ -248,7 +255,7 @@ func (s *ContainerStore) init() {
248255
for {
249256
select {
250257
case event := <-s.events:
251-
log.Trace().Str("event", event.Name).Str("id", event.ActorID).Msg("received container event")
258+
log.Debug().Str("event", event.Name).Str("id", event.ActorID).Msg("received container event")
252259
switch event.Name {
253260
case "create":
254261
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
@@ -262,7 +269,6 @@ func (s *ContainerStore) init() {
262269
})
263270

264271
if valid {
265-
log.Debug().Str("id", container.ID).Msg("container started")
266272
s.containers.Store(container.ID, &container)
267273
s.newContainerSubscribers.Range(func(c context.Context, containers chan<- Container) bool {
268274
select {
@@ -287,7 +293,6 @@ func (s *ContainerStore) init() {
287293
})
288294

289295
if valid {
290-
log.Debug().Str("id", container.ID).Msg("container started")
291296
s.containers.Store(container.ID, &container)
292297
s.newContainerSubscribers.Range(func(c context.Context, containers chan<- Container) bool {
293298
select {
@@ -304,42 +309,41 @@ func (s *ContainerStore) init() {
304309
s.containers.Delete(event.ActorID)
305310

306311
case "update":
307-
s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, bool) {
312+
started := false
313+
updatedContainer, _ := s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, bool) {
308314
if loaded {
309-
log.Debug().Str("id", c.ID).Msg("container updated")
310-
started := false
311-
if newContainer, err := s.client.FindContainer(context.Background(), c.ID); err == nil {
312-
if newContainer.State == "running" && c.State != "running" {
313-
started = true
314-
}
315-
c.Name = newContainer.Name
316-
c.State = newContainer.State
317-
c.Labels = newContainer.Labels
318-
c.StartedAt = newContainer.StartedAt
319-
c.FinishedAt = newContainer.FinishedAt
320-
c.Created = newContainer.Created
321-
} else {
322-
log.Error().Err(err).Str("id", c.ID).Msg("failed to update container")
323-
}
324-
if started {
325-
s.subscribers.Range(func(ctx context.Context, events chan<- ContainerEvent) bool {
326-
select {
327-
case events <- ContainerEvent{
328-
Name: "start",
329-
ActorID: c.ID,
330-
}:
331-
case <-ctx.Done():
332-
s.subscribers.Delete(ctx)
333-
}
334-
return true
335-
})
315+
newContainer := event.Container
316+
if newContainer.State == "running" && c.State != "running" {
317+
started = true
336318
}
319+
c.Name = newContainer.Name
320+
c.State = newContainer.State
321+
c.Labels = newContainer.Labels
322+
c.StartedAt = newContainer.StartedAt
323+
c.FinishedAt = newContainer.FinishedAt
324+
c.Created = newContainer.Created
325+
c.Host = newContainer.Host
337326
return c, false
338327
} else {
339328
return c, true
340329
}
341330
})
342331

332+
if started {
333+
s.subscribers.Range(func(ctx context.Context, events chan<- ContainerEvent) bool {
334+
select {
335+
case events <- ContainerEvent{
336+
Name: "start",
337+
ActorID: updatedContainer.ID,
338+
Host: updatedContainer.Host,
339+
}:
340+
case <-ctx.Done():
341+
s.subscribers.Delete(ctx)
342+
}
343+
return true
344+
})
345+
}
346+
343347
case "die":
344348
s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, bool) {
345349
if loaded {

internal/container/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type ContainerEvent struct {
4242
ActorID string `json:"actorId"`
4343
ActorAttributes map[string]string `json:"actorAttributes,omitempty"`
4444
Time time.Time `json:"time"`
45+
Container *Container `json:"-"`
4546
}
4647

4748
type ContainerLabels map[string][]string

internal/k8s/client.go

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type K8sClient struct {
2525
Clientset *kubernetes.Clientset
2626
namespace string
2727
config *rest.Config
28+
host container.Host
2829
}
2930

3031
func NewK8sClient(namespace string) (*K8sClient, error) {
@@ -56,12 +57,49 @@ func NewK8sClient(namespace string) (*K8sClient, error) {
5657
return nil, err
5758
}
5859

60+
nodes, err := clientset.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
61+
if err != nil {
62+
return nil, err
63+
}
64+
if len(nodes.Items) == 0 {
65+
return nil, fmt.Errorf("nodes not found")
66+
}
67+
node := nodes.Items[0]
68+
5969
return &K8sClient{
6070
Clientset: clientset,
6171
namespace: namespace,
6272
config: config,
73+
host: container.Host{
74+
ID: node.Status.NodeInfo.MachineID,
75+
Name: node.Name,
76+
},
6377
}, nil
6478
}
79+
80+
func podToContainers(pod *corev1.Pod) []container.Container {
81+
started := time.Time{}
82+
if pod.Status.StartTime != nil {
83+
started = pod.Status.StartTime.Time
84+
}
85+
var containers []container.Container
86+
for _, c := range pod.Spec.Containers {
87+
containers = append(containers, container.Container{
88+
ID: pod.Name + ":" + c.Name,
89+
Name: pod.Name + "/" + c.Name,
90+
Image: c.Image,
91+
Created: pod.CreationTimestamp.Time,
92+
State: phaseToState(pod.Status.Phase),
93+
StartedAt: started,
94+
Command: strings.Join(c.Command, " "),
95+
Host: pod.Spec.NodeName,
96+
Tty: c.TTY,
97+
Stats: utils.NewRingBuffer[container.ContainerStat](300),
98+
})
99+
}
100+
return containers
101+
}
102+
65103
func (k *K8sClient) ListContainers(ctx context.Context, labels container.ContainerLabels) ([]container.Container, error) {
66104
pods, err := k.Clientset.CoreV1().Pods(k.namespace).List(ctx, metav1.ListOptions{})
67105
if err != nil {
@@ -77,7 +115,6 @@ func (k *K8sClient) ListContainers(ctx context.Context, labels container.Contain
77115
Image: c.Image,
78116
Created: pod.CreationTimestamp.Time,
79117
State: phaseToState(pod.Status.Phase),
80-
Tty: c.TTY,
81118
Host: pod.Spec.NodeName,
82119
})
83120
}
@@ -111,24 +148,9 @@ func (k *K8sClient) FindContainer(ctx context.Context, id string) (container.Con
111148
return container.Container{}, err
112149
}
113150

114-
for _, c := range pod.Spec.Containers {
115-
if c.Name == containerName {
116-
started := time.Time{}
117-
if pod.Status.StartTime != nil {
118-
started = pod.Status.StartTime.Time
119-
}
120-
return container.Container{
121-
ID: pod.Name + ":" + c.Name,
122-
Name: pod.Name + "/" + c.Name,
123-
Image: c.Image,
124-
Created: pod.CreationTimestamp.Time,
125-
State: phaseToState(pod.Status.Phase),
126-
StartedAt: started,
127-
Command: strings.Join(c.Command, " "),
128-
Host: pod.Spec.NodeName,
129-
Tty: c.TTY,
130-
Stats: utils.NewRingBuffer[container.ContainerStat](300),
131-
}, nil
151+
for _, c := range podToContainers(pod) {
152+
if c.ID == id {
153+
return c, nil
132154
}
133155
}
134156

@@ -183,14 +205,13 @@ func (k *K8sClient) ContainerEvents(ctx context.Context, ch chan<- container.Con
183205
name = "update"
184206
}
185207

186-
log.Debug().Interface("event.Type", event.Type).Str("name", name).Interface("StartTime", pod.Status.StartTime).Msg("Sending container event")
187-
188-
for _, c := range pod.Spec.Containers {
208+
for _, c := range podToContainers(pod) {
189209
ch <- container.ContainerEvent{
190-
Name: name,
191-
ActorID: pod.Name + ":" + c.Name,
192-
Host: pod.Spec.NodeName,
193-
Time: time.Now(),
210+
Name: name,
211+
ActorID: c.ID,
212+
Host: pod.Spec.NodeName,
213+
Time: time.Now(),
214+
Container: &c,
194215
}
195216
}
196217
}
@@ -208,7 +229,7 @@ func (k *K8sClient) Ping(ctx context.Context) error {
208229
}
209230

210231
func (k *K8sClient) Host() container.Host {
211-
return container.Host{}
232+
return k.host
212233
}
213234

214235
func (k *K8sClient) ContainerActions(ctx context.Context, action container.ContainerAction, containerID string) error {

internal/support/k8s/k8s_cluster_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func (m *K8sClusterService) Hosts() []container.Host {
122122
}
123123

124124
func (m *K8sClusterService) LocalHost() (container.Host, error) {
125-
return container.Host{}, nil
125+
return m.client.client.Host(), nil
126126
}
127127

128128
func (m *K8sClusterService) SubscribeAvailableHosts(ctx context.Context, hosts chan<- container.Host) {

internal/web/events.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,12 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
6868
if !ok {
6969
return
7070
}
71+
log.Debug().Str("event", event.Name).Str("id", event.ActorID).Msg("container event from store")
7172
switch event.Name {
7273
case "start", "die", "destroy", "rename":
7374
if event.Name == "start" || event.Name == "rename" {
74-
log.Debug().Str("action", event.Name).Str("id", event.ActorID).Msg("container event")
75-
7675
if containers, err := h.hostService.ListContainersForHost(event.Host, userLabels); err == nil {
76+
log.Debug().Str("host", event.Host).Int("count", len(containers)).Msg("updating containers for host")
7777
if err := sseWriter.Event("containers-changed", containers); err != nil {
7878
log.Error().Err(err).Msg("error writing containers to event stream")
7979
return
@@ -87,15 +87,11 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
8787
}
8888

8989
case "update":
90-
log.Debug().Str("id", event.ActorID).Msg("container updated")
91-
if containerService, err := h.hostService.FindContainer(event.Host, event.ActorID, userLabels); err == nil {
92-
if err := sseWriter.Event("container-updated", containerService.Container); err != nil {
93-
log.Error().Err(err).Msg("error writing event to event stream")
94-
return
95-
}
90+
if err := sseWriter.Event("container-updated", event.Container); err != nil {
91+
log.Error().Err(err).Msg("error writing event to event stream")
92+
return
9693
}
9794
case "health_status: healthy", "health_status: unhealthy":
98-
log.Debug().Str("container", event.ActorID).Str("health", event.Name).Msg("container health status")
9995
healthy := "unhealthy"
10096
if event.Name == "health_status: healthy" {
10197
healthy = "healthy"
@@ -135,7 +131,7 @@ func sendBeaconEvent(h *handler, r *http.Request, runningContainers int) {
135131

136132
local, err := h.hostService.LocalHost()
137133
if err == nil {
138-
b.ServerID = local.ID // TODO : fix this for k8s
134+
b.ServerID = local.ID
139135
}
140136

141137
if err := analytics.SendBeacon(b); err != nil {

0 commit comments

Comments
 (0)