Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kube registry and examples #271

Merged
merged 2 commits into from Jul 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions chassis.go
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/ServiceComb/go-chassis/core/registry"
// file package for file based registration
_ "github.com/ServiceComb/go-chassis/core/registry/file"
// kube package handles kubernetes service discovery
_ "github.com/ServiceComb/go-chassis/core/registry/kube"
// servicecenter package handles service center api calls
_ "github.com/ServiceComb/go-chassis/core/registry/servicecenter"
// pilot package handles istio pilot SDS api calls
Expand Down
42 changes: 24 additions & 18 deletions core/config/model/registry.go 100644 → 100755
@@ -1,28 +1,33 @@
package model

//ServiceStruct SC注册中心地址信息结构体
//ServiceStruct SC information
type ServiceStruct struct {
Registry RegistryStruct `yaml:"registry"`
}

//RegistryStruct SC注册中心地址信息
//RegistryStruct SC information
type RegistryStruct struct {
Disable bool `yaml:"disabled"`
Type string `yaml:"type"`
Scope string `yaml:"scope"`
AutoDiscovery bool `yaml:"autodiscovery"`
AutoIPIndex bool `yaml:"autoIPIndex"`
Address string `yaml:"address"`
RefreshInterval string `yaml:"refreshInterval"`
Watch bool `yaml:"watch"`
Tenant string `yaml:"tenant"`
AutoRegister string `yaml:"register"`
APIVersion RegistryAPIVersionStruct `yaml:"api"`
Registrator RegistratorStruct `yaml:"registrator"`
ServiceDiscovery ServiceDiscoveryStruct `yaml:"serviceDiscovery"`
ContractDiscovery ContractDiscoveryStruct `yaml:"contractDiscovery"`
HealthCheck bool `yaml:"healthCheck"`
CacheIndex bool `yaml:"cacheIndex"`
// NOTE: this part of struct would be deperacated later
// please use registrator instead
Disable bool `yaml:"disabled"`
Type string `yaml:"type"`
Scope string `yaml:"scope"`
AutoDiscovery bool `yaml:"autodiscovery"`
AutoIPIndex bool `yaml:"autoIPIndex"`
Address string `yaml:"address"`
RefreshInterval string `yaml:"refreshInterval"`
Watch bool `yaml:"watch"`
Tenant string `yaml:"tenant"`
AutoRegister string `yaml:"register"`
APIVersion RegistryAPIVersionStruct `yaml:"api"`

// Use Registrator ServiceDiscovery and ContractDiscovery
// to define information about service registry
Registrator RegistratorStruct `yaml:"registrator"`
ServiceDiscovery ServiceDiscoveryStruct `yaml:"serviceDiscovery"`
ContractDiscovery ContractDiscoveryStruct `yaml:"contractDiscovery"`
HealthCheck bool `yaml:"healthCheck"`
CacheIndex bool `yaml:"cacheIndex"`
}

//RegistratorStruct service registry config struct
Expand All @@ -47,6 +52,7 @@ type ServiceDiscoveryStruct struct {
RefreshInterval string `yaml:"refreshInterval"`
Watch bool `yaml:"watch"`
Tenant string `yaml:"tenant"`
ConfigPath string `yaml:"configPath"`
APIVersion RegistryAPIVersionStruct `yaml:"api"`
HealthCheck bool `yaml:"healthCheck"`
}
Expand Down
11 changes: 11 additions & 0 deletions core/config/sd_config.go 100644 → 100755
Expand Up @@ -65,3 +65,14 @@ func GetServiceDiscoveryHealthCheck() bool {
}
return archaius.GetBool("cse.service.registry.healthCheck", false)
}

// DefaultConfigPath set the default config path
const DefaultConfigPath = "/etc/.kube/config"

// GetServiceDiscoveryConfigPath returns the configpath of SD registry
func GetServiceDiscoveryConfigPath() string {
if GlobalDefinition.Cse.Service.Registry.ServiceDiscovery.ConfigPath != "" {
return GlobalDefinition.Cse.Service.Registry.ServiceDiscovery.ConfigPath
}
return DefaultConfigPath
}
4 changes: 4 additions & 0 deletions core/registry/cache.go
Expand Up @@ -3,6 +3,7 @@ package registry
import (
"github.com/ServiceComb/go-chassis/core/common"
cache "github.com/patrickmn/go-cache"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
)

Expand Down Expand Up @@ -65,3 +66,6 @@ func (t Tags) AppID() string { return t[common.BuildinTagApp] }

// Version returns buildinTagVersion of tags
func (t Tags) Version() string { return t[common.BuildinTagVersion] }

// ToLabelSelector returns selector
func (t Tags) ToLabelSelector() labels.Selector { return labels.Set(t).AsSelector() }
95 changes: 95 additions & 0 deletions core/registry/kube/controller.go
@@ -0,0 +1,95 @@
package kuberegistry

import (
"github.com/ServiceComb/go-chassis/core/lager"
"github.com/ServiceComb/go-chassis/core/registry"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)

// DiscoveryController defines discovery controller for kube registry
type DiscoveryController struct {
client kubernetes.Interface

sLister corelisters.ServiceLister
eLister corelisters.EndpointsLister

sListerSynced cache.InformerSynced
eListerSynced cache.InformerSynced
}

// NewDiscoveryController returns new discovery controller
func NewDiscoveryController(
sInformer coreinformers.ServiceInformer,
eInformer coreinformers.EndpointsInformer,
client kubernetes.Interface,
) *DiscoveryController {

dc := &DiscoveryController{
client: client,
}

sInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addService,
})
eInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addEndpoints,
})
dc.sListerSynced = sInformer.Informer().HasSynced
dc.eListerSynced = eInformer.Informer().HasSynced
dc.sLister = sInformer.Lister()
dc.eLister = eInformer.Lister()
return dc
}

// Run begins discovery controller
func (dc *DiscoveryController) Run(stop <-chan struct{}) {
lager.Logger.Info("Starting Discovery Controller")
if !cache.WaitForCacheSync(stop, dc.sListerSynced, dc.eListerSynced) {
lager.Logger.Error("Time out waiting for caches to sync", nil)
return
}
lager.Logger.Info("Finish Waiting For Cache Sync")
}

func (dc *DiscoveryController) addService(obj interface{}) {
svc := obj.(*v1.Service)
lager.Logger.Infof("Add Service: %s", svc.Name)
}

func (dc *DiscoveryController) addEndpoints(obj interface{}) {
ep := obj.(*v1.Endpoints)
lager.Logger.Infof("Add Endpoint: %s", ep.Name)
}

// FindEndpoints returns microservice instances of kube registry
func (dc *DiscoveryController) FindEndpoints(service string, tags registry.Tags) ([]*registry.MicroServiceInstance, error) {
name, namespace := splitServiceKey(service)
// TODO: use labels.ToLabelSelector to trans endpoint
// use cache lister to get specific endpoints or use kubeclient instead
ep, err := dc.eLister.Endpoints(namespace).Get(name)
if err != nil {
return nil, err
}
return toMicroServiceInstances(ep), nil
}

// GetAllServices returns microservice of kube registry
func (dc *DiscoveryController) GetAllServices() ([]*registry.MicroService, error) {
microServices, err := dc.sLister.List(labels.Everything())
if err != nil {
lager.Logger.Errorf(err, "get all microservices from kube failed")
return nil, err
}
ms := make([]*registry.MicroService, len(microServices))
for i, s := range microServices {
ms[i] = toMicroService(s)
}
lager.Logger.Debugf("get all microservices success, microservices: %v", microServices)
return ms, nil
}
63 changes: 63 additions & 0 deletions core/registry/kube/controller_test.go
@@ -0,0 +1,63 @@
package kuberegistry

import (
"context"
"log"
"testing"
"time"

"github.com/ServiceComb/go-chassis/core/lager"
"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
)

func TestDiscoveryController(t *testing.T) {
lager.Initialize("", "INFO", "", "size", true, 1, 10, 7)
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)

client := fake.NewSimpleClientset()
sharedfactory := informers.NewSharedInformerFactory(client, 0)
sInformer := sharedfactory.Core().V1().Services()
eInformer := sharedfactory.Core().V1().Endpoints()

dc := NewDiscoveryController(sInformer, eInformer, client)
sharedfactory.Start(ctx.Done())
dc.Run(ctx.Done())

// create endpoints
p := &v1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: "kubeserver"},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "127.0.0.1",
TargetRef: &v1.ObjectReference{UID: "12345"}}},
Ports: []v1.EndpointPort{{Name: "rest", Port: 9090}},
}}}
_, err := client.Core().Endpoints("default").Create(p)
if err != nil {
t.Errorf("error create endpoints: %v", err)
}

// create services
s := &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "kubeserver"}}
_, err = client.Core().Services("default").Create(s)
if err != nil {
t.Errorf("error create service: %v", err)
}

time.Sleep(1 * time.Second)
ret, err := dc.FindEndpoints("kubeserver.default.svc.local", nil)
assert.Equal(t, len(ret), 1)
for _, ep := range ret {
log.Printf("Got endpoints %s(%s)", ep.EndpointsMap, ep.ServiceID)
assert.Equal(t, ep.EndpointsMap["rest"], "127.0.0.1:9090")
assert.Equal(t, ep.ServiceID, "kubeserver.default")
}

svc, err := dc.GetAllServices()
assert.Equal(t, len(svc), 1)
for _, ss := range svc {
assert.Equal(t, ss.ServiceName, "kubeserver")
}
}
43 changes: 43 additions & 0 deletions core/registry/kube/conversion.go
@@ -0,0 +1,43 @@
package kuberegistry

import (
"strconv"

"github.com/ServiceComb/go-chassis/core/registry"
v1 "k8s.io/api/core/v1"
)

func toMicroService(ss *v1.Service) *registry.MicroService {
return &registry.MicroService{
ServiceName: ss.Name,
ServiceID: string(ss.UID),
Metadata: ss.Spec.Selector,
RegisterBy: KubeRegistry,
}
}

func toMicroServiceInstances(ep *v1.Endpoints) []*registry.MicroServiceInstance {
ins := []*registry.MicroServiceInstance{}
for _, ss := range ep.Subsets {
for _, as := range ss.Addresses {
ins = append(ins, &registry.MicroServiceInstance{
InstanceID: string(as.TargetRef.UID),
ServiceID: ep.Name + "." + ep.Namespace,
HostName: as.Hostname,
EndpointsMap: torotocolMap(as, ss.Ports),
})
}
}
return ins
}

func torotocolMap(address v1.EndpointAddress, ports []v1.EndpointPort) map[string]string {
ret := map[string]string{}
for _, port := range ports {
if _, ok := ret[port.Name]; !ok {
ret[port.Name] = address.IP + ":" + strconv.Itoa(int(port.Port))
continue
}
}
return ret
}