Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support gracefully offline without registry #1973

Merged
7 changes: 5 additions & 2 deletions config/graceful_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,13 @@ func waitingProviderProcessedTimeout(shutdownConfig *ShutdownConfig) {
}
deadline := time.Now().Add(timeout)

for time.Now().Before(deadline) && shutdownConfig.ProviderActiveCount.Load() > 0 {
offlineRequestWindowTimeout := shutdownConfig.GetOfflineRequestWindowTimeout()
for time.Now().Before(deadline) &&
(shutdownConfig.ProviderActiveCount.Load() > 0 || time.Now().Before(shutdownConfig.ProviderLastReceivedRequestTime.Load().Add(offlineRequestWindowTimeout))) {
// sleep 10 ms and then we check it again
time.Sleep(10 * time.Millisecond)
logger.Infof("waiting for provider active invocation count = %d", shutdownConfig.ProviderActiveCount.Load())
logger.Infof("waiting for provider active invocation count = %d, provider last received request time: %v",
shutdownConfig.ProviderActiveCount.Load(), shutdownConfig.ProviderLastReceivedRequestTime.Load())
}
}

Expand Down
28 changes: 24 additions & 4 deletions config/graceful_shutdown_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ import (
)

const (
defaultTimeout = 60 * time.Second
defaultStepTimeout = 3 * time.Second
defaultConsumerUpdateWaitTime = 3 * time.Second
defaultTimeout = 60 * time.Second
defaultStepTimeout = 3 * time.Second
defaultConsumerUpdateWaitTime = 3 * time.Second
defaultOfflineRequestWindowTimeout = 3 * time.Second
)

// ShutdownConfig is used as configuration for graceful shutdown
Expand Down Expand Up @@ -66,12 +67,16 @@ type ShutdownConfig struct {
RejectRequestHandler string `yaml:"reject-handler" json:"reject-handler,omitempty" property:"reject_handler"`
// internal listen kill signal,the default is true.
InternalSignal bool `default:"true" yaml:"internal-signal" json:"internal.signal,omitempty" property:"internal.signal"`

// offline request window length
OfflineRequestWindowTimeout string `yaml:"offline-request-window-timeout" json:"offlineRequestWindowTimeout,omitempty" property:"offlineRequestWindowTimeout"`
// true -> new request will be rejected.
RejectRequest atomic.Bool
// active invocation
ConsumerActiveCount atomic.Int32
ProviderActiveCount atomic.Int32

// provider last received request timestamp
ProviderLastReceivedRequestTime atomic.Time
}

// Prefix dubbo.shutdown
Expand Down Expand Up @@ -99,6 +104,16 @@ func (config *ShutdownConfig) GetStepTimeout() time.Duration {
return result
}

func (config *ShutdownConfig) GetOfflineRequestWindowTimeout() time.Duration {
result, err := time.ParseDuration(config.OfflineRequestWindowTimeout)
if err != nil {
logger.Errorf("The OfflineRequestWindowTimeout configuration is invalid: %s, and we will use the default value: %s, err: %v",
config.OfflineRequestWindowTimeout, defaultOfflineRequestWindowTimeout.String(), err)
return defaultOfflineRequestWindowTimeout
}
return result
}

func (config *ShutdownConfig) GetConsumerUpdateWaitTime() time.Duration {
result, err := time.ParseDuration(config.ConsumerUpdateWaitTime)
if err != nil {
Expand Down Expand Up @@ -150,3 +165,8 @@ func (scb *ShutdownConfigBuilder) Build() *ShutdownConfig {
defaults.Set(scb)
return scb.shutdownConfig
}

func (scb *ShutdownConfigBuilder) SetOfflineRequestWindowTimeout(offlineRequestWindowTimeout string) *ShutdownConfigBuilder {
scb.shutdownConfig.OfflineRequestWindowTimeout = offlineRequestWindowTimeout
return scb
}
23 changes: 18 additions & 5 deletions config/graceful_shutdown_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,37 @@ func TestShutdownConfigGetTimeout(t *testing.T) {
assert.False(t, config.RejectRequest.Load())

config = ShutdownConfig{
Timeout: "60s",
StepTimeout: "10s",
Timeout: "60s",
StepTimeout: "10s",
OfflineRequestWindowTimeout: "30s",
}

assert.Equal(t, 60*time.Second, config.GetTimeout())
assert.Equal(t, 10*time.Second, config.GetStepTimeout())

assert.Equal(t, 30*time.Second, config.GetOfflineRequestWindowTimeout())
config = ShutdownConfig{
Timeout: "34ms",
StepTimeout: "79ms",
Timeout: "34ms",
StepTimeout: "79ms",
OfflineRequestWindowTimeout: "13ms",
}

assert.Equal(t, 34*time.Millisecond, config.GetTimeout())
assert.Equal(t, 79*time.Millisecond, config.GetStepTimeout())
assert.Equal(t, 13*time.Millisecond, config.GetOfflineRequestWindowTimeout())

// test default
config = ShutdownConfig{}

assert.Equal(t, defaultTimeout, config.GetTimeout())
assert.Equal(t, defaultStepTimeout, config.GetStepTimeout())
assert.Equal(t, defaultOfflineRequestWindowTimeout, config.GetOfflineRequestWindowTimeout())
}

func TestNewShutDownConfigBuilder(t *testing.T) {
config := NewShutDownConfigBuilder().
SetTimeout("10s").
SetStepTimeout("15s").
SetOfflineRequestWindowTimeout("13s").
SetRejectRequestHandler("handler").
SetRejectRequest(true).
SetInternalSignal(true).
Expand All @@ -68,6 +79,8 @@ func TestNewShutDownConfigBuilder(t *testing.T) {
stepTimeout := config.GetStepTimeout()
assert.Equal(t, stepTimeout, 15*time.Second)

offlineRequestWindowTimeout := config.GetOfflineRequestWindowTimeout()
assert.Equal(t, offlineRequestWindowTimeout, 13*time.Second)
err := config.Init()
assert.NoError(t, err)

Expand Down
2 changes: 2 additions & 0 deletions filter/graceful_shutdown/provider_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package graceful_shutdown
import (
"context"
"sync"
"time"
)

import (
Expand Down Expand Up @@ -75,6 +76,7 @@ func (f *providerGracefulShutdownFilter) Invoke(ctx context.Context, invoker pro
}
}
f.shutdownConfig.ProviderActiveCount.Inc()
f.shutdownConfig.ProviderLastReceivedRequestTime.Store(time.Now())
return invoker.Invoke(ctx, invocation)
}

Expand Down