From d2e942b35371c767e3905b42fc18fe5f166f3096 Mon Sep 17 00:00:00 2001 From: Abel Feng Date: Wed, 27 Mar 2024 11:21:12 +0800 Subject: [PATCH] add task api endpoint in task create options Signed-off-by: Abel Feng --- client/task_opts.go | 18 ++++++++++++ core/runtime/runtime.go | 6 ++++ core/runtime/v2/manager.go | 36 +++++++++++++++++------ internal/cri/server/container_start.go | 2 +- plugins/services/tasks/local.go | 40 ++++++++++++++++---------- 5 files changed, 78 insertions(+), 24 deletions(-) diff --git a/client/task_opts.go b/client/task_opts.go index ea6658192a90a..82c78bc111062 100644 --- a/client/task_opts.go +++ b/client/task_opts.go @@ -50,6 +50,24 @@ func WithRuntimePath(absRuntimePath string) NewTaskOpts { } } +// WithTaskAPIEndpoint allow task service to manage a task through a given endpoint, +// usually it is served inside a sandbox, and we can get it from sandbox status. +func WithTaskAPIEndpoint(address, protocol string, version uint32) NewTaskOpts { + return func(ctx context.Context, client *Client, info *TaskInfo) error { + if info.Options == nil { + info.Options = &options.Options{} + } + opts, ok := info.Options.(*options.Options) + if !ok { + return errors.New("invalid runtime v2 options format") + } + opts.TaskApiAddress = address + opts.TaskApiProtocol = protocol + opts.TaskApiVersion = version + return nil + } +} + // WithTaskCheckpoint allows a task to be created with live runtime and memory data from a // previous checkpoint. Additional software such as CRIU may be required to // restore a task from a checkpoint diff --git a/core/runtime/runtime.go b/core/runtime/runtime.go index 8c79c5c57db4d..fdb6834565697 100644 --- a/core/runtime/runtime.go +++ b/core/runtime/runtime.go @@ -51,6 +51,12 @@ type CreateOpts struct { Runtime string // SandboxID is an optional ID of sandbox this container belongs to SandboxID string + // Address is an optional Address for Task API server + Address string + // Protocol is an optional Protocol for Task API connection + Protocol string + // Version is an optional Version of the Task API + Version uint32 } // Exit information for a process diff --git a/core/runtime/v2/manager.go b/core/runtime/v2/manager.go index 3e12f37999c25..fdd409f8b506a 100644 --- a/core/runtime/v2/manager.go +++ b/core/runtime/v2/manager.go @@ -199,9 +199,22 @@ func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateO // This container belongs to sandbox which supposed to be already started via sandbox API. if opts.SandboxID != "" { - process, err := m.Get(ctx, opts.SandboxID) - if err != nil { - return nil, fmt.Errorf("can't find sandbox %s", opts.SandboxID) + var params shimbinary.BootstrapParams + if opts.Address != "" && opts.Protocol != "" { + params = shimbinary.BootstrapParams{ + Version: int(opts.Version), + Address: opts.Address, + Protocol: opts.Protocol, + } + } else { + // For those sandbox we can not get endpoint, + // fallback to legacy implementation + p, restoreErr := m.restoreBootstrapParams(ctx, opts.SandboxID) + if restoreErr != nil { + return nil, fmt.Errorf("failed to get bootstrap "+ + "params of sandbox %s, %v, legacy restore error %v", opts.SandboxID, err, restoreErr) + } + params = p } // Write sandbox ID this task belongs to. @@ -209,11 +222,6 @@ func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateO return nil, err } - params, err := restoreBootstrapParams(filepath.Join(m.state, process.Namespace(), opts.SandboxID)) - if err != nil { - return nil, err - } - if err := writeBootstrapParams(filepath.Join(bundle.Path, "bootstrap.json"), params); err != nil { return nil, fmt.Errorf("failed to write bootstrap.json for bundle %s: %w", bundle.Path, err) } @@ -287,6 +295,18 @@ func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, return shim, nil } +func (m *ShimManager) restoreBootstrapParams(ctx context.Context, sandboxID string) (shimbinary.BootstrapParams, error) { + process, err := m.Get(ctx, sandboxID) + if err != nil { + return shimbinary.BootstrapParams{}, fmt.Errorf("can't find sandbox %s", sandboxID) + } + params, err := restoreBootstrapParams(filepath.Join(m.state, process.Namespace(), sandboxID)) + if err != nil { + return shimbinary.BootstrapParams{}, err + } + return params, nil +} + // restoreBootstrapParams reads bootstrap.json to restore shim configuration. // If its an old shim, this will perform migration - read address file and write default bootstrap // configuration (version = 2, protocol = ttrpc, and address). diff --git a/internal/cri/server/container_start.go b/internal/cri/server/container_start.go index 0d84694624c78..2cff127b9be4e 100644 --- a/internal/cri/server/container_start.go +++ b/internal/cri/server/container_start.go @@ -123,7 +123,7 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain endpoint := sandbox.Endpoint if endpoint.Protocol != "" && endpoint.Address != "" { taskOpts = append(taskOpts, - containerd.WithTaskApiEndpoint(endpoint.Address, endpoint.Protocol, endpoint.Version)) + containerd.WithTaskAPIEndpoint(endpoint.Address, endpoint.Protocol, endpoint.Version)) } task, err := container.NewTask(ctx, ioCreation, taskOpts...) diff --git a/plugins/services/tasks/local.go b/plugins/services/tasks/local.go index 514898c2ee4aa..fc703be3eed45 100644 --- a/plugins/services/tasks/local.go +++ b/plugins/services/tasks/local.go @@ -155,10 +155,25 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc. if err != nil { return nil, errdefs.ToGRPC(err) } - checkpointPath, err := getRestorePath(container.Runtime.Name, r.Options) - if err != nil { - return nil, err + + var ( + checkpointPath string + taskAPIAddress string + taskAPIProtocol string + taskAPIVersion uint32 + ) + + if r.Options != nil { + taskOptions, err := formatOptions(container.Runtime.Name, r.Options) + if err != nil { + return nil, err + } + checkpointPath = taskOptions.CriuImagePath + taskAPIAddress = taskOptions.TaskApiAddress + taskAPIProtocol = taskOptions.TaskApiProtocol + taskAPIVersion = taskOptions.TaskApiVersion } + // jump get checkpointPath from checkpoint image if checkpointPath == "" && r.Checkpoint != nil { checkpointPath, err = os.MkdirTemp(os.Getenv("XDG_RUNTIME_DIR"), "ctrd-checkpoint") @@ -196,6 +211,9 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc. RuntimeOptions: container.Runtime.Options, TaskOptions: r.Options, SandboxID: container.SandboxID, + Address: taskAPIAddress, + Protocol: taskAPIProtocol, + Version: taskAPIVersion, } if r.RuntimePath != "" { opts.Runtime = r.RuntimePath @@ -723,22 +741,14 @@ func getCheckpointPath(runtime string, option *ptypes.Any) (string, error) { return checkpointPath, nil } -// getRestorePath only suitable for runc runtime now -func getRestorePath(runtime string, option *ptypes.Any) (string, error) { - if option == nil { - return "", nil - } - - var restorePath string +func formatOptions(runtime string, option *ptypes.Any) (*options.Options, error) { v, err := typeurl.UnmarshalAny(option) if err != nil { - return "", err + return nil, err } opts, ok := v.(*options.Options) if !ok { - return "", fmt.Errorf("invalid task create option for %s", runtime) + return nil, fmt.Errorf("invalid task create option for %s", runtime) } - restorePath = opts.CriuImagePath - - return restorePath, nil + return opts, nil }