Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ftr: Added more event distribution types and improved event distribution mechanism for 1.5 #1405

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,17 @@ const (
// SERVICE_DISCOVERY_KEY indicate which service discovery instance will be used
SERVICE_DISCOVERY_KEY = "service_discovery"
)

// Loader Hook
const (
HookEventParamKey = "config-loader-hook-event"
HookEventErrorMessageParamKey = "config-loader-hook-error-message"

HookEventBeforeReferenceConnect = "before-reference-connect"
HookEventReferenceConnectSuccess = "reference-connect-success"
HookEventReferenceConnectFail = "reference-connect-fail"

HookEventBeforeProviderConnect = "before-service-listen"
HookEventProviderConnectSuccess = "service-listen-success"
HookEventProviderConnectFail = "service-listen-fail"
)
19 changes: 19 additions & 0 deletions common/extension/config_post_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ func GetConfigPostProcessor(name string) interfaces.ConfigPostProcessor {
return processors[name]
}

// RemoveConfigPostProcessor remove process from processors.
func RemoveConfigPostProcessor(name string) {
delete(processors, name)
}

// GetConfigPostProcessors returns all registered instances of ConfigPostProcessor.
func GetConfigPostProcessors() []interfaces.ConfigPostProcessor {
ret := make([]interfaces.ConfigPostProcessor, 0, len(processors))
Expand All @@ -43,3 +48,17 @@ func GetConfigPostProcessors() []interfaces.ConfigPostProcessor {
}
return ret
}

func GetConfigLoaderHooks() []interfaces.ConfigLoaderHook {
var ret []interfaces.ConfigLoaderHook
for _, v := range processors {
h, ok := v.(interfaces.ConfigLoaderHook)
if ok {
ret = append(ret, h)
}
}
if ret == nil {
return make([]interfaces.ConfigLoaderHook, 0)
}
return ret
}
25 changes: 23 additions & 2 deletions config/config_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func loadConsumerConfig() {
for {
checkok := true
for _, refconfig := range consumerConfig.References {
referenceURL := refconfig.getValidURL()
if (refconfig.Check != nil && *refconfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check != nil && *consumerConfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check == nil) { // default to true
Expand All @@ -179,20 +180,29 @@ func loadConsumerConfig() {
if count > maxWait {
errMsg := fmt.Sprintf("Failed to check the status of the service %v. No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
logger.Error(errMsg)
refconfig.postProcessConfig(referenceURL, &map[string]string{
constant.HookEventParamKey: constant.HookEventReferenceConnectFail,
constant.HookEventErrorMessageParamKey: errMsg,
})
panic(errMsg)
}
time.Sleep(time.Second * 1)
break
}
if refconfig.invoker == nil {
logger.Warnf("The interface %s invoker not exist, may you should check your interface config.", refconfig.InterfaceName)
continue
}
}
refconfig.postProcessConfig(referenceURL, &map[string]string{
constant.HookEventParamKey: constant.HookEventReferenceConnectSuccess,
})
}
if checkok {
break
}
}
postAllConsumersConnectComplete()
}

func loadProviderConfig() {
Expand Down Expand Up @@ -247,11 +257,22 @@ func loadProviderConfig() {
svs.id = key
svs.Implement(rpcService)
svs.Protocols = providerConfig.Protocols
if err := svs.Export(); err != nil {
panic(fmt.Sprintf("service %s export failed! err: %#v", key, err))
err := svs.Export()
serviceURL := svs.getValidURL()
if err != nil {
errMsg := fmt.Sprintf("service %s export failed! err: %#v", key, err)
svs.postProcessConfig(serviceURL, &map[string]string{
ChangedenCZD marked this conversation as resolved.
Show resolved Hide resolved
constant.HookEventParamKey: constant.HookEventProviderConnectFail,
constant.HookEventErrorMessageParamKey: errMsg,
})
panic(errMsg)
}
svs.postProcessConfig(serviceURL, &map[string]string{
constant.HookEventParamKey: constant.HookEventProviderConnectSuccess,
})
}
registerServiceInstance()
postAllProvidersConnectComplete()
}

// registerServiceInstance register service instance
Expand Down
81 changes: 81 additions & 0 deletions config/config_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,87 @@ func TestLoad(t *testing.T) {
providerConfig = nil
}

type CustomEvent struct {
t *testing.T
}

// implements interfaces.ConfigPostProcessor's functions
func (c CustomEvent) PostProcessReferenceConfig(u *common.URL) {
logger.Debug("PostProcessReferenceConfig Start")
logger.Debug("Event: ", u.GetParam(constant.HookEventParamKey, ""))
logger.Debug("Url: ", u)
logger.Debug("Error Message: ", u.GetParam(constant.HookEventErrorMessageParamKey, ""))
logger.Debug("PostProcessReferenceConfig End")
assert.Equal(c.t, u.GetParam(constant.SIDE_KEY, ""), "consumer")
}
func (c CustomEvent) PostProcessServiceConfig(u *common.URL) {
logger.Debug("PostProcessServiceConfig Start")
logger.Debug("Event: ", u.GetParam(constant.HookEventParamKey, ""))
logger.Debug("Url: ", u)
logger.Debug("Error Message: ", u.GetParam(constant.HookEventErrorMessageParamKey, ""))
logger.Debug("PostProcessServiceConfig End")
assert.Equal(c.t, u.GetParam(constant.SIDE_KEY, ""), "provider")
}

// implements interfaces.ConfigLoaderHook's functions
func (c CustomEvent) AllReferencesConnectComplete() {
logger.Debug("AllConsumersConnectComplete")
}
func (c CustomEvent) AllServicesListenComplete() {
logger.Debug("AllServicesListenComplete")
}
func (c CustomEvent) BeforeShutdown() {
logger.Debug("BeforeShutdown")
}

func TestLoadWithEventDispatch(t *testing.T) {
doInitConsumer()
doInitProvider()
for _, v := range providerConfig.Services {
v.export = true
}

ms := &MockService{}
SetConsumerService(ms)
SetProviderService(ms)

extension.SetProtocol("registry", GetProtocol)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory)
GetApplicationConfig().MetadataType = "mock"
var mm *mockMetadataService
extension.SetMetadataService("mock", func() (metadataService service.MetadataService, err error) {
if mm == nil {
mm = &mockMetadataService{
exportedServiceURLs: new(sync.Map),
lock: new(sync.RWMutex),
}
}
return mm, nil
})

configPostProcessorName := "TestLoadWithEventDispatch"
extension.SetConfigPostProcessor(configPostProcessorName, CustomEvent{t})

Load()

assert.Equal(t, ms, GetRPCService(ms.Reference()))
ms2 := &struct {
MockService
}{}
RPCService(ms2)
assert.NotEqual(t, ms2, GetRPCService(ms2.Reference()))

conServices = map[string]common.RPCService{}
proServices = map[string]common.RPCService{}
err := common.ServiceMap.UnRegister("com.MockService", "mock",
common.ServiceKey("com.MockService", "huadong_idc", "1.0.0"))
assert.Nil(t, err)
extension.RemoveConfigPostProcessor(configPostProcessorName)
consumerConfig = nil
providerConfig = nil
}

func TestLoadWithSingleReg(t *testing.T) {
doInitConsumerWithSingleRegistry()
mockInitProviderWithSingleRegistry()
Expand Down
8 changes: 8 additions & 0 deletions config/graceful_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func GracefulShutdownInit() {
// gracefulShutdownOnce.Do(func() {
time.AfterFunc(totalTimeout(), func() {
logger.Warn("Shutdown gracefully timeout, application will shutdown immediately. ")
postBeforeShutdown()
os.Exit(0)
})
BeforeShutdown()
Expand All @@ -76,6 +77,7 @@ func GracefulShutdownInit() {
debug.WriteHeapDump(os.Stdout.Fd())
}
}
postBeforeShutdown()
os.Exit(0)
}
}()
Expand Down Expand Up @@ -223,3 +225,9 @@ func getConsumerProtocols() *gxset.HashSet {
}
return result
}

func postBeforeShutdown() {
for _, h := range extension.GetConfigLoaderHooks() {
h.BeforeShutdown()
}
}
16 changes: 16 additions & 0 deletions config/interfaces/config_post_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,24 @@ import (
// ServiceConfig during deployment time.
type ConfigPostProcessor interface {
// PostProcessReferenceConfig customizes ReferenceConfig's params.
// PostProcessReferenceConfig emit on refer reference (GetParam(constant.HOOK_EVENT_PARAM_KEY): before-reference-connect, reference-connect-success, reference-connect-fail)
PostProcessReferenceConfig(*common.URL)

// PostProcessServiceConfig customizes ServiceConfig's params.
// PostProcessServiceConfig emit on export service (GetParam(constant.HOOK_EVENT_PARAM_KEY): before-service-listen, service-listen-success, service-listen-fail)
PostProcessServiceConfig(*common.URL)
}

// ConfigLoaderHook is extends ConfigPostProcessor
type ConfigLoaderHook interface {
ConfigPostProcessor

// AllReferencesConnectComplete emit on all references export complete
AllReferencesConnectComplete()

// AllServicesListenComplete emit on all services export complete
AllServicesListenComplete()

// BeforeShutdown emit on before shutdown
BeforeShutdown()
}
34 changes: 32 additions & 2 deletions config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ func (c *ReferenceConfig) Refer(_ interface{}) {
if c.ForceTag {
cfgURL.AddParam(constant.ForceUseTag, "true")
}
c.postProcessConfig(cfgURL)
c.postProcessConfig(cfgURL, &map[string]string{
constant.HookEventParamKey: constant.HookEventBeforeReferenceConnect,
})
if c.URL != "" {
// 1. user specified URL, could be peer-to-peer address, or register center's address.
urlStrings := gxstrings.RegSplit(c.URL, "\\s*[;]+\\s*")
Expand Down Expand Up @@ -302,8 +304,36 @@ func publishConsumerDefinition(url *common.URL) {
}

// postProcessConfig asks registered ConfigPostProcessor to post-process the current ReferenceConfig.
func (c *ReferenceConfig) postProcessConfig(url *common.URL) {
func (c *ReferenceConfig) postProcessConfig(url *common.URL, params *map[string]string) {
for _, p := range extension.GetConfigPostProcessors() {
if params != nil && len(*params) > 0 {
if url == nil {
url = &common.URL{}
} else {
url = url.Clone()
}
for k, v := range *params {
url.SetParam(k, v)
}
}
p.PostProcessReferenceConfig(url)
}
}

func (c *ReferenceConfig) getValidURL() *common.URL {
urls := c.urls
var u *common.URL
if urls != nil && len(urls) > 0 {
u = urls[0]
}
if u != nil && u.SubURL != nil {
return u.SubURL
}
return u
}

func postAllConsumersConnectComplete() {
for _, h := range extension.GetConfigLoaderHooks() {
h.AllReferencesConnectComplete()
}
}
34 changes: 32 additions & 2 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ func (c *ServiceConfig) Export() error {
}

// post process the URL to be exported
c.postProcessConfig(ivkURL)
c.postProcessConfig(ivkURL, &map[string]string{
constant.HookEventParamKey: constant.HookEventBeforeProviderConnect,
})
// config post processor may set "export" to false
if !ivkURL.GetParamBool(constant.EXPORT_KEY, true) {
return nil
Expand Down Expand Up @@ -355,8 +357,36 @@ func publishServiceDefinition(url *common.URL) {
}

// postProcessConfig asks registered ConfigPostProcessor to post-process the current ServiceConfig.
func (c *ServiceConfig) postProcessConfig(url *common.URL) {
func (c *ServiceConfig) postProcessConfig(url *common.URL, params *map[string]string) {
for _, p := range extension.GetConfigPostProcessors() {
if params != nil && len(*params) > 0 {
if url != nil {
url = url.Clone()
} else {
url = &common.URL{}
}
for k, v := range *params {
url.SetParam(k, v)
}
}
p.PostProcessServiceConfig(url)
}
}

func (c *ServiceConfig) getValidURL() *common.URL {
urls := c.GetExportedUrls()
var u *common.URL
if urls != nil && len(urls) > 0 {
u = urls[0]
}
if u != nil && u.SubURL != nil {
return u.SubURL
}
return u
}

func postAllProvidersConnectComplete() {
for _, h := range extension.GetConfigLoaderHooks() {
h.AllServicesListenComplete()
}
}