/
publisher.go
136 lines (115 loc) · 4.21 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package v2
import (
"context"
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
"github.com/grafana/synthetic-monitoring-agent/internal/model"
"github.com/grafana/synthetic-monitoring-agent/internal/pusher"
)
const Name = "v2"
// NewPublisher creates a new instance of the v2 Publisher.
//
// The provider context is used to control the lifetime of the publisher.
func NewPublisher(ctx context.Context, tenantProvider pusher.TenantProvider, logger zerolog.Logger, pr prometheus.Registerer) pusher.Publisher {
impl := &publisherImpl{
ctx: ctx,
tenantProvider: tenantProvider,
options: defaultPusherOptions,
handlers: make(map[model.GlobalID]payloadHandler),
}
impl.options.logger = logger
impl.options.metrics = pusher.NewMetrics(pr)
return impl
}
type payloadHandler interface {
publish(payload pusher.Payload)
// run returns the handler that should run or nil to signal that it
// should be terminated.
run(ctx context.Context) payloadHandler
}
type publisherImpl struct {
ctx context.Context
tenantProvider pusher.TenantProvider
options pusherOptions
handlerMutex sync.Mutex // protects the handlers map
handlers map[model.GlobalID]payloadHandler
}
var _ pusher.Publisher = &publisherImpl{}
func (p *publisherImpl) Publish(payload pusher.Payload) {
tenantID := payload.Tenant()
handler, found := p.getHandler(tenantID)
if !found {
var swapped bool
newHandler := newTenantPusher(tenantID, p.tenantProvider, p.options.withTenant(tenantID))
handler, swapped = p.replaceHandler(tenantID, nil, newHandler)
if swapped {
go p.runHandler(tenantID, handler)
}
}
handler.publish(payload)
}
func (p *publisherImpl) runHandler(tenantID model.GlobalID, h payloadHandler) {
tid, rid := model.GetLocalAndRegionIDs(tenantID)
p.options.logger.Info().Int64("tenant_id", tid).Int("region_id", rid).Msg("started push handler")
defer p.options.logger.Info().Int64("tenant_id", tid).Int("region_id", rid).Msg("stopped push handler")
for ok := true; ok && h != nil; {
next := h.run(p.ctx)
h, ok = p.replaceHandler(tenantID, h, next)
if !ok {
p.options.logger.Error().Int64("tenant_id", tid).Int("region_id", rid).Msg("unable to swap handler, tenant hijacked")
}
}
}
// replaceHandler exchanges the old handler with the new handler for the tenant
// identified by tenantID.
//
// By passing a nil new handler, you are trying to delete. A nil old handler
// means you are trying to add.
//
// The handler currently in effect is returned, along with whether the handler
// was changed or not.
func (p *publisherImpl) replaceHandler(tenantID model.GlobalID, old, new payloadHandler) (payloadHandler, bool) {
p.handlerMutex.Lock()
defer p.handlerMutex.Unlock()
// Get the existing handler if any.
current := p.handlers[tenantID]
// old | current | new | op
// --------+---------+---------+---------------
// nil | nil | nil | delete (noop)
// nil | nil | non-nil | add
// *nil | non-nil | nil | delete
// *nil | non-nil | non-nil | replace
// *non-nil | nil | nil | delete (noop)
// *non-nil | nil | non-nil | add
// non-nil | non-nil | nil | delete
// non-nil | non-nil | non-nil | replace
// If old is nil, that means we are trying to add a handler. If current
// is not nil, that means there's an existing handler, and the addition
// is not necessary. If current is nil, we go ahead and add the new handler.
//
// If old is not nil, we are trying to replace or delete a handler.
//
// If there's nothing there, current is nil and therefore different from
// old. That means there's nothing to replace (somebody beat us to it)
// and nothing to delete (idem).
//
// If there's something there, current is not nil, we have to replace
// the existing one.
if current != old {
return current, false
}
if new != nil {
p.handlers[tenantID] = new
} else {
delete(p.handlers, tenantID)
}
p.options.metrics.InstalledHandlers.Set(float64(len(p.handlers)))
return new, true
}
func (p *publisherImpl) getHandler(tenantID model.GlobalID) (payloadHandler, bool) {
p.handlerMutex.Lock()
defer p.handlerMutex.Unlock()
handler, found := p.handlers[tenantID]
return handler, found
}