Skip to content

Commit

Permalink
Merge pull request apache#302 from LaurenceLiZhixin/ftr/triple
Browse files Browse the repository at this point in the history
Ftr/triple
  • Loading branch information
AlexStocks committed Nov 24, 2021
2 parents c0efcee + d448f2b commit 34d8029
Show file tree
Hide file tree
Showing 53 changed files with 1,060 additions and 111 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ samples/dubbogo/simple/server/app/app
/logs
/.idea
/.vscode

log
cache

.DS_Store
vendor/
Expand Down
13 changes: 7 additions & 6 deletions cmd/pixiu/pixiu.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ import (
"time"
)

import (
_ "github.com/apache/dubbo-go/common/proxy/proxy_factory"
_ "github.com/apache/dubbo-go/metadata/service/inmemory"

"github.com/spf13/cobra"
)

import (
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/config"
Expand All @@ -33,12 +40,6 @@ import (
_ "github.com/apache/dubbo-go-pixiu/pkg/pluginregistry"
)

import (
_ "github.com/apache/dubbo-go/common/proxy/proxy_factory"
_ "github.com/apache/dubbo-go/metadata/service/inmemory"
"github.com/spf13/cobra"
)

var (
// Version pixiu version
Version = "0.3.0"
Expand Down
3 changes: 3 additions & 0 deletions cmd/pixiu/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package main

import (
"fmt"
)

import (
"github.com/spf13/cobra"
)

Expand Down
22 changes: 12 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,25 @@ module github.com/apache/dubbo-go-pixiu
go 1.15

require (
dubbo.apache.org/dubbo-go/v3 v3.0.0-rc4-1
github.com/alibaba/sentinel-golang v1.0.2
github.com/apache/dubbo-go v1.5.7
github.com/apache/dubbo-go-hessian2 v1.9.5
github.com/creasty/defaults v1.5.2
github.com/dubbogo/dubbo-go-pixiu-filter v0.1.4
github.com/dubbogo/go-zookeeper v1.0.3
github.com/dubbogo/gost v1.11.14
github.com/dubbogo/gost v1.11.19
github.com/emirpasic/gods v1.12.0
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/gin-gonic/gin v1.7.4
github.com/go-resty/resty/v2 v2.3.0
github.com/go-resty/resty/v2 v2.7.0
github.com/gogo/protobuf v1.3.2
github.com/goinggo/mapstructure v0.0.0-20140717182941-194205d9b4a9
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.2.0 // indirect
github.com/jhump/protoreflect v1.9.0
github.com/mitchellh/mapstructure v1.4.1
github.com/nacos-group/nacos-sdk-go v1.0.8
github.com/mercari/grpc-http-proxy v0.1.2
github.com/mitchellh/mapstructure v1.4.2
github.com/nacos-group/nacos-sdk-go v1.0.9
github.com/opentrx/seata-golang/v2 v2.0.5
github.com/pkg/errors v0.9.1
github.com/prometheus/common v0.29.0 // indirect
Expand All @@ -29,7 +30,7 @@ require (
github.com/spf13/cobra v1.1.1
github.com/stretchr/testify v1.7.0
github.com/tklauser/go-sysconf v0.3.5 // indirect
go.etcd.io/etcd/api/v3 v3.5.0-alpha.0
go.etcd.io/etcd/api/v3 v3.5.1
go.opentelemetry.io/otel v1.0.0-RC2
go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC2
go.opentelemetry.io/otel/exporters/prometheus v0.21.0
Expand All @@ -38,14 +39,15 @@ require (
go.opentelemetry.io/otel/sdk/export/metric v0.21.0
go.opentelemetry.io/otel/sdk/metric v0.21.0
go.opentelemetry.io/otel/trace v1.0.0-RC2
go.uber.org/zap v1.17.0
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
google.golang.org/grpc v1.40.0
go.uber.org/zap v1.19.1
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97
google.golang.org/grpc v1.42.0
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v2 v2.4.0
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d // indirect
)

replace github.com/envoyproxy/go-control-plane => github.com/envoyproxy/go-control-plane v0.8.0

replace github.com/go-co-op/gocron => github.com/go-co-op/gocron v0.1.1
230 changes: 187 additions & 43 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/adapter/dubboregistry/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ import (

type RegistryEventListener interface {
OnAddAPI(r router.API) error
OnRemoveAPI(r router.API) error
OnDeleteRouter(r config.Resource) error
}
5 changes: 4 additions & 1 deletion pkg/adapter/dubboregistry/registry/base/baseregistry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ import (
)

import (
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
"github.com/dubbogo/dubbo-go-pixiu-filter/pkg/router"
)

import (
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
)

var _ registry.Registry = new(mockRegFacade)

type mockRegFacade struct {
Expand Down
201 changes: 201 additions & 0 deletions pkg/adapter/dubboregistry/registry/nacos/interface_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nacos

import (
"fmt"
"strings"
"sync"
"time"
)

import (
dubboCommon "dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
dubboConfig "dubbo.apache.org/dubbo-go/v3/config"
dubboRegistry "dubbo.apache.org/dubbo-go/v3/registry"

"github.com/apache/dubbo-go/common"

"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/vo"
)

import (
common2 "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common"
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/remoting/zookeeper"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
)

const (
MaxFailTimes = 2
ConnDelay = 3 * time.Second
)

var _ registry.Listener = new(nacosIntfListener)

type nacosIntfListener struct {
exit chan struct{}
client naming_client.INamingClient
reg *NacosRegistry
wg sync.WaitGroup
dubbogoNacosRegistry dubboRegistry.Registry
addr string
adapterListener common2.RegistryEventListener
}

// newNacosIntfListener returns a new nacosIntfListener with pre-defined path according to the registered type.
func newNacosIntfListener(client naming_client.INamingClient, addr string, reg *NacosRegistry, adapterListener common2.RegistryEventListener) registry.Listener {
return &nacosIntfListener{
exit: make(chan struct{}),
client: client,
reg: reg,
addr: addr,
adapterListener: adapterListener,
}
}

func (z *nacosIntfListener) Close() {
close(z.exit)
z.wg.Wait()
}

func (z *nacosIntfListener) WatchAndHandle() {
var err error
z.dubbogoNacosRegistry, err = dubboConfig.NewRegistryConfigBuilder().
SetProtocol("nacos").
SetAddress(z.addr).
Build().GetInstance(common.CONSUMER)
if err != nil {
logger.Errorf("create nacos registry with address = %s error = %s", z.addr, err)
return
}
z.wg.Add(1)
go z.watch()
}

func (z *nacosIntfListener) watch() {
defer z.wg.Done()
var (
failTimes int64 = 0
delayTimer = time.NewTimer(ConnDelay * time.Duration(failTimes))
)
defer delayTimer.Stop()
for {
serviceList, err := z.client.GetAllServicesInfo(vo.GetAllServiceInfoParam{
PageSize: 100,
})
// error handling
if err != nil {
failTimes++
logger.Infof("watching nacos interface with error{%v}", err)
// Exit the watch if root node is in error
if err == zookeeper.ErrNilNode {
logger.Errorf("watching nacos services got errNilNode,so exit listen")
return
}
if failTimes > MaxFailTimes {
logger.Errorf("Error happens on nacos exceed max fail times: %s,so exit listen", MaxFailTimes)
return
}
delayTimer.Reset(ConnDelay * time.Duration(failTimes))
<-delayTimer.C
continue
}
failTimes = 0
if err := z.updateServiceList(serviceList.Doms); err != nil {
logger.Errorf("update service list failed %s", err)
}
time.Sleep(time.Second * 3)
}
}

type serviceInfo struct {
interfaceName string
version string
group string
}

func (s *serviceInfo) String() string {
return fmt.Sprintf("%s:%s:%s", s.interfaceName, s.version, s.group)
}

func fromServiceFullKey(fullKey string) *serviceInfo {
serviceInfoStrs := strings.Split(fullKey, ":")
if len(serviceInfoStrs) != 4 {
return nil
}
return &serviceInfo{
interfaceName: serviceInfoStrs[1],
version: serviceInfoStrs[2],
group: serviceInfoStrs[3],
}
}

func (z *nacosIntfListener) updateServiceList(serviceList []string) error {
// todo lock all svc listener

allSvcListener := z.reg.GetAllSvcListener()
subscribedServiceKeysMap := make(map[string]bool)
for k := range allSvcListener {
subscribedServiceKeysMap[k] = true
}
serviceNeedUpdate := make([]*serviceInfo, 0)
for _, v := range serviceList {
svcInfo := fromServiceFullKey(v)
if svcInfo == nil {
// invalid nacos dubbo service key
continue
}
key := svcInfo.String()
if _, ok := allSvcListener[key]; !ok {
serviceNeedUpdate = append(serviceNeedUpdate, svcInfo)
} else {
delete(subscribedServiceKeysMap, key)
}
}
if len(serviceNeedUpdate) == 0 && len(subscribedServiceKeysMap) == 0 {
// there is no needs to update
return nil
}
// subscribedServiceKeysMap is services needs to be removed
// serviceNeedUpdate is services needs to be subscribed

for _, v := range serviceNeedUpdate {
url, _ := dubboCommon.NewURL("mock://localhost:8848")
url.SetParam(constant.InterfaceKey, v.interfaceName)
url.SetParam(constant.GroupKey, v.group)
url.SetParam(constant.VersionKey, v.version)
l := newNacosSrvListener(url, z.client, z.adapterListener)
l.wg.Add(1)
go func(v *serviceInfo) {
defer l.wg.Done()
z.reg.SetSvcListener(l.url.ServiceKey(), l)
if err := z.dubbogoNacosRegistry.Subscribe(url, l); err != nil {
logger.Errorf("subscribe listener with interfaceKey = %s, error = %s", l, err)
z.reg.RemoveSvcListener(l.url.ServiceKey())
}
}(v)
}
// todo deal with subscribedServiceKeysMap services to be removed
for k := range subscribedServiceKeysMap {
z.reg.RemoveSvcListener(k)
}
return nil
}
Loading

0 comments on commit 34d8029

Please sign in to comment.