-
Notifications
You must be signed in to change notification settings - Fork 14
/
container_events.go
84 lines (68 loc) · 2.52 KB
/
container_events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package abstractions
import (
"context"
"fmt"
"strings"
"github.com/beam-cloud/beta9/pkg/common"
"github.com/beam-cloud/beta9/pkg/types"
)
type ContainerEventManager struct {
containerPrefix string
keyEventChan chan common.KeyEvent
keyEventManager *common.KeyEventManager
instanceFactory func(stubId string, options ...func(IAutoscaledInstance)) (IAutoscaledInstance, error)
}
func NewContainerEventManager(containerPrefix string, keyEventManager *common.KeyEventManager, instanceFactory func(stubId string, options ...func(IAutoscaledInstance)) (IAutoscaledInstance, error)) (*ContainerEventManager, error) {
keyEventChan := make(chan common.KeyEvent)
return &ContainerEventManager{
containerPrefix: containerPrefix,
instanceFactory: instanceFactory,
keyEventChan: keyEventChan,
keyEventManager: keyEventManager,
}, nil
}
func (em *ContainerEventManager) Listen(ctx context.Context) {
go em.keyEventManager.ListenForPattern(ctx, common.RedisKeys.SchedulerContainerState(em.containerPrefix), em.keyEventChan)
go em.handleContainerEvents(ctx)
}
func (em *ContainerEventManager) handleContainerEvents(ctx context.Context) {
for {
select {
case event := <-em.keyEventChan:
operation := event.Operation
/*
Container IDs are formatted like so:
endpoint-6f073820-3d2f-483c-8089-0a30862c3145-80fd7e36
containerPrefix is the first portion of this string, in the above example "endpoint"
This portion "6f073820-3d2f-483c-8089-0a30862c3145" is the stub ID, and the final "80fd7e36"
is a UUID specific to this container.
Because we listen for keyspace notifications on a certain container prefix, actual events
come in like:
{Key:-6f073820-3d2f-483c-8089-0a30862c3145-80fd7e36 Operation:hset}
So what we are doing here is reconstructing the containerId using a known prefix, and then parsing
out the stubId.
*/
containerId := fmt.Sprintf("%s%s", em.containerPrefix, event.Key)
containerIdParts := strings.Split(containerId, "-")
stubId := strings.Join(containerIdParts[1:6], "-")
instance, err := em.instanceFactory(stubId)
if err != nil {
continue
}
switch operation {
case common.KeyOperationSet, common.KeyOperationHSet:
instance.ConsumeContainerEvent(types.ContainerEvent{
ContainerId: containerId,
Change: +1,
})
case common.KeyOperationDel, common.KeyOperationExpired:
instance.ConsumeContainerEvent(types.ContainerEvent{
ContainerId: containerId,
Change: -1,
})
}
case <-ctx.Done():
return
}
}
}