Skip to content

Commit

Permalink
add kubelet client for Pod resource info
Browse files Browse the repository at this point in the history
This change introduces kubelet client to get allocated device
information of a Pod from newly added Kubelet grpc service.
For more information please see:
[kubernetes/kubernetes#70508](kubernetes/kubernetes#70508)

Change-Id: I11e58ccdd52662601f445fa24c7d55c225441efc
Signed-off-by: Abdul Halim <abdul.halim@intel.com>
  • Loading branch information
ahalimx86 committed Mar 22, 2019
1 parent 81afd29 commit 97c195b
Show file tree
Hide file tree
Showing 8 changed files with 423 additions and 31 deletions.
15 changes: 6 additions & 9 deletions checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/intel/multus-cni/logging"
"github.com/intel/multus-cni/types"
v1 "k8s.io/api/core/v1"
)

const (
Expand All @@ -45,22 +46,18 @@ type Data struct {
Checksum uint64
}

type Checkpoint interface {
// GetComputeDeviceMap returns an instance of a map of ResourceInfo for a PodID
GetComputeDeviceMap(string) (map[string]*types.ResourceInfo, error)
}
type checkpoint struct {
fileName string
podEntires []PodDevicesEntry
}

// GetCheckpoint returns an instance of Checkpoint
func GetCheckpoint() (Checkpoint, error) {
func GetCheckpoint() (types.ResourceClient, error) {
logging.Debugf("GetCheckpoint(): invoked")
return getCheckpoint(checkPointfile)
}

func getCheckpoint(filePath string) (Checkpoint, error) {
func getCheckpoint(filePath string) (types.ResourceClient, error) {
cp := &checkpoint{fileName: filePath}
err := cp.getPodEntries()
if err != nil {
Expand Down Expand Up @@ -89,12 +86,12 @@ func (cp *checkpoint) getPodEntries() error {
}

// GetComputeDeviceMap returns an instance of a map of ResourceInfo
func (cp *checkpoint) GetComputeDeviceMap(podID string) (map[string]*types.ResourceInfo, error) {

func (cp *checkpoint) GetPodResourceMap(pod *v1.Pod) (map[string]*types.ResourceInfo, error) {
podID := string(pod.UID)
resourceMap := make(map[string]*types.ResourceInfo)

if podID == "" {
return nil, logging.Errorf("GetComputeDeviceMap(): invalid Pod cannot be empty")
return nil, logging.Errorf("GetPodResourceMap(): invalid Pod cannot be empty")
}

for _, pod := range cp.podEntires {
Expand Down
15 changes: 13 additions & 2 deletions checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"testing"

"github.com/intel/multus-cni/types"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sTypes "k8s.io/apimachinery/pkg/types"
)

const (
Expand Down Expand Up @@ -74,7 +77,7 @@ var _ = BeforeSuite(func() {
var _ = Describe("Kubelet checkpoint data read operations", func() {
Context("Using /tmp/kubelet_internal_checkpoint file", func() {
var (
cp Checkpoint
cp types.ResourceClient
err error
resourceMap map[string]*types.ResourceInfo
resourceInfo *types.ResourceInfo
Expand All @@ -87,7 +90,15 @@ var _ = Describe("Kubelet checkpoint data read operations", func() {
})

It("should return a ResourceMap instance", func() {
rmap, err := cp.GetComputeDeviceMap("970a395d-bb3b-11e8-89df-408d5c537d23")
podUID := k8sTypes.UID("970a395d-bb3b-11e8-89df-408d5c537d23")
fakePod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "fakePod",
Namespace: "podNamespace",
UID: podUID,
},
}
rmap, err := cp.GetPodResourceMap(fakePod)
Expect(err).NotTo(HaveOccurred())
Expect(rmap).NotTo(BeEmpty())
resourceMap = rmap
Expand Down
64 changes: 57 additions & 7 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,11 @@ import:
- kubernetes
- tools/clientcmd
- util/retry
- package: k8s.io/kubernetes
version: v1.13.0
subpackages:
- pkg/kubelet/apis/podresources
- pkg/kubelet/apis/podresources/v1alpha1
- pkg/kubelet/util
- package: github.com/vishvananda/netns
- package: github.com/pkg/errors
21 changes: 10 additions & 11 deletions k8sclient/k8sclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/containernetworking/cni/libcni"
"github.com/containernetworking/cni/pkg/skel"
cnitypes "github.com/containernetworking/cni/pkg/types"
"github.com/intel/multus-cni/checkpoint"
"github.com/intel/multus-cni/kubeletclient"
"github.com/intel/multus-cni/logging"
"github.com/intel/multus-cni/types"
)
Expand Down Expand Up @@ -336,7 +336,7 @@ func cniConfigFromNetworkResource(customResource *types.NetworkAttachmentDefinit
return config, nil
}

func getKubernetesDelegate(client KubeClient, net *types.NetworkSelectionElement, confdir string, podID string, resourceMap map[string]*types.ResourceInfo) (*types.DelegateNetConf, map[string]*types.ResourceInfo, error) {
func getKubernetesDelegate(client KubeClient, net *types.NetworkSelectionElement, confdir string, pod *v1.Pod, resourceMap map[string]*types.ResourceInfo) (*types.DelegateNetConf, map[string]*types.ResourceInfo, error) {

logging.Debugf("getKubernetesDelegate: %v, %v, %s", client, net, confdir)
rawPath := fmt.Sprintf("/apis/k8s.cni.cncf.io/v1/namespaces/%s/network-attachment-definitions/%s", net.Namespace, net.Name)
Expand All @@ -353,18 +353,18 @@ func getKubernetesDelegate(client KubeClient, net *types.NetworkSelectionElement
// Get resourceName annotation from NetworkAttachmentDefinition
deviceID := ""
resourceName, ok := customResource.Metadata.Annotations[resourceNameAnnot]
if ok && podID != "" {
if ok && pod.Name != "" && pod.Namespace != "" {
// ResourceName annotation is found; try to get device info from resourceMap
logging.Debugf("getKubernetesDelegate: found resourceName annotation : %s", resourceName)

if resourceMap == nil {
checkpoint, err := checkpoint.GetCheckpoint()
ck, err := kubeletclient.GetResourceClient()
if err != nil {
return nil, resourceMap, logging.Errorf("getKubernetesDelegate: failed to get a checkpoint instance: %v", err)
return nil, resourceMap, logging.Errorf("getKubernetesDelegate: failed to get a ResourceClient instance: %v", err)
}
resourceMap, err = checkpoint.GetComputeDeviceMap(podID)
resourceMap, err = ck.GetPodResourceMap(pod)
if err != nil {
return nil, resourceMap, logging.Errorf("getKubernetesDelegate: failed to get resourceMap from kubelet checkpoint file: %v", err)
return nil, resourceMap, logging.Errorf("getKubernetesDelegate: failed to get resourceMap from ResourceClient: %v", err)
}
logging.Debugf("getKubernetesDelegate(): resourceMap instance: %+v", resourceMap)
}
Expand All @@ -373,7 +373,7 @@ func getKubernetesDelegate(client KubeClient, net *types.NetworkSelectionElement
if ok {
if idCount := len(entry.DeviceIDs); idCount > 0 && idCount > entry.Index {
deviceID = entry.DeviceIDs[entry.Index]
logging.Debugf("getKubernetesDelegate: podID: %s deviceID: %s", podID, deviceID)
logging.Debugf("getKubernetesDelegate: podName: %s deviceID: %s", pod.Name, deviceID)
entry.Index++ // increment Index for next delegate
}
}
Expand Down Expand Up @@ -536,7 +536,6 @@ func GetNetworkDelegates(k8sclient KubeClient, pod *v1.Pod, networks []*types.Ne
var delegates []*types.DelegateNetConf
defaultNamespace := pod.ObjectMeta.Namespace

podID := pod.UID
for _, net := range networks {

// The pods namespace (stored as defaultNamespace, does not equal the annotation's target namespace in net.Namespace)
Expand All @@ -547,7 +546,7 @@ func GetNetworkDelegates(k8sclient KubeClient, pod *v1.Pod, networks []*types.Ne
}
}

delegate, updatedResourceMap, err := getKubernetesDelegate(k8sclient, net, confdir, string(podID), resourceMap)
delegate, updatedResourceMap, err := getKubernetesDelegate(k8sclient, net, confdir, pod, resourceMap)
if err != nil {
return nil, logging.Errorf("GetPodNetwork: failed getting the delegate: %v", err)
}
Expand Down Expand Up @@ -690,7 +689,7 @@ func tryLoadK8sPodDefaultNetwork(kubeClient KubeClient, pod *v1.Pod, conf *types
return nil, logging.Errorf("tryLoadK8sPodDefaultNetwork: more than one default network is specified: %s", netAnnot)
}

delegate, _, err := getKubernetesDelegate(kubeClient, networks[0], conf.ConfDir, "", nil)
delegate, _, err := getKubernetesDelegate(kubeClient, networks[0], conf.ConfDir, pod, nil)
if err != nil {
return nil, logging.Errorf("tryLoadK8sPodDefaultNetwork: failed getting the delegate: %v", err)
}
Expand Down

0 comments on commit 97c195b

Please sign in to comment.