Skip to content
This repository was archived by the owner on Feb 8, 2021. It is now read-only.

Commit b6daccc

Browse files
committed
Initialize services for hyper pod
1 parent 64ee920 commit b6daccc

File tree

3 files changed

+56
-12
lines changed

3 files changed

+56
-12
lines changed

pkg/kubelet/hyper/hyper.go

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@ import (
3535
"k8s.io/kubernetes/pkg/api"
3636
"k8s.io/kubernetes/pkg/api/unversioned"
3737
"k8s.io/kubernetes/pkg/client/record"
38+
client "k8s.io/kubernetes/pkg/client/unversioned"
3839
"k8s.io/kubernetes/pkg/credentialprovider"
3940
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
4041
"k8s.io/kubernetes/pkg/kubelet/network"
4142
"k8s.io/kubernetes/pkg/kubelet/prober"
43+
"k8s.io/kubernetes/pkg/labels"
4244
"k8s.io/kubernetes/pkg/probe"
4345
"k8s.io/kubernetes/pkg/types"
4446
"k8s.io/kubernetes/pkg/util"
@@ -66,6 +68,7 @@ type runtime struct {
6668
readinessManager *kubecontainer.ReadinessManager
6769
volumeGetter volumeGetter
6870
hyperClient *HyperClient
71+
kubeClient client.Interface
6972
imagePuller kubecontainer.ImagePuller
7073
}
7174

@@ -81,7 +84,8 @@ func New(generator kubecontainer.RunContainerOptionsGenerator,
8184
networkPlugin network.NetworkPlugin,
8285
containerRefManager *kubecontainer.RefManager,
8386
readinessManager *kubecontainer.ReadinessManager,
84-
volumeGetter volumeGetter) (kubecontainer.Runtime, error) {
87+
volumeGetter volumeGetter,
88+
kubeClient client.Interface) (kubecontainer.Runtime, error) {
8589

8690
// check hyper has already installed
8791
hyperBinAbsPath, err := exec.LookPath(hyperBinName)
@@ -100,6 +104,7 @@ func New(generator kubecontainer.RunContainerOptionsGenerator,
100104
readinessManager: readinessManager,
101105
volumeGetter: volumeGetter,
102106
hyperClient: NewHyperClient(),
107+
kubeClient: kubeClient,
103108
}
104109
hyper.prober = prober.New(hyper, readinessManager, containerRefManager, recorder)
105110
hyper.imagePuller = kubecontainer.NewImagePuller(recorder, hyper)
@@ -309,6 +314,40 @@ func (r *runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
309314
return kubepods, nil
310315
}
311316

317+
func (r *runtime) buildHyperPodServices(pod *api.Pod) []HyperService {
318+
items, err := r.kubeClient.Services(pod.Namespace).List(labels.Everything())
319+
if err != nil {
320+
glog.Warningf("Get services failed: %v", err)
321+
return nil
322+
}
323+
324+
var services []HyperService
325+
for _, svc := range items.Items {
326+
hyperService := HyperService{
327+
ServiceIP: svc.Spec.ClusterIP,
328+
}
329+
endpoints, _ := r.kubeClient.Endpoints(pod.Namespace).Get(svc.Name)
330+
for _, svcPort := range svc.Spec.Ports {
331+
hyperService.ServicePort = svcPort.Port
332+
for _, ep := range endpoints.Subsets {
333+
for _, epPort := range ep.Ports {
334+
if svcPort.Name == "" || svcPort.Name == epPort.Name {
335+
for _, eh := range ep.Addresses {
336+
hyperService.Hosts = append(hyperService.Hosts, HyperServiceBackend{
337+
HostIP: eh.IP,
338+
HostPort: epPort.Port,
339+
})
340+
}
341+
}
342+
}
343+
}
344+
services = append(services, hyperService)
345+
}
346+
}
347+
348+
return services
349+
}
350+
312351
func (r *runtime) buildHyperPod(pod *api.Pod, pullSecrets []api.Secret) ([]byte, error) {
313352
// check and pull image
314353
for _, c := range pod.Spec.Containers {
@@ -335,6 +374,18 @@ func (r *runtime) buildHyperPod(pod *api.Pod, pullSecrets []api.Secret) ([]byte,
335374
}
336375
specMap[KEY_VOLUMES] = volumes
337376

377+
services := r.buildHyperPodServices(pod)
378+
if services == nil {
379+
// Just for fake
380+
services = []HyperService{
381+
{
382+
ServiceIP: "127.0.0.2",
383+
ServicePort: 65534,
384+
},
385+
}
386+
}
387+
specMap["services"] = services
388+
338389
// build hyper containers spec
339390
var containers []map[string]interface{}
340391
for _, container := range pod.Spec.Containers {

pkg/kubelet/kubelet.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,9 @@ func NewMainKubelet(
437437
klet.networkPlugin,
438438
containerRefManager,
439439
readinessManager,
440-
klet.volumeManager)
440+
klet.volumeManager,
441+
klet.kubeClient,
442+
)
441443
if err != nil {
442444
return nil, err
443445
}

pkg/proxy/haproxy/proxier.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -353,15 +353,6 @@ func (proxier *Proxier) syncProxyRules() {
353353
continue
354354
}
355355

356-
// Get hyper services
357-
hyperServices, err := proxier.hyperClient.ListServices(podInfo.PodID)
358-
if err != nil {
359-
glog.Warningf("Can not get hyper service for pod %s: %v", podInfo.PodName, err)
360-
continue
361-
}
362-
363-
glog.V(4).Infof("Hyper services of pod %s is %v", podInfo.PodName, hyperServices)
364-
365356
// Build rules in same namespace
366357
consumedServices := make([]hyper.HyperService, 0, 1)
367358
for _, svcInfo := range proxier.serviceMap {
@@ -391,7 +382,7 @@ func (proxier *Proxier) syncProxyRules() {
391382
glog.V(4).Infof("Services of pod %s should consumed: %v", podInfo.PodName, consumedServices)
392383

393384
// update existing services
394-
err = proxier.hyperClient.UpdateServices(podInfo.PodID, hyperServices)
385+
err = proxier.hyperClient.UpdateServices(podInfo.PodID, consumedServices)
395386
if err != nil {
396387
glog.Warningf("Updating service for hyper pod %s failed: %v", podInfo.PodName, err)
397388
}

0 commit comments

Comments
 (0)