Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add failover process between liaison and data nodes #433

Merged
merged 7 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ jobs:
uses: ./.github/workflows/test.yml
with:
options: --fail-fast --label-filter \\!slow
timeout-minutes: 30

result:
name: Continuous Integration
Expand Down
10 changes: 9 additions & 1 deletion banyand/dquery/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package dquery

import (
"time"

"go.uber.org/multierr"

"github.com/apache/skywalking-banyandb/api/common"
Expand All @@ -31,6 +33,8 @@ import (
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
)

const defaultTopNQueryTimeout = 10 * time.Second

type topNQueryProcessor struct {
broadcaster bus.Broadcaster
*queryService
Expand All @@ -52,7 +56,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
agg := request.Agg
request.Agg = modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED
now := bus.MessageID(request.TimeRange.Begin.Nanos)
ff, err := t.broadcaster.Broadcast(data.TopicTopNQuery, bus.NewMessage(now, request))
ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout, data.TopicTopNQuery, bus.NewMessage(now, request))
if err != nil {
resp = bus.NewMessage(now, common.NewError("execute the query %s: %v", request.Metadata.GetName(), err))
return
Expand Down Expand Up @@ -87,6 +91,10 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
}
}
}
if allErr != nil {
resp = bus.NewMessage(now, common.NewError("execute the query %s: %v", request.Metadata.GetName(), allErr))
return
}
if tags == nil {
resp = bus.NewMessage(now, &measurev1.TopNResponse{})
return
Expand Down
15 changes: 11 additions & 4 deletions banyand/internal/storage/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,23 @@ func TestSeriesIndexController(t *testing.T) {
require.NoError(t, err)
defer sic.Close()
require.NoError(t, sic.run(time.Now().Add(-time.Hour*23+10*time.Minute)))
require.NotNil(t, sic.standby)
sic.RLock()
standby := sic.standby
sic.RUnlock()
require.NotNil(t, standby)
idxNames := make([]string, 0)
walkDir(tmpDir, "idx-", func(suffix string) error {
idxNames = append(idxNames, suffix)
return nil
})
assert.Equal(t, 2, len(idxNames))
nextTime := sic.standby.startTime
nextTime := standby.startTime
require.NoError(t, sic.run(time.Now().Add(time.Hour)))
require.Nil(t, sic.standby)
assert.Equal(t, nextTime, sic.hot.startTime)
sic.RLock()
standby = sic.standby
hot := sic.hot
sic.RUnlock()
require.Nil(t, standby)
assert.Equal(t, nextTime, hot.startTime)
})
}
3 changes: 2 additions & 1 deletion banyand/liaison/grpc/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type measureService struct {
ingestionAccessLog accesslog.Log
pipeline queue.Client
broadcaster queue.Client
writeTimeout time.Duration
}

func (ms *measureService) setLogger(log *logger.Logger) {
Expand All @@ -67,7 +68,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
}
}
ctx := measure.Context()
publisher := ms.pipeline.NewBatchPublisher()
publisher := ms.pipeline.NewBatchPublisher(ms.writeTimeout)
defer publisher.Close()
for {
select {
Expand Down
2 changes: 2 additions & 0 deletions banyand/liaison/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ func (s *server) FlagSet() *run.FlagSet {
fs.Uint32Var(&s.port, "grpc-port", 17912, "the port of banyand listens")
fs.BoolVar(&s.enableIngestionAccessLog, "enable-ingestion-access-log", false, "enable ingestion access log")
fs.StringVar(&s.accessLogRootPath, "access-log-root-path", "", "access log root path")
fs.DurationVar(&s.streamSVC.writeTimeout, "stream-write-timeout", 15*time.Second, "stream write timeout")
fs.DurationVar(&s.measureSVC.writeTimeout, "measure-write-timeout", 15*time.Second, "measure write timeout")
return fs
}

Expand Down
3 changes: 2 additions & 1 deletion banyand/liaison/grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type streamService struct {
ingestionAccessLog accesslog.Log
pipeline queue.Client
broadcaster queue.Client
writeTimeout time.Duration
}

func (s *streamService) setLogger(log *logger.Logger) {
Expand All @@ -66,7 +67,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
logger.Err(errResp).Msg("failed to send response")
}
}
publisher := s.pipeline.NewBatchPublisher()
publisher := s.pipeline.NewBatchPublisher(s.writeTimeout)
defer publisher.Close()
ctx := stream.Context()
for {
Expand Down
5 changes: 3 additions & 2 deletions banyand/measure/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ import (
)

const (
timeBucketFormat = "200601021504"
timeBucketFormat = "200601021504"
resultPersistencyTimeout = 10 * time.Second
)

var (
Expand Down Expand Up @@ -133,7 +134,7 @@ func (t *topNStreamingProcessor) writeStreamRecord(record flow.StreamRecord) err
eventTime := t.downSampleTimeBucket(record.TimestampMillis())
timeBucket := eventTime.Format(timeBucketFormat)
var err error
publisher := t.pipeline.NewBatchPublisher()
publisher := t.pipeline.NewBatchPublisher(resultPersistencyTimeout)
defer publisher.Close()
for group, tuples := range tuplesGroups {
if e := t.l.Debug(); e.Enabled() {
Expand Down
6 changes: 4 additions & 2 deletions banyand/queue/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package queue

import (
"time"

"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/run"
Expand Down Expand Up @@ -63,7 +65,7 @@ func (l *local) Publish(topic bus.Topic, message ...bus.Message) (bus.Future, er
return l.local.Publish(topic, message...)
}

func (l *local) Broadcast(topic bus.Topic, message bus.Message) ([]bus.Future, error) {
func (l *local) Broadcast(_ time.Duration, topic bus.Topic, message bus.Message) ([]bus.Future, error) {
f, err := l.Publish(topic, message)
if err != nil {
return nil, err
Expand All @@ -75,7 +77,7 @@ func (l local) Name() string {
return "local-pipeline"
}

func (l local) NewBatchPublisher() BatchPublisher {
func (l local) NewBatchPublisher(_ time.Duration) BatchPublisher {
return &localBatchPublisher{
local: l.local,
}
Expand Down
167 changes: 160 additions & 7 deletions banyand/queue/pub/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,29 @@
package pub

import (
"context"
"fmt"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health/grpc_health_v1"

clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
"github.com/apache/skywalking-banyandb/pkg/logger"
)

const rpcTimeout = 2 * time.Second

var (
// Retry policy for health check.
initBackoff = time.Second
maxBackoff = 20 * time.Second
backoffMultiplier = 2.0

serviceName = clusterv1.Service_ServiceDesc.ServiceName

// The timeout is set by each RPC.
Expand All @@ -49,6 +61,7 @@ var (
type client struct {
client clusterv1.ServiceClient
conn *grpc.ClientConn
md schema.Metadata
}

func (p *pub) OnAddOrUpdate(md schema.Metadata) {
Expand Down Expand Up @@ -84,20 +97,32 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
p.mu.Lock()
defer p.mu.Unlock()

p.registered[name] = struct{}{}

// If the client already exists, just return
if _, ok := p.clients[name]; ok {
if _, ok := p.active[name]; ok {
return
}
if _, ok := p.evictable[name]; ok {
return
}
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicy))
if err != nil {
p.log.Error().Err(err).Msg("failed to connect to grpc server")
return
}

if !p.checkClient(conn, md) {
p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("node is unhealthy, move it to evict queue")
return
}

c := clusterv1.NewServiceClient(conn)
p.clients[name] = &client{conn: conn, client: c}
p.active[name] = &client{conn: conn, client: c, md: md}
if p.handler != nil {
p.handler.OnAddOrUpdate(md)
}
p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("new node is healthy, add it to active queue")
}

func (p *pub) OnDelete(md schema.Metadata) {
Expand All @@ -116,14 +141,142 @@ func (p *pub) OnDelete(md schema.Metadata) {
}
p.mu.Lock()
defer p.mu.Unlock()
delete(p.registered, name)
if en, ok := p.evictable[name]; ok {
close(en.c)
delete(p.evictable, name)
p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("node is removed from evict queue by delete event")
return
}

if client, ok := p.clients[name]; ok {
if client.conn != nil {
client.conn.Close() // Close the client connection
}
delete(p.clients, name)
if client, ok := p.active[name]; ok && !p.healthCheck(node, client.conn) {
_ = client.conn.Close()
delete(p.active, name)
if p.handler != nil {
p.handler.OnDelete(md)
}
p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("node is removed from active queue by delete event")
}
}

func (p *pub) checkClient(conn *grpc.ClientConn, md schema.Metadata) bool {
node, ok := md.Spec.(*databasev1.Node)
if !ok {
logger.Panicf("failed to cast node spec")
return false
}
if p.healthCheck(node, conn) {
return true
}
_ = conn.Close()
if !p.closer.AddRunning() {
return false
}
name := node.Metadata.Name
p.evictable[name] = evictNode{n: node, c: make(chan struct{})}
if p.handler != nil {
p.handler.OnDelete(md)
}
go func(p *pub, name string, en evictNode, md schema.Metadata) {
defer p.closer.Done()
backoff := initBackoff
for {
select {
case <-time.After(backoff):
connEvict, errEvict := grpc.Dial(node.GrpcAddress, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicy))
if errEvict == nil && p.healthCheck(en.n, connEvict) {
p.mu.Lock()
defer p.mu.Unlock()
if _, ok := p.evictable[name]; !ok {
// The client has been removed from evict clients map, just return
return
}
c := clusterv1.NewServiceClient(connEvict)
p.active[name] = &client{conn: connEvict, client: c, md: md}
if p.handler != nil {
p.handler.OnAddOrUpdate(md)
}
delete(p.evictable, name)
p.log.Info().Stringer("node", en.n).Msg("node is healthy, move it back to active queue")
return
}
if errEvict != nil {
_ = connEvict.Close()
}
if _, ok := p.registered[name]; !ok {
return
}
p.log.Error().Err(errEvict).Msgf("failed to re-connect to grpc server after waiting for %s", backoff)
case <-en.c:
return
}
if backoff < maxBackoff {
backoff *= time.Duration(backoffMultiplier)
} else {
backoff = maxBackoff
}
}
}(p, name, p.evictable[name], md)
return false
}

func (p *pub) healthCheck(node fmt.Stringer, conn *grpc.ClientConn) bool {
var resp *grpc_health_v1.HealthCheckResponse
if err := grpchelper.Request(context.Background(), rpcTimeout, func(rpcCtx context.Context) (err error) {
resp, err = grpc_health_v1.NewHealthClient(conn).Check(rpcCtx,
&grpc_health_v1.HealthCheckRequest{
Service: "",
})
return err
}); err != nil {
if e := p.log.Debug(); e.Enabled() {
e.Err(err).Stringer("node", node).Msg("service unhealthy")
}
return false
}
if resp.GetStatus() == grpc_health_v1.HealthCheckResponse_SERVING {
return true
}
return false
}

func (p *pub) failover(node string) {
p.mu.Lock()
defer p.mu.Unlock()
if en, ok := p.evictable[node]; ok {
close(en.c)
delete(p.evictable, node)
p.log.Info().Str("node", node).Str("status", p.dump()).Msg("node is removed from evict queue by wire event")
return
}

if client, ok := p.active[node]; ok && !p.checkClient(client.conn, client.md) {
_ = client.conn.Close()
delete(p.active, node)
if p.handler != nil {
p.handler.OnDelete(client.md)
}
p.log.Info().Str("status", p.dump()).Str("node", node).Msg("node is unhealthy, move it to evict queue")
}
}

func (p *pub) dump() string {
keysRegistered := make([]string, 0, len(p.registered))
for k := range p.registered {
keysRegistered = append(keysRegistered, k)
}
keysActive := make([]string, 0, len(p.active))
for k := range p.active {
keysActive = append(keysActive, k)
}
keysEvictable := make([]string, 0, len(p.evictable))
for k := range p.evictable {
keysEvictable = append(keysEvictable, k)
}
return fmt.Sprintf("registered: %v, active :%v, evictable :%v", keysRegistered, keysActive, keysEvictable)
}

type evictNode struct {
n *databasev1.Node
c chan struct{}
}
Loading
Loading