Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

feat: support managing heartbeat #12

Merged
merged 5 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions server/grpcservice/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
)

var (
ErrRecvHeartbeat = coderr.NewCodeError(coderr.Internal, "receive heartbeat")
ErrRegisterHeartbeatSender = coderr.NewCodeError(coderr.Internal, "register heartbeat sender")
ErrRecvHeartbeat = coderr.NewCodeError(coderr.Internal, "receive heartbeat")
ErrBindHeartbeatStream = coderr.NewCodeError(coderr.Internal, "bind heartbeat sender")
ErrUnbindHeartbeatStream = coderr.NewCodeError(coderr.Internal, "unbind heartbeat sender")
)
3 changes: 0 additions & 3 deletions server/grpcservice/handler.go

This file was deleted.

76 changes: 59 additions & 17 deletions server/grpcservice/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,73 @@ func NewService(opTimeout time.Duration, h Handler) *Service {
}
}

type HeartbeatSender interface {
type HeartbeatStreamSender interface {
Send(response *metapb.NodeHeartbeatResponse) error
}

// Handler is needed by grpc service to process the requests.
type Handler interface {
ShiKaiWi marked this conversation as resolved.
Show resolved Hide resolved
BindHeartbeatStream(ctx context.Context, node string, sender HeartbeatSender) error
UnbindHeartbeatStream(ctx context.Context, node string) error
BindHeartbeatStream(ctx context.Context, node string, sender HeartbeatStreamSender) error
ProcessHeartbeat(ctx context.Context, req *metapb.NodeHeartbeatRequest) error

// TODO: define the methods for handling other grpc requests.
}

func (s *Service) NodeHeartbeat(heartbeatSrv metapb.CeresmetaRpcService_NodeHeartbeatServer) error {
ctx, cancel := context.WithCancel(context.Background())
type streamBinder struct {
timeout time.Duration
h Handler
stream HeartbeatStreamSender

// States of the binder which may be updated.
node string
bound bool
}

func (b *streamBinder) bindIfNot(ctx context.Context, node string) error {
if b.bound {
return nil
}

ctx, cancel := context.WithTimeout(ctx, b.timeout)
defer cancel()
if err := b.h.BindHeartbeatStream(ctx, node, b.stream); err != nil {
return ErrBindHeartbeatStream.WithCausef("node:%s, err:%v", node, err)
}

isStreamBound := false
bindStreamIfNot := func(ctx context.Context, node string) {
if isStreamBound {
return
}
b.bound = true
b.node = node
return nil
}

ctx1, cancel := context.WithTimeout(ctx, s.opTimeout)
defer cancel()
func (b *streamBinder) unbind(ctx context.Context) error {
if !b.bound {
return nil
}

if err := s.h.BindHeartbeatStream(ctx1, node, heartbeatSrv); err != nil {
log.Error("fail to bind node stream", zap.String("node", node), zap.Error(err))
} else {
isStreamBound = true
}
ctx, cancel := context.WithTimeout(ctx, b.timeout)
defer cancel()
if err := b.h.UnbindHeartbeatStream(ctx, b.node); err != nil {
return ErrUnbindHeartbeatStream.WithCausef("node:%s, err:%v", b.node, err)
}

return nil
}

func (s *Service) NodeHeartbeat(heartbeatSrv metapb.CeresmetaRpcService_NodeHeartbeatServer) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

binder := streamBinder{
timeout: s.opTimeout,
h: s.h,
stream: heartbeatSrv,
}
defer func() {
if err := binder.unbind(ctx); err != nil {
log.Error("fail to unbind stream", zap.Error(err))
}
}()

// Process the message from the stream sequentially.
for {
Expand All @@ -66,7 +105,10 @@ func (s *Service) NodeHeartbeat(heartbeatSrv metapb.CeresmetaRpcService_NodeHear
return ErrRecvHeartbeat.WithCause(err)
}

bindStreamIfNot(ctx, req.Info.Node)
if err := binder.bindIfNot(ctx, req.Info.Node); err != nil {
log.Error("fail to bind node stream", zap.Error(err))
}

func() {
ctx1, cancel := context.WithTimeout(ctx, s.opTimeout)
defer cancel()
Expand Down
14 changes: 14 additions & 0 deletions server/schedule/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
*/

package schedule

import "github.com/CeresDB/ceresmeta/pkg/coderr"

var (
ErrStreamNotAvailable = coderr.NewCodeError(coderr.Internal, "stream to node is not available")
ErrStreamSendMsg = coderr.NewCodeError(coderr.Internal, "send msg by stream to node")
ErrStreamSendTimeout = coderr.NewCodeError(coderr.Internal, "send msg timeout")
ErrHeartbeatStreamsClosed = coderr.NewCodeError(coderr.Internal, "HeartbeatStreams closed")
)
136 changes: 136 additions & 0 deletions server/schedule/hbstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

package schedule

import (
"context"
"sync"

"github.com/CeresDB/ceresdbproto/pkg/metapb"
"github.com/CeresDB/ceresmeta/pkg/log"
"go.uber.org/zap"
)

const (
defaultHeartbeatMsgCap int = 1024
)

type sendReq struct {
ctx context.Context
node string
msg *metapb.NodeHeartbeatResponse
}

// HeartbeatStreams manages all the streams connected by ceresdb node.
type HeartbeatStreams struct {
ctx context.Context
cancel context.CancelFunc
bgJobWg *sync.WaitGroup

reqCh chan *sendReq

// mu protects nodeStreams
mu *sync.RWMutex
// TODO: now these streams only can be removed by Unbind method and it is better add a active way to clear the dead
// streams in background.
nodeStreams map[string]HeartbeatStreamSender
}

func NewHeartbeatStreams(ctx context.Context) *HeartbeatStreams {
ctx, cancel := context.WithCancel(ctx)
h := &HeartbeatStreams{
ctx: ctx,
cancel: cancel,
bgJobWg: &sync.WaitGroup{},

reqCh: make(chan *sendReq, defaultHeartbeatMsgCap),
mu: &sync.RWMutex{},
nodeStreams: make(map[string]HeartbeatStreamSender),
}

go h.runBgJob()

return h
}

type HeartbeatStreamSender interface {
Send(response *metapb.NodeHeartbeatResponse) error
}

func (h *HeartbeatStreams) runBgJob() {
h.bgJobWg.Add(1)
defer h.bgJobWg.Done()

for {
select {
case req := <-h.reqCh:
err := h.sendMsgOnce(req)
if err != nil {
log.Error("fail to send msg", zap.Error(err), zap.String("node", req.node), zap.Any("msg", req.msg))
}
case <-h.ctx.Done():
log.Warn("exit from background jobs")
return
}
}
}

func (h *HeartbeatStreams) sendMsgOnce(hbMsg *sendReq) error {
sender := h.getStream(hbMsg.node)
if sender == nil {
return ErrStreamNotAvailable
}

// TODO: set a timeout for sending messages
if err := sender.Send(hbMsg.msg); err != nil {
return ErrStreamSendMsg.WithCause(err)
}

return nil
}

// getStream finds and returns the sender bound to the node, and it is nil if not exist.
func (h *HeartbeatStreams) getStream(node string) HeartbeatStreamSender {
h.mu.RLock()
defer h.mu.RUnlock()

return h.nodeStreams[node]
}

func (h *HeartbeatStreams) Bind(node string, sender HeartbeatStreamSender) {
h.mu.Lock()
defer h.mu.Unlock()

h.nodeStreams[node] = sender
}

func (h *HeartbeatStreams) Unbind(node string) {
h.mu.Lock()
defer h.mu.Unlock()

delete(h.nodeStreams, node)
}

// SendMsgAsync sends messages to node and this procedure is asynchronous.
func (h *HeartbeatStreams) SendMsgAsync(ctx context.Context, node string, msg *metapb.NodeHeartbeatResponse) error {
req := &sendReq{
ctx,
node,
msg,
}

select {
case h.reqCh <- req:
return nil
case <-ctx.Done():
return ErrStreamSendTimeout
case <-h.ctx.Done():
return ErrHeartbeatStreamsClosed
}
}

// Close cancels and waits for all the waiting goroutines.
func (h *HeartbeatStreams) Close() {
h.cancel()
h.bgJobWg.Wait()
}
20 changes: 16 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/CeresDB/ceresmeta/server/etcdutil"
"github.com/CeresDB/ceresmeta/server/grpcservice"
"github.com/CeresDB/ceresmeta/server/member"
"github.com/CeresDB/ceresmeta/server/schedule"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.uber.org/zap"
Expand All @@ -26,6 +27,7 @@ type Server struct {
etcdCfg *embed.Config

// The fields below are initialized after Run of server is called.
hbStreams *schedule.HeartbeatStreams

// member describes membership in ceresmeta cluster.
member *member.Member
Expand Down Expand Up @@ -66,7 +68,7 @@ func (srv *Server) Run(ctx context.Context) error {
return err
}

if err := srv.startHTTPServer(ctx); err != nil {
if err := srv.startServer(ctx); err != nil {
return err
}

Expand All @@ -77,6 +79,7 @@ func (srv *Server) Run(ctx context.Context) error {

func (srv *Server) Close() {
atomic.StoreInt32(&srv.isClosed, 1)

srv.stopBgJobs()

if srv.etcdCli != nil {
Expand All @@ -86,6 +89,8 @@ func (srv *Server) Close() {
}
}

srv.hbStreams.Close()

// TODO: release other resources: httpclient, etcd server and so on.
}

Expand Down Expand Up @@ -126,8 +131,9 @@ func (srv *Server) startEtcd(ctx context.Context) error {
return nil
}

/// startServer starts the http services.
func (srv *Server) startHTTPServer(_ context.Context) error {
/// startServer starts involved services.
func (srv *Server) startServer(ctx context.Context) error {
srv.hbStreams = schedule.NewHeartbeatStreams(ctx)
return nil
}

Expand Down Expand Up @@ -176,7 +182,13 @@ func (ctx *leaderWatchContext) EtcdLeaderID() uint64 {
return ctx.srv.etcdSrv.Server.Lead()
}

func (*Server) BindHeartbeatStream(_ context.Context, _ string, _ grpcservice.HeartbeatSender) error {
func (srv *Server) BindHeartbeatStream(_ context.Context, node string, sender grpcservice.HeartbeatStreamSender) error {
srv.hbStreams.Bind(node, sender)
return nil
}

func (srv *Server) UnbindHeartbeatStream(_ context.Context, node string) error {
srv.hbStreams.Unbind(node)
return nil
}

Expand Down