Skip to content

Commit

Permalink
Merge 460bcd2 into 81c2e23
Browse files Browse the repository at this point in the history
  • Loading branch information
ChangedenCZD committed Aug 26, 2021
2 parents 81c2e23 + 460bcd2 commit fa59c63
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 10 deletions.
11 changes: 11 additions & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,14 @@ const (
// SERVICE_DISCOVERY_KEY indicate which service discovery instance will be used
SERVICE_DISCOVERY_KEY = "service_discovery"
)

// Loader Hook
const (
HookEventBeforeReferenceConnect = "before-reference-connect"
HookEventReferenceConnectSuccess = "reference-connect-success"
HookEventReferenceConnectFail = "reference-connect-fail"

HookEventBeforeProviderConnect = "before-service-listen"
HookEventProviderConnectSuccess = "service-listen-success"
HookEventProviderConnectFail = "service-listen-fail"
)
5 changes: 5 additions & 0 deletions common/extension/config_post_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ func GetConfigPostProcessor(name string) interfaces.ConfigPostProcessor {
return processors[name]
}

// RemoveConfigPostProcessor remove process from processors.
func RemoveConfigPostProcessor(name string) {
delete(processors, name)
}

// GetConfigPostProcessors returns all registered instances of ConfigPostProcessor.
func GetConfigPostProcessors() []interfaces.ConfigPostProcessor {
ret := make([]interfaces.ConfigPostProcessor, 0, len(processors))
Expand Down
15 changes: 13 additions & 2 deletions config/config_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func loadConsumerConfig() {
for {
checkok := true
for _, refconfig := range consumerConfig.References {
referenceURL := refconfig.getValidURL()
if (refconfig.Check != nil && *refconfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check != nil && *consumerConfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check == nil) { // default to true
Expand All @@ -179,20 +180,24 @@ func loadConsumerConfig() {
if count > maxWait {
errMsg := fmt.Sprintf("Failed to check the status of the service %v. No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
logger.Error(errMsg)
refconfig.postProcessConfig(referenceURL, constant.HookEventReferenceConnectFail, &errMsg)
panic(errMsg)
}
time.Sleep(time.Second * 1)
break
}
if refconfig.invoker == nil {
logger.Warnf("The interface %s invoker not exist, may you should check your interface config.", refconfig.InterfaceName)
continue
}
}
refconfig.postProcessConfig(referenceURL, constant.HookEventReferenceConnectSuccess, nil)
}
if checkok {
break
}
}
postAllConsumersConnectComplete()
}

func loadProviderConfig() {
Expand Down Expand Up @@ -247,11 +252,17 @@ func loadProviderConfig() {
svs.id = key
svs.Implement(rpcService)
svs.Protocols = providerConfig.Protocols
if err := svs.Export(); err != nil {
panic(fmt.Sprintf("service %s export failed! err: %#v", key, err))
err := svs.Export()
serviceURL := svs.getValidURL()
if err != nil {
errMsg := fmt.Sprintf("service %s export failed! err: %#v", key, err)
svs.postProcessConfig(serviceURL, constant.HookEventProviderConnectFail, &errMsg)
panic(errMsg)
}
svs.postProcessConfig(serviceURL, constant.HookEventProviderConnectSuccess, nil)
}
registerServiceInstance()
postAllProvidersConnectComplete()
}

// registerServiceInstance register service instance
Expand Down
83 changes: 83 additions & 0 deletions config/config_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,89 @@ func TestLoad(t *testing.T) {
providerConfig = nil
}

type CustomEvent struct {
t *testing.T
}

// implements interfaces.ConfigPostProcessor's functions
func (c CustomEvent) PostProcessReferenceConfig(u *common.URL, event string, errMsg *string) {
logger.Debug("PostProcessReferenceConfig Start")
logger.Debug("Event: ", event)
logger.Debug("Url: ", u)
if errMsg != nil {
logger.Debug("Error Message: ", *errMsg)
}
logger.Debug("PostProcessReferenceConfig End")
assert.Equal(c.t, u.GetParam(constant.SIDE_KEY, ""), "consumer")
}
func (c CustomEvent) PostProcessServiceConfig(u *common.URL, event string, errMsg *string) {
logger.Debug("PostProcessServiceConfig Start")
logger.Debug("Event: ", event)
logger.Debug("Url: ", u)
if errMsg != nil {
logger.Debug("Error Message: ", *errMsg)
}
logger.Debug("PostProcessServiceConfig End")
assert.Equal(c.t, u.GetParam(constant.SIDE_KEY, ""), "provider")
}
func (c CustomEvent) AllReferencesConnectComplete() {
logger.Debug("AllConsumersConnectComplete")
}
func (c CustomEvent) AllServicesListenComplete() {
logger.Debug("AllServicesListenComplete")
}
func (c CustomEvent) BeforeShutdown() {
logger.Debug("BeforeShutdown")
}

func TestLoadWithEventDispatch(t *testing.T) {
doInitConsumer()
doInitProvider()
for _, v := range providerConfig.Services {
v.export = true
}

ms := &MockService{}
SetConsumerService(ms)
SetProviderService(ms)

extension.SetProtocol("registry", GetProtocol)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory)
GetApplicationConfig().MetadataType = "mock"
var mm *mockMetadataService
extension.SetMetadataService("mock", func() (metadataService service.MetadataService, err error) {
if mm == nil {
mm = &mockMetadataService{
exportedServiceURLs: new(sync.Map),
lock: new(sync.RWMutex),
}
}
return mm, nil
})

configPostProcessorName := "TestLoadWithEventDispatch"
extension.SetConfigPostProcessor(configPostProcessorName, CustomEvent{t})

Load()

assert.Equal(t, ms, GetRPCService(ms.Reference()))
ms2 := &struct {
MockService
}{}
RPCService(ms2)
assert.NotEqual(t, ms2, GetRPCService(ms2.Reference()))

conServices = map[string]common.RPCService{}
proServices = map[string]common.RPCService{}
err := common.ServiceMap.UnRegister("com.MockService", "mock",
common.ServiceKey("com.MockService", "huadong_idc", "1.0.0"))
assert.Nil(t, err)
extension.RemoveConfigPostProcessor(configPostProcessorName)
consumerConfig = nil
providerConfig = nil
}

func TestLoadWithSingleReg(t *testing.T) {
doInitConsumerWithSingleRegistry()
mockInitProviderWithSingleRegistry()
Expand Down
8 changes: 8 additions & 0 deletions config/graceful_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func GracefulShutdownInit() {
// gracefulShutdownOnce.Do(func() {
time.AfterFunc(totalTimeout(), func() {
logger.Warn("Shutdown gracefully timeout, application will shutdown immediately. ")
postBeforeShutdown()
os.Exit(0)
})
BeforeShutdown()
Expand All @@ -76,6 +77,7 @@ func GracefulShutdownInit() {
debug.WriteHeapDump(os.Stdout.Fd())
}
}
postBeforeShutdown()
os.Exit(0)
}
}()
Expand Down Expand Up @@ -223,3 +225,9 @@ func getConsumerProtocols() *gxset.HashSet {
}
return result
}

func postBeforeShutdown() {
for _, p := range extension.GetConfigPostProcessors() {
p.BeforeShutdown()
}
}
15 changes: 13 additions & 2 deletions config/interfaces/config_post_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,19 @@ import (
// ServiceConfig during deployment time.
type ConfigPostProcessor interface {
// PostProcessReferenceConfig customizes ReferenceConfig's params.
PostProcessReferenceConfig(*common.URL)
// PostProcessReferenceConfig emit on refer reference (event: before-reference-connect, reference-connect-success, reference-connect-fail)
PostProcessReferenceConfig(url *common.URL, event string, errMsg *string)

// PostProcessServiceConfig customizes ServiceConfig's params.
PostProcessServiceConfig(*common.URL)
// PostProcessServiceConfig emit on export service (event: before-service-listen, service-listen-success, service-listen-fail)
PostProcessServiceConfig(url *common.URL, event string, errMsg *string)

// AllReferencesConnectComplete emit on all references export complete
AllReferencesConnectComplete()

// AllServicesListenComplete emit on all services export complete
AllServicesListenComplete()

// BeforeShutdown emit on before shutdown
BeforeShutdown()
}
24 changes: 21 additions & 3 deletions config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (c *ReferenceConfig) Refer(_ interface{}) {
if c.ForceTag {
cfgURL.AddParam(constant.ForceUseTag, "true")
}
c.postProcessConfig(cfgURL)
c.postProcessConfig(cfgURL, constant.HookEventBeforeReferenceConnect, nil)
if c.URL != "" {
// 1. user specified URL, could be peer-to-peer address, or register center's address.
urlStrings := gxstrings.RegSplit(c.URL, "\\s*[;]+\\s*")
Expand Down Expand Up @@ -302,8 +302,26 @@ func publishConsumerDefinition(url *common.URL) {
}

// postProcessConfig asks registered ConfigPostProcessor to post-process the current ReferenceConfig.
func (c *ReferenceConfig) postProcessConfig(url *common.URL) {
func (c *ReferenceConfig) postProcessConfig(url *common.URL, event string, errMsg *string) {
for _, p := range extension.GetConfigPostProcessors() {
p.PostProcessReferenceConfig(url)
p.PostProcessReferenceConfig(url, event, errMsg)
}
}

func (c *ReferenceConfig) getValidURL() *common.URL {
urls := c.urls
var u *common.URL
if urls != nil && len(urls) > 0 {
u = urls[0]
}
if u != nil && u.SubURL != nil {
return u.SubURL
}
return u
}

func postAllConsumersConnectComplete() {
for _, p := range extension.GetConfigPostProcessors() {
p.AllReferencesConnectComplete()
}
}
24 changes: 21 additions & 3 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (c *ServiceConfig) Export() error {
}

// post process the URL to be exported
c.postProcessConfig(ivkURL)
c.postProcessConfig(ivkURL, constant.HookEventBeforeProviderConnect, nil)
// config post processor may set "export" to false
if !ivkURL.GetParamBool(constant.EXPORT_KEY, true) {
return nil
Expand Down Expand Up @@ -355,8 +355,26 @@ func publishServiceDefinition(url *common.URL) {
}

// postProcessConfig asks registered ConfigPostProcessor to post-process the current ServiceConfig.
func (c *ServiceConfig) postProcessConfig(url *common.URL) {
func (c *ServiceConfig) postProcessConfig(url *common.URL, event string, errMsg *string) {
for _, p := range extension.GetConfigPostProcessors() {
p.PostProcessServiceConfig(url)
p.PostProcessServiceConfig(url, event, errMsg)
}
}

func (c *ServiceConfig) getValidURL() *common.URL {
urls := c.GetExportedUrls()
var u *common.URL
if urls != nil && len(urls) > 0 {
u = urls[0]
}
if u != nil && u.SubURL != nil {
return u.SubURL
}
return u
}

func postAllProvidersConnectComplete() {
for _, p := range extension.GetConfigPostProcessors() {
p.AllServicesListenComplete()
}
}

0 comments on commit fa59c63

Please sign in to comment.