Skip to content

Commit

Permalink
[agent] Add cron for register and heartbeat
Browse files Browse the repository at this point in the history
- if register success, switch to heartbeat mode
  • Loading branch information
at15 committed Mar 3, 2018
1 parent 40582af commit 07d58f5
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 15 deletions.
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ install:
go install -ldflags "$(FLAGS)" ./cmd/bhubagent
go install -ldflags "$(FLAGS)" ./cmd/bhubcentral
go install -ldflags "$(FLAGS)" ./cmd/bhubctl
go install -ldflags "$(FLAGS)" ./cmd/bhubdoctor
# go install -ldflags "$(FLAGS)" ./cmd/bhubdoctor

.PHONY: clean
clean:
rm $(shell which bhubagent)
rm $(shell which bhubcentral)
rm $(shell which bhubctl)

.PHONY: generate
generate:
Expand Down
2 changes: 2 additions & 0 deletions bhubagent.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ http:
enableTracing: false
central:
addr: "localhost:6081"
heartbeat:
interval: "1s"
node:
role: "any" # workload_*, database_*
provider:
Expand Down
13 changes: 9 additions & 4 deletions pkg/agent/config/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ type CentralConfig struct {
Addr string `yaml:"addr"`
}

type HeartbeatConfig struct {
Interval string `yaml:"interval"` // TODO: this requires time.ParseDuration
}

type ServerConfig struct {
Http iconfig.HttpServerConfig `yaml:"http"`
Grpc iconfig.GrpcServerConfig `yaml:"grpc"`
Central CentralConfig `yaml:"central"`
Node cconfig.NodeConfig `yaml:"node"`
Http iconfig.HttpServerConfig `yaml:"http"`
Grpc iconfig.GrpcServerConfig `yaml:"grpc"`
Central CentralConfig `yaml:"central"`
Node cconfig.NodeConfig `yaml:"node"`
Heartbeat HeartbeatConfig `yaml:"heartbeat"`
}
39 changes: 33 additions & 6 deletions pkg/agent/server/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,45 @@ const (

// Beater keep posting the server about agent state and retrieve job status
type Beater struct {
client grpc.BenchHubCentralClient
log *dlog.Logger
client grpc.BenchHubCentralClient
interval time.Duration
registered bool
log *dlog.Logger
}

func NewBeater(client grpc.BenchHubCentralClient) *Beater {
b := &Beater{client: client}
func NewBeater(client grpc.BenchHubCentralClient, interval time.Duration) *Beater {
b := &Beater{
client: client,
interval: interval,
}
dlog.NewStructLogger(log, b)
return b
}

func (b *Beater) RunWithContext(ctx context.Context) error {
// TODO: based on agent state, either register or heartbeat
b.log.Warn(b.Register())
for {
select {
case <-ctx.Done():
// TODO: should we return nil or return context error?
b.log.Infof("beater stop due to context finished, its error is %v", ctx.Err())
return nil
default:
// TODO: based on agent state, either register or heartbeat
if !b.registered {
err := b.Register()
if err != nil {
b.log.Warnf("register failed %s, retry in %s", err.Error(), b.interval)
} else {
b.log.Infof("register success")
b.registered = true
}
} else {
// TODO: real heart beat logic
b.log.Infof("TODO: heartbeat")
}
time.Sleep(b.interval)
}
}
return nil
}

Expand All @@ -42,6 +68,7 @@ func (b *Beater) Register() error {
defer cancel()
node, err := nodeutil.GetNode()
// TODO: update bindAddr, ip, port, etc.
// TODO: update provider etc.
if err != nil {
return err
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/agent/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"sync"
"time"

igrpc "github.com/at15/go.ice/ice/transport/grpc"
ihttp "github.com/at15/go.ice/ice/transport/http"
Expand Down Expand Up @@ -55,8 +56,13 @@ func NewManager(cfg config.ServerConfig) (*Manager, error) {
if err != nil {
return nil, errors.Wrap(err, "can't dial central server")
}
beatInterval, err := time.ParseDuration(cfg.Heartbeat.Interval)
if err != nil || beatInterval <= 0 {
return nil, errors.Wrapf(err, "invalid heartbeat interval config %d", beatInterval)
}
//log.Fatalf("interval is %s %d", beatInterval, beatInterval)
client := crpc.NewClient(conn)
beater := NewBeater(client)
beater := NewBeater(client, beatInterval)
mgr := &Manager{
cfg: cfg,
grpcSrv: grpcSrv,
Expand Down Expand Up @@ -136,9 +142,7 @@ func (mgr *Manager) Run() error {
return
}
}()
// create client for central server
//if conn, err := grpc.Dial(cfg)
// register + heartbeat, client for central
// heartbeat with server
go func() {
// TODO: logic here might be incorrect, beater can exit if ctx is canceled by other go routine, i.e. grpc, http server
if err := mgr.beater.RunWithContext(ctx); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/central/server/node_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package server

// node monitor check status of registered node to update their status

0 comments on commit 07d58f5

Please sign in to comment.