Skip to content

Commit

Permalink
优化代码逻辑;合并ConfigLoaderHook相关Func至ConfigPostProcessor中;
Browse files Browse the repository at this point in the history
  • Loading branch information
Changeden committed Aug 26, 2021
1 parent 34b4f9a commit 460bcd2
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 84 deletions.
3 changes: 0 additions & 3 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,6 @@ const (

// Loader Hook
const (
HookEventParamKey = "config-loader-hook-event"
HookEventErrorMessageParamKey = "config-loader-hook-error-message"

HookEventBeforeReferenceConnect = "before-reference-connect"
HookEventReferenceConnectSuccess = "reference-connect-success"
HookEventReferenceConnectFail = "reference-connect-fail"
Expand Down
14 changes: 0 additions & 14 deletions common/extension/config_post_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,3 @@ func GetConfigPostProcessors() []interfaces.ConfigPostProcessor {
}
return ret
}

func GetConfigLoaderHooks() []interfaces.ConfigLoaderHook {
var ret []interfaces.ConfigLoaderHook
for _, v := range processors {
h, ok := v.(interfaces.ConfigLoaderHook)
if ok {
ret = append(ret, h)
}
}
if ret == nil {
return make([]interfaces.ConfigLoaderHook, 0)
}
return ret
}
18 changes: 4 additions & 14 deletions config/config_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,7 @@ func loadConsumerConfig() {
if count > maxWait {
errMsg := fmt.Sprintf("Failed to check the status of the service %v. No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
logger.Error(errMsg)
refconfig.postProcessConfig(referenceURL, &map[string]string{
constant.HookEventParamKey: constant.HookEventReferenceConnectFail,
constant.HookEventErrorMessageParamKey: errMsg,
})
refconfig.postProcessConfig(referenceURL, constant.HookEventReferenceConnectFail, &errMsg)
panic(errMsg)
}
time.Sleep(time.Second * 1)
Expand All @@ -194,9 +191,7 @@ func loadConsumerConfig() {
continue
}
}
refconfig.postProcessConfig(referenceURL, &map[string]string{
constant.HookEventParamKey: constant.HookEventReferenceConnectSuccess,
})
refconfig.postProcessConfig(referenceURL, constant.HookEventReferenceConnectSuccess, nil)
}
if checkok {
break
Expand Down Expand Up @@ -261,15 +256,10 @@ func loadProviderConfig() {
serviceURL := svs.getValidURL()
if err != nil {
errMsg := fmt.Sprintf("service %s export failed! err: %#v", key, err)
svs.postProcessConfig(serviceURL, &map[string]string{
constant.HookEventParamKey: constant.HookEventProviderConnectFail,
constant.HookEventErrorMessageParamKey: errMsg,
})
svs.postProcessConfig(serviceURL, constant.HookEventProviderConnectFail, &errMsg)
panic(errMsg)
}
svs.postProcessConfig(serviceURL, &map[string]string{
constant.HookEventParamKey: constant.HookEventProviderConnectSuccess,
})
svs.postProcessConfig(serviceURL, constant.HookEventProviderConnectSuccess, nil)
}
registerServiceInstance()
postAllProvidersConnectComplete()
Expand Down
18 changes: 10 additions & 8 deletions config/config_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,24 +116,26 @@ type CustomEvent struct {
}

// implements interfaces.ConfigPostProcessor's functions
func (c CustomEvent) PostProcessReferenceConfig(u *common.URL) {
func (c CustomEvent) PostProcessReferenceConfig(u *common.URL, event string, errMsg *string) {
logger.Debug("PostProcessReferenceConfig Start")
logger.Debug("Event: ", u.GetParam(constant.HookEventParamKey, ""))
logger.Debug("Event: ", event)
logger.Debug("Url: ", u)
logger.Debug("Error Message: ", u.GetParam(constant.HookEventErrorMessageParamKey, ""))
if errMsg != nil {
logger.Debug("Error Message: ", *errMsg)
}
logger.Debug("PostProcessReferenceConfig End")
assert.Equal(c.t, u.GetParam(constant.SIDE_KEY, ""), "consumer")
}
func (c CustomEvent) PostProcessServiceConfig(u *common.URL) {
func (c CustomEvent) PostProcessServiceConfig(u *common.URL, event string, errMsg *string) {
logger.Debug("PostProcessServiceConfig Start")
logger.Debug("Event: ", u.GetParam(constant.HookEventParamKey, ""))
logger.Debug("Event: ", event)
logger.Debug("Url: ", u)
logger.Debug("Error Message: ", u.GetParam(constant.HookEventErrorMessageParamKey, ""))
if errMsg != nil {
logger.Debug("Error Message: ", *errMsg)
}
logger.Debug("PostProcessServiceConfig End")
assert.Equal(c.t, u.GetParam(constant.SIDE_KEY, ""), "provider")
}

// implements interfaces.ConfigLoaderHook's functions
func (c CustomEvent) AllReferencesConnectComplete() {
logger.Debug("AllConsumersConnectComplete")
}
Expand Down
4 changes: 2 additions & 2 deletions config/graceful_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func getConsumerProtocols() *gxset.HashSet {
}

func postBeforeShutdown() {
for _, h := range extension.GetConfigLoaderHooks() {
h.BeforeShutdown()
for _, p := range extension.GetConfigPostProcessors() {
p.BeforeShutdown()
}
}
13 changes: 4 additions & 9 deletions config/interfaces/config_post_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,12 @@ import (
// ServiceConfig during deployment time.
type ConfigPostProcessor interface {
// PostProcessReferenceConfig customizes ReferenceConfig's params.
// PostProcessReferenceConfig emit on refer reference (GetParam(constant.HOOK_EVENT_PARAM_KEY): before-reference-connect, reference-connect-success, reference-connect-fail)
PostProcessReferenceConfig(*common.URL)
// PostProcessReferenceConfig emit on refer reference (event: before-reference-connect, reference-connect-success, reference-connect-fail)
PostProcessReferenceConfig(url *common.URL, event string, errMsg *string)

// PostProcessServiceConfig customizes ServiceConfig's params.
// PostProcessServiceConfig emit on export service (GetParam(constant.HOOK_EVENT_PARAM_KEY): before-service-listen, service-listen-success, service-listen-fail)
PostProcessServiceConfig(*common.URL)
}

// ConfigLoaderHook is extends ConfigPostProcessor
type ConfigLoaderHook interface {
ConfigPostProcessor
// PostProcessServiceConfig emit on export service (event: before-service-listen, service-listen-success, service-listen-fail)
PostProcessServiceConfig(url *common.URL, event string, errMsg *string)

// AllReferencesConnectComplete emit on all references export complete
AllReferencesConnectComplete()
Expand Down
22 changes: 5 additions & 17 deletions config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ func (c *ReferenceConfig) Refer(_ interface{}) {
if c.ForceTag {
cfgURL.AddParam(constant.ForceUseTag, "true")
}
c.postProcessConfig(cfgURL, &map[string]string{
constant.HookEventParamKey: constant.HookEventBeforeReferenceConnect,
})
c.postProcessConfig(cfgURL, constant.HookEventBeforeReferenceConnect, nil)
if c.URL != "" {
// 1. user specified URL, could be peer-to-peer address, or register center's address.
urlStrings := gxstrings.RegSplit(c.URL, "\\s*[;]+\\s*")
Expand Down Expand Up @@ -304,19 +302,9 @@ func publishConsumerDefinition(url *common.URL) {
}

// postProcessConfig asks registered ConfigPostProcessor to post-process the current ReferenceConfig.
func (c *ReferenceConfig) postProcessConfig(url *common.URL, params *map[string]string) {
func (c *ReferenceConfig) postProcessConfig(url *common.URL, event string, errMsg *string) {
for _, p := range extension.GetConfigPostProcessors() {
if params != nil && len(*params) > 0 {
if url == nil {
url = &common.URL{}
} else {
url = url.Clone()
}
for k, v := range *params {
url.SetParam(k, v)
}
}
p.PostProcessReferenceConfig(url)
p.PostProcessReferenceConfig(url, event, errMsg)
}
}

Expand All @@ -333,7 +321,7 @@ func (c *ReferenceConfig) getValidURL() *common.URL {
}

func postAllConsumersConnectComplete() {
for _, h := range extension.GetConfigLoaderHooks() {
h.AllReferencesConnectComplete()
for _, p := range extension.GetConfigPostProcessors() {
p.AllReferencesConnectComplete()
}
}
22 changes: 5 additions & 17 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,7 @@ func (c *ServiceConfig) Export() error {
}

// post process the URL to be exported
c.postProcessConfig(ivkURL, &map[string]string{
constant.HookEventParamKey: constant.HookEventBeforeProviderConnect,
})
c.postProcessConfig(ivkURL, constant.HookEventBeforeProviderConnect, nil)
// config post processor may set "export" to false
if !ivkURL.GetParamBool(constant.EXPORT_KEY, true) {
return nil
Expand Down Expand Up @@ -357,19 +355,9 @@ func publishServiceDefinition(url *common.URL) {
}

// postProcessConfig asks registered ConfigPostProcessor to post-process the current ServiceConfig.
func (c *ServiceConfig) postProcessConfig(url *common.URL, params *map[string]string) {
func (c *ServiceConfig) postProcessConfig(url *common.URL, event string, errMsg *string) {
for _, p := range extension.GetConfigPostProcessors() {
if params != nil && len(*params) > 0 {
if url != nil {
url = url.Clone()
} else {
url = &common.URL{}
}
for k, v := range *params {
url.SetParam(k, v)
}
}
p.PostProcessServiceConfig(url)
p.PostProcessServiceConfig(url, event, errMsg)
}
}

Expand All @@ -386,7 +374,7 @@ func (c *ServiceConfig) getValidURL() *common.URL {
}

func postAllProvidersConnectComplete() {
for _, h := range extension.GetConfigLoaderHooks() {
h.AllServicesListenComplete()
for _, p := range extension.GetConfigPostProcessors() {
p.AllServicesListenComplete()
}
}

0 comments on commit 460bcd2

Please sign in to comment.