Skip to content

Commit

Permalink
[central] Formalize node information
Browse files Browse the repository at this point in the history
- central set remoteAddr in response to node register, so it knows its
own ip address in server's view
  - this is a hack because we haven't implement obtain ip address of the
client itself on the client side #18
  • Loading branch information
at15 committed Mar 3, 2018
1 parent 07d58f5 commit c111850
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 181 deletions.
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion bhubcentral.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,10 @@ http:
key: icehub.key
enableTracing: false
meta:
provider: mem
provider: mem
node:
role: "central" # workload_*, database_*
provider:
name: "at15-local-cloud"
region: "us-west"
instance: "GameBoyAdvance"
48 changes: 37 additions & 11 deletions pkg/agent/server/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package server

import (
"context"
"time"

"github.com/dyweb/gommon/errors"
dlog "github.com/dyweb/gommon/log"
"time"

"github.com/benchhub/benchhub/pkg/agent/config"
cpb "github.com/benchhub/benchhub/pkg/central/centralpb"
"github.com/benchhub/benchhub/pkg/central/transport/grpc"
pbc "github.com/benchhub/benchhub/pkg/common/commonpb"
"github.com/benchhub/benchhub/pkg/common/nodeutil"
"net"
"strconv"
)

const (
Expand All @@ -20,16 +23,18 @@ const (

// Beater keep posting the server about agent state and retrieve job status
type Beater struct {
client grpc.BenchHubCentralClient
interval time.Duration
registered bool
log *dlog.Logger
client grpc.BenchHubCentralClient
interval time.Duration
globalConfig config.ServerConfig
registered bool
log *dlog.Logger
}

func NewBeater(client grpc.BenchHubCentralClient, interval time.Duration) *Beater {
func NewBeater(client grpc.BenchHubCentralClient, interval time.Duration, cfg config.ServerConfig) *Beater {
b := &Beater{
client: client,
interval: interval,
client: client,
interval: interval,
globalConfig: cfg,
}
dlog.NewStructLogger(log, b)
return b
Expand All @@ -51,6 +56,7 @@ func (b *Beater) RunWithContext(ctx context.Context) error {
} else {
b.log.Infof("register success")
b.registered = true
// TODO: publish event inside process? need a place to know node's state
}
} else {
// TODO: real heart beat logic
Expand All @@ -67,8 +73,13 @@ func (b *Beater) Register() error {
ctx, cancel := context.WithTimeout(context.Background(), registerTimeout)
defer cancel()
node, err := nodeutil.GetNode()
// TODO: update bindAddr, ip, port, etc.
// TODO: update provider etc.
node.BindAdrr = b.globalConfig.Grpc.Addr
node.BindIp, node.BindPort = splitHostPort(node.BindAdrr)
node.Provider = pbc.NodeProvider{
Name: b.globalConfig.Node.Provider.Name,
Region: b.globalConfig.Node.Provider.Region,
Instance: b.globalConfig.Node.Provider.Instance,
}
if err != nil {
return err
}
Expand All @@ -82,3 +93,18 @@ func (b *Beater) Register() error {
b.log.Infof("register res id is %s", res.Id)
return nil
}

// FIXME: exact duplicated code in central and agent, this should go to go.ice
func splitHostPort(addr string) (string, int64) {
_, ps, err := net.SplitHostPort(addr)
if err != nil {
log.Warnf("failed to split host port %s %v", addr, err)
return "", 0
}
p, err := strconv.Atoi(ps)
if err != nil {
log.Warnf("failed to convert port number %s to int %v", ps, err)
return ps, int64(p)
}
return ps, int64(p)
}
2 changes: 1 addition & 1 deletion pkg/agent/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewManager(cfg config.ServerConfig) (*Manager, error) {
}
//log.Fatalf("interval is %s %d", beatInterval, beatInterval)
client := crpc.NewClient(conn)
beater := NewBeater(client, beatInterval)
beater := NewBeater(client, beatInterval, cfg)
mgr := &Manager{
cfg: cfg,
grpcSrv: grpcSrv,
Expand Down
2 changes: 2 additions & 0 deletions pkg/central/config/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
iconfig "github.com/at15/go.ice/ice/config"
cconfig "github.com/benchhub/benchhub/pkg/common/config"
)

type MetaConfig struct {
Expand All @@ -12,4 +13,5 @@ type ServerConfig struct {
Http iconfig.HttpServerConfig `yaml:"http"`
Grpc iconfig.GrpcServerConfig `yaml:"grpc"`
Meta MetaConfig `yaml:"meta"`
Node cconfig.NodeConfig `yaml:"node"`
}
57 changes: 39 additions & 18 deletions pkg/central/server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ package server
import (
"context"
"fmt"
"net"
"os"
"strconv"

igrpc "github.com/at15/go.ice/ice/transport/grpc"
dlog "github.com/dyweb/gommon/log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

pb "github.com/benchhub/benchhub/pkg/central/centralpb"
"github.com/benchhub/benchhub/pkg/central/config"
"github.com/benchhub/benchhub/pkg/central/store/meta"
rpc "github.com/benchhub/benchhub/pkg/central/transport/grpc"
pbc "github.com/benchhub/benchhub/pkg/common/commonpb"
Expand All @@ -19,52 +23,54 @@ import (
var _ rpc.BenchHubCentralServer = (*GrpcServer)(nil)

type GrpcServer struct {
meta meta.Provider
log *dlog.Logger
meta meta.Provider
globalConfig config.ServerConfig
log *dlog.Logger
}

func NewGrpcServer(meta meta.Provider) (*GrpcServer, error) {
func NewGrpcServer(meta meta.Provider, cfg config.ServerConfig) (*GrpcServer, error) {
srv := &GrpcServer{
meta: meta,
meta: meta,
globalConfig: cfg,
}
dlog.NewStructLogger(log, srv)
return srv, nil
}

// TODO: get peer information
// TODO: https://groups.google.com/forum/#!topic/grpc-io/UodEY4N78Sk
// tell the agent what its address in central's perspective,
//peer, err := peer.FromContext(ctx)
//peer.Addr

func (srv *GrpcServer) Ping(ctx context.Context, ping *pbc.Ping) (*pbc.Pong, error) {
srv.log.Infof("got ping, message is %s", ping.Message)
res := fmt.Sprintf("pong from central %s your message is %s", hostname(), ping.Message)
return &pbc.Pong{Message: res}, nil
}

func (srv *GrpcServer) RegisterAgent(ctx context.Context, req *pb.RegisterAgentReq) (*pb.RegisterAgentRes, error) {
// TODO:
// - check if the node is already registered
// - assign it id
// - return information about itself
srv.log.Infof("register agent req from %s", req.Node.Host)
remoteAddr := igrpc.RemoteAddr(ctx)
srv.log.Infof("register agent req from %s %s", remoteAddr, req.Node.Host)
req.Node.RemoteAddr = remoteAddr

err := srv.meta.AddNode(req.Node.Uid, req.Node)
if err != nil {
log.Warnf("failed to add node %v", err)
// TODO: already exists may not be the only cause .... though for in memory, it should be ...
return nil, status.Errorf(codes.AlreadyExists, "failed to add node %v", err)
}
central, err := nodeutil.GetNode()
// TODO: update bindAddr, ip, port, etc.

node, err := nodeutil.GetNode()
if err != nil {
log.Warnf("failed to get central node info %v", err)
return nil, status.Errorf(codes.Internal, "failed to get central node info %v", err)
}
node.BindAdrr = srv.globalConfig.Grpc.Addr
node.BindIp, node.BindPort = splitHostPort(node.BindAdrr)
node.Provider = pbc.NodeProvider{
Name: srv.globalConfig.Node.Provider.Name,
Region: srv.globalConfig.Node.Provider.Region,
Instance: srv.globalConfig.Node.Provider.Instance,
}
res := &pb.RegisterAgentRes{
Id: req.Node.Uid,
Node: req.Node,
Central: *central,
Central: *node,
}
return res, nil
}
Expand All @@ -81,3 +87,18 @@ func hostname() string {
return host
}
}

// FIXME: exact duplicated code in central and agent, this should go to go.ice
func splitHostPort(addr string) (string, int64) {
_, ps, err := net.SplitHostPort(addr)
if err != nil {
log.Warnf("failed to split host port %s %v", addr, err)
return "", 0
}
p, err := strconv.Atoi(ps)
if err != nil {
log.Warnf("failed to convert port number %s to int %v", ps, err)
return ps, int64(p)
}
return ps, int64(p)
}
2 changes: 1 addition & 1 deletion pkg/central/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewManager(cfg config.ServerConfig) (*Manager, error) {
if err != nil {
return nil, errors.Wrap(err, "manager can't create meta store")
}
grpcSrv, err := NewGrpcServer(metaStore)
grpcSrv, err := NewGrpcServer(metaStore, cfg)
if err != nil {
return nil, errors.Wrap(err, "manager can't create grpc server")
}
Expand Down

0 comments on commit c111850

Please sign in to comment.