Skip to content

Commit

Permalink
refactor manager key to be a struct
Browse files Browse the repository at this point in the history
  • Loading branch information
leklund committed Nov 29, 2022
1 parent e86aa0c commit c3e93ec
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 42 deletions.
65 changes: 33 additions & 32 deletions pkg/rt/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"sort"
"strings"
"sync"

"github.com/fastly/fastly-exporter/pkg/api"
Expand All @@ -26,6 +25,11 @@ type MetricsProvider interface {
MetricsFor(serviceID string) *prom.Metrics
}

type subscriberKey struct {
serviceID string
product string
}

// Manager owns a set of subscribers. On refresh, it asks the ServiceIdentifier
// for a set of service IDs that should be active, and manages the lifecycles of
// the corresponding subscribers.
Expand All @@ -39,7 +43,7 @@ type Manager struct {
logger log.Logger

mtx sync.RWMutex
managed map[string]interrupt
managed map[subscriberKey]interrupt
}

// NewManager returns a usable manager. Callers should invoke Refresh on a
Expand All @@ -56,7 +60,7 @@ func NewManager(ids ServiceIdentifier, client HTTPClient, token string, metrics
productCache: productCache,
logger: logger,

managed: map[string]interrupt{},
managed: map[subscriberKey]interrupt{},
}
}

Expand All @@ -71,11 +75,11 @@ func (m *Manager) Refresh() {
m.mtx.Lock()
defer m.mtx.Unlock()

nextgen := map[string]interrupt{}
nextgen := map[subscriberKey]interrupt{}
for _, product := range api.Products {
if m.productCache.HasAccess(product) {
for _, id := range m.ids.ServiceIDs() {
key := id + sep + product
key := subscriberKey{serviceID: id, product: product}

if irq, ok := m.managed[key]; ok {
level.Debug(m.logger).Log("service_id", id, "type", product, "subscriber", "maintain")
Expand All @@ -88,30 +92,28 @@ func (m *Manager) Refresh() {
}
}

for _, key := range m.managedIDsWithLock() {
id, managedProduct := parseKey(key)
if managedProduct != product {
for _, key := range m.managedKeysWithLock() {
if key.product != product {
continue
}

level.Info(m.logger).Log("service_id", id, "type", product, "subscriber", "stop")
level.Info(m.logger).Log("service_id", key.serviceID, "type", key.product, "subscriber", "stop")
irq := m.managed[key]
irq.cancel()
err := <-irq.done
delete(m.managed, key)
level.Debug(m.logger).Log("service_id", id, "type", product, "interrupt", err)
level.Debug(m.logger).Log("service_id", key.serviceID, "type", key.product, "interrupt", err)
}

for key, irq := range nextgen {
id, managedProduct := parseKey(key)
if managedProduct != product {
if key.product != product {
continue
}

select {
default: // still running (good)
case err := <-irq.done: // exited (bad)
level.Error(m.logger).Log("service_id", id, "type", product, "interrupt", err, "err", "premature termination", "msg", "will attempt to reconnect on next refresh")
level.Error(m.logger).Log("service_id", key.serviceID, "type", key.product, "interrupt", err, "err", "premature termination", "msg", "will attempt to reconnect on next refresh")
delete(nextgen, key)
}
}
Expand All @@ -122,25 +124,28 @@ func (m *Manager) Refresh() {

// Active returns the set of service IDs currently being managed.
// Mostly useful for tests.
func (m *Manager) Active() (serviceIDs []string) {
func (m *Manager) Active() []string {
serviceIDs := []string{}
m.mtx.RLock()
defer m.mtx.RUnlock()
return m.managedIDsWithLock()
for _, key := range m.managedKeysWithLock() {
serviceIDs = append(serviceIDs, key.serviceID)
}
return serviceIDs
}

// StopAll terminates and cleans up all active subscribers.
func (m *Manager) StopAll() {
m.mtx.Lock()
defer m.mtx.Unlock()

for _, key := range m.managedIDsWithLock() {
id, product := parseKey(key)
level.Info(m.logger).Log("service_id", id, "type", product, "subscriber", "stop")
for _, key := range m.managedKeysWithLock() {
level.Info(m.logger).Log("service_id", key.serviceID, "type", key.product, "subscriber", "stop")
irq := m.managed[key]
irq.cancel()
for i := 0; i < cap(irq.done); i++ {
err := <-irq.done
level.Debug(m.logger).Log("service_id", id, "goroutine", i+1, "of", cap(irq.done), "interrupt", err)
level.Debug(m.logger).Log("service_id", key.serviceID, "goroutine", i+1, "of", cap(irq.done), "interrupt", err)
}
delete(m.managed, key)
}
Expand All @@ -162,23 +167,19 @@ func (m *Manager) spawn(serviceID string, product string) interrupt {
return interrupt{cancel, done}
}

func (m *Manager) managedIDsWithLock() []string {
ids := make([]string, 0, len(m.managed))
for id := range m.managed {
ids = append(ids, id)
func (m *Manager) managedKeysWithLock() []subscriberKey {
keys := make([]subscriberKey, 0, len(m.managed))
for key := range m.managed {
keys = append(keys, key)
}
sort.Strings(ids)
return ids
sort.Slice(keys, func(i, j int) bool {
return keys[i].serviceID < keys[j].serviceID
})

return keys
}

type interrupt struct {
cancel func()
done <-chan error
}

func parseKey(key string) (string, string) {
ks := strings.Split(key, sep)
return ks[0], ks[1]
}

const sep = "|"
20 changes: 10 additions & 10 deletions pkg/rt/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ func TestManager(t *testing.T) {
assertStringSliceEqual(t, []string{}, manager.Active())

if want, have := []string{
`level=info service_id=101010 subscriber=create`,
`level=info service_id=2f2f2f subscriber=create`,
`level=info service_id=3a3b3c subscriber=create`,
`level=info service_id=101010 subscriber=stop`,
`level=info service_id=3a3b3c subscriber=stop`,
`level=info service_id=2f2f2f subscriber=stop`,
`level=info service_id=2f2f2f subscriber=create`,
`level=info service_id=3a3b3c subscriber=create`,
`level=info service_id=2f2f2f subscriber=stop`,
`level=info service_id=3a3b3c subscriber=stop`,
`level=info service_id=101010 type=default subscriber=create`,
`level=info service_id=2f2f2f type=default subscriber=create`,
`level=info service_id=3a3b3c type=default subscriber=create`,
`level=info service_id=101010 type=default subscriber=stop`,
`level=info service_id=3a3b3c type=default subscriber=stop`,
`level=info service_id=2f2f2f type=default subscriber=stop`,
`level=info service_id=2f2f2f type=default subscriber=create`,
`level=info service_id=3a3b3c type=default subscriber=create`,
`level=info service_id=2f2f2f type=default subscriber=stop`,
`level=info service_id=3a3b3c type=default subscriber=stop`,
}, strings.Split(strings.TrimSpace(logbuf.String()), "\n"); !cmp.Equal(want, have) {
t.Error(cmp.Diff(want, have))
}
Expand Down

0 comments on commit c3e93ec

Please sign in to comment.