Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cmd/kubeadm/app/phases/kubelet/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ func WriteKubeletDynamicEnvFile(cfg *kubeadmapi.ClusterConfiguration, nodeReg *k
flagOpts.criSocket = ""
}
stringMap := buildKubeletArgs(flagOpts)
argList := kubeadmutil.ArgumentsToCommand(stringMap, nodeReg.KubeletExtraArgs)
return WriteKubeletArgsToFile(stringMap, nodeReg.KubeletExtraArgs, kubeletDir)
}

// WriteKubeletArgsToFile writes combined kubelet flags to KubeletEnvFile file in kubeletDir.
func WriteKubeletArgsToFile(kubeletFlags, overridesFlags []kubeadmapi.Arg, kubeletDir string) error {
argList := kubeadmutil.ArgumentsToCommand(kubeletFlags, overridesFlags)
envFileContent := fmt.Sprintf("%s=%q\n", constants.KubeletEnvFileVariableName, strings.Join(argList, " "))

return writeKubeletFlagBytesToDisk([]byte(envFileContent), kubeletDir)
Expand Down
12 changes: 11 additions & 1 deletion cmd/kubeadm/app/phases/upgrade/postupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,25 @@ func WriteKubeletConfigFiles(cfg *kubeadmapi.InitConfiguration, kubeletConfigDir
if features.Enabled(cfg.FeatureGates, features.NodeLocalCRISocket) {
// If instance-config.yaml exist on disk, we don't need to create it.
_, err := os.Stat(filepath.Join(kubeletDir, kubeadmconstants.KubeletInstanceConfigurationFileName))
// After the NodeLocalCRISocket feature gate is removed, os.IsNotExist(err) should also be removed.
// If there is no instance configuration, it indicates that the configuration on the node has been corrupted,
// and an error needs to be reported.
if os.IsNotExist(err) {
var containerRuntimeEndpoint string
var kubeletFlags []kubeadmapi.Arg
dynamicFlags, err := kubeletphase.ReadKubeletDynamicEnvFile(filepath.Join(kubeletDir, kubeadmconstants.KubeletEnvFileName))
if err == nil {
args := kubeadmutil.ArgumentsFromCommand(dynamicFlags)
for _, arg := range args {
if arg.Name == "container-runtime-endpoint" {
containerRuntimeEndpoint = arg.Value
break
continue
}
kubeletFlags = append(kubeletFlags, arg)
}
if len(containerRuntimeEndpoint) != 0 {
if err := kubeletphase.WriteKubeletArgsToFile(kubeletFlags, nil, kubeletDir); err != nil {
return err
}
}
} else if dryRun {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"google.golang.org/grpc"
"k8s.io/klog/v2"

resourceapi "k8s.io/api/resource/v1beta1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/dynamic-resource-allocation/resourceslice"
Expand Down Expand Up @@ -55,24 +54,25 @@ type DRAPlugin interface {
// after it returns before all information is actually written
// to the API server.
//
// The caller must not modify the content after the call.
// It is the responsibility of the caller to ensure that the pools and
// slices described in the driver resources parameters are valid
// according to the restrictions defined in the resource.k8s.io API.
//
// Returns an error if KubeClient or NodeName options were not
// set in Start() to create the DRAPlugin instance.
PublishResources(ctx context.Context, resources Resources) error
// Invalid ResourceSlices will be rejected by the apiserver during
// publishing, which happens asynchronously and thus does not
// get returned as error here. The only error returned here is
// when publishing was not set up properly, for example missing
// [KubeClient] or [NodeName] options.
//
// The caller may modify the resources after this call returns.
PublishResources(ctx context.Context, resources resourceslice.DriverResources) error

// This unexported method ensures that we can modify the interface
// without causing an API break of the package
// (https://pkg.go.dev/golang.org/x/exp/apidiff#section-readme).
internal()
}

// Resources currently only supports devices. Might get extended in the
// future.
type Resources struct {
Devices []resourceapi.Device
}

// Option implements the functional options pattern for Start.
type Option func(o *options) error

Expand Down Expand Up @@ -407,7 +407,7 @@ func (d *draPlugin) Stop() {

// PublishResources implements [DRAPlugin.PublishResources]. Returns en error if
// kubeClient or nodeName are unset.
func (d *draPlugin) PublishResources(ctx context.Context, resources Resources) error {
func (d *draPlugin) PublishResources(_ context.Context, resources resourceslice.DriverResources) error {
if d.kubeClient == nil {
return errors.New("no KubeClient found to publish resources")
}
Expand All @@ -425,14 +425,9 @@ func (d *draPlugin) PublishResources(ctx context.Context, resources Resources) e
UID: d.nodeUID, // Optional, will be determined by controller if empty.
}
driverResources := &resourceslice.DriverResources{
Pools: map[string]resourceslice.Pool{
d.nodeName: {
Slices: []resourceslice.Slice{{
Devices: resources.Devices,
}},
},
},
Pools: resources.Pools,
}

if d.resourceSliceController == nil {
// Start publishing the information. The controller is using
// our background context, not the one passed into this
Expand Down
25 changes: 19 additions & 6 deletions test/e2e/dra/test-driver/app/kubeletplugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"k8s.io/dynamic-resource-allocation/kubeletplugin"
"k8s.io/dynamic-resource-allocation/resourceslice"
"k8s.io/klog/v2"
drapbv1alpha4 "k8s.io/kubelet/pkg/apis/dra/v1alpha4"
drapb "k8s.io/kubelet/pkg/apis/dra/v1beta1"
Expand Down Expand Up @@ -188,10 +189,16 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube
Basic: &resourceapi.BasicDevice{},
}
}
resources := kubeletplugin.Resources{
Devices: devices,
driverResources := resourceslice.DriverResources{
Pools: map[string]resourceslice.Pool{
nodeName: {
Slices: []resourceslice.Slice{{
Devices: devices,
}},
},
},
}
if err := ex.d.PublishResources(ctx, resources); err != nil {
if err := ex.d.PublishResources(ctx, driverResources); err != nil {
return nil, fmt.Errorf("start kubelet plugin: publish resources: %w", err)
}
} else if len(ex.fileOps.Devices) > 0 {
Expand All @@ -202,10 +209,16 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube
Basic: &resourceapi.BasicDevice{Attributes: ex.fileOps.Devices[deviceName]},
}
}
resources := kubeletplugin.Resources{
Devices: devices,
driverResources := resourceslice.DriverResources{
Pools: map[string]resourceslice.Pool{
nodeName: {
Slices: []resourceslice.Slice{{
Devices: devices,
}},
},
},
}
if err := ex.d.PublishResources(ctx, resources); err != nil {
if err := ex.d.PublishResources(ctx, driverResources); err != nil {
return nil, fmt.Errorf("start kubelet plugin: publish resources: %w", err)
}
}
Expand Down