/
revolution.go
189 lines (173 loc) · 5.15 KB
/
revolution.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
/*This file is part of kuberpult.
Kuberpult is free software: you can redistribute it and/or modify
it under the terms of the Expat(MIT) License as published by
the Free Software Foundation.
Kuberpult is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
MIT License for more details.
You should have received a copy of the MIT License
along with kuberpult. If not, see <https://directory.fsf.org/wiki/License:Expat>.
Copyright 2023 freiheit.com*/
package revolution
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"github.com/freiheit-com/kuberpult/services/rollout-service/pkg/service"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)
type Config struct {
URL string
Token []byte
Concurrency int
MaxEventAge time.Duration
}
func New(config Config) *Subscriber {
sub := &Subscriber{
group: errgroup.Group{},
}
sub.group.SetLimit(config.Concurrency)
sub.url = config.URL
sub.token = config.Token
sub.ready = func() {}
sub.maxAge = config.MaxEventAge
sub.now = time.Now
return sub
}
type Subscriber struct {
group errgroup.Group
token []byte
url string
// The ready function is needed to sync tests
ready func()
state map[service.Key]*service.BroadcastEvent
// The maximum age of events that should be considered. If 0,
// all events are considered.
maxAge time.Duration
// Used to simulate the current time in tests
now func() time.Time
}
func (s *Subscriber) Subscribe(ctx context.Context, b *service.Broadcast) error {
if s.state == nil {
s.state = map[service.Key]*service.BroadcastEvent{}
}
for {
err := s.subscribeOnce(ctx, b)
select {
case <-ctx.Done():
return err
default:
}
}
}
func (s *Subscriber) subscribeOnce(ctx context.Context, b *service.Broadcast) error {
event, ch, unsub := b.Start()
defer unsub()
for _, ev := range event {
if ev.IsProduction != nil && *ev.IsProduction {
s.state[ev.Key] = ev
}
}
s.ready()
for {
select {
case <-ctx.Done():
return s.group.Wait()
case ev, ok := <-ch:
if !ok {
return s.group.Wait()
}
if ev.IsProduction == nil || !*ev.IsProduction {
continue
}
if s.maxAge != 0 &&
ev.ArgocdVersion != nil &&
ev.ArgocdVersion.DeployedAt.Add(s.maxAge).Before(s.now()) {
continue
}
if shouldNotify(s.state[ev.Key], ev) {
s.group.Go(s.notify(ctx, ev))
}
s.state[ev.Key] = ev
}
}
}
func shouldNotify(old *service.BroadcastEvent, nu *service.BroadcastEvent) bool {
// check for fields that must be present to generate the request
if nu.ArgocdVersion == nil || nu.IsProduction == nil || nu.ArgocdVersion.SourceCommitId == "" {
return false
}
if old == nil || old.ArgocdVersion == nil || old.IsProduction == nil {
return true
}
if old.ArgocdVersion.SourceCommitId != nu.ArgocdVersion.SourceCommitId || old.ArgocdVersion.DeployedAt != nu.ArgocdVersion.DeployedAt {
return true
}
return false
}
type kuberpultEvent struct {
Id string `json:"id"`
// Id/UUID to de-duplicate events
CommitHash string `json:"commitHash"`
EventTime string `json:"eventTime"`
// optimally in RFC3339 format
URL string `json:"url,omitempty"`
// where to see the logs/status of the deployment
ServiceName string `json:"serviceName"`
}
func (s *Subscriber) notify(ctx context.Context, ev *service.BroadcastEvent) func() error {
event := kuberpultEvent{
Id: uuidFor(ev.Application, ev.ArgocdVersion.SourceCommitId, ev.ArgocdVersion.DeployedAt.String()),
CommitHash: ev.ArgocdVersion.SourceCommitId,
EventTime: ev.ArgocdVersion.DeployedAt.Format(time.RFC3339),
ServiceName: ev.Application,
}
return func() error {
span, _ := tracer.StartSpanFromContext(ctx, "revolution.notify")
defer span.Finish()
span.SetTag("revolution.url", s.url)
span.SetTag("revolution.id", event.Id)
span.SetTag("environment", ev.Environment)
span.SetTag("application", ev.Application)
body, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
h := hmac.New(sha256.New, s.token)
h.Write([]byte(body))
sha := "sha256=" + hex.EncodeToString(h.Sum(nil))
r, err := http.NewRequest(http.MethodPost, s.url, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("creating http request: %w", err)
}
r.Header.Set("Content-Type", "application/json")
r.Header.Set("X-Hub-Signature-256", sha)
r.Header.Set("User-Agent", "kuberpult")
s, err := http.DefaultClient.Do(r)
if err != nil {
span.Finish(tracer.WithError(err))
return nil
}
span.SetTag("http.status_code", s.Status)
defer s.Body.Close()
content, _ := io.ReadAll(s.Body)
if s.StatusCode > 299 {
return fmt.Errorf("http status (%d): %s", s.StatusCode, content)
}
return nil
}
}
var kuberpultUuid uuid.UUID = uuid.NewSHA1(uuid.MustParse("00000000-0000-0000-0000-000000000000"), []byte("kuberpult"))
func uuidFor(application, commitHash, deployedAt string) string {
return uuid.NewSHA1(kuberpultUuid, []byte(fmt.Sprintf("%s\n%s\n%s", application, commitHash, deployedAt))).String()
}