Skip to content

Commit

Permalink
Make HAManager configurable by Blip users
Browse files Browse the repository at this point in the history
  • Loading branch information
prudhvi committed Dec 19, 2023
1 parent 6d6adfc commit 6873106
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 1 deletion.
50 changes: 50 additions & 0 deletions ha/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package ha

import (
"fmt"
"github.com/cashapp/blip"
"sync"
)

type HAManagerFactory interface {
Make(monitor blip.ConfigMonitor) (Manager, error)
}

func Register(f HAManagerFactory) error {
hf.Lock()
defer hf.Unlock()
if hf.ha != nil {
return fmt.Errorf("HA already registered")
}
hf.ha = f
blip.Debug("register HA")
return nil
}

func Make(args blip.ConfigMonitor) (Manager, error) {
hf.Lock()
defer hf.Unlock()
if hf.ha == nil {
return nil, fmt.Errorf("HA not registered")
}
return hf.ha.Make(args)
}

type haf struct {
*sync.Mutex
ha HAManagerFactory
}

type defaultFactory struct {
}

var f = &defaultFactory{}

var hf = &haf{
Mutex: &sync.Mutex{},
ha: f,
}

func (f *defaultFactory) Make(_ blip.ConfigMonitor) (Manager, error) {
return Disabled, nil
}
8 changes: 8 additions & 0 deletions monitor/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"crypto/sha256"
"errors"
"fmt"
"github.com/cashapp/blip/ha"
"os"
"sync"
"time"
Expand Down Expand Up @@ -463,11 +464,18 @@ func (ml *Loader) makeMonitor(cfg blip.ConfigMonitor) (*Monitor, error) {
sinks = append(sinks, sink)
}

var ham ha.Manager
ham, err := ha.Make(cfg)
if err != nil {
return nil, err
}

mon := NewMonitor(MonitorArgs{
Config: cfg,
DbMaker: ml.factory.DbConn,
PlanLoader: ml.planLoader,
Sinks: sinks,
HA: ham,
TransformMetric: ml.plugin.TransformMetrics,
})
return mon, nil
Expand Down
5 changes: 4 additions & 1 deletion monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Monitor struct {

event event.MonitorReceiver
retry *backoff.ExponentialBackOff
ha ha.Manager
}

// MonitorArgs are required arguments to NewMonitor.
Expand All @@ -82,6 +83,7 @@ type MonitorArgs struct {
PlanLoader *plan.Loader
Sinks []blip.Sink
TransformMetric func(metrics *blip.Metrics) error
HA ha.Manager
}

// NewMonitor creates a new Monitor with the given arguments. The caller must
Expand All @@ -98,6 +100,7 @@ func NewMonitor(args MonitorArgs) *Monitor {
planLoader: args.PlanLoader,
sinks: args.Sinks,
transformMetric: args.TransformMetric,
ha: args.HA,
// --
runMux: &sync.RWMutex{},
wg: sync.WaitGroup{},
Expand Down Expand Up @@ -422,7 +425,7 @@ func (m *Monitor) startup() error {
Config: m.cfg.Plans.Change,
DB: m.db,
LCO: m.lco,
HA: ha.Disabled,
HA: m.ha,
})

m.wg.Add(1)
Expand Down

0 comments on commit 6873106

Please sign in to comment.