forked from knechtionscoding/keel
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanager.go
148 lines (123 loc) · 3.56 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package pubsub
import (
"sync"
"time"
"golang.org/x/net/context"
"github.com/datagravity-ai/keel/provider"
log "github.com/sirupsen/logrus"
)
// DefaultManager - subscription manager
type DefaultManager struct {
providers provider.Providers
client Subscriber
// existing subscribers
mu *sync.Mutex
// a map of GCR URIs and subscribers to those URIs
// i.e. key could be something like: gcr.io%2Fmy-project
subscribers map[string]context.Context
// projectID is required to correctly set GCR subscriptions
projectID string
// clusterName is used to create unique names for the subscriptions. Each subscription
// has to have a unique name in order to receive all events (otherwise, if it is the same,
// only 1 keel instance will receive a GCR event after a push event)
clusterName string
// scanTick - scan interval in seconds, defaults to 60 seconds
scanTick int
// root context
ctx context.Context
}
// Subscriber - subscribe is responsible to listen for repository events and
// inform providers
type Subscriber interface {
Subscribe(ctx context.Context, topic, subscription string) error
}
// NewDefaultManager - creates new pubsub manager to create subscription for deployments
func NewDefaultManager(clusterName, projectID string, providers provider.Providers, subClient Subscriber) *DefaultManager {
return &DefaultManager{
providers: providers,
client: subClient,
projectID: projectID,
clusterName: clusterName,
subscribers: make(map[string]context.Context),
mu: &sync.Mutex{},
scanTick: 60,
}
}
// Start - start scanning deployment for changes
func (s *DefaultManager) Start(ctx context.Context) error {
// setting root context
s.ctx = ctx
// initial scan
err := s.scan(ctx)
if err != nil {
log.WithFields(log.Fields{
"error": err,
}).Error("trigger.pubsub.manager: scan failed")
}
ticker := time.NewTicker(time.Duration(s.scanTick) * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
log.Debug("performing scan")
err := s.scan(ctx)
if err != nil {
log.WithFields(log.Fields{
"error": err,
}).Error("trigger.pubsub.manager: scan failed")
}
}
}
}
func (s *DefaultManager) scan(ctx context.Context) error {
trackedImages, err := s.providers.TrackedImages()
if err != nil {
return err
}
for _, trackedImage := range trackedImages {
if !isGoogleArtifactRegistry(trackedImage.Image.Registry()) {
log.Debugf("registry %s is not a GCR, skipping", trackedImage.Image.Registry())
continue
}
// uri
// https://cloud.google.com/container-registry/docs/configuring-notifications
s.ensureSubscription("gcr")
}
return nil
}
func (s *DefaultManager) subscribed(gcrURI string) bool {
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.subscribers[gcrURI]
return ok
}
func (s *DefaultManager) ensureSubscription(gcrURI string) {
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.subscribers[gcrURI]
if !ok {
ctx, cancel := context.WithCancel(s.ctx)
s.subscribers[gcrURI] = ctx
subName := containerRegistrySubName(s.clusterName, s.projectID, gcrURI)
go func() {
defer cancel()
err := s.client.Subscribe(s.ctx, gcrURI, subName)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"gcr_uri": gcrURI,
"subscription_name": subName,
}).Error("trigger.pubsub.manager: failed to create subscription")
}
// cleanup
s.removeSubscription(gcrURI)
}()
}
}
func (s *DefaultManager) removeSubscription(gcrURI string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.subscribers, gcrURI)
}