Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Grpc based Health check. #1935

Merged
merged 2 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,9 @@ const (
ReflectionServiceTypeName = "XXX_serverReflectionServer"
ReflectionServiceInterface = "grpc.reflection.v1alpha.ServerReflection"
)

// healthcheck service
const (
HealthCheckServiceTypeName = "XXX_healthServer"
HealthCheckServiceInterface = "grpc.health.v1.Health"
)
18 changes: 16 additions & 2 deletions config/provider_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,17 @@ func (c *ProviderConfig) Init(rc *RootConfig) error {

for k, v := range rc.Protocols {
if v.Name == tripleConstant.TRIPLE {
// Auto create grpc based health check service.
healthService := NewServiceConfigBuilder().
SetProtocolIDs(k).
SetNotRegister(true).
SetInterface(constant.HealthCheckServiceInterface).
Build()
if err := healthService.Init(rc); err != nil {
return err
}
c.Services[constant.HealthCheckServiceTypeName] = healthService

// Auto create reflection service configure only when provider with triple service is configured.
tripleReflectionService := NewServiceConfigBuilder().
SetProtocolIDs(k).
Expand All @@ -120,7 +131,9 @@ func (c *ProviderConfig) Init(rc *RootConfig) error {
if err := tripleReflectionService.Init(rc); err != nil {
return err
}
// Maybe only register once, If setting this service, break from traversing Protocols.
c.Services[constant.ReflectionServiceTypeName] = tripleReflectionService
break
}
}

Expand All @@ -144,8 +157,9 @@ func (c *ProviderConfig) Load() {
for registeredTypeName, service := range GetProviderServiceMap() {
serviceConfig, ok := c.Services[registeredTypeName]
if !ok {
if registeredTypeName == constant.ReflectionServiceTypeName {
// do not auto generate reflection server's configuration.
if registeredTypeName == constant.ReflectionServiceTypeName ||
registeredTypeName == constant.HealthCheckServiceTypeName {
// do not auto generate reflection or health check server's configuration.
continue
}
// service doesn't config in config file, create one with default
Expand Down
3 changes: 2 additions & 1 deletion config/provider_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func TestProviderConfigRootRegistry(t *testing.T) {
assert.Nil(t, err)
provider := rootConfig.Provider
assert.NotNil(t, provider)
assert.Equal(t, 2, len(provider.Services))
assert.NotNil(t, provider.Services["HelloService"])
assert.NotNil(t, provider.Services["OrderService"])

assert.Equal(t, 2, len(provider.Services["HelloService"].RegistryIDs))
assert.Equal(t, 1, len(provider.Services["OrderService"].RegistryIDs))
Expand Down
1 change: 1 addition & 0 deletions imports/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
_ "dubbo.apache.org/dubbo-go/v3/metrics/prometheus"
_ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo"
_ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3"
_ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3/health"
_ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3/reflection"
_ "dubbo.apache.org/dubbo-go/v3/protocol/grpc"
_ "dubbo.apache.org/dubbo-go/v3/protocol/jsonrpc"
Expand Down
185 changes: 185 additions & 0 deletions protocol/dubbo3/health/serverhealth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package health provides a service that exposes server's health and it must be
// imported to enable support for client-side health checks.
package health

import (
"context"
"sync"
)

import (
"github.com/dubbogo/grpc-go/codes"
"github.com/dubbogo/grpc-go/status"
)

import (
logger "dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config"
healthpb "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3/health/triple_health_v1"
healthtriple "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3/health/triple_health_v1"
)

// Server implements `service Health`.
type XXX_healthServer struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为何是 XXX 开头?替换成 Dubbogo 如何?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得应该是可以的,我是参考了以前grpc reflection的server的命名方法。@LaurenceLiZhixin
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议换成Dubbogo开头吧,不要使用下划线。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得应该是可以的,我是参考了以前grpc reflection的server的命名方法。@LaurenceLiZhixin image

这么说吧,XXX 开头我以前确实见过,那是 protobuf 根据 .proto 文件生成的机器代码,不是给人读的。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为何要义 XXX 开头命名?

healthtriple.UnimplementedHealthServer
mu sync.RWMutex
// If shutdown is true, it's expected all serving status is NOT_SERVING, and
// will stay in NOT_SERVING.
shutdown bool
// statusMap stores the serving status of the services this Server monitors.
statusMap map[string]healthpb.HealthCheckResponse_ServingStatus
updates map[string]map[healthtriple.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus
}

var healthServer *XXX_healthServer

// NewServer returns a new Server.
func NewServer() *XXX_healthServer {
return &XXX_healthServer{
statusMap: map[string]healthpb.HealthCheckResponse_ServingStatus{"": healthpb.HealthCheckResponse_SERVING},
updates: make(map[string]map[healthtriple.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus),
}
}

// Check implements `service Health`.
func (s *XXX_healthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if servingStatus, ok := s.statusMap[in.Service]; ok {
return &healthpb.HealthCheckResponse{
Status: servingStatus,
}, nil
}
return nil, status.Error(codes.NotFound, "unknown service")
}

// Watch implements `service Health`.
func (s *XXX_healthServer) Watch(in *healthpb.HealthCheckRequest, stream healthtriple.Health_WatchServer) error {
service := in.Service
// update channel is used for getting service status updates.
update := make(chan healthpb.HealthCheckResponse_ServingStatus, 1)
s.mu.Lock()
// Puts the initial status to the channel.
if servingStatus, ok := s.statusMap[service]; ok {
update <- servingStatus
} else {
update <- healthpb.HealthCheckResponse_SERVICE_UNKNOWN
}

// Registers the update channel to the correct place in the updates map.
if _, ok := s.updates[service]; !ok {
s.updates[service] = make(map[healthtriple.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus)
}
s.updates[service][stream] = update
defer func() {
s.mu.Lock()
delete(s.updates[service], stream)
s.mu.Unlock()
}()
s.mu.Unlock()

var lastSentStatus healthpb.HealthCheckResponse_ServingStatus = -1
for {
select {
// Status updated. Sends the up-to-date status to the client.
case servingStatus := <-update:
if lastSentStatus == servingStatus {
continue
}
lastSentStatus = servingStatus
err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus})
if err != nil {
return status.Error(codes.Canceled, "Stream has ended.")
}
// Context done. Removes the update channel from the updates map.
case <-stream.Context().Done():
return status.Error(codes.Canceled, "Stream has ended.")
}
}
}

// SetServingStatus is called when need to reset the serving status of a service
// or insert a new service entry into the statusMap.
func (s *XXX_healthServer) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
s.mu.Lock()
defer s.mu.Unlock()
if s.shutdown {
logger.Infof("health: status changing for %s to %v is ignored because health service is shutdown", service, servingStatus)
return
}

s.setServingStatusLocked(service, servingStatus)
}

func (s *XXX_healthServer) setServingStatusLocked(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
s.statusMap[service] = servingStatus
for _, update := range s.updates[service] {
// Clears previous updates, that are not sent to the client, from the channel.
// This can happen if the client is not reading and the server gets flow control limited.
select {
case <-update:
default:
}
// Puts the most recent update to the channel.
update <- servingStatus
}
}

// Shutdown sets all serving status to NOT_SERVING, and configures the server to
// ignore all future status changes.
//
// This changes serving status for all services. To set status for a particular
// services, call SetServingStatus().
func (s *XXX_healthServer) Shutdown() {
s.mu.Lock()
defer s.mu.Unlock()
s.shutdown = true
for service := range s.statusMap {
s.setServingStatusLocked(service, healthpb.HealthCheckResponse_NOT_SERVING)
}
}

// Resume sets all serving status to SERVING, and configures the server to
// accept all future status changes.
//
// This changes serving status for all services. To set status for a particular
// services, call SetServingStatus().
func (s *XXX_healthServer) Resume() {
s.mu.Lock()
defer s.mu.Unlock()
s.shutdown = false
for service := range s.statusMap {
s.setServingStatusLocked(service, healthpb.HealthCheckResponse_SERVING)
}
}

// Set health check interface.
func init() {
healthServer = NewServer()
config.SetProviderService(healthServer)
}

func SetServingStatusServing(service string) {
healthServer.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING)
}

func SetServingStatusNotServing(service string) {
healthServer.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING)
}