Skip to content

Commit

Permalink
fix: rename to host addr
Browse files Browse the repository at this point in the history
  • Loading branch information
LaurenceLiZhixin committed Apr 5, 2022
1 parent e41ea18 commit 53a86dd
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 56 deletions.
2 changes: 1 addition & 1 deletion registry/xds/registry.go
Expand Up @@ -163,7 +163,7 @@ func newXDSRegistry(url *common.URL) (registry.Registry, error) {
constant.PodNameEnvKey, constant.PodNamespaceEnvKey)
}

wrappedXDSClient, err := xds.NewXDSWrappedClient(pn, ns, localIP, common2.NewAddr(url.Ip+":"+url.Port))
wrappedXDSClient, err := xds.NewXDSWrappedClient(pn, ns, localIP, common2.NewHostNameOrIPAddr(url.Ip+":"+url.Port))
if err != nil {
return nil, err
}
Expand Down
22 changes: 11 additions & 11 deletions remoting/xds/client.go
Expand Up @@ -62,14 +62,14 @@ type WrappedClientImpl struct {
/*
hostAddr is local pod's cluster and hostAddr, like dubbo-go-app.default.svc.cluster.local:20000
*/
hostAddr xdsCommon.Addr
hostAddr xdsCommon.HostAddr

/*
istiod info
istiodAddr is istio $(istioSeviceFullName):$(xds-grpc-port) like istiod.istio-system.svc.cluster.local:15010
istiodPodIP is to call istiod unexposed debug port 8080
*/
istiodAddr xdsCommon.Addr
istiodAddr xdsCommon.HostAddr
istiodPodIP string

/*
Expand Down Expand Up @@ -131,7 +131,7 @@ func GetXDSWrappedClient() *WrappedClientImpl {
}

// NewXDSWrappedClient create or get singleton xdsWrappedClient
func NewXDSWrappedClient(podName, namespace, localIP string, istioAddr xdsCommon.Addr) (XDSWrapperClient, error) {
func NewXDSWrappedClient(podName, namespace, localIP string, istioAddr xdsCommon.HostAddr) (XDSWrapperClient, error) {
// todo @(laurence) safety problem? what if to concurrent 'new' both create new client?
if xdsWrappedClient != nil {
return xdsWrappedClient, nil
Expand Down Expand Up @@ -171,7 +171,7 @@ func NewXDSWrappedClient(podName, namespace, localIP string, istioAddr xdsCommon
newClient.interfaceMapHandler = mapping.NewInterfaceMapHandlerImpl(
newClient.xdsClient,
defaultIstiodTokenPath,
xdsCommon.NewAddr(newClient.istiodPodIP+":"+defaultIstiodDebugPort),
xdsCommon.NewHostNameOrIPAddr(newClient.istiodPodIP+":"+defaultIstiodDebugPort),
newClient.hostAddr)

xdsWrappedClient = newClient
Expand Down Expand Up @@ -202,7 +202,7 @@ func (w *WrappedClientImpl) GetRouterConfig(hostAddr string) resource.RouteConfi
}

func (w *WrappedClientImpl) GetClusterUpdateIgnoreVersion(hostAddr string) resource.ClusterUpdate {
addr := xdsCommon.NewAddr(hostAddr)
addr := xdsCommon.NewHostNameOrIPAddr(hostAddr)
w.cdsMapLock.RLock()
defer w.cdsMapLock.Unlock()
for clusterName, v := range w.cdsMap {
Expand Down Expand Up @@ -234,7 +234,7 @@ func (w *WrappedClientImpl) UnSubscribe(svcUniqueName string) {
w.subscribeStopChMap.Delete(svcUniqueName)
}

func (w *WrappedClientImpl) GetHostAddress() xdsCommon.Addr {
func (w *WrappedClientImpl) GetHostAddress() xdsCommon.HostAddr {
return w.hostAddr
}

Expand All @@ -252,7 +252,7 @@ func (w *WrappedClientImpl) registerHostLevelSubscription(hostAddr, interfaceNam
w.hostAddrListenerMapLock.Unlock()
return
}
// host Addr key must not exist in map, create one
// host HostAddr key must not exist in map, create one
w.hostAddrListenerMap[hostAddr] = make(map[string]registry.NotifyListener)

w.hostAddrClusterCtxMapLock.Lock()
Expand Down Expand Up @@ -412,7 +412,7 @@ func (w *WrappedClientImpl) startWatchingAllClusterAndLoadLocalHostAddrAndIstioP
}
for _, v := range endpoint.Localities {
for _, e := range v.Endpoints {
w.istiodPodIP = xdsCommon.NewAddr(e.Address).HostnameOrIP
w.istiodPodIP = xdsCommon.NewHostNameOrIPAddr(e.Address).HostnameOrIP
foundIstiod = true
close(foundLocalStopCh)
}
Expand All @@ -428,7 +428,7 @@ func (w *WrappedClientImpl) startWatchingAllClusterAndLoadLocalHostAddrAndIstioP
}
for _, v := range endpoint.Localities {
for _, e := range v.Endpoints {
if xdsCommon.NewAddr(e.Address).HostnameOrIP == w.localIP {
if xdsCommon.NewHostNameOrIPAddr(e.Address).HostnameOrIP == w.localIP {
cluster := xdsCommon.NewCluster(update.ClusterName)
w.hostAddr = cluster.Addr
foundLocal = true
Expand Down Expand Up @@ -485,7 +485,7 @@ func (w *WrappedClientImpl) runWatchingCdsUpdateEvent() {
// 'outbound|20000||dubbo-go-app.default.svc.cluster.local',
// 'outbound|20000|v2|dubbo-go-app.default.svc.cluster.local']
func (w *WrappedClientImpl) getAllVersionClusterName(hostAddr string) []string {
addr := xdsCommon.NewAddr(hostAddr)
addr := xdsCommon.NewHostNameOrIPAddr(hostAddr)
allVersionClusterNames := make([]string, 0)
w.cdsMapLock.RLock()
defer w.cdsMapLock.RUnlock()
Expand All @@ -505,6 +505,6 @@ type XDSWrapperClient interface {
GetHostAddrByServiceUniqueKey(serviceUniqueKey string) (string, error)
ChangeInterfaceMap(serviceUniqueKey string, add bool) error
GetClusterUpdateIgnoreVersion(hostAddr string) resource.ClusterUpdate
GetHostAddress() xdsCommon.Addr
GetHostAddress() xdsCommon.HostAddr
GetIstioPodIP() string
}
28 changes: 14 additions & 14 deletions remoting/xds/client_test.go
Expand Up @@ -165,10 +165,10 @@ func testWithDiscoverySuccess(t *testing.T) {
cancelCalledCounter.Inc()
})

xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.Addr) (client.XDSClient, error) {
xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.HostAddr) (client.XDSClient, error) {
return mockXDSClient, nil
}
xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewAddr(istioHostAddrFoo))
xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewHostNameOrIPAddr(istioHostAddrFoo))
assert.Nil(t, err)
assert.NotNil(t, xdsWrappedClient)

Expand Down Expand Up @@ -246,10 +246,10 @@ func testFailedWithIstioCDS(t *testing.T) {
cancelCalledCounter.Inc()
})

xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.Addr) (client.XDSClient, error) {
xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.HostAddr) (client.XDSClient, error) {
return mockXDSClient, nil
}
xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewAddr(istioHostAddrFoo))
xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewHostNameOrIPAddr(istioHostAddrFoo))
assert.Equal(t, DiscoverIstioPodError, err)
assert.Nil(t, xdsWrappedClient)
assert.Equal(t, int32(1), cancelCalledCounter.Load())
Expand Down Expand Up @@ -321,10 +321,10 @@ func testFailedWithLocalCDS(t *testing.T) {
cancelCalledCounter.Inc()
})

xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.Addr) (client.XDSClient, error) {
xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.HostAddr) (client.XDSClient, error) {
return mockXDSClient, nil
}
xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewAddr(istioHostAddrFoo))
xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewHostNameOrIPAddr(istioHostAddrFoo))
assert.Equal(t, DiscoverLocalError, err)
assert.Nil(t, xdsWrappedClient)
assert.Equal(t, int32(1), cancelCalledCounter.Load())
Expand Down Expand Up @@ -396,10 +396,10 @@ func testFailedWithNoneCDS(t *testing.T) {
cancelCalledCounter.Inc()
})

xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.Addr) (client.XDSClient, error) {
xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.HostAddr) (client.XDSClient, error) {
return mockXDSClient, nil
}
xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewAddr(istioHostAddrFoo))
xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewHostNameOrIPAddr(istioHostAddrFoo))
assert.Equal(t, DiscoverIstioPodError, err)
assert.Nil(t, xdsWrappedClient)
assert.Equal(t, int32(0), cancelCalledCounter.Load())
Expand Down Expand Up @@ -471,10 +471,10 @@ func testFailedWithLocalEDSFailed(t *testing.T) {
cancelCalledCounter.Inc()
})

xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.Addr) (client.XDSClient, error) {
xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.HostAddr) (client.XDSClient, error) {
return mockXDSClient, nil
}
xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewAddr(istioHostAddrFoo))
xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewHostNameOrIPAddr(istioHostAddrFoo))
assert.Equal(t, DiscoverLocalError, err)
assert.Nil(t, xdsWrappedClient)
assert.Equal(t, int32(2), cancelCalledCounter.Load())
Expand Down Expand Up @@ -546,10 +546,10 @@ func testFailedWithIstioEDSFailed(t *testing.T) {
cancelCalledCounter.Inc()
})

xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.Addr) (client.XDSClient, error) {
xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.HostAddr) (client.XDSClient, error) {
return mockXDSClient, nil
}
xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewAddr(istioHostAddrFoo))
xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewHostNameOrIPAddr(istioHostAddrFoo))
assert.Equal(t, DiscoverIstioPodError, err)
assert.Nil(t, xdsWrappedClient)
assert.Equal(t, int32(2), cancelCalledCounter.Load())
Expand Down Expand Up @@ -717,12 +717,12 @@ func testSubscribe(t *testing.T) {
})).
Return(func() {})

xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.Addr) (client.XDSClient, error) {
xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.HostAddr) (client.XDSClient, error) {
return mockXDSClient, nil
}

xdsWrappedClient = nil
xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewAddr(istioHostAddrFoo))
xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewHostNameOrIPAddr(istioHostAddrFoo))
assert.Nil(t, err)
assert.NotNil(t, xdsWrappedClient)

Expand Down
43 changes: 43 additions & 0 deletions remoting/xds/common/addr.go
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package common

import (
"net"
)

type HostAddr struct {
HostnameOrIP string
Port string
}

func NewHostNameOrIPAddr(addr string) HostAddr {
host, port, _ := net.SplitHostPort(addr)
return HostAddr{
HostnameOrIP: host,
Port: port,
}
}

func (a *HostAddr) Network() string {
return "tcp"
}

func (a *HostAddr) String() string {
return net.JoinHostPort(a.HostnameOrIP, a.Port)
}
26 changes: 3 additions & 23 deletions remoting/xds/common/model.go → remoting/xds/common/cluster.go
Expand Up @@ -17,39 +17,19 @@

package common

import (
"net"
"strings"
)

type Addr struct {
HostnameOrIP string
Port string
}

func NewAddr(addr string) Addr {
host, port, _ := net.SplitHostPort(addr)
return Addr{
HostnameOrIP: host,
Port: port,
}
}

func (a *Addr) String() string {
return net.JoinHostPort(a.HostnameOrIP, a.Port)
}
import "strings"

type Cluster struct {
Bound string
Addr Addr
Addr HostAddr
Subset string
}

func NewCluster(clusterID string) Cluster {
clusterIDs := strings.Split(clusterID, "|")
return Cluster{
Bound: clusterIDs[0],
Addr: Addr{
Addr: HostAddr{
Port: clusterIDs[1],
HostnameOrIP: clusterIDs[3],
},
Expand Down
6 changes: 3 additions & 3 deletions remoting/xds/mapping/handler.go
Expand Up @@ -42,9 +42,9 @@ const (
)

type InterfaceMapHandlerImpl struct {
hostAddr common.Addr
hostAddr common.HostAddr

istioDebugAddr common.Addr
istioDebugAddr common.HostAddr

xdsClient client.XDSClient

Expand Down Expand Up @@ -138,7 +138,7 @@ func (i *InterfaceMapHandlerImpl) interfaceAppNameMap2DubboGoMetadata() *structp
return GetDubboGoMetadata(string(data))
}

func NewInterfaceMapHandlerImpl(xdsClient client.XDSClient, istioTokenPath string, istioDebugAddr, hostAddr common.Addr) InterfaceMapHandler {
func NewInterfaceMapHandlerImpl(xdsClient client.XDSClient, istioTokenPath string, istioDebugAddr, hostAddr common.HostAddr) InterfaceMapHandler {
return &InterfaceMapHandlerImpl{
xdsClient: xdsClient,
interfaceAppNameMap: map[string]string{},
Expand Down
6 changes: 3 additions & 3 deletions remoting/xds/mapping/handler_test.go
Expand Up @@ -53,15 +53,15 @@ const (

func TestNewInterfaceMapHandler(t *testing.T) {
mockXDSClient := &mocks.XDSClient{}
interfaceMapHandler := NewInterfaceMapHandlerImpl(mockXDSClient, istioTokenPathFoo, common.NewAddr(istiodDebugAddrStrFoo), common.NewAddr(localPodServiceAddr))
interfaceMapHandler := NewInterfaceMapHandlerImpl(mockXDSClient, istioTokenPathFoo, common.NewHostNameOrIPAddr(istiodDebugAddrStrFoo), common.NewHostNameOrIPAddr(localPodServiceAddr))
assert.NotNil(t, interfaceMapHandler)
}

func TestInterfaceMapHandlerRegisterAndUnregister(t *testing.T) {
mockXDSClient := &mocks.XDSClient{}
mockXDSClient.On("SetMetadata", mock.AnythingOfType("*structpb.Struct")).Return(nil)

interfaceMapHandler := NewInterfaceMapHandlerImpl(mockXDSClient, istioTokenPathFoo, common.NewAddr(istiodDebugAddrStrFoo), common.NewAddr(localPodServiceAddr))
interfaceMapHandler := NewInterfaceMapHandlerImpl(mockXDSClient, istioTokenPathFoo, common.NewHostNameOrIPAddr(istiodDebugAddrStrFoo), common.NewHostNameOrIPAddr(localPodServiceAddr))

assert.Nil(t, interfaceMapHandler.Register(serviceKey1))
assert.Nil(t, interfaceMapHandler.Register(serviceKey2))
Expand All @@ -76,7 +76,7 @@ func TestInterfaceMapHandlerRegisterAndUnregister(t *testing.T) {

func TestGetServiceUniqueKeyHostAddrMapFromPilot(t *testing.T) {
mockXDSClient := &mocks.XDSClient{}
interfaceMapHandler := NewInterfaceMapHandlerImpl(mockXDSClient, istioTokenPathFoo, common.NewAddr(istiodDebugAddrStrFoo), common.NewAddr(localPodServiceAddr))
interfaceMapHandler := NewInterfaceMapHandlerImpl(mockXDSClient, istioTokenPathFoo, common.NewHostNameOrIPAddr(istiodDebugAddrStrFoo), common.NewHostNameOrIPAddr(localPodServiceAddr))
assert.Nil(t, generateMockToken())

// 1. start mock http server
Expand Down
2 changes: 1 addition & 1 deletion remoting/xds/xds_client_factory.go
Expand Up @@ -34,7 +34,7 @@ import (

// xdsClientFactoryFunction generates new xds client
// when running ut, it's for for ut to replace
var xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr xdsCommon.Addr) (client.XDSClient, error) {
var xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr xdsCommon.HostAddr) (client.XDSClient, error) {
// todo fix these ugly magic num
v3NodeProto := &v3corepb.Node{
Id: "sidecar~" + localIP + "~" + podName + "." + namespace + "~" + namespace + ".svc.cluster.local",
Expand Down

0 comments on commit 53a86dd

Please sign in to comment.