Skip to content

Commit

Permalink
feat: Grpc based Health check. (#1935)
Browse files Browse the repository at this point in the history
* feat: Add health check service. Health check service starts with dubbo.
	* Use grpc health check proto, ref: https://github.com/grpc-ecosystem/grpc-health-probe
	* Fix test config
	* Current health check only support triple protocol.

* chore: Change inner serivice name.
  • Loading branch information
sheny1xuan committed Jun 22, 2022
1 parent e10e208 commit 48fad4b
Show file tree
Hide file tree
Showing 10 changed files with 892 additions and 19 deletions.
8 changes: 7 additions & 1 deletion common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,12 @@ const (

// reflection service
const (
ReflectionServiceTypeName = "XXX_serverReflectionServer"
ReflectionServiceTypeName = "DubbogoServerReflectionServer"
ReflectionServiceInterface = "grpc.reflection.v1alpha.ServerReflection"
)

// healthcheck service
const (
HealthCheckServiceTypeName = "DubbogoHealthServer"
HealthCheckServiceInterface = "grpc.health.v1.Health"
)
18 changes: 16 additions & 2 deletions config/provider_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,17 @@ func (c *ProviderConfig) Init(rc *RootConfig) error {

for k, v := range rc.Protocols {
if v.Name == tripleConstant.TRIPLE {
// Auto create grpc based health check service.
healthService := NewServiceConfigBuilder().
SetProtocolIDs(k).
SetNotRegister(true).
SetInterface(constant.HealthCheckServiceInterface).
Build()
if err := healthService.Init(rc); err != nil {
return err
}
c.Services[constant.HealthCheckServiceTypeName] = healthService

// Auto create reflection service configure only when provider with triple service is configured.
tripleReflectionService := NewServiceConfigBuilder().
SetProtocolIDs(k).
Expand All @@ -120,7 +131,9 @@ func (c *ProviderConfig) Init(rc *RootConfig) error {
if err := tripleReflectionService.Init(rc); err != nil {
return err
}
// Maybe only register once, If setting this service, break from traversing Protocols.
c.Services[constant.ReflectionServiceTypeName] = tripleReflectionService
break
}
}

Expand All @@ -144,8 +157,9 @@ func (c *ProviderConfig) Load() {
for registeredTypeName, service := range GetProviderServiceMap() {
serviceConfig, ok := c.Services[registeredTypeName]
if !ok {
if registeredTypeName == constant.ReflectionServiceTypeName {
// do not auto generate reflection server's configuration.
if registeredTypeName == constant.ReflectionServiceTypeName ||
registeredTypeName == constant.HealthCheckServiceTypeName {
// do not auto generate reflection or health check server's configuration.
continue
}
// service doesn't config in config file, create one with default
Expand Down
3 changes: 2 additions & 1 deletion config/provider_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func TestProviderConfigRootRegistry(t *testing.T) {
assert.Nil(t, err)
provider := rootConfig.Provider
assert.NotNil(t, provider)
assert.Equal(t, 2, len(provider.Services))
assert.NotNil(t, provider.Services["HelloService"])
assert.NotNil(t, provider.Services["OrderService"])

assert.Equal(t, 2, len(provider.Services["HelloService"].RegistryIDs))
assert.Equal(t, 1, len(provider.Services["OrderService"].RegistryIDs))
Expand Down
1 change: 1 addition & 0 deletions imports/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
_ "dubbo.apache.org/dubbo-go/v3/metrics/prometheus"
_ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo"
_ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3"
_ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3/health"
_ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3/reflection"
_ "dubbo.apache.org/dubbo-go/v3/protocol/grpc"
_ "dubbo.apache.org/dubbo-go/v3/protocol/jsonrpc"
Expand Down
185 changes: 185 additions & 0 deletions protocol/dubbo3/health/serverhealth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package health provides a service that exposes server's health and it must be
// imported to enable support for client-side health checks.
package health

import (
"context"
"sync"
)

import (
"github.com/dubbogo/grpc-go/codes"
"github.com/dubbogo/grpc-go/status"
)

import (
logger "dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config"
healthpb "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3/health/triple_health_v1"
healthtriple "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3/health/triple_health_v1"
)

// Server implements `service Health`.
type DubbogoHealthServer struct {
healthtriple.UnimplementedHealthServer
mu sync.RWMutex
// If shutdown is true, it's expected all serving status is NOT_SERVING, and
// will stay in NOT_SERVING.
shutdown bool
// statusMap stores the serving status of the services this Server monitors.
statusMap map[string]healthpb.HealthCheckResponse_ServingStatus
updates map[string]map[healthtriple.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus
}

var healthServer *DubbogoHealthServer

// NewServer returns a new Server.
func NewServer() *DubbogoHealthServer {
return &DubbogoHealthServer{
statusMap: map[string]healthpb.HealthCheckResponse_ServingStatus{"": healthpb.HealthCheckResponse_SERVING},
updates: make(map[string]map[healthtriple.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus),
}
}

// Check implements `service Health`.
func (s *DubbogoHealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if servingStatus, ok := s.statusMap[in.Service]; ok {
return &healthpb.HealthCheckResponse{
Status: servingStatus,
}, nil
}
return nil, status.Error(codes.NotFound, "unknown service")
}

// Watch implements `service Health`.
func (s *DubbogoHealthServer) Watch(in *healthpb.HealthCheckRequest, stream healthtriple.Health_WatchServer) error {
service := in.Service
// update channel is used for getting service status updates.
update := make(chan healthpb.HealthCheckResponse_ServingStatus, 1)
s.mu.Lock()
// Puts the initial status to the channel.
if servingStatus, ok := s.statusMap[service]; ok {
update <- servingStatus
} else {
update <- healthpb.HealthCheckResponse_SERVICE_UNKNOWN
}

// Registers the update channel to the correct place in the updates map.
if _, ok := s.updates[service]; !ok {
s.updates[service] = make(map[healthtriple.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus)
}
s.updates[service][stream] = update
defer func() {
s.mu.Lock()
delete(s.updates[service], stream)
s.mu.Unlock()
}()
s.mu.Unlock()

var lastSentStatus healthpb.HealthCheckResponse_ServingStatus = -1
for {
select {
// Status updated. Sends the up-to-date status to the client.
case servingStatus := <-update:
if lastSentStatus == servingStatus {
continue
}
lastSentStatus = servingStatus
err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus})
if err != nil {
return status.Error(codes.Canceled, "Stream has ended.")
}
// Context done. Removes the update channel from the updates map.
case <-stream.Context().Done():
return status.Error(codes.Canceled, "Stream has ended.")
}
}
}

// SetServingStatus is called when need to reset the serving status of a service
// or insert a new service entry into the statusMap.
func (s *DubbogoHealthServer) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
s.mu.Lock()
defer s.mu.Unlock()
if s.shutdown {
logger.Infof("health: status changing for %s to %v is ignored because health service is shutdown", service, servingStatus)
return
}

s.setServingStatusLocked(service, servingStatus)
}

func (s *DubbogoHealthServer) setServingStatusLocked(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
s.statusMap[service] = servingStatus
for _, update := range s.updates[service] {
// Clears previous updates, that are not sent to the client, from the channel.
// This can happen if the client is not reading and the server gets flow control limited.
select {
case <-update:
default:
}
// Puts the most recent update to the channel.
update <- servingStatus
}
}

// Shutdown sets all serving status to NOT_SERVING, and configures the server to
// ignore all future status changes.
//
// This changes serving status for all services. To set status for a particular
// services, call SetServingStatus().
func (s *DubbogoHealthServer) Shutdown() {
s.mu.Lock()
defer s.mu.Unlock()
s.shutdown = true
for service := range s.statusMap {
s.setServingStatusLocked(service, healthpb.HealthCheckResponse_NOT_SERVING)
}
}

// Resume sets all serving status to SERVING, and configures the server to
// accept all future status changes.
//
// This changes serving status for all services. To set status for a particular
// services, call SetServingStatus().
func (s *DubbogoHealthServer) Resume() {
s.mu.Lock()
defer s.mu.Unlock()
s.shutdown = false
for service := range s.statusMap {
s.setServingStatusLocked(service, healthpb.HealthCheckResponse_SERVING)
}
}

// Set health check interface.
func init() {
healthServer = NewServer()
config.SetProviderService(healthServer)
}

func SetServingStatusServing(service string) {
healthServer.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING)
}

func SetServingStatusNotServing(service string) {
healthServer.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING)
}

0 comments on commit 48fad4b

Please sign in to comment.