Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update DRA driver error injection #20

Open
wants to merge 6 commits into
base: PR144-e2e_node-DRA-test-plugin-failures
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion test/e2e/dra/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod))
loggerCtx := klog.NewContext(ctx, logger)
plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, nodename,
app.FileOperations{
app.PluginConfig{
Create: func(name string, content []byte) error {
klog.Background().Info("creating CDI file", "node", nodename, "filename", name, "content", string(content))
return d.createFile(&pod, name, content)
Expand Down
36 changes: 28 additions & 8 deletions test/e2e/dra/test-driver/app/gomega.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,42 @@ var BeRegistered = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error
return false, nil
}).WithMessage("contain successful NotifyRegistrationStatus call")

// NodePrepareResouceCalled checks that NodePrepareResource API has been called
var NodePrepareResourceCalled = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
// NodePrepareResoucesSucceeded checks that NodePrepareResources API has been called and succeeded
var NodePrepareResourcesSucceeded = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
for _, call := range actualCalls {
if strings.HasSuffix(call.FullMethod, "/NodePrepareResource") && call.Err == nil {
if strings.HasSuffix(call.FullMethod, "/NodePrepareResources") && call.Response != nil && call.Err == nil {
return true, nil
}
}
return false, nil
}).WithMessage("contain NodePrepareResource call")
}).WithMessage("contain successful NodePrepareResources call")

// NodePrepareResoucesCalled checks that NodePrepareResources API has been called
var NodePrepareResourcesCalled = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
// NodePrepareResoucesErrored checks that NodePrepareResources API has been called and returned an error
var NodePrepareResourcesErrored = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
for _, call := range actualCalls {
if strings.HasSuffix(call.FullMethod, "/NodePrepareResources") && call.Err == nil {
if strings.HasSuffix(call.FullMethod, "/NodePrepareResources") && call.Err != nil {
return true, nil
}
}
return false, nil
}).WithMessage("contain NodePrepareResources call")
}).WithMessage("contain unsuccessful NodePrepareResources call")

// NodeUnprepareResoucesSucceeded checks that NodeUnprepareResources API has been called and succeeded
var NodeUnprepareResourcesSucceeded = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
for _, call := range actualCalls {
if strings.HasSuffix(call.FullMethod, "/NodeUnprepareResources") && call.Response != nil && call.Err == nil {
return true, nil
}
}
return false, nil
}).WithMessage("contain successful NodeUnprepareResources call")

// NodeUnprepareResoucesErrored checks that NodeUnprepareResources API has been called and returned an error
var NodeUnprepareResourcesErrored = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
for _, call := range actualCalls {
if strings.HasSuffix(call.FullMethod, "/NodeUnprepareResources") && call.Err != nil {
return true, nil
}
}
return false, nil
}).WithMessage("contain unsuccessful NodeUnprepareResources call")
71 changes: 31 additions & 40 deletions test/e2e/dra/test-driver/app/kubeletplugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ import (
)

type ExamplePlugin struct {
stopCh <-chan struct{}
logger klog.Logger
d kubeletplugin.DRAPlugin
fileOps FileOperations
stopCh <-chan struct{}
logger klog.Logger
d kubeletplugin.DRAPlugin
config PluginConfig

cdiDir string
driverName string
Expand All @@ -53,8 +53,6 @@ type ExamplePlugin struct {
instancesInUse sets.Set[string]
prepared map[ClaimID]any
gRPCCalls []GRPCCall

block bool
}

type GRPCCall struct {
Expand Down Expand Up @@ -86,32 +84,37 @@ func (ex *ExamplePlugin) getJSONFilePath(claimUID string) string {
return filepath.Join(ex.cdiDir, fmt.Sprintf("%s-%s.json", ex.driverName, claimUID))
}

// FileOperations defines optional callbacks for handling CDI files
// PluginConfig defines optional callbacks for handling CDI files
// and some other configuration.
type FileOperations struct {
type PluginConfig struct {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming this was overdue. Should have done it already earlier when adding NumResourceInstances...

// Create must overwrite the file.
Create func(name string, content []byte) error

// Remove must remove the file. It must not return an error when the
// file does not exist.
Remove func(name string) error

// InterceptGRPCCall, if set, gets invoked after recording that a GRPC
// call started and before invoking the actual implementation of that
// call. Can be used to inject errors or delays.
InterceptGRPCCall grpc.UnaryServerInterceptor

// NumResourceInstances determines whether the plugin reports resources
// instances and how many. A negative value causes it to report "not implemented"
// in the NodeListAndWatchResources gRPC call.
NumResourceInstances int
}

// StartPlugin sets up the servers that are necessary for a DRA kubelet plugin.
func StartPlugin(ctx context.Context, cdiDir, driverName string, nodeName string, fileOps FileOperations, opts ...kubeletplugin.Option) (*ExamplePlugin, error) {
func StartPlugin(ctx context.Context, cdiDir, driverName string, nodeName string, config PluginConfig, opts ...kubeletplugin.Option) (*ExamplePlugin, error) {
logger := klog.FromContext(ctx)
if fileOps.Create == nil {
fileOps.Create = func(name string, content []byte) error {
if config.Create == nil {
config.Create = func(name string, content []byte) error {
return os.WriteFile(name, content, os.FileMode(0644))
}
}
if fileOps.Remove == nil {
fileOps.Remove = func(name string) error {
if config.Remove == nil {
config.Remove = func(name string) error {
if err := os.Remove(name); err != nil && !os.IsNotExist(err) {
return err
}
Expand All @@ -121,7 +124,7 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, nodeName string
ex := &ExamplePlugin{
stopCh: ctx.Done(),
logger: logger,
fileOps: fileOps,
config: config,
cdiDir: cdiDir,
driverName: driverName,
nodeName: nodeName,
Expand All @@ -130,14 +133,23 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, nodeName string
prepared: make(map[ClaimID]any),
}

for i := 0; i < ex.fileOps.NumResourceInstances; i++ {
for i := 0; i < ex.config.NumResourceInstances; i++ {
ex.instances.Insert(fmt.Sprintf("instance-%02d", i))
}

interceptor := ex.recordGRPCCall
if config.InterceptGRPCCall != nil {
// Chain the interceptors: recordGRPCCall -> InterceptGRPCCall -> original implementation.
interceptor = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
return ex.recordGRPCCall(ctx, req, info, func(ctx context.Context, req any) (any, error) {
return config.InterceptGRPCCall(ctx, req, info, handler)
})
}
}
opts = append(opts,
kubeletplugin.Logger(logger),
kubeletplugin.DriverName(driverName),
kubeletplugin.GRPCInterceptor(ex.recordGRPCCall),
kubeletplugin.GRPCInterceptor(interceptor),
kubeletplugin.GRPCStreamInterceptor(ex.recordGRPCStream),
)
d, err := kubeletplugin.Start(ex, opts...)
Expand All @@ -162,26 +174,13 @@ func (ex *ExamplePlugin) IsRegistered() bool {
return status.PluginRegistered
}

// Block sets a flag to block Node[Un]PrepareResources
// to emulate time consuming or stuck calls
func (ex *ExamplePlugin) Block() {
ex.block = true
}

// NodePrepareResource ensures that the CDI file for the claim exists. It uses
// a deterministic name to simplify NodeUnprepareResource (no need to remember
// or discover the name) and idempotency (when called again, the file simply
// gets written again).
func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimName string, claimUID string, resourceHandle string, structuredResourceHandle []*resourceapi.StructuredResourceHandle) ([]string, error) {
logger := klog.FromContext(ctx)

// Block to emulate plugin stuckness or slowness.
// By default the call will not be blocked as ex.block = false.
if ex.block {
<-ctx.Done()
return nil, ctx.Err()
}

ex.mutex.Lock()
defer ex.mutex.Unlock()

Expand Down Expand Up @@ -275,7 +274,7 @@ func (ex *ExamplePlugin) nodePrepareResource(ctx context.Context, claimName stri
if err != nil {
return nil, fmt.Errorf("marshal spec: %w", err)
}
if err := ex.fileOps.Create(filePath, buffer); err != nil {
if err := ex.config.Create(filePath, buffer); err != nil {
return nil, fmt.Errorf("failed to write CDI file %v", err)
}

Expand Down Expand Up @@ -330,15 +329,8 @@ func (ex *ExamplePlugin) NodePrepareResources(ctx context.Context, req *drapbv1a
func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimName string, claimUID string, resourceHandle string, structuredResourceHandle []*resourceapi.StructuredResourceHandle) error {
logger := klog.FromContext(ctx)

// Block to emulate plugin stuckness or slowness.
// By default the call will not be blocked as ex.block = false.
if ex.block {
<-ctx.Done()
return ctx.Err()
}

filePath := ex.getJSONFilePath(claimUID)
if err := ex.fileOps.Remove(filePath); err != nil {
if err := ex.config.Remove(filePath); err != nil {
return fmt.Errorf("error removing CDI file: %w", err)
}
logger.V(3).Info("CDI file removed", "path", filePath)
Expand Down Expand Up @@ -373,7 +365,6 @@ func (ex *ExamplePlugin) nodeUnprepareResource(ctx context.Context, claimName st
}
}
delete(ex.prepared, ClaimID{Name: claimName, UID: claimUID})

return nil
}

Expand All @@ -395,7 +386,7 @@ func (ex *ExamplePlugin) NodeUnprepareResources(ctx context.Context, req *drapbv
}

func (ex *ExamplePlugin) NodeListAndWatchResources(req *drapbv1alpha3.NodeListAndWatchResourcesRequest, stream drapbv1alpha3.Node_NodeListAndWatchResourcesServer) error {
if ex.fileOps.NumResourceInstances < 0 {
if ex.config.NumResourceInstances < 0 {
ex.logger.Info("Sending no NodeResourcesResponse")
return status.New(codes.Unimplemented, "node resource support disabled").Err()
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/dra/test-driver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func NewCommand() *cobra.Command {
return fmt.Errorf("create socket directory: %w", err)
}

plugin, err := StartPlugin(cmd.Context(), *cdiDir, *driverName, "", FileOperations{},
plugin, err := StartPlugin(cmd.Context(), *cdiDir, *driverName, "", PluginConfig{},
kubeletplugin.PluginSocketPath(*endpoint),
kubeletplugin.RegistrarSocketPath(path.Join(*pluginRegistrationPath, *driverName+"-reg.sock")),
kubeletplugin.KubeletPluginSocketPath(*draAddress),
Expand Down
Loading