Skip to content

Commit

Permalink
Merge pull request #83 from gdiazlo/agents
Browse files Browse the repository at this point in the history
Agents, metrics and some tests refactor
  • Loading branch information
gdiazlo authored Mar 19, 2019
2 parents 5fb2735 + 5bee53e commit daa0759
Show file tree
Hide file tree
Showing 30 changed files with 778 additions and 837 deletions.
8 changes: 4 additions & 4 deletions api/metricshttp/metricshttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ func NewMetricsHTTP(r *prometheus.Registry) *http.ServeMux {
prometheus.DefaultGatherer,
r,
}
mux.Handle(
"/metrics",
promhttp.InstrumentMetricHandler(r, promhttp.HandlerFor(g, promhttp.HandlerOpts{})),
)

handler := promhttp.HandlerFor(g, promhttp.HandlerOpts{})
instrumentedHandler := promhttp.InstrumentMetricHandler(r, handler)
mux.Handle("/metrics", instrumentedHandler)
return mux
}
8 changes: 4 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func NewHTTPClient(options ...HTTPClientOptionF) (*HTTPClient, error) {
if client.discoveryEnabled {
// try to discover the cluster topology initially
if err := client.discover(); err != nil {
return nil, err
log.Infof("Unable to get QED topology, we will try it later: %v", err)
}
}

Expand All @@ -153,9 +153,9 @@ func NewHTTPClient(options ...HTTPClientOptionF) (*HTTPClient, error) {
client.healthCheck(client.healthcheckTimeout)
}

// Ensure thath we have at least one endpoint, the primary, available
// Ensure that we have at least one endpoint, the primary, available
if !client.topology.HasActivePrimary() {
return nil, ErrNoPrimary
log.Infof("QED does not have a primary node or it is down, we will try it later.")
}

// if t.discoveryEnabled {
Expand Down Expand Up @@ -215,7 +215,7 @@ func (c *HTTPClient) setRetrier(maxRetries int) error {
// Create a Retrier that will wait for 100ms between requests.
ticks := make([]int, maxRetries)
for i := 0; i < len(ticks); i++ {
ticks[i] = 100
ticks[i] = 1000
}
backoff := NewSimpleBackoff(ticks...)
c.retrier = NewBackoffRequestRetrier(c.httpClient, c.maxRetries, backoff)
Expand Down
7 changes: 5 additions & 2 deletions cmd/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,26 @@ func newAgentCommand(cmdCtx *cmdContext, args []string) *cobra.Command {
f.StringVar(&config.NodeName, "node", "", "Unique name for node. If not set, fallback to hostname")
f.StringVar(&config.BindAddr, "bind", "", "Bind address for TCP/UDP gossip on (host:port)")
f.StringVar(&config.AdvertiseAddr, "advertise", "", "Address to advertise to cluster")
f.StringVar(&config.MetricsAddr, "metrics", "", "Address to bind metrics endpoint")
f.StringSliceVar(&config.StartJoin, "join", []string{}, "Comma-delimited list of nodes ([host]:port), through which a cluster can be joined")
f.StringSliceVar(&config.AlertsUrls, "alertsUrls", []string{}, "Comma-delimited list of Alert servers ([host]:port), through which an agent can post alerts")

// Lookups
v.BindPFlag("agent.node", f.Lookup("node"))
v.BindPFlag("agent.bind", f.Lookup("bind"))
v.BindPFlag("agent.advertise", f.Lookup("advertise"))
v.BindPFlag("agent.metrics", f.Lookup("metrics"))
v.BindPFlag("agent.join", f.Lookup("join"))
v.BindPFlag("agent.alert_urls", f.Lookup("alertsUrls"))
v.BindPFlag("agent.alerts_urls", f.Lookup("alertsUrls"))

agentPreRun := func(config gossip.Config) gossip.Config {
config.EnableCompression = true
config.NodeName = v.GetString("agent.node")
config.BindAddr = v.GetString("agent.bind")
config.AdvertiseAddr = v.GetString("agent.advertise")
config.MetricsAddr = v.GetString("agent.metrics")
config.StartJoin = v.GetStringSlice("agent.join")
config.AlertsUrls = v.GetStringSlice("agent.alert_urls")
config.AlertsUrls = v.GetStringSlice("agent.alerts_urls")

markStringRequired(config.NodeName, "node")
markStringRequired(config.BindAddr, "bind")
Expand Down
19 changes: 10 additions & 9 deletions cmd/agent_auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/bbva/qed/gossip/auditor"
"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/log"
"github.com/bbva/qed/metrics"
"github.com/bbva/qed/util"
)

Expand All @@ -40,13 +41,13 @@ func newAgentAuditorCommand(ctx *cmdContext, config gossip.Config, agentPreRun f
// must be curried.
config = agentPreRun(config)

// Bindings
auditorConfig.MetricsAddr = config.BindAddr // TODO: make MetricsAddr configurable
auditorConfig.QEDUrls = v.GetStringSlice("agent.server_urls")
auditorConfig.PubUrls = v.GetStringSlice("agent.alert_urls")
auditorConfig.PubUrls = v.GetStringSlice("agent.pub_urls")
auditorConfig.AlertsUrls = v.GetStringSlice("agent.alerts_urls")

markSliceStringRequired(auditorConfig.QEDUrls, "qedUrls")
markSliceStringRequired(auditorConfig.PubUrls, "pubUrls")
markSliceStringRequired(auditorConfig.AlertsUrls, "alertsUrls")
},
Run: func(cmd *cobra.Command, args []string) {

Expand All @@ -57,8 +58,8 @@ func newAgentAuditorCommand(ctx *cmdContext, config gossip.Config, agentPreRun f
if err != nil {
log.Fatalf("Failed to start the QED monitor: %v", err)
}

agent, err := gossip.NewAgent(&config, []gossip.Processor{auditor})
metricsServer := metrics.NewServer(config.MetricsAddr)
agent, err := gossip.NewAgent(&config, []gossip.Processor{auditor}, metricsServer)
if err != nil {
log.Fatalf("Failed to start the QED auditor: %v", err)
}
Expand All @@ -76,11 +77,11 @@ func newAgentAuditorCommand(ctx *cmdContext, config gossip.Config, agentPreRun f

f := cmd.Flags()
f.StringSliceVarP(&auditorConfig.QEDUrls, "qedUrls", "", []string{}, "Comma-delimited list of QED servers ([host]:port), through which an auditor can make queries")
f.StringSliceVarP(&auditorConfig.PubUrls, "pubUrls", "", []string{}, "Comma-delimited list of QED servers ([host]:port), through which an auditor can make queries")

f.StringSliceVarP(&auditorConfig.PubUrls, "pubUrls", "", []string{}, "Comma-delimited list of store servers ([host]:port), through which an auditor can make queries")
f.StringSliceVarP(&auditorConfig.AlertsUrls, "alertsUrls", "", []string{}, "Comma-delimited list of alerts servers ([host]:port), through which an auditor can make queries")
// Lookups
v.BindPFlag("agent.server_urls", f.Lookup("qedUrls"))
v.BindPFlag("agent.alert_urls", f.Lookup("pubUrls"))

v.BindPFlag("agent.pub_urls", f.Lookup("pubUrls"))
v.BindPFlag("agent.alerts_urls", f.Lookup("alertsUrls"))
return cmd
}
15 changes: 8 additions & 7 deletions cmd/agent_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/gossip/monitor"
"github.com/bbva/qed/log"
"github.com/bbva/qed/metrics"
"github.com/bbva/qed/util"
)

Expand All @@ -43,22 +44,22 @@ func newAgentMonitorCommand(ctx *cmdContext, config gossip.Config, agentPreRun f
// Bindings
monitorConfig.MetricsAddr = config.BindAddr // TODO: make MetricsAddr configurable
monitorConfig.QEDUrls = v.GetStringSlice("agent.server_urls")
monitorConfig.PubUrls = v.GetStringSlice("agent.alert_urls")
monitorConfig.AlertsUrls = v.GetStringSlice("agent.alerts_urls")

markSliceStringRequired(monitorConfig.QEDUrls, "qedUrls")
markSliceStringRequired(monitorConfig.PubUrls, "pubUrls")
markSliceStringRequired(monitorConfig.AlertsUrls, "alertsUrls")
},
Run: func(cmd *cobra.Command, args []string) {

config.Role = member.Monitor
monitorConfig.APIKey = ctx.apiKey

monitor, err := monitor.NewMonitor(*monitorConfig)
monitor, err := monitor.NewMonitor(monitorConfig)
if err != nil {
log.Fatalf("Failed to start the QED monitor: %v", err)
}

agent, err := gossip.NewAgent(&config, []gossip.Processor{monitor})
metricsServer := metrics.NewServer(config.MetricsAddr)
agent, err := gossip.NewAgent(&config, []gossip.Processor{monitor}, metricsServer)
if err != nil {
log.Fatalf("Failed to start the QED monitor: %v", err)
}
Expand All @@ -77,11 +78,11 @@ func newAgentMonitorCommand(ctx *cmdContext, config gossip.Config, agentPreRun f

f := cmd.Flags()
f.StringSliceVarP(&monitorConfig.QEDUrls, "qedUrls", "", []string{}, "Comma-delimited list of QED servers ([host]:port), through which a monitor can make queries")
f.StringSliceVarP(&monitorConfig.PubUrls, "pubUrls", "", []string{}, "Comma-delimited list of QED servers ([host]:port), through which an monitor can publish alerts")
f.StringSliceVarP(&monitorConfig.AlertsUrls, "alertsUrls", "", []string{}, "Comma-delimited list of QED servers ([host]:port), through which an monitor can publish alerts")

// Lookups
v.BindPFlag("agent.server_urls", f.Lookup("qedUrls"))
v.BindPFlag("agent.alert_urls", f.Lookup("pubUrls"))
v.BindPFlag("agent.alerts_urls", f.Lookup("alertsUrls"))

return cmd
}
12 changes: 8 additions & 4 deletions cmd/agent_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/gossip/publisher"
"github.com/bbva/qed/log"
"github.com/bbva/qed/metrics"
"github.com/bbva/qed/util"
)

Expand All @@ -43,8 +44,10 @@ func newAgentPublisherCommand(ctx *cmdContext, config gossip.Config, agentPreRun
// Bindings
publisherConfig.MetricsAddr = config.BindAddr // TODO: make MetricsAddr configurable
publisherConfig.PubUrls = v.GetStringSlice("agent.snapshots_store_urls")
publisherConfig.AlertsUrls = v.GetStringSlice("agent.alerts_urls")

markSliceStringRequired(publisherConfig.PubUrls, "pubUrls")
markSliceStringRequired(publisherConfig.AlertsUrls, "alertsUrls")
},
Run: func(cmd *cobra.Command, args []string) {

Expand All @@ -54,8 +57,8 @@ func newAgentPublisherCommand(ctx *cmdContext, config gossip.Config, agentPreRun
if err != nil {
log.Fatalf("Failed to start the QED publisher: %v", err)
}

agent, err := gossip.NewAgent(&config, []gossip.Processor{publisher})
metricsServer := metrics.NewServer(config.MetricsAddr)
agent, err := gossip.NewAgent(&config, []gossip.Processor{publisher}, metricsServer)
if err != nil {
log.Fatalf("Failed to start the QED publisher: %v", err)
}
Expand All @@ -72,11 +75,12 @@ func newAgentPublisherCommand(ctx *cmdContext, config gossip.Config, agentPreRun
}

f := cmd.Flags()
f.StringSliceVarP(&publisherConfig.PubUrls, "pubUrls", "", []string{},
"Comma-delimited list of end-publishers ([host]:port), through which an publisher can send requests")
f.StringSliceVarP(&publisherConfig.PubUrls, "pubUrls", "", []string{}, "Comma-delimited list of end-publishers ([host]:port), through which an publisher can send requests")
f.StringSliceVarP(&publisherConfig.AlertsUrls, "alertsUrls", "", []string{}, "Comma-delimited list of QED servers ([host]:port), through which an monitor can publish alerts")

// Lookups
v.BindPFlag("agent.snapshots_store_urls", f.Lookup("pubUrls"))
v.BindPFlag("agent.alerts_urls", f.Lookup("alertsUrls"))

return cmd
}
100 changes: 59 additions & 41 deletions gossip/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,51 @@ import (
"time"

"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/metrics"
"github.com/bbva/qed/protocol"
"github.com/coocood/freecache"
"github.com/hashicorp/memberlist"
)

type hashedBatch struct {
batch *protocol.BatchSnapshots
digest hashing.Digest
}

type Agent struct {
config *Config
Self *member.Peer

metricsServer *metrics.Server

memberlist *memberlist.Memberlist
broadcasts *memberlist.TransmitLimitedQueue

Topology *Topology

stateLock sync.Mutex

processed *freecache.Cache
processors []Processor

In chan *protocol.BatchSnapshots
In chan *hashedBatch
Out chan *protocol.BatchSnapshots
quit chan bool
}

func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) {
func NewAgent(conf *Config, p []Processor, m *metrics.Server) (agent *Agent, err error) {
log.Infof("New agent %s\n", conf.NodeName)
agent = &Agent{
config: conf,
Topology: NewTopology(),
processors: p,
In: make(chan *protocol.BatchSnapshots, 1<<16),
Out: make(chan *protocol.BatchSnapshots, 1<<16),
quit: make(chan bool),
config: conf,
metricsServer: m,
Topology: NewTopology(),
processors: p,
processed: freecache.NewCache(1 << 20),
In: make(chan *hashedBatch, 1<<16),
Out: make(chan *protocol.BatchSnapshots, 1<<16),
quit: make(chan bool),
}

bindIP, bindPort, err := conf.AddrParts(conf.BindAddr)
Expand All @@ -77,7 +90,6 @@ func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) {
conf.MemberlistConfig.AdvertisePort = advertisePort
conf.MemberlistConfig.Name = conf.NodeName
conf.MemberlistConfig.Logger = log.GetLogger()

// Configure delegates
conf.MemberlistConfig.Delegate = newAgentDelegate(agent)
conf.MemberlistConfig.Events = &eventDelegate{agent}
Expand Down Expand Up @@ -106,10 +118,12 @@ func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) {

return agent, nil
}
func chTimedSend(batch *protocol.BatchSnapshots, ch chan *protocol.BatchSnapshots) {

// Send a batch into a queue channel with the agent TimeoutQueues timeout.
func (a *Agent) ChTimedSend(batch *protocol.BatchSnapshots, ch chan *protocol.BatchSnapshots) {
for {
select {
case <-time.After(200 * time.Millisecond):
case <-time.After(a.config.TimeoutQueues):
log.Infof("Agent timed out enqueueing batch in out channel")
return
case ch <- batch:
Expand All @@ -119,47 +133,49 @@ func chTimedSend(batch *protocol.BatchSnapshots, ch chan *protocol.BatchSnapshot
}

func (a *Agent) start() {
outTicker := time.NewTicker(2 * time.Second)

for _, p := range a.processors {
p.RegisterMetrics(a.metricsServer)
}

go func() {
a.metricsServer.Start()
}()

for {
select {
case batch := <-a.In:
case hashedBatch := <-a.In:
_, err := a.processed.Get(hashedBatch.digest)
if err == nil {
continue
}
a.processed.Set(hashedBatch.digest, []byte{0x0}, 0)

for _, p := range a.processors {
go p.Process(*batch)
go p.Process(hashedBatch.batch)
}
chTimedSend(batch, a.Out)
case <-outTicker.C:
go a.sendOutQueue()
a.ChTimedSend(hashedBatch.batch, a.Out)
case b := <-a.Out:
go a.send(b)
case <-a.quit:
return
}
}
}

func batchId(b *protocol.BatchSnapshots) string {
return fmt.Sprintf("( ttl %d, lv %d)", b.TTL, b.Snapshots[len(b.Snapshots)-1].Snapshot.Version)
}

func (a *Agent) sendOutQueue() {
var batch *protocol.BatchSnapshots
for {
select {
case batch = <-a.Out:
default:
return
}
func (a *Agent) send(batch *protocol.BatchSnapshots) {

if batch.TTL <= 0 {
continue
}
if batch.TTL <= 0 {
return
}

batch.TTL -= 1
from := batch.From
batch.From = a.Self
msg, _ := batch.Encode()
for _, dst := range a.route(from) {
log.Debugf("Sending %+v to %+v\n", batchId(batch), dst.Name)
a.memberlist.SendReliable(dst, msg)
}
batch.TTL -= 1
from := batch.From
batch.From = a.Self
msg, _ := batch.Encode()
for _, dst := range a.route(from) {
log.Debugf("Sending batch to %+v\n", dst.Name)
a.memberlist.SendReliable(dst, msg)
}
}

Expand Down Expand Up @@ -242,10 +258,12 @@ func (a *Agent) Leave() error {
//
// It is safe to call this method multiple times.
func (a *Agent) Shutdown() error {
log.Info("\nShutting down agent %s", a.config.NodeName)
log.Infof("Shutting down agent %s", a.config.NodeName)
a.stateLock.Lock()
defer a.stateLock.Unlock()

a.metricsServer.Shutdown()

if a.Self.Status == member.Shutdown {
return nil
}
Expand Down
Loading

0 comments on commit daa0759

Please sign in to comment.