Permalink
Browse files

feature(api): introduce influxdb montioring support for cmd/runner

  • Loading branch information...
Lee Hambley
Lee Hambley committed Dec 14, 2017
1 parent f082398 commit 1e961c138763e56025688fcd6445ce9d81506248
@@ -2,6 +2,7 @@ alecthomas
fatih
nsf
rogpeppe
influxdata
golang
sqs
zmb3
@@ -20,9 +20,11 @@ const imageName = "harrow-baseimage"
type LXD struct {
config *config.Config
connURL *url.URL
log logger.Logger
containerUuid string
log logger.Logger
reporter Reporter
containerUUID string
}
func (lxd *LXD) MakeContainer() error {
@@ -47,6 +49,8 @@ func (lxd *LXD) MakeContainer() error {
return errors.Wrap(err, "expected note about 'Starting <name>', wasn't recieved")
}
lxd.reporter.MadeContainer()
return nil
}
@@ -69,6 +73,7 @@ func (lxd *LXD) DestroyContainer() error {
output, err := session.CombinedOutput(cmdStr)
if err == nil {
lxd.reporter.DestroyedContainer()
return nil // container deleted successfully
}
@@ -84,13 +89,12 @@ func (lxd *LXD) DestroyContainer() error {
if errBack := goback.Wait(b); errBack != nil { // goback.ErrMaxAttemptsExceeded incase we're over-time
return errors.Wrap(err, "max attempts exceeded waiting to destroy container")
} else {
lxd.reporter.DestroyContainerWillRetry()
lxd.log.Info().Msgf("error destroying container, will retry: %s", err)
}
return nil
}
}
return nil
}
func (lxd *LXD) WaitForContainerNetworking(d time.Duration) error {
@@ -226,6 +230,7 @@ func (lxd *LXD) sshClient() (*ssh.Client, error) {
for {
client, err := ssh.Dial("tcp", lxd.connURL.Host, sshConfig)
if err != nil {
lxd.reporter.SSHError(err)
if errBack := goback.Wait(b); errBack != nil { // goback.ErrMaxAttemptsExceeded incase we're over-time
return nil, errors.Wrap(err, "max attempts exceeded waiting to dial ssh to host")
} else {
@@ -248,6 +253,7 @@ func (lxd *LXD) sshSession() (*ssh.Session, error) {
}
session, err := client.NewSession()
if err != nil {
lxd.reporter.SSHError(err)
if errBack := goback.Wait(b); errBack != nil { // goback.ErrMaxAttemptsExceeded incase we're over-time
return nil, errors.Wrap(err, "max attempts exceeded waiting to start ssh session")
} else {
@@ -260,7 +266,7 @@ func (lxd *LXD) sshSession() (*ssh.Session, error) {
}
func (lxd *LXD) containerName() string {
return fmt.Sprintf("%s-%s", lxd.prefix(), lxd.containerUuid)
return fmt.Sprintf("%s-%s", lxd.prefix(), lxd.containerUUID)
}
func (lxd *LXD) prefix() string {
@@ -51,6 +51,9 @@ func Main() {
activityBus := activity.NewAMQPTransport(config.AmqpConnectionString(), fmt.Sprintf("runner-%s", connStr))
defer activityBus.Close()
// Reporter tracks metrics and emits stats
var reporter = NewInfluxDBReporter(log, config.InfluxDBConfig(), *connStr)
// Configure the runner with the things we have (log, interval, etc)
// and start it in a goroutine
runner := &Runner{
@@ -59,6 +62,7 @@ func Main() {
interval: 60,
log: log.With().Str("host", *connStr).Logger(),
activitySink: activityBus,
reporter: reporter,
}
if err := runner.SetLXDConnStr(*connStr); err != nil {
@@ -99,6 +103,7 @@ O:
case s := <-stop:
log.Error().Msgf("received signal, stopping: %s", s)
reporter.Signal(s)
runner.Stop("sig")
break O
@@ -14,10 +14,11 @@ import (
)
type Operation struct {
db *sqlx.DB
config *config.Config
lxd *LXD
log logger.Logger
db *sqlx.DB
config *config.Config
lxd *LXD
log logger.Logger
reporter Reporter
op *domain.Operation
}
@@ -0,0 +1,124 @@
package runner
import (
"fmt"
"os"
"time"
"github.com/harrowio/harrow/config"
"github.com/harrowio/harrow/logger"
"github.com/influxdata/influxdb/client/v2"
)
// Reporter is the generic reporter for metrics
type Reporter interface {
// Waiting/polling
PolledNoWork()
PolledFoundWork()
// how long is our backlog?
WaitTime(time.Duration)
// did we fail to connect to the host?
SSHError(error)
// Heartbeats
Heartbeat()
LongPollLost()
// Containers
MadeContainer()
DestroyedContainer()
DestroyContainerWillRetry()
// What're we doing operationally?
Signal(os.Signal)
CleanShutdown()
}
type NoopReporter struct{}
func (_ NoopReporter) PolledNoWork() {}
func (_ NoopReporter) PolledFoundWork() {}
func (_ NoopReporter) WaitTime(_ time.Duration) {}
func (_ NoopReporter) SSHError(error) {}
func (_ NoopReporter) Heartbeat() {}
func (_ NoopReporter) LongPollLost() {}
func (_ NoopReporter) MadeContainer() {}
func (_ NoopReporter) DestroyedContainer() {}
func (_ NoopReporter) DestroyContainerWillRetry() {}
func (_ NoopReporter) Signal(os.Signal) {}
func (_ NoopReporter) CleanShutdown() {}
func NewInfluxDBReporter(logger logger.Logger, influxConf config.InfluxDBConfig, runnerName string) influxDBReporter {
// Create a new HTTPClient
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: influxConf.Addr,
Username: influxConf.Username,
Password: influxConf.Password,
Timeout: influxConf.Timeout,
})
if err != nil {
logger.Fatal().Msgf("can't make influxdb http client: %s", err)
}
q := client.NewQuery(fmt.Sprintf("CREATE DATABASE %s", influxConf.Database), "", "")
c.Query(q)
return influxDBReporter{c, map[string]string{"runner-name": runnerName}, influxConf.Database, logger}
}
type influxDBReporter struct {
client client.Client
defaultTags map[string]string
database string
logger logger.Logger
}
func (i influxDBReporter) recordPoint(pointName string, fields map[string]interface{}) {
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Database: i.database,
Precision: "s",
})
pt, err := client.NewPoint(pointName, i.defaultTags, fields, time.Now())
if err != nil {
i.logger.Error().Msgf("error making influxdb point: %s", err)
}
bp.AddPoint(pt)
i.client.Write(bp)
}
func (i influxDBReporter) PolledNoWork() {
i.recordPoint("polled_no_work", map[string]interface{}{"count": 1})
}
func (i influxDBReporter) PolledFoundWork() {
i.recordPoint("polled_found_work", map[string]interface{}{"count": 1})
}
func (i influxDBReporter) WaitTime(d time.Duration) {
i.recordPoint("wait_time", map[string]interface{}{"count": 1, "duration_ms": d * time.Millisecond})
}
func (i influxDBReporter) SSHError(err error) {
i.recordPoint("ssh_error", map[string]interface{}{"count": 1, "error_str": err.Error()})
}
func (i influxDBReporter) Heartbeat() {
i.recordPoint("heart_beat", map[string]interface{}{"count": 1})
}
func (i influxDBReporter) LongPollLost() {
i.recordPoint("long_poll_lost", map[string]interface{}{"count": 1})
}
func (i influxDBReporter) MadeContainer() {
i.recordPoint("made_container", map[string]interface{}{"count": 1})
}
func (i influxDBReporter) DestroyedContainer() {
i.recordPoint("destroyed_container", map[string]interface{}{"count": 1, "is_retry": false})
}
func (i influxDBReporter) DestroyContainerWillRetry() {
i.recordPoint("destroyed_container", map[string]interface{}{"count": 0, "is_retry": true})
}
func (i influxDBReporter) Signal(s os.Signal) {
i.recordPoint("got_signal", map[string]interface{}{"count": 1, "signal": s.String()})
}
func (i influxDBReporter) CleanShutdown() {
i.recordPoint("shutdown", map[string]interface{}{"count": 1})
}
@@ -18,7 +18,9 @@ import (
type OperationFromDbOrBus struct {
dbConnStr string
db *sqlx.DB
log logger.Logger
log logger.Logger
reporter Reporter
}
func (ofdob *OperationFromDbOrBus) WaitForNew(quit chan bool) bool { // look again
@@ -124,6 +126,7 @@ func (ofdob *OperationFromDbOrBus) Next() (*domain.Operation, error) {
ofdob.log.Debug().Msg(re.ReplaceAllString(query, " "))
err = tx.Get(op, query)
if err == sql.ErrNoRows {
ofdob.reporter.PolledNoWork()
ofdob.log.Debug().Msg("no rows found, but no errors")
return nil, nil
}
@@ -147,11 +150,15 @@ func (ofdob *OperationFromDbOrBus) Next() (*domain.Operation, error) {
return ofdob.Next()
}
if err := appendStatusLog(ofdob.log, tx, op.Uuid, "vm.reserved", fmt.Sprintf("Reserved, will be started (wait time %s)", time.Now().UTC().Sub(*op.CreatedAt))); err != nil {
var waitTime = time.Now().UTC().Sub(*op.CreatedAt)
ofdob.reporter.WaitTime(waitTime)
if err := appendStatusLog(ofdob.log, tx, op.Uuid, "vm.reserved", fmt.Sprintf("Reserved, will be started (wait time %s)", waitTime)); err != nil {
return nil, errors.Wrap(err, "could not append vm.reserved message to operation status logs")
}
tx.Commit()
ofdob.reporter.PolledFoundWork()
ofdob.log.Info().Str("runnable", "Next()").Msg("returning")
return op, nil
}
@@ -33,7 +33,10 @@ type Runner struct {
config *config.Config
interval int
errs chan error
// tracking
log logger.Logger
reporter Reporter
// internally set
connURL *url.URL
@@ -58,9 +61,10 @@ func (r *Runner) Start() {
)
r.lxd = &LXD{
config: r.config,
connURL: r.connURL,
log: r.log,
config: r.config,
connURL: r.connURL,
log: r.log,
reporter: r.reporter,
}
r.db, err = r.config.DB()
@@ -115,8 +119,10 @@ func (r *Runner) Start() {
go func(quit chan bool, sendOn chan<- *domain.Operation, db *sqlx.DB) {
opdob := OperationFromDbOrBus{
db: db,
log: r.log,
dbConnStr: pgDSN,
log: r.log,
reporter: r.reporter,
}
if err := opdob.NextOn(quit, sendOn); err != nil {
r.log.Info().Msgf("searching goroutine sending on err chan (%s)", err)
@@ -141,12 +147,15 @@ func (r *Runner) Start() {
// healchChecks might be a channel that never yields (if we never go into
// <-containerNetworkUp), but then we should exit with an error immediately
case err := <-connectionLost:
r.reporter.LongPollLost()
r.errs <- errors.Wrap(err, "long running container connection lost")
// healchChecks might be a channel that never yields (if we never go into
// <-containerNetworkUp), but then we should exit with an error immediately
case <-healthChecks:
if err := r.lxd.CheckContainerNetworking(); err != nil {
if err := r.lxd.CheckContainerNetworking(); err == nil {
r.reporter.Heartbeat()
} else {
r.errs <- errors.Wrap(err, "container failed periodic networking health check")
}
}
@@ -164,6 +173,7 @@ func (r *Runner) Stop(reason string) {
r.stopper = nil
r.log.Info().Msg("waiting for clean shutdown...")
<-stopped
r.reporter.CleanShutdown()
r.log.Info().Msg("got clean shutdown notice, returning from Stop()")
}
}
@@ -178,7 +188,7 @@ func (r *Runner) SetLXDConnStr(connStr string) (err error) {
func (r *Runner) MakeContainer(uuid string, res chan error) {
r.lxd.containerUuid = uuid
r.lxd.containerUUID = uuid
r.log.Debug().Msg("checking container health")
exists, err := r.lxd.ContainerExists()
@@ -55,3 +55,11 @@ func getEnvBoolWithDefault(k string, d bool) bool {
}
return val
}
func getEnvDurationWithDefault(k string, d time.Duration) time.Duration {
val, err := time.ParseDuration(getEnvWithDefault(k, fmt.Sprintf("%s", d)))
if err != nil {
panic(fmt.Sprintf("can't parse env variable '%s' as duration", k))
}
return val
}
@@ -0,0 +1,21 @@
package config
import "time"
type InfluxDBConfig struct {
Addr string
Username string
Password string
Database string
Timeout time.Duration
}
func (c *Config) InfluxDBConfig() InfluxDBConfig {
return InfluxDBConfig{
Addr: getEnvWithDefault("HAR_INFLUXDB_ADDR", "http://localhost/"),
Username: getEnvWithDefault("HAR_INFLUXDB_USERNAME", ""),
Password: getEnvWithDefault("HAR_INFLUXDB_PASSWORD", ""),
Database: getEnvWithDefault("HAR_INFLUXDB_PASSWORD", "har_no_database_specified"),
Timeout: getEnvDurationWithDefault("HAR_INFLUXDB_TIMEOUT", 5*time.Second),
}
}

0 comments on commit 1e961c1

Please sign in to comment.