Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ftr/triple #302

Merged
merged 17 commits into from
Nov 24, 2021
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ samples/dubbogo/simple/server/app/app
/logs
/.idea
/.vscode

log
cache

.DS_Store
vendor/
Expand Down
13 changes: 7 additions & 6 deletions cmd/pixiu/pixiu.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ import (
"time"
)

import (
_ "github.com/apache/dubbo-go/common/proxy/proxy_factory"
_ "github.com/apache/dubbo-go/metadata/service/inmemory"

"github.com/spf13/cobra"
)

import (
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/config"
Expand All @@ -33,12 +40,6 @@ import (
_ "github.com/apache/dubbo-go-pixiu/pkg/pluginregistry"
)

import (
_ "github.com/apache/dubbo-go/common/proxy/proxy_factory"
_ "github.com/apache/dubbo-go/metadata/service/inmemory"
"github.com/spf13/cobra"
)

var (
// Version pixiu version
Version = "0.3.0"
Expand Down
3 changes: 3 additions & 0 deletions cmd/pixiu/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package main

import (
"fmt"
)

import (
"github.com/spf13/cobra"
)

Expand Down
22 changes: 12 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,25 @@ module github.com/apache/dubbo-go-pixiu
go 1.15

require (
dubbo.apache.org/dubbo-go/v3 v3.0.0-rc4-1
github.com/alibaba/sentinel-golang v1.0.2
github.com/apache/dubbo-go v1.5.7
github.com/apache/dubbo-go-hessian2 v1.9.5
github.com/creasty/defaults v1.5.2
github.com/dubbogo/dubbo-go-pixiu-filter v0.1.4
github.com/dubbogo/go-zookeeper v1.0.3
github.com/dubbogo/gost v1.11.14
github.com/dubbogo/gost v1.11.19
github.com/emirpasic/gods v1.12.0
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/gin-gonic/gin v1.7.4
github.com/go-resty/resty/v2 v2.3.0
github.com/go-resty/resty/v2 v2.7.0
github.com/gogo/protobuf v1.3.2
github.com/goinggo/mapstructure v0.0.0-20140717182941-194205d9b4a9
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.2.0 // indirect
github.com/jhump/protoreflect v1.9.0
github.com/mitchellh/mapstructure v1.4.1
github.com/nacos-group/nacos-sdk-go v1.0.8
github.com/mercari/grpc-http-proxy v0.1.2
github.com/mitchellh/mapstructure v1.4.2
github.com/nacos-group/nacos-sdk-go v1.0.9
github.com/opentrx/seata-golang/v2 v2.0.5
github.com/pkg/errors v0.9.1
github.com/prometheus/common v0.29.0 // indirect
Expand All @@ -29,7 +30,7 @@ require (
github.com/spf13/cobra v1.1.1
github.com/stretchr/testify v1.7.0
github.com/tklauser/go-sysconf v0.3.5 // indirect
go.etcd.io/etcd/api/v3 v3.5.0-alpha.0
go.etcd.io/etcd/api/v3 v3.5.1
go.opentelemetry.io/otel v1.0.0-RC2
go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC2
go.opentelemetry.io/otel/exporters/prometheus v0.21.0
Expand All @@ -38,14 +39,15 @@ require (
go.opentelemetry.io/otel/sdk/export/metric v0.21.0
go.opentelemetry.io/otel/sdk/metric v0.21.0
go.opentelemetry.io/otel/trace v1.0.0-RC2
go.uber.org/zap v1.17.0
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
google.golang.org/grpc v1.40.0
go.uber.org/zap v1.19.1
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97
google.golang.org/grpc v1.42.0
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v2 v2.4.0
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d // indirect
)

replace github.com/envoyproxy/go-control-plane => github.com/envoyproxy/go-control-plane v0.8.0

replace github.com/go-co-op/gocron => github.com/go-co-op/gocron v0.1.1
230 changes: 187 additions & 43 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/adapter/dubboregistry/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ import (

type RegistryEventListener interface {
OnAddAPI(r router.API) error
OnRemoveAPI(r router.API) error
OnDeleteRouter(r config.Resource) error
}
5 changes: 4 additions & 1 deletion pkg/adapter/dubboregistry/registry/base/baseregistry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ import (
)

import (
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
"github.com/dubbogo/dubbo-go-pixiu-filter/pkg/router"
)

import (
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
)

var _ registry.Registry = new(mockRegFacade)

type mockRegFacade struct {
Expand Down
201 changes: 201 additions & 0 deletions pkg/adapter/dubboregistry/registry/nacos/interface_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nacos

import (
"fmt"
"strings"
"sync"
"time"
)

import (
dubboCommon "dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
dubboConfig "dubbo.apache.org/dubbo-go/v3/config"
dubboRegistry "dubbo.apache.org/dubbo-go/v3/registry"

"github.com/apache/dubbo-go/common"

"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/vo"
)

import (
common2 "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common"
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/remoting/zookeeper"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
)

const (
MaxFailTimes = 2
ConnDelay = 3 * time.Second
)

var _ registry.Listener = new(nacosIntfListener)

type nacosIntfListener struct {
exit chan struct{}
client naming_client.INamingClient
reg *NacosRegistry
wg sync.WaitGroup
dubbogoNacosRegistry dubboRegistry.Registry
addr string
adapterListener common2.RegistryEventListener
}

// newNacosIntfListener returns a new nacosIntfListener with pre-defined path according to the registered type.
func newNacosIntfListener(client naming_client.INamingClient, addr string, reg *NacosRegistry, adapterListener common2.RegistryEventListener) registry.Listener {
return &nacosIntfListener{
exit: make(chan struct{}),
client: client,
reg: reg,
addr: addr,
adapterListener: adapterListener,
}
}

func (z *nacosIntfListener) Close() {
close(z.exit)
z.wg.Wait()
}

func (z *nacosIntfListener) WatchAndHandle() {
var err error
z.dubbogoNacosRegistry, err = dubboConfig.NewRegistryConfigBuilder().
SetProtocol("nacos").
SetAddress(z.addr).
Build().GetInstance(common.CONSUMER)
if err != nil {
logger.Errorf("create nacos registry with address = %s error = %s", z.addr, err)
return
}
z.wg.Add(1)
go z.watch()
}

func (z *nacosIntfListener) watch() {
defer z.wg.Done()
var (
failTimes int64 = 0
delayTimer = time.NewTimer(ConnDelay * time.Duration(failTimes))
)
defer delayTimer.Stop()
for {
serviceList, err := z.client.GetAllServicesInfo(vo.GetAllServiceInfoParam{
PageSize: 100,
})
// error handling
if err != nil {
failTimes++
logger.Infof("watching nacos interface with error{%v}", err)
// Exit the watch if root node is in error
if err == zookeeper.ErrNilNode {
logger.Errorf("watching nacos services got errNilNode,so exit listen")
return
}
if failTimes > MaxFailTimes {
logger.Errorf("Error happens on nacos exceed max fail times: %s,so exit listen", MaxFailTimes)
return
}
delayTimer.Reset(ConnDelay * time.Duration(failTimes))
<-delayTimer.C
continue
}
failTimes = 0
if err := z.updateServiceList(serviceList.Doms); err != nil {
logger.Errorf("update service list failed %s", err)
}
time.Sleep(time.Second * 3)
}
}

type serviceInfo struct {
interfaceName string
version string
group string
}

func (s *serviceInfo) String() string {
return fmt.Sprintf("%s:%s:%s", s.interfaceName, s.version, s.group)
}

func fromServiceFullKey(fullKey string) *serviceInfo {
serviceInfoStrs := strings.Split(fullKey, ":")
if len(serviceInfoStrs) != 4 {
return nil
}
return &serviceInfo{
interfaceName: serviceInfoStrs[1],
version: serviceInfoStrs[2],
group: serviceInfoStrs[3],
}
}

func (z *nacosIntfListener) updateServiceList(serviceList []string) error {
// todo lock all svc listener

allSvcListener := z.reg.GetAllSvcListener()
subscribedServiceKeysMap := make(map[string]bool)
for k := range allSvcListener {
subscribedServiceKeysMap[k] = true
}
serviceNeedUpdate := make([]*serviceInfo, 0)
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
for _, v := range serviceList {
svcInfo := fromServiceFullKey(v)
if svcInfo == nil {
// invalid nacos dubbo service key
continue
}
key := svcInfo.String()
if _, ok := allSvcListener[key]; !ok {
serviceNeedUpdate = append(serviceNeedUpdate, svcInfo)
} else {
delete(subscribedServiceKeysMap, key)
}
}
if len(serviceNeedUpdate) == 0 && len(subscribedServiceKeysMap) == 0 {
// there is no needs to update
return nil
}
// subscribedServiceKeysMap is services needs to be removed
// serviceNeedUpdate is services needs to be subscribed

for _, v := range serviceNeedUpdate {
url, _ := dubboCommon.NewURL("mock://localhost:8848")
url.SetParam(constant.InterfaceKey, v.interfaceName)
url.SetParam(constant.GroupKey, v.group)
url.SetParam(constant.VersionKey, v.version)
l := newNacosSrvListener(url, z.client, z.adapterListener)
l.wg.Add(1)
go func(v *serviceInfo) {
defer l.wg.Done()
z.reg.SetSvcListener(l.url.ServiceKey(), l)
if err := z.dubbogoNacosRegistry.Subscribe(url, l); err != nil {
logger.Errorf("subscribe listener with interfaceKey = %s, error = %s", l, err)
z.reg.RemoveSvcListener(l.url.ServiceKey())
}
}(v)
}
// todo deal with subscribedServiceKeysMap services to be removed
for k := range subscribedServiceKeysMap {
z.reg.RemoveSvcListener(k)
}
return nil
}
Loading